Overview
The Core Middleware is a topic-based publish/subscribe middleware (see Wikipedia) that makes it possible to speed up the software development by leveraging on code reuse.
Applications are built connecting together logical blocks (nodes) by means of named communication channels (topics). Each topic has its own specific message type. Each node can transmit (publish) on and can receive (subscribe) from any channel (topic).
In this document a more in-depth description of the following objects will be given.
-
Messages
-
Publishers and Subscribers
-
Nodes
-
configurations
-
Node manager
-
Module
Messages
A message is the datatype of a topic, that is the data that nodes can share by means of the publish/subscribe middleware.
From the source code point of view a message is a subclass of core::mw::Message
.
The subclass must be defined as a packed struct using the CORE_PACKED macro.
|
core::mw::Message
1
2
3
4
5
6
7
8
9
#pragma once
#include <core/mw/Message.hpp>
struct example:
public core::mw::Message (1)
{
uint32_t value;
} CORE_PACKED; (2)
1 | extend core::mw::Message |
2 | make it a packed struct |
Even though it is possible to directly write the message code, it is better to use the facilities provided by Core Message. |
Publishers and Subscribers
Publisher
A publisher is defined as an instance of core::mw::Publisher<typename MessageType>
class.
The class template must be specialized using the message type of the topic it will publish to.
1
2
3
4
5
6
7
8
9
10
11
12
13
core::common_msgs::Led* msg;
core::mw::Publisher<core::common_msgs::Led> publisher; (1)
if (publisher.alloc(msg)) { (2)
msgp->led = 4; (3)
msgp->value = 0;
if (!publisher.publish(*msg)) { (4)
...
}
}
1 | specialize with the message |
2 | ask the publisher to allocate a message |
3 | valorize the message |
4 | publish it |
The core::mw::Publisher::alloc()
method will return false where there are no subscribers to the topic.
The core::mw::Publisher::publish()
method will return false when TODO.
It is up to the node to tell the system that the publisher exists. This will be discussed in the Nodes section. |
Subscriber
A subscriber is defined as an instance of core::mw::Subscriber<typename MessageType, unsigned QUEUE_LENGTH>
class.
The class template must be specialized using the message type of the topic it will subscribe to and the length of the message queue.
The message queue length (QUEUE_LENGTH
) must be at least 2.
There are 2 ways to consume the data received:
-
registering a callback
-
using
core::mw::Subscriber::fetch()
It is up to the node to tell the system that the subscriber exists. This will be discussed in the Nodes section. |
Callback
It is possible to register a subscriber callback the node will call on its spin() method (more on this later).
The callback function signature for a core::mw::Subscriber<typename MessageType, unsigned QUEUE_LENGTH>
subscriber is:
1
2
3
4
typedef bool (* Callback)(
const MessageType& msg,
Node* node
);
The callback function can be registered using core::mw::Subscriber::set_callback()
method.
The node parameter is required to easily allow multiple instance of the subscriber (as the callback is a C function-pointer).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
subscriber.set_callback(LedSubscriber::callback); (1)
...
bool
LedSubscriber::callback(
const core::common_msgs::Led& msg,
core::mw::Node* node
)
{
LedSubscriber* _this = static_cast<LedSubscriber*>(node); (2)
_this->setLedValue(msg.value);
}
1 | register the callback |
2 | pointer to instance of the node |
If the callback is a class member function, it MUST be made static (to exclude the parameter this from its signature).
|
fetch()
1
2
3
4
5
core::common_msgs::Led* msg;
if (subscriber.fetch(msg)) {
setLedValue(msg.value);
}
The core::mw::Subscriber::fetch()
method will return false where there are no messages in the queue.
Nodes
A node is nothing but a process that runs on a module and that communicate with other nodes using a topic based publish/subscribe pattern (see Wikipedia).
From the source code point of view a node can be defined in two different ways:
-
as subclass of
core::mw::CoreNode
-
as an OS thread
The first approach permits to control the execution flow and is the preferred way to implement a node.
The latter approach, whilst apparently simpler, does not offer any control on the Node lifecycle.
Nodes defined as subclasses of core::mw::CoreNode
share other important features:
-
they can be described using a JSON file, allowing automatic code generation
-
they can be easily configured by means of a configuration system
CoreNode
ICoreNode and the node lifecycle
The core::mw::ICoreNode
interface describes the lifecycle of a node.
ICoreNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class ICoreNode
{
public:
enum class State { (1)
NONE, // Node has not been set up
SET_UP, // wait
INITIALIZING, // -> onInitialize
INITIALIZED, // wait
CONFIGURING, // -> onConfigure
CONFIGURED, // wait
PREPARING_HW, // -> onPrepareHW
HW_READY, // wait
PREPARING_MW, // -> onPrepareMW
MW_READY, // wait
IDLE, // wait
STARTING, // -> onStart
LOOPING, // -> onLoop
STOPPING, // -> onStop
FINALIZING, // -> onFinalize
ERROR, // wait forever
TEARING_DOWN // killing
};
enum class Action { (2)
INITIALIZE,
CONFIGURE,
PREPARE_HW,
PREPARE_MW,
START,
STOP,
FINALIZE
};
virtual bool (3)
setup() = 0;
virtual bool (4)
execute(
Action what
) = 0;
virtual bool (5)
teardown() = 0;
virtual State
state() = 0;
virtual ~ICoreNode() {}
protected: (6)
virtual bool
onInitialize() = 0;
virtual bool
onConfigure() = 0;
virtual bool
onPrepareHW() = 0;
virtual bool
onPrepareMW() = 0;
virtual bool
onStart() = 0;
virtual bool
onLoop() = 0;
virtual bool
onStop() = 0;
virtual bool
onError() = 0;
virtual bool
onFinalize() = 0;
protected:
ICoreNode() :
_currentState(State::NONE)
{}
State _currentState;
};
1 | node states |
2 | actions on the node |
3 | setup the node |
4 | change state |
5 | teardown the node |
6 | event handlers |
When is firstly created, an core::mw::ICoreNode
derived object must be in a ICoreNode::State::NONE
state.
Calling ICoreNode::setup()
will set up the node execution.
State will change to ICoreNode::State::SET_UP
once the node is ready.
Then is it possible to cycle all the states using ICoreNode::execute()
.
The order of the actions to execute must be exactly:
-
ICoreNode::Action::INITIALIZE
-
ICoreNode::Action::CONFIGURE
-
ICoreNode::Action::PREPARE_HW
-
ICoreNode::Action::PREPARE_MW
-
ICoreNode::Action::START
-
ICoreNode::Action::STOP
-
ICoreNode::Action::FINALIZE
ICoreNode::execute()
must return false
if an out of order action is requested.
Action | Event handler | Meaning |
---|---|---|
|
|
memory allocation, default memeber value assignement, … |
|
|
configuration, … |
|
|
hardware setup, device probing, … |
|
|
publishers advertising and subscribers subscribing, callback settings, … |
|
|
hardware starting, … |
|
|
hardware stopping, … |
|
|
memory deallocation, … |
Two more events are defined: ICoreNode::onLoop()
and ICoreNode::onError()
.
ICoreNode::onLoop()
is the main node loop.
Whenever an unrecoverable error occurs (i.e.: when an event member function returns false
), the state of the node
must become ICoreNode::State::ERROR
and ICoreNode::onError()
event must be called.
The only way to recover must then be to tear down the node, calling ICoreNode::teardown()
.
The node must switch to a ICoreNode::State::TEARING_DOWN
state and then, to ICoreNode::State::NONE
.
ICoreNode::teardown()
must also be called when a `ICoreNode `object is destroyed.
See ICoreNode
state machine for a description of the state transitions.
There is no reference to core::mw::ICoreNode::teardown()
, but it must always be possible to call it in every moment.
ICoreNode
state machineCoreNode
CoreNode
header file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#pragma once
#include <core/mw/namespace.hpp>
#include <core/os/Thread.hpp>
#include <core/os/Condition.hpp>
#include <core/os/Mutex.hpp>
#include <core/mw/Node.hpp>
#include <core/mw/ICoreNode.hpp>
NAMESPACE_CORE_MW_BEGIN
class CoreNode:
public ICoreNode,
protected core::mw::Node
{
public:
virtual ~CoreNode() {}
CoreNode(
const char* name,
core::mw::Thread::PriorityEnum priority = core::mw::Thread::PriorityEnum::NORMAL
);
...
The core::mw::CoreNode
class offers the developer the possibility to cleanly implement all the possible aspects related to a node by implementing the core::mw::ICoreNode
interface.
Calling CoreNode::setup()
will create the node execution thread.
State will change to CoreNode::State::SET_UP
once the thread has been created.
Implementing a new node requires to extend core::mw::CoreNode
, and to override the default event member functions as required.
core::mw::CoreNode
constructor takes 2 arguments, the node name and the node thread priority.
The event member functions by default return true , so it is not needed to implement these in core::mw::CoreNode derived classes.
|
core::mw::CoreNode
extends a lower-level core::mw::Node
class.
This class offers methods to notify the system that topic are going to be published and/or subscribed.
core::mw::Node
advertise and publish methods
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template <typename MT>
bool
advertise(
Publisher<MT>& pub,
const char* namep,
const Time& publish_timeout = Time::INFINITE
);
template <typename MT>
bool
subscribe(
SubscriberExtBuf<MT>& sub,
const char* namep,
MT mgpool_buf[]
);
template <typename MT, unsigned QL>
bool
subscribe(
Subscriber<MT, QL>& sub,
const char* namep
);
A core::mw::CoreNode
object can be added to a core::mw::CoreNodeManager
(such as a core::mw::CoreModule
).
In this way all the nodes lifecycles are syncronized.
It is also easier and more concise than calling all the execute()
while checking for the return value and states.
CoreNode
led::Publisher
node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#pragma once
#include <core/mw/CoreNode.hpp>
#include <core/mw/Publisher.hpp>
#include <core/common_msgs/Led.hpp>
#include <core/led/PublisherConfiguration.hpp>
#include <array>
namespace core {
namespace led {
class Publisher:
public core::mw::CoreNode
{
public: // CONSTRUCTOR and DESTRUCTOR
Publisher(
const char* name,
core::mw::Thread::PriorityEnum priority = core::mw::Thread::PriorityEnum::NORMAL
);
virtual
~Publisher();
public: // CONFIGURATION
PublisherConfiguration configuration;
private: // PUBLISHERS and SUBSCRIBERS
core::mw::Publisher<common_msgs::Led> _publisher;
private: // PRIVATE MEMBERS
uint32_t _toggle;
private: // EVENTS
bool
onPrepareMW();
bool
onLoop();
};
}
}
OS thread
In this approach a node is defined directly as a thread.
The main node loop must call node Node::spin()
member function, that dispatches the messages around.
core::mw::Node
spin method
1
2
3
4
bool
spin(
const Time& timeout = Time::INFINITE
);
Node::spin()
returns false if no messages where received after timeout
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void
test_subscriber_node(
void* arg
)
{
common_msgs::String64* msgp;
core::mw::Node node("test_sub"); (1)
core::mw::Subscriber<core::common_msgs::String64, 5> sub; (2)
node.subscribe(sub, "test"); (3)
for (;;) {
node.spin(core::mw::Time::ms(1000)); (4)
if (sub.fetch(msgp)) {
module.stream.printf("%s\r\n", msgp->data);
sub.release(*msgp);
}
}
}
1 | create the node |
2 | create a Subscriber |
3 | subscribe sub to topic test |
4 | spin |
And later on, create a thread with it.
1
2
3
...
core::os::Thread::create_heap(NULL, THD_WORKING_AREA_SIZE(1024), NORMALPRIO, test_subscriber_node, nullptr);
...
Configurations
core::mw::CoreConfiguration
class implements a kind-of reflection system, which allows to access derived class members by their name:
1
2
3
core::led::LedPublisherConfiguration configuration;
configuration.led = 5;
is the same as
1
2
3
core::led::LedPublisherConfiguration configuration;
configuration["led"] = 5;
This is useful for a runtime configuration of the node.
Even though it is possible to directly write the configuration code, it is better to use the facilities provided by Core Configuration. |
As this system relies on type deduction for template argument and return value types, special care must be used in assignments using the It is better to always force the type by using an explicit cast or the user defined literals
|
Node Manager
core::mw::ICoreNodeManager
provides a simple way to synchronize a set on nodes by grouping lifecycle steps together
and calling the proper CoreNode
methods in a coordinated way on a list of nodes, while checking for the success of the operations.
ICoreNodeManager
interface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#pragma once
#include <core/mw/namespace.hpp>
#include <core/mw/CoreNode.hpp>
NAMESPACE_CORE_MW_BEGIN
class ICoreNodeManager
{
public:
virtual void
add(
const CoreNode& node
) = 0;
virtual bool
setup() = 0;
virtual bool
run() = 0;
virtual bool
stop() = 0;
virtual bool
teardown() = 0;
virtual bool
isOk() = 0;
};
NAMESPACE_CORE_MW_END
ICoreNodeManager::add()
permits to add a node to the list of nodes to synchronize.
ICoreNodeManager | CoreNode |
---|---|
|
|
|
|
|
|
|
|
All the methods listed above must return false
if any of the nodes reaches an ICoreNode::State::ERROR
state during the execution.
ICoreNodeManager::isOk()
must also return false
if any of the nodes is in a ICoreNode::State::ERROR
state.
core::mw::CoreNodeManager
implements core::mw::ICoreNodeManager
.
Module
core::mw::CoreModule
abstracts a Core module.
Each module must inherit from this class.
CoreModule
header file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Led::toggle and Led::write are to be implemented in derived Module.
#pragma once
#include <core/mw/namespace.hpp>
#include <core/mw/common.hpp>
#include <core/mw/CoreNodeManager.hpp>
NAMESPACE_CORE_MW_BEGIN
class CoreModule:
public CoreNodeManager (1)
{
public:
class Led (2)
{
public:
void
toggle();
void
write(
unsigned on
);
};
static Led& led;
bool
initialize();
static const void
halt(
const char* message
);
CoreModule() {}
virtual ~CoreModule() {}
};
NAMESPACE_CORE_MW_END
1 | inherits from core::mw::CoreNodeManager |
2 | led class |
Lines 15-25 define the class that control the status led. The implementation of the class is left to the derived module.
CoreNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#pragma once
#include <core/mw/CoreModule.hpp>
#include <ModuleConfiguration.hpp>
namespace core {
namespace hw {
class Pad;
class SPIDevice;
class I2CMaster;
class EXTChannel;
}
}
class Module:
public core::mw::CoreModule (1)
{
public:
static bool
initialize(); (2)
// --- DEVICES ----------------------------------------------------------------
static core::hw::Pad& d0;
static core::hw::Pad& d1;
static core::hw::Pad& d2;
static core::hw::Pad& d3;
static core::hw::Pad& d4;
static core::hw::Pad& d5;
static core::hw::Pad& d6;
static core::hw::Pad& d7;
static core::hw::SPIDevice& spi;
static core::hw::I2CMaster& i2c;
// ----------------------------------------------------------------------------
Module();
virtual ~Module() {}
};
1 | inherit from CoreModule |
2 | override CoreModule::intialize() |
CoreNode
The following is the main.cpp
from a target using the IO module.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <ModuleConfiguration.hpp>
#include <Module.hpp> (1)
// MESSAGES
#include <core/common_msgs/Led.hpp>
// NODES
#include <core/led/Subscriber.hpp>
// BOARD IMPL
#include <core/hw/EXT.hpp> (2)
#include <core/hw/GPIO.hpp>
#include <core/hw/SPI.hpp>
#include <core/hw/I2C.hpp>
// *** DO NOT MOVE ***
Module module; (3)
// TYPES
// NODES
core::led::Subscriber led_subscriber("led_subscriber", core::os::Thread::PriorityEnum::LOWEST); (4)
// MAIN
extern "C" {
int
main()
{
module.initialize(); (5)
// Led subscriber node
core::led::SubscriberConfiguration led_subscriber_configuration;
led_subscriber_configuration.topic = "led";
led_subscriber.setConfiguration(led_subscriber_configuration); (6)
module.add(led_subscriber); (7)
// Setup and run
module.setup(); (8)
module.run(); (9)
// Is everything going well?
for (;;) {
if (!module.isOk()) { (10)
module.halt("This must not happen!");
}
core::os::Thread::sleep(core::os::Time::ms(500));
}
return core::os::Thread::OK;
}
}
1 | include the header file for the derived module |
2 | include the implementation of the forward declarations in Module.hpp |
3 | module instance |
4 | nodes instances |
5 | initialize the module |
6 | configure the nodes |
7 | add nodes to the module |
8 | setup the nodes |
9 | run the nodes |
10 | check that everything is ok |