Skip to content

Commit dac08c3

Browse files
committed
Add cross thread buss
to integrate simplex in another system (eg: legacy, IO, slow task, ...)
1 parent be83de3 commit dac08c3

File tree

9 files changed

+1198
-0
lines changed

9 files changed

+1198
-0
lines changed

include/trz/pattern/bus/channel.hpp

+219
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <typeindex>
5+
#include <typeinfo>
6+
#include <unordered_map>
7+
#include <vector>
8+
9+
#include "data.hpp"
10+
#include "ringbuffer.hpp"
11+
12+
13+
namespace xthreadbus {
14+
/**
15+
* @brief holds chanel's specific publisher, consumer and transfer buffer
16+
*
17+
* @tparam _MaxSize of exchange buffer
18+
*/
19+
template <std::size_t _MaxSize> class Channel
20+
{
21+
public:
22+
/**
23+
* @brief communication sending part
24+
*
25+
*/
26+
class Publisher
27+
{
28+
Channel &m_channel;
29+
30+
public:
31+
/**
32+
* @brief Construct a new Publisher
33+
*
34+
* @param channel to publish on
35+
*/
36+
Publisher(Channel &channel) : m_channel(channel), m_sendBuffer(_MaxSize) { m_sendBuffer.clear(); }
37+
38+
/**
39+
* @brief store data pointer for later send
40+
*
41+
* @tparam _TDataType data pointer type
42+
* @param data pointer to publish
43+
* @return _TDataType& ref to data
44+
*/
45+
template <typename _TDataType> _TDataType &prepare(_TDataType *data)
46+
{
47+
static_assert(std::is_base_of<Data, _TDataType>::value, "_TDataType must extend Data");
48+
m_sendBuffer.push_back(data);
49+
return *data;
50+
}
51+
52+
/**
53+
* @brief directly publish pointer data in transfer buffer
54+
* when transfer buffer is full, store data pointer to a later push
55+
*
56+
* @tparam _TDataType data pointer type
57+
* @param data pointer to publish
58+
* @param storeOnFail when send fails, store for later send
59+
* @return [sent]
60+
*/
61+
template <typename _TDataType> bool publishOne(_TDataType *data, bool storeOnFail = true)
62+
{
63+
static_assert(std::is_base_of<Data, _TDataType>::value, "_TDataType must extend Data");
64+
const bool sent = m_channel.m_ringbuffer.enqueue(data);
65+
if (!sent && storeOnFail)
66+
{
67+
// TODO? push front?
68+
m_sendBuffer.push_back(data);
69+
}
70+
return sent;
71+
}
72+
73+
/**
74+
* @brief send all/maximum (until transfer buffer is full)
75+
* prepared/stored data pointer all in a row.
76+
*
77+
* @return size_t [data pointer left to be send]
78+
*/
79+
std::size_t publishAll()
80+
{
81+
const std::size_t dataToSendSize = m_sendBuffer.size();
82+
if (!dataToSendSize)
83+
{
84+
return 0;
85+
}
86+
const std::size_t sent = m_channel.m_ringbuffer.enqueue(m_sendBuffer.data(), dataToSendSize);
87+
const std::size_t leftToSend = dataToSendSize - sent;
88+
if (leftToSend)
89+
{
90+
memcpy(m_sendBuffer.data(), m_sendBuffer.data() + sent, leftToSend);
91+
}
92+
m_sendBuffer.erase(m_sendBuffer.begin() + leftToSend, m_sendBuffer.end());
93+
return leftToSend;
94+
}
95+
96+
private:
97+
std::vector<Data *> m_sendBuffer;
98+
};
99+
100+
/**
101+
* @brief communication receiving part
102+
*
103+
*/
104+
class Subscriber
105+
{
106+
/**
107+
* @brief helper casting data pointer type into subscribed type
108+
*
109+
* @tparam _TDataType data pointer type
110+
* @tparam _TSubscriberType handler type having onData(const _TDataType &) method
111+
*/
112+
template <class _TDataType, class _TSubscriberType> class StaticEventHandler
113+
{
114+
public:
115+
static void staticRead(void *subscriber, const Data &data)
116+
{
117+
reinterpret_cast<_TSubscriberType *>(subscriber)->onData(static_cast<const _TDataType &>(data));
118+
}
119+
};
120+
121+
public:
122+
/**
123+
* @brief Construct a new Subscriber
124+
*
125+
* @param channel
126+
*/
127+
Subscriber(Channel &channel) : m_channel(channel) {}
128+
129+
/**
130+
* @brief subscribe handler to data pointer type
131+
*
132+
* @tparam _TDataType data pointer type
133+
* @tparam _TSubscriberType handler type having onData(const _TDataType &) method
134+
* @param subscriber handler instance address
135+
*/
136+
template <class _TDataType, class _TSubscriberType> void subscribe(_TSubscriberType *subscriber)
137+
{
138+
auto dataTypeId = std::type_index(typeid(_TDataType));
139+
if (m_subscribers.count(dataTypeId)) return ; // TODO ? : erase previously subscribed handler ?
140+
static_assert(std::is_base_of<Data, _TDataType>::value, "_TDataType must extend Data");
141+
void (*staticRead)(void *, const Data &) = &StaticEventHandler<_TDataType, _TSubscriberType>::staticRead;
142+
std::tuple<void *, void (*)(void *, const Data &)> consumeFunc = std::make_tuple(subscriber, staticRead);
143+
m_subscribers.emplace(dataTypeId, consumeFunc);
144+
}
145+
146+
/**
147+
* @brief get next data pointer in transfer buffer
148+
* then callback handler onData method
149+
*
150+
* @return [data pointer found]
151+
*/
152+
bool readOne()
153+
{
154+
const bool msgRead = m_channel.m_ringbuffer.dequeue(m_readBuffer.data(), 1)==1;
155+
std::size_t index = 0;
156+
if (msgRead && nullptr != m_readBuffer[index])
157+
{
158+
const auto it = m_subscribers.find(std::type_index(typeid(*m_readBuffer[index])));
159+
if (it != m_subscribers.end())
160+
{
161+
auto subscriber = std::get<0>(it->second);
162+
auto onData = std::get<1>(it->second);
163+
onData(subscriber, *m_readBuffer[index]);
164+
}
165+
delete m_readBuffer[index];
166+
167+
}
168+
return msgRead;
169+
}
170+
171+
/**
172+
* @brief get all data pointer in transfer buffer
173+
* then callback all handlers onData method
174+
*
175+
* @return std::size_t amount of data pointer read
176+
*/
177+
std::size_t readAll()
178+
{
179+
const std::size_t size = m_channel.m_ringbuffer.dequeue(m_readBuffer.data(), _MaxSize);
180+
for (std::size_t index = 0; index < size; ++index)
181+
{
182+
if (nullptr != m_readBuffer[index])
183+
{
184+
const auto it = m_subscribers.find(std::type_index(typeid(*m_readBuffer[index])));
185+
if (it != m_subscribers.end())
186+
{
187+
auto subscriber = std::get<0>(it->second);
188+
auto onData = std::get<1>(it->second);
189+
onData(subscriber, *m_readBuffer[index]);
190+
}
191+
delete m_readBuffer[index];
192+
}
193+
}
194+
return size;
195+
}
196+
197+
private:
198+
Channel & m_channel;
199+
std::unordered_map<std::type_index, std::tuple<void *, void (*)(void *, const Data &)>> m_subscribers;
200+
std::array<Data *, _MaxSize> m_readBuffer;
201+
};
202+
203+
public:
204+
/**
205+
* @brief Construct a new Channel
206+
*
207+
*/
208+
Channel() : m_publisher(*this), m_consumer(*this) {}
209+
210+
Publisher & getPublisher() { return m_publisher; }
211+
Subscriber &getSubscriber() { return m_consumer; }
212+
213+
private:
214+
Publisher m_publisher;
215+
Subscriber m_consumer;
216+
ringbuffer<Data *, _MaxSize> m_ringbuffer;
217+
};
218+
219+
} // namespace xthreadbus

include/trz/pattern/bus/data.hpp

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
2+
#pragma once
3+
4+
5+
namespace xthreadbus {
6+
/**
7+
* @brief base data type
8+
*
9+
*/
10+
class Data
11+
{
12+
public:
13+
virtual ~Data() = default;
14+
};
15+
16+
} // namespace xthreadbus

0 commit comments

Comments
 (0)