1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/udpcast/Transport.hpp" 4 #include "hmbdc/app/udpcast/Messages.hpp" 5 #include "hmbdc/app/Base.hpp" 6 #include "hmbdc/comm/inet/Endpoint.hpp" 7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/time/Rater.hpp" 10 #include "hmbdc/numeric/BitMath.hpp" 12 #include <boost/regex.hpp> 17 #include <sys/epoll.h> 18 namespace hmbdc {
namespace app {
namespace udpcast {
20 namespace sendtransportengine_detail {
28 using ptr = std::shared_ptr<SendTransport>;
32 bool match(
Topic const& t)
const {
33 return boost::regex_match(t.c_str(), topicRegex_);
36 template <
typename... Messages>
37 void queue(
Topic const& t, Messages&&... msgs) {
38 auto n =
sizeof...(msgs);
39 auto it = buffer_.claim(n);
40 queue(it, t, std::forward<Messages>(msgs)...);
41 buffer_.commit(it, n);
44 template <
typename... Messages>
45 bool tryQueue(
Topic const& t, Messages&&... msgs) {
46 auto n =
sizeof...(msgs);
47 auto it = buffer_.tryClaim(n);
49 queue(it, t, std::forward<Messages>(msgs)...);
50 buffer_.commit(it, n);
56 template <
typename Message,
typename ... Args>
57 void queueInPlace(
Topic const& t, Args&&... args) {
58 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
59 auto s = buffer_.claim();
60 char* addr =
static_cast<char*
>(*s);
64 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
68 HMBDC_THROW(std::out_of_range
69 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
74 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len);
76 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
78 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
79 utils::EpollTask::instance().poll();
85 using Message =
typename std::remove_reference<M>::type;
86 char* addr =
static_cast<char*
>(buf);
90 if (hmbdc_likely(
sizeof(Message) <= bufLen)) {
94 HMBDC_THROW(std::out_of_range,
"bufLen too small to hold a message");
103 boost::regex topicRegex_;
104 size_t maxMessageSize_;
108 size_t maxSendBatch_;
111 size_t toSendMsgsHead_;
112 size_t toSendMsgsTail_;
113 mmsghdr* toSendPkts_;
114 size_t toSendPktsHead_;
115 size_t toSendPktsTail_;
116 std::vector<comm::inet::Endpoint> udpcastDests_;
119 outBufferSizePower2();
121 template<
typename M,
typename ... Messages>
123 , M&& m, Messages&&... msgs) {
124 using Message =
typename std::remove_reference<M>::type;
125 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
127 char* addr =
static_cast<char*
>(s);
131 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
135 HMBDC_THROW(std::out_of_range
136 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
138 queue(++it, t, std::forward<Messages>(msgs)...);
148 ,
Client<SendTransportEngine> {
149 using SendTransport::SendTransport;
150 using SendTransport::hmbdcName;
151 using SendTransport::schedSpec;
Definition: MonoLockFreeBuffer.hpp:15
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:10
Definition: SendTransportEngine.hpp:146
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:154
Definition: SendTransportEngine.hpp:26
Definition: Message.hpp:76
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:157
Definition: LockFreeBufferMisc.hpp:74