Overview

The Core Middleware is a topic-based publish/subscribe middleware (see Wikipedia) that makes it possible to speed up the software development by leveraging on code reuse.

Applications are built connecting together logical blocks (nodes) by means of named communication channels (topics). Each topic has its own specific message type. Each node can transmit (publish) on and can receive (subscribe) from any channel (topic).

In this document a more in-depth description of the following objects will be given.

  1. Messages

  2. Publishers and Subscribers

  3. Nodes

  4. configurations

  5. Node manager

  6. Module

Messages

A message is the datatype of a topic, that is the data that nodes can share by means of the publish/subscribe middleware.

From the source code point of view a message is a subclass of core::mw::Message.

The subclass must be defined as a packed struct using the CORE_PACKED macro.
Example 1. A message derived from core::mw::Message
1
2
3
4
5
6
7
8
9
#pragma once

#include <core/mw/Message.hpp>

struct example:
          public core::mw::Message (1)
{
          uint32_t value;
} CORE_PACKED; (2)
1 extend core::mw::Message
2 make it a packed struct
Even though it is possible to directly write the message code, it is better to use the facilities provided by Core Message.

Publishers and Subscribers

Publisher

A publisher is defined as an instance of core::mw::Publisher<typename MessageType> class.

The class template must be specialized using the message type of the topic it will publish to.

Example 2. How to publish data
1
2
3
4
5
6
7
8
9
10
11
12
13
core::common_msgs::Led* msg;

core::mw::Publisher<core::common_msgs::Led> publisher; (1)

if (publisher.alloc(msg)) { (2)

        msgp->led   = 4;    (3)
        msgp->value = 0;

        if (!publisher.publish(*msg)) { (4)
                ...
        }
}
1 specialize with the message
2 ask the publisher to allocate a message
3 valorize the message
4 publish it

The core::mw::Publisher::alloc() method will return false where there are no subscribers to the topic.
The core::mw::Publisher::publish() method will return false when TODO.

It is up to the node to tell the system that the publisher exists. This will be discussed in the Nodes section.

Subscriber

A subscriber is defined as an instance of core::mw::Subscriber<typename MessageType, unsigned QUEUE_LENGTH> class.

The class template must be specialized using the message type of the topic it will subscribe to and the length of the message queue. The message queue length (QUEUE_LENGTH) must be at least 2.

There are 2 ways to consume the data received:

  • registering a callback

  • using core::mw::Subscriber::fetch()

It is up to the node to tell the system that the subscriber exists. This will be discussed in the Nodes section.

Callback

It is possible to register a subscriber callback the node will call on its spin() method (more on this later).

The callback function signature for a core::mw::Subscriber<typename MessageType, unsigned QUEUE_LENGTH> subscriber is:

Callback data type
1
2
3
4
typedef bool (* Callback)(
        const MessageType& msg,
        Node*              node
);

The callback function can be registered using core::mw::Subscriber::set_callback() method.

The node parameter is required to easily allow multiple instance of the subscriber (as the callback is a C function-pointer).

Example 3. Callback function
1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
subscriber.set_callback(LedSubscriber::callback); (1)
...

bool
LedSubscriber::callback(
                const core::common_msgs::Led& msg,
                core::mw::Node*         node
)
{
        LedSubscriber* _this = static_cast<LedSubscriber*>(node); (2)

        _this->setLedValue(msg.value);
}
1 register the callback
2 pointer to instance of the node
If the callback is a class member function, it MUST be made static (to exclude the parameter this from its signature).

fetch()

Example 4. fetch()
1
2
3
4
5
core::common_msgs::Led* msg;

if (subscriber.fetch(msg)) {
        setLedValue(msg.value);
}

The core::mw::Subscriber::fetch() method will return false where there are no messages in the queue.

Nodes

A node is nothing but a process that runs on a module and that communicate with other nodes using a topic based publish/subscribe pattern (see Wikipedia).

From the source code point of view a node can be defined in two different ways:

  • as subclass of core::mw::CoreNode

  • as an OS thread

The first approach permits to control the execution flow and is the preferred way to implement a node.
The latter approach, whilst apparently simpler, does not offer any control on the Node lifecycle.

Nodes defined as subclasses of core::mw::CoreNode share other important features:

  • they can be described using a JSON file, allowing automatic code generation

  • they can be easily configured by means of a configuration system

CoreNode

ICoreNode and the node lifecycle

The core::mw::ICoreNode interface describes the lifecycle of a node.

