1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/SendServer.hpp" 6 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/app/udpcast/SendTransportEngine.hpp" 8 #include "hmbdc//Traits.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 13 #include <boost/circular_buffer.hpp> 16 #include <type_traits> 18 namespace hmbdc {
namespace app {
namespace tcpcast {
20 namespace send_detail {
23 using pattern::MonoLockFreeBuffer;
30 using ptr = std::shared_ptr<SendTransport>;
37 template <
typename... Messages>
38 void queue(
Topic const& t, Messages&&... msgs) {
39 auto n =
sizeof...(msgs);
40 auto it = buffer_.claim(n);
41 queue(it, t, std::forward<Messages>(msgs)...);
42 buffer_.commit(it, n);
45 template <
typename... Messages>
46 bool tryQueue(
Topic const& t, Messages&&... msgs) {
47 auto n =
sizeof...(msgs);
48 auto it = buffer_.tryClaim(n);
50 queue(it, t, std::forward<Messages>(msgs)...);
51 buffer_.commit(it, n);
57 template <
typename Message,
typename ... Args>
58 void queueInPlace(
Topic const& t, Args&&... args) {
59 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
60 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
62 ,
"hasMemoryAttachment has to the first base for Message");
63 auto s = buffer_.claim();
64 char* addr =
static_cast<char*
>(*s);
66 h->flag = calculateFlag<Message>();
70 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
73 HMBDC_THROW(std::out_of_range
74 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
79 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len);
83 bool match(
Topic const& t)
const {
84 return boost::regex_match(t.c_str(), topicRegex_);
89 boost::regex topicRegex_;
92 size_t maxMessageSize_;
93 size_t minRecvToStart_;
96 unique_ptr<udpcast::SendTransport> mcSendTransport_;
102 template <
typename Message>
104 uint8_t calculateFlag() {
105 if (is_base_of<hasMemoryAttachment, Message>::value)
return hasMemoryAttachment::flag;
109 template<
typename M,
typename ... Messages>
111 , M&& m, Messages&&... msgs) {
112 using Message =
typename std::remove_reference<M>::type;
113 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
114 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
116 ,
"hasMemoryAttachment has to the first base for Message");
119 char* addr =
static_cast<char*
>(s);
121 h->flag = calculateFlag<Message>();
125 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
128 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
130 queue(++it, t, std::forward<Messages>(msgs)...);
141 ,
Client<SendTransportEngine> {
143 using SendTransport::hmbdcName;
144 using SendTransport::schedSpec;
147 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
149 mcSendTransport_->runOnce(
true);
165 if (stopped_)
return 0;
166 if (server_ && !minRecvToStart_) {
167 return server_->readySessionCount();
169 return numeric_limits<size_t>::max();
173 auto seq = buffer_.readSeq();
174 if (seq == lastSeq_ && buffer_.isFull()) {
175 server_->killSlowestSession();
183 size_t maxSendBatch_;
184 bool waitForSlowReceivers_;
189 unique_ptr<SendServer> server_;
Definition: MonoLockFreeBuffer.hpp:15
capture the transportation mechanism
Definition: SendTransportEngine.hpp:28
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:153
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: Transport.hpp:69
Definition: Traits.hpp:35
Definition: SendTransportEngine.hpp:137
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:147
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:164
Definition: Message.hpp:76
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Definition: Timers.hpp:104
Definition: LockFreeBufferMisc.hpp:74