hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/SendServer.hpp"
6 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
8 #include "hmbdc//Traits.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <boost/circular_buffer.hpp>
14 #include <memory>
15 #include <tuple>
16 #include <type_traits>
17 
18 #include <iostream>
19 
20 namespace hmbdc { namespace app { namespace tcpcast {
21 
22 namespace send_detail {
23 using namespace hmbdc::time;
24 using namespace boost::asio;
25 using namespace std;
26 using pattern::MonoLockFreeBuffer;
27 /**
28  * @brief capture the transportation mechanism
29  *
30  */
32 : Transport {
33  using ptr = std::shared_ptr<SendTransport>;
34  /**
35  * @brief ctor
36  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
37  * @param maxMessageSize max messafe size in bytes to be sent
38  */
39  SendTransport(Config const& cfg
40  , size_t maxMessageSize)
41  : Transport(cfg)
42  , topic_(cfg.getExt<string>("topicRegex"))
43  , topicRegex_(topic_)
44  , maxMessageSize_(maxMessageSize)
45  , minRecvToStart_(cfg.getExt<size_t>("minRecvToStart"))
46  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic) + sizeof(MessageHead)
47  , config_.getExt<uint16_t>("outBufferSizePower2")
48  ?config_.getExt<uint16_t>("outBufferSizePower2")
49  :hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
50  )
51  , rater_(Duration::seconds(1u)
52  , config_.getExt<size_t>("sendBytesPerSec")
53  , config_.getExt<size_t>("sendBytesBurst")
54  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
55  )
56  , mcConfig_(cfg) {
57  mcConfig_.put("loopback", true); //always allow advertising to loopback, so other process
58  //on the same machine get them
59  mcConfig_.put("topicRegex", tcpcastAdTopic_.c_str());
60  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
61  mcConfig_.put("outBufferSizePower2", 3u);
62  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
63  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
64  // mcConfig_.put("ttl", cfg.getExt<int>("ttl"));
65 
66  mcSendTransport_.reset(new mcast::SendTransport(
67  mcConfig_, sizeof(MessageWrap<TopicSource>)));
68  }
69 
70  template <typename... Messages>
71  void queue(Topic const& t, Messages&&... msgs) {
72  auto n = sizeof...(msgs);
73  auto it = buffer_.claim(n);
74  queue(it, t, std::forward<Messages>(msgs)...);
75  buffer_.commit(it, n);
76  }
77 
78  template <typename... Messages>
79  bool tryQueue(Topic const& t, Messages&&... msgs) {
80  auto n = sizeof...(msgs);
81  auto it = buffer_.tryClaim(n);
82  if (it) {
83  queue(it, t, std::forward<Messages>(msgs)...);
84  buffer_.commit(it, n);
85  return true;
86  }
87  return false;
88  }
89 
90  template <typename Message, typename ... Args>
91  void queueInPlace(Topic const& t, Args&&... args) {
92  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
94  , "hasMemoryAttachment has to the first base for Message");
95  auto s = buffer_.claim();
96  char* addr = static_cast<char*>(*s);
97  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
98  h->flag = calculateFlag<Message>();
99  h->messagePayloadLen = sizeof(MessageWrap<Message>);
100  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
101  h->topicLen = tl;
102  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
103  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
104  } else {
105  HMBDC_THROW(std::out_of_range
106  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
107  }
108  buffer_.commit(s);
109  }
110 
111  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
112  auto s = buffer_.claim();
113  char* addr = static_cast<char*>(*s);
114  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
115  h->flag = 0;
116  h->messagePayloadLen = sizeof(MessageWrap<JustBytes>) - 1 + len;
117  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
118  h->topicLen = tl;
119  if (hmbdc_likely(len <= maxMessageSize_)) {
120  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<JustBytes>(tag, bytes, len);
121  } else {
122  HMBDC_THROW(std::out_of_range
123  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
124  }
125  buffer_.commit(s);
126  }
127 
128  void stop() {
129  buffer_.reset();
130  };
131 
132  void initInThread() {
133  mcSendTransport_->initInThread();
134  Transport::initInThread();
135  }
136 
137  bool match(Topic const& t) const {
138  return boost::regex_match(t.c_str(), topicRegex_);
139  }
140 
141 private:
142  std::string topic_;
143  boost::regex topicRegex_;
144 
145 protected:
146  size_t maxMessageSize_;
147  size_t minRecvToStart_;
148  MonoLockFreeBuffer buffer_;
149 
150  unique_ptr<mcast::SendTransport> mcSendTransport_;
151  Rater rater_;
152  Config mcConfig_;
153 
154 private:
155 
156  template <typename Message>
157  static
158  uint8_t calculateFlag() {
159  if (is_base_of<hasMemoryAttachment, Message>::value) return hasMemoryAttachment::flag;
160  return 0;
161  }
162 
163  template<typename M, typename ... Messages>
164  void queue(MonoLockFreeBuffer::iterator it, Topic const& t
165  , M&& m, Messages&&... msgs) {
166  using Message = typename std::remove_reference<M>::type;
167  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
169  , "hasMemoryAttachment has to the first base for Message");
170 
171  auto s = *it;
172  char* addr = static_cast<char*>(s);
173  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
174  h->flag = calculateFlag<Message>();
175  h->messagePayloadLen = sizeof(MessageWrap<Message>);
176  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
177  h->topicLen = tl;
178  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
179  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
180  } else {
181  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
182  }
183  queue(++it, t, std::forward<Messages>(msgs)...);
184  }
185 
186  void queue(MonoLockFreeBuffer::iterator it
187  , Topic const& t) {}
188 };
189 
192 , TimerManager
195 , Client<SendTransportEngine> {
196  SendTransportEngine(Config const& cfg
197  , size_t maxMessageSize)
198  : SendTransport(cfg, maxMessageSize)
199  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("topicAdvertisePeriodSeconds")))
200  , mtu_(cfg.getExt<size_t>("mtu") - 20 - 20) //20 bytes ip header and 20 bytes tcp header
201  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
202  , waitForSlowReceivers_(cfg.getExt<bool>("waitForSlowReceivers"))
203  , lastSeq_(0)
204  , stopped_(false) {
205  auto totalHead = sizeof(app::MessageHead)
206  + sizeof(Topic) + sizeof(TransportMessageHeader);
207  if (maxMessageSize_ + totalHead > mtu_) {
208  HMBDC_THROW(std::out_of_range, "maxMessageSize needs <= " << mtu_ - totalHead);
209  }
210  toSend_.reserve(maxSendBatch_);
211  // inititialize begin_ to first item
212  MonoLockFreeBuffer::iterator end;
213  buffer_.peek(begin_, end, 1u);
214  buffer_.wasteAfterPeek(begin_, 0, true);
215  }
217 
218  char const* hmbdcName() const {
219  return this->hmbdcName_.c_str();
220  }
221 
222  std::tuple<char const*, int> schedSpec() const {
223  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
224  }
225 
226  /*virtual*/
227  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
228  //only START to send when enough sessions are ready to receive or
229  //buffer is full
230  if (hmbdc_likely(!minRecvToStart_
231  || asyncServer_->readySessionCount() >= minRecvToStart_)) {
232  minRecvToStart_ = 0; //now started no going back
233  runAsync();
234  }
235  mcSendTransport_->runOnce();
236 
237  if (hmbdc_unlikely(pIos_->stopped())) pIos_->reset();
238  boost::system::error_code ec;
239  pIos_->poll(ec);
240  if (hmbdc_unlikely(ec)) HMBDC_LOG_C("io_service error=", ec);
241  }
242 
243  /*virtual*/
244  bool droppedCb() override {
245  stop();
246  stopped_ = true;
247  return true;
248  };
249 
250  /*virtual*/
251  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
252  this->initInThread();
253  asyncServer_.reset(new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
254  setCallback(
255  [this](TimerManager& tm, SysTime const& now) {
256  mcSendTransport_->queue(
257  tcpcastAdTopic_, asyncServer_->advertisingMessage());
258  if (!waitForSlowReceivers_) cullSlow();
259  }
260  );
261 
262  schedule(SysTime::now(), *this);
263  }
264 
265  /**
266  * @brief check how many recipient sessions are still active
267  * @return numeric_limits<size_t>::max() if the sending hasn't started due to
268  * minRecvToStart has not been met yet
269  */
270  size_t sessionsRemainingActive() const {
271  if (stopped_) return 0;
272  if (asyncServer_ && !minRecvToStart_) {
273  return asyncServer_->readySessionCount();
274  }
275  return numeric_limits<size_t>::max();
276  }
277 private:
278  void cullSlow() {
279  auto seq = buffer_.readSeq();
280  if (seq == lastSeq_ && buffer_.isFull()) {
281  asyncServer_->killSlowestSession();
282  }
283  lastSeq_ = seq;
284  }
285 
286  void runAsync() HMBDC_RESTRICT {
287  MonoLockFreeBuffer::iterator begin, end;
288  buffer_.peek(begin, end, maxSendBatch_);
289  auto it = begin_;
290  pair<char const*, char const*> currentTopic;
291  currentTopic.first = nullptr;
292  size_t currentTopicLen = 0;
293  size_t toSendByteSize = 0;
294 
295  while (it != end) {
296  void* ptr = *it;
297  auto item = static_cast<TransportMessageHeader*>(ptr);
298 
299  if (hmbdc_unlikely(!rater_.check(item->wireSize()))) break;
300  if (hmbdc_unlikely(!item->flag
301  && toSendByteSize + item->wireSize() > mtu_)) break;
302  it++;
303  if (hmbdc_unlikely(!currentTopic.first)) {
304  currentTopic = item->topic();
305  currentTopicLen = item->topicLen;
306  toSendByteSize = item->wireSize();
307  toSend_.emplace_back(ptr, item->wireSize());
308  } else if (item->topicLen == currentTopicLen
309  && strncmp(currentTopic.first, item->topic().first, currentTopicLen)
310  == 0) {
311  toSendByteSize += item->wireSize();
312  toSend_.emplace_back(ptr, item->wireSize());
313  } else {
314  asyncServer_->queue(currentTopic
315  , move(toSend_)
316  , toSendByteSize
317  , it);
318  toSend_.clear();
319 
320  currentTopic = item->topic();
321  currentTopicLen = item->topicLen;
322  toSendByteSize = item->wireSize();
323  toSend_.emplace_back(ptr, item->wireSize());
324  }
325 
326  if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
327  auto& a = item->wrapped<hasMemoryAttachment>();
328  toSend_.emplace_back(a.attachment, a.len);
329  toSendByteSize += a.len;
330  }
331  rater_.commit();
332  }
333 
334  if (toSend_.size()) {
335  asyncServer_->queue(currentTopic
336  , move(toSend_)
337  , toSendByteSize
338  , it);
339  toSend_.clear();
340  }
341  begin_ = it;
342  //use it here which is the maximum
343  auto newStart = asyncServer_->run(begin, it);
344  if (begin != end) { //only need to do this when there r things read from buffer_
345  for (auto it = begin; it != newStart; ++it) {
346  void* ptr = *it;
347  auto item = static_cast<TransportMessageHeader*>(ptr);
348  if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
349  auto& a = item->wrapped<hasMemoryAttachment>();
350  if (a.afterConsumedCleanupFunc) a.afterConsumedCleanupFunc(&a);
351  }
352  }
353  buffer_.wasteAfterPeek(begin, newStart - begin, true);
354  }
355  }
356 
357  size_t mtu_;
358  size_t maxSendBatch_;
359  bool waitForSlowReceivers_;
360  MonoLockFreeBuffer::iterator begin_;
361  vector<const_buffer> toSend_;
362  // SendServer *server_;
363  // unique_ptr<SyncSendServer> syncServer_;
364  unique_ptr<AsyncSendServer> asyncServer_;
365  size_t lastSeq_;
366  bool stopped_;
367 };
368 } //send_detail
369 
372 }}}
Definition: Message.hpp:135
capture the transportation mechanism
Definition: SendTransportEngine.hpp:31
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
SendTransport(Config const &cfg, size_t maxMessageSize)
ctor
Definition: SendTransportEngine.hpp:39
Definition: Transport.hpp:22
Definition: SendTransportEngine.hpp:190
Definition: Message.hpp:34
Definition: Time.hpp:13
Definition: Traits.hpp:35
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:270
Definition: Message.hpp:55
Definition: Time.hpp:116
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:158
Definition: Base.hpp:12
Definition: Timers.hpp:96