Interface ICoreNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class ICoreNode
{
public:
        enum class State { (1)
                NONE, // Node has not been set up
                SET_UP, // wait
                INITIALIZING, // -> onInitialize
                INITIALIZED, // wait
                CONFIGURING, // -> onConfigure
                CONFIGURED, // wait
                PREPARING_HW, // -> onPrepareHW
                HW_READY, // wait
                PREPARING_MW, // -> onPrepareMW
                MW_READY, // wait
                IDLE, // wait
                STARTING, // -> onStart
                LOOPING, // -> onLoop
                STOPPING, // -> onStop
                FINALIZING, // -> onFinalize
                ERROR, // wait forever
                TEARING_DOWN // killing
        };

        enum class Action { (2)
                INITIALIZE,
                CONFIGURE,
                PREPARE_HW,
                PREPARE_MW,
                START,
                STOP,
                FINALIZE
        };

        virtual bool (3)
        setup() = 0;

        virtual bool (4)
        execute(
                Action what
        ) = 0;

        virtual bool (5)
        teardown() = 0;

        virtual State
        state() = 0;

        virtual ~ICoreNode() {}

protected: (6)
        virtual bool
        onInitialize() = 0;

        virtual bool
        onConfigure() = 0;

        virtual bool
        onPrepareHW() = 0;

        virtual bool
        onPrepareMW() = 0;

        virtual bool
        onStart() = 0;

        virtual bool
        onLoop() = 0;

        virtual bool
        onStop() = 0;

        virtual bool
        onError() = 0;

        virtual bool
        onFinalize() = 0;


protected:
        ICoreNode() :
                _currentState(State::NONE)
        {}

        State _currentState;
};
1 node states
2 actions on the node
3 setup the node
4 change state
5 teardown the node
6 event handlers

When is firstly created, an core::mw::ICoreNode derived object must be in a ICoreNode::State::NONE state.

Calling ICoreNode::setup() will set up the node execution. State will change to ICoreNode::State::SET_UP once the node is ready.

Then is it possible to cycle all the states using ICoreNode::execute().
The order of the actions to execute must be exactly:

  1. ICoreNode::Action::INITIALIZE

  2. ICoreNode::Action::CONFIGURE

  3. ICoreNode::Action::PREPARE_HW

  4. ICoreNode::Action::PREPARE_MW

  5. ICoreNode::Action::START

  6. ICoreNode::Action::STOP

  7. ICoreNode::Action::FINALIZE

ICoreNode::execute() must return false if an out of order action is requested.

Table 1. To every action corresponds an event
Action Event handler Meaning

INITIALIZE

onInitialize()

memory allocation, default memeber value assignement, …​

CONFIGURE

onConfigure()

configuration, …​

PREPARE_HW

onPrepareHW()

hardware setup, device probing, …​

PREPARE_MW

onPrepareMW()

publishers advertising and subscribers subscribing, callback settings, …​

START

onStart()

hardware starting, …​

STOP

onStop()

hardware stopping, …​

FINALIZE

onFinalize()

memory deallocation, …​

Two more events are defined: ICoreNode::onLoop() and ICoreNode::onError().

ICoreNode::onLoop() is the main node loop.

Whenever an unrecoverable error occurs (i.e.: when an event member function returns false), the state of the node must become ICoreNode::State::ERROR and ICoreNode::onError() event must be called.

The only way to recover must then be to tear down the node, calling ICoreNode::teardown().
The node must switch to a ICoreNode::State::TEARING_DOWN state and then, to ICoreNode::State::NONE. ICoreNode::teardown() must also be called when a `ICoreNode `object is destroyed.

See ICoreNode state machine for a description of the state transitions. There is no reference to core::mw::ICoreNode::teardown(), but it must always be possible to call it in every moment.

ICoreNode state machine
ICoreNode state machine

CoreNode

CoreNode header file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#pragma once

#include <core/mw/namespace.hpp>

#include <core/os/Thread.hpp>
#include <core/os/Condition.hpp>
#include <core/os/Mutex.hpp>
#include <core/mw/Node.hpp>

#include <core/mw/ICoreNode.hpp>

NAMESPACE_CORE_MW_BEGIN

class CoreNode:
        public ICoreNode,
        protected core::mw::Node
{
public:
        virtual ~CoreNode() {}

        CoreNode(
                        const char*                    name,
                        core::mw::Thread::PriorityEnum priority = core::mw::Thread::PriorityEnum::NORMAL
        );

...

The core::mw::CoreNode class offers the developer the possibility to cleanly implement all the possible aspects related to a node by implementing the core::mw::ICoreNode interface.

Calling CoreNode::setup() will create the node execution thread. State will change to CoreNode::State::SET_UP once the thread has been created.

Implementing a new node requires to extend core::mw::CoreNode, and to override the default event member functions as required.
core::mw::CoreNode constructor takes 2 arguments, the node name and the node thread priority.

The event member functions by default return true, so it is not needed to implement these in core::mw::CoreNode derived classes.

core::mw::CoreNode extends a lower-level core::mw::Node class.
This class offers methods to notify the system that topic are going to be published and/or subscribed.

core::mw::Node advertise and publish methods
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
        template <typename MT>
        bool
        advertise(
                        Publisher<MT>& pub,
                        const char*    namep,
                        const Time&    publish_timeout = Time::INFINITE
        );


        template <typename MT>
        bool
        subscribe(
                        SubscriberExtBuf<MT>& sub,
                        const char*           namep,
                        MT                    mgpool_buf[]
        );


        template <typename MT, unsigned QL>
        bool
        subscribe(
                        Subscriber<MT, QL>& sub,
                        const char* namep
        );

A core::mw::CoreNode object can be added to a core::mw::CoreNodeManager (such as a core::mw::CoreModule).
In this way all the nodes lifecycles are syncronized. It is also easier and more concise than calling all the execute() while checking for the return value and states.

Example 5. A node derived from CoreNode
Header file for led::Publisher node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#pragma once

#include <core/mw/CoreNode.hpp>
#include <core/mw/Publisher.hpp>

#include <core/common_msgs/Led.hpp>
#include <core/led/PublisherConfiguration.hpp>

#include <array>

namespace core {
namespace led {
        class Publisher:
                public core::mw::CoreNode
        {
public: // CONSTRUCTOR and DESTRUCTOR
                Publisher(
                                const char*                    name,
                                core::mw::Thread::PriorityEnum priority = core::mw::Thread::PriorityEnum::NORMAL
                );
                virtual
                ~Publisher();

public: // CONFIGURATION
                PublisherConfiguration configuration;

private: // PUBLISHERS and SUBSCRIBERS
                core::mw::Publisher<common_msgs::Led> _publisher;

private: // PRIVATE MEMBERS
                uint32_t _toggle;

private: // EVENTS
                bool
                onPrepareMW();

                bool
                onLoop();
        };
}
}

OS thread

In this approach a node is defined directly as a thread.

The main node loop must call node Node::spin() member function, that dispatches the messages around.

core::mw::Node spin method
1
2
3
4
bool
        spin(
                        const Time& timeout = Time::INFINITE
        );

Node::spin() returns false if no messages where received after timeout.

Example 6. A node defined as an OS thread
Thread Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void
test_subscriber_node(
                void* arg
)
{
        common_msgs::String64* msgp;

        core::mw::Node node("test_sub"); (1)
        core::mw::Subscriber<core::common_msgs::String64, 5> sub; (2)
        node.subscribe(sub, "test"); (3)

        for (;;) {
                node.spin(core::mw::Time::ms(1000)); (4)

                if (sub.fetch(msgp)) {
                        module.stream.printf("%s\r\n", msgp->data);
                        sub.release(*msgp);
                }
        }
}
1 create the node
2 create a Subscriber
3 subscribe sub to topic test
4 spin

And later on, create a thread with it.

Thread creation
1
2
3
...
core::os::Thread::create_heap(NULL, THD_WORKING_AREA_SIZE(1024), NORMALPRIO, test_subscriber_node, nullptr);
...

Configurations

core::mw::CoreConfiguration class implements a kind-of reflection system, which allows to access derived class members by their name:

1
2
3
core::led::LedPublisherConfiguration configuration;

configuration.led = 5;

is the same as

1
2
3
core::led::LedPublisherConfiguration configuration;

configuration["led"] = 5;

This is useful for a runtime configuration of the node.

Even though it is possible to directly write the configuration code, it is better to use the facilities provided by Core Configuration.

As this system relies on type deduction for template argument and return value types, special care must be used in assignments using the [] operator. This is specially true in assignments of numeric literals.

It is better to always force the type by using an explicit cast or the user defined literals _s8, _u8, _s16, _u16, _s32, _u32, _s64, _u64, _f32, _f64.

1
2
3
4
core::led::LedPublisherConfiguration configuration;

configuration["led"] = (uint32_t)5; (1)
configuration["led"] = 5_u32; (2)
1 explicit cast
2 user defined literal

Node Manager

core::mw::ICoreNodeManager provides a simple way to synchronize a set on nodes by grouping lifecycle steps together and calling the proper CoreNode methods in a coordinated way on a list of nodes, while checking for the success of the operations.

ICoreNodeManager interface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#pragma once

#include <core/mw/namespace.hpp>

#include <core/mw/CoreNode.hpp>

NAMESPACE_CORE_MW_BEGIN

class ICoreNodeManager
{
public:
        virtual void
        add(
                        const CoreNode& node
        ) = 0;

        virtual bool
        setup() = 0;

        virtual bool
        run() = 0;

        virtual bool
        stop() = 0;

        virtual bool
        teardown() = 0;

        virtual bool
        isOk() = 0;

};

NAMESPACE_CORE_MW_END

ICoreNodeManager::add() permits to add a node to the list of nodes to synchronize.

Table 2. ICoreNodeManager → CoreNode relation
ICoreNodeManager CoreNode

setup()

setup()

run()

execute(INITIALIZE)execute(CONFIGURE)execute(PREPARE_HW)execute(PREPARE_MW)execute(START)

stop()

execute(STOP)execute(FINALIZE)

teardown()

teardown()

All the methods listed above must return false if any of the nodes reaches an ICoreNode::State::ERROR state during the execution.
ICoreNodeManager::isOk() must also return false if any of the nodes is in a ICoreNode::State::ERROR state.

core::mw::CoreNodeManager implements core::mw::ICoreNodeManager.

Module

core::mw::CoreModule abstracts a Core module.

Each module must inherit from this class.

CoreModule header file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Led::toggle and Led::write are to be implemented in derived Module.

#pragma once

#include <core/mw/namespace.hpp>
#include <core/mw/common.hpp>
#include <core/mw/CoreNodeManager.hpp>

NAMESPACE_CORE_MW_BEGIN

class CoreModule:
        public CoreNodeManager (1)
{
public:
        class Led (2)
        {
        public:
                void
                toggle();

                void
                write(
                        unsigned on
                );
        };

        static Led& led;


        bool
        initialize();

        static const void
        halt(
                const char* message
        );


        CoreModule() {}

        virtual ~CoreModule() {}
};

NAMESPACE_CORE_MW_END
1 inherits from core::mw::CoreNodeManager
2 led class

Lines 15-25 define the class that control the status led. The implementation of the class is left to the derived module.

Example 7. A module derived from CoreNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#pragma once

#include <core/mw/CoreModule.hpp>
#include <ModuleConfiguration.hpp>

namespace core {
namespace hw {
class Pad;
class SPIDevice;
class I2CMaster;
class EXTChannel;
}
}

class Module:
        public core::mw::CoreModule (1)
{
public:
        static bool
        initialize(); (2)

// --- DEVICES ----------------------------------------------------------------
        static core::hw::Pad& d0;
        static core::hw::Pad& d1;
        static core::hw::Pad& d2;
        static core::hw::Pad& d3;
        static core::hw::Pad& d4;
        static core::hw::Pad& d5;
        static core::hw::Pad& d6;
        static core::hw::Pad& d7;

        static core::hw::SPIDevice& spi;
        static core::hw::I2CMaster& i2c;
// ----------------------------------------------------------------------------

        Module();
        virtual ~Module() {}
};
1 inherit from CoreModule
2 override CoreModule::intialize()
Example 8. Using a module derived from CoreNode

The following is the main.cpp from a target using the IO module.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <ModuleConfiguration.hpp>
#include <Module.hpp> (1)

// MESSAGES
#include <core/common_msgs/Led.hpp>

// NODES
#include <core/led/Subscriber.hpp>

// BOARD IMPL
#include <core/hw/EXT.hpp> (2)
#include <core/hw/GPIO.hpp>
#include <core/hw/SPI.hpp>
#include <core/hw/I2C.hpp>

// *** DO NOT MOVE ***
Module module; (3)

// TYPES

// NODES
core::led::Subscriber led_subscriber("led_subscriber", core::os::Thread::PriorityEnum::LOWEST); (4)

// MAIN
extern "C" {
        int
        main()
        {
                module.initialize(); (5)

                // Led subscriber node
                core::led::SubscriberConfiguration led_subscriber_configuration;
                led_subscriber_configuration.topic = "led";
                led_subscriber.setConfiguration(led_subscriber_configuration); (6)
                module.add(led_subscriber); (7)

                // Setup and run
                module.setup(); (8)
                module.run(); (9)

                // Is everything going well?
                for (;;) {
                        if (!module.isOk()) { (10)
                                module.halt("This must not happen!");
                        }

                        core::os::Thread::sleep(core::os::Time::ms(500));
                }

                return core::os::Thread::OK;
            }
}

1 include the header file for the derived module
2 include the implementation of the forward declarations in Module.hpp
3 module instance
4 nodes instances
5 initialize the module
6 configure the nodes
7 add nodes to the module
8 setup the nodes
9 run the nodes
10 check that everything is ok