1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/SendServer.hpp" 6 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/app/mcast/SendTransportEngine.hpp" 8 #include "hmbdc//Traits.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 13 #include <boost/circular_buffer.hpp> 16 #include <type_traits> 18 namespace hmbdc {
namespace app {
namespace tcpcast {
20 namespace send_detail {
23 using pattern::MonoLockFreeBuffer;
30 using ptr = std::shared_ptr<SendTransport>;
37 template <
typename... Messages>
38 void queue(
Topic const& t, Messages&&... msgs) {
39 auto n =
sizeof...(msgs);
40 auto it = buffer_.claim(n);
41 queue(it, t, std::forward<Messages>(msgs)...);
42 buffer_.commit(it, n);
45 template <
typename... Messages>
46 bool tryQueue(
Topic const& t, Messages&&... msgs) {
47 auto n =
sizeof...(msgs);
48 auto it = buffer_.tryClaim(n);
50 queue(it, t, std::forward<Messages>(msgs)...);
51 buffer_.commit(it, n);
57 template <
typename Message,
typename ... Args>
58 void queueInPlace(
Topic const& t, Args&&... args) {
59 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
61 ,
"hasMemoryAttachment has to the first base for Message");
62 auto s = buffer_.claim();
63 char* addr =
static_cast<char*
>(*s);
65 h->flag = calculateFlag<Message>();
69 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
72 HMBDC_THROW(std::out_of_range
73 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
78 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len);
82 bool match(
Topic const& t)
const {
83 return boost::regex_match(t.c_str(), topicRegex_);
88 boost::regex topicRegex_;
91 size_t maxMessageSize_;
92 size_t minRecvToStart_;
93 MonoLockFreeBuffer buffer_;
95 unique_ptr<mcast::SendTransport> mcSendTransport_;
101 template <
typename Message>
103 uint8_t calculateFlag() {
104 if (is_base_of<hasMemoryAttachment, Message>::value)
return hasMemoryAttachment::flag;
108 template<
typename M,
typename ... Messages>
109 void queue(MonoLockFreeBuffer::iterator it,
Topic const& t
110 , M&& m, Messages&&... msgs) {
111 using Message =
typename std::remove_reference<M>::type;
112 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
114 ,
"hasMemoryAttachment has to the first base for Message");
117 char* addr =
static_cast<char*
>(s);
119 h->flag = calculateFlag<Message>();
123 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
126 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
128 queue(++it, t, std::forward<Messages>(msgs)...);
131 void queue(MonoLockFreeBuffer::iterator it
139 ,
Client<SendTransportEngine> {
142 using SendTransport::hmbdcName;
143 using SendTransport::schedSpec;
146 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
148 mcSendTransport_->runOnce(
true);
178 if (stopped_)
return 0;
179 if (server_ && !minRecvToStart_) {
180 return server_->readySessionCount();
182 return numeric_limits<size_t>::max();
186 auto seq = buffer_.readSeq();
187 if (seq == lastSeq_ && buffer_.isFull()) {
188 server_->killSlowestSession();
196 size_t maxSendBatch_;
197 bool waitForSlowReceivers_;
198 MonoLockFreeBuffer::iterator begin_;
202 unique_ptr<SendServer> server_;
capture the transportation mechanism
Definition: SendTransportEngine.hpp:28
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:152
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: Transport.hpp:64
Definition: Traits.hpp:35
Definition: SendTransportEngine.hpp:135
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:146
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:177
SendTransport(Config const &, size_t)
ctor
Definition: Message.hpp:72
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Timers.hpp:104