hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/SendServer.hpp"
6 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
8 #include "hmbdc//Traits.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <boost/circular_buffer.hpp>
14 #include <memory>
15 #include <tuple>
16 #include <type_traits>
17 
18 #include <iostream>
19 
20 namespace hmbdc { namespace app { namespace tcpcast {
21 
22 namespace send_detail {
23 using namespace hmbdc::time;
24 using namespace boost::asio;
25 using namespace std;
26 using pattern::MonoLockFreeBuffer;
27 /**
28  * @brief capture the transportation mechanism
29  *
30  */
32 : Transport {
33  using ptr = std::shared_ptr<SendTransport>;
34  /**
35  * @brief ctor
36  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
37  * @param maxMessageSize max messafe size in bytes to be sent
38  * @param minRecvToStart start send when there are that many recipient online, otherwise
39  * hold the message in buffer
40  */
41  SendTransport(Config const& cfg
42  , size_t maxMessageSize
43  , size_t minRecvToStart)
44  : Transport(cfg)
45  , topic_(cfg.getExt<string>("topicRegex"))
46  , topicRegex_(topic_)
47  , maxMessageSize_(maxMessageSize)
48  , minRecvToStart_(minRecvToStart)
49  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic) + sizeof(MessageHead)
50  , config_.getExt<uint16_t>("outBufferSizePower2")
51  ?config_.getExt<uint16_t>("outBufferSizePower2")
52  :hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
53  )
54  , rater_(Duration::seconds(1u)
55  , config_.getExt<size_t>("sendBytesPerSec")
56  , config_.getExt<size_t>("sendBytesBurst")
57  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
58  )
59  , mcConfig_(cfg) {
60  mcConfig_.put("loopback", true); //always allow advertising to loopback, so other process
61  //on the same machine get them
62  mcConfig_.put("topicRegex", tcpcastAdTopic_.c_str());
63  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
64  mcConfig_.put("outBufferSizePower2", 3u);
65  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
66  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
67  // mcConfig_.put("ttl", cfg.getExt<int>("ttl"));
68 
69  mcSendTransport_.reset(new mcast::SendTransport(
70  mcConfig_, sizeof(MessageWrap<TopicSource>)));
71  }
72 
73  template <typename... Messages>
74  void queue(Topic const& t, Messages&&... msgs) {
75  auto n = sizeof...(msgs);
76  auto it = buffer_.claim(n);
77  queue(it, t, std::forward<Messages>(msgs)...);
78  buffer_.commit(it, n);
79  }
80 
81  template <typename... Messages>
82  bool tryQueue(Topic const& t, Messages&&... msgs) {
83  auto n = sizeof...(msgs);
84  auto it = buffer_.tryClaim(n);
85  if (it) {
86  queue(it, t, std::forward<Messages>(msgs)...);
87  buffer_.commit(it, n);
88  return true;
89  }
90  return false;
91  }
92 
93  template <typename Message, typename ... Args>
94  void queueInPlace(Topic const& t, Args&&... args) {
95  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
97  , "hasMemoryAttachment has to the first base for Message");
98  auto s = buffer_.claim();
99  char* addr = static_cast<char*>(*s);
100  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
101  h->flag = calculateFlag<Message>();
102  h->messagePayloadLen = sizeof(MessageWrap<Message>);
103  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
104  h->topicLen = tl;
105  if (likely(sizeof(Message) <= maxMessageSize_)) {
106  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
107  } else {
108  HMBDC_THROW(std::out_of_range
109  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
110  }
111  buffer_.commit(s);
112  }
113 
114  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
115  auto s = buffer_.claim();
116  char* addr = static_cast<char*>(*s);
117  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
118  h->flag = 0;
119  h->messagePayloadLen = sizeof(MessageWrap<JustBytes>) - 1 + len;
120  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
121  h->topicLen = tl;
122  if (likely(len <= maxMessageSize_)) {
123  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<JustBytes>(tag, bytes, len);
124  } else {
125  HMBDC_THROW(std::out_of_range
126  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
127  }
128  buffer_.commit(s);
129  }
130 
131  void stop() {
132  buffer_.reset();
133  };
134 
135  void initInThread() {
136  mcSendTransport_->initInThread();
137  Transport::initInThread();
138  }
139 
140  bool match(Topic const& t) const {
141  return boost::regex_match(t.c_str(), topicRegex_);
142  }
143 
144 private:
145  std::string topic_;
146  boost::regex topicRegex_;
147 
148 protected:
149  size_t maxMessageSize_;
150  size_t minRecvToStart_;
151  MonoLockFreeBuffer buffer_;
152 
153  unique_ptr<mcast::SendTransport> mcSendTransport_;
154  Rater rater_;
155  Config mcConfig_;
156 
157 private:
158 
159  template <typename Message>
160  static
161  uint8_t calculateFlag() {
162  if (is_base_of<hasMemoryAttachment, Message>::value) return hasMemoryAttachment::flag;
163  return 0;
164  }
165 
166  template<typename M, typename ... Messages>
167  void queue(MonoLockFreeBuffer::iterator it, Topic const& t
168  , M&& m, Messages&&... msgs) {
169  using Message = typename std::remove_reference<M>::type;
170  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
172  , "hasMemoryAttachment has to the first base for Message");
173 
174  auto s = *it;
175  char* addr = static_cast<char*>(s);
176  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
177  h->flag = calculateFlag<Message>();
178  h->messagePayloadLen = sizeof(MessageWrap<Message>);
179  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
180  h->topicLen = tl;
181  if (likely(sizeof(Message) <= maxMessageSize_)) {
182  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
183  } else {
184  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
185  }
186  queue(++it, t, std::forward<Messages>(msgs)...);
187  }
188 
189  void queue(MonoLockFreeBuffer::iterator it
190  , Topic const& t) {}
191 };
192 
195 , TimerManager
198 , Client<SendTransportEngine> {
199  SendTransportEngine(Config const& cfg
200  , size_t maxMessageSize
201  , size_t minRecvToStart)
202  : SendTransport(cfg, maxMessageSize, minRecvToStart)
203  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("topicAdvertisePeriodSeconds")))
204  , mtu_(cfg.getExt<size_t>("mtu") - 20 - 20) //20 bytes ip header and 20 bytes tcp header
205  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
206  , waitForSlowReceivers_(cfg.getExt<bool>("waitForSlowReceivers"))
207  , lastSeq_(0) {
208  auto totalHead = sizeof(app::MessageHead)
209  + sizeof(Topic) + sizeof(TransportMessageHeader);
210  if (maxMessageSize_ + totalHead > mtu_) {
211  HMBDC_THROW(std::out_of_range, "maxMessageSize needs <= " << mtu_ - totalHead);
212  }
213  toSend_.reserve(maxSendBatch_);
214  // inititialize begin_ to first item
215  MonoLockFreeBuffer::iterator end;
216  buffer_.peek(begin_, end, 1u);
217  buffer_.wasteAfterPeek(begin_, 0, true);
218  }
220 
221  char const* hmbdcName() const {
222  return this->hmbdcName_.c_str();
223  }
224 
225  std::tuple<char const*, int> schedSpec() const {
226  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
227  }
228 
229  /*virtual*/
230  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
231  //only START to send when enough sessions are ready to receive or
232  //buffer is full
233  if (likely(!minRecvToStart_
234  || asyncServer_->readySessionCount() >= minRecvToStart_)) {
235  minRecvToStart_ = 0; //now started no going back
236  runAsync();
237  }
238  mcSendTransport_->runOnce();
239 
240  if (unlikely(pIos_->stopped())) pIos_->reset();
241  boost::system::error_code ec;
242  pIos_->poll(ec);
243  if (unlikely(ec)) HMBDC_LOG_C("io_service error=", ec);
244  }
245 
246  /*virtual*/
247  bool droppedCb() override {
248  stop();
249  return true;
250  };
251 
252  /*virtual*/
253  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
254  this->initInThread();
255  asyncServer_.reset(new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
256  setCallback(
257  [this](TimerManager& tm, SysTime const& now) {
258  mcSendTransport_->queue(
259  tcpcastAdTopic_, asyncServer_->advertisingMessage());
260  if (!waitForSlowReceivers_) cullSlow();
261  }
262  );
263 
264  schedule(SysTime::now(), *this);
265  }
266 
267  size_t activeSessionCount() const {
268  if (asyncServer_ && !minRecvToStart_) {
269  return asyncServer_->readySessionCount();
270  }
271  return numeric_limits<size_t>::max();
272  }
273 private:
274  void cullSlow() {
275  auto seq = buffer_.readSeq();
276  if (seq == lastSeq_ && buffer_.isFull()) {
277  asyncServer_->killSlowestSession();
278  }
279  lastSeq_ = seq;
280  }
281 
282  void runAsync() HMBDC_RESTRICT {
283  MonoLockFreeBuffer::iterator begin, end;
284  buffer_.peek(begin, end, maxSendBatch_);
285  auto it = begin_;
286  pair<char const*, char const*> currentTopic;
287  currentTopic.first = nullptr;
288  size_t currentTopicLen = 0;
289  size_t toSendByteSize = 0;
290 
291  while (it != end) {
292  void* ptr = *it;
293  auto item = static_cast<TransportMessageHeader*>(ptr);
294 
295  if (unlikely(!rater_.check(item->wireSize()))) break;
296  if (unlikely(!item->flag
297  && toSendByteSize + item->wireSize() > mtu_)) break;
298  it++;
299  if (unlikely(!currentTopic.first)) {
300  currentTopic = item->topic();
301  currentTopicLen = item->topicLen;
302  toSendByteSize = item->wireSize();
303  toSend_.emplace_back(ptr, item->wireSize());
304  } else if (item->topicLen == currentTopicLen
305  && strncmp(currentTopic.first, item->topic().first, currentTopicLen)
306  == 0) {
307  toSendByteSize += item->wireSize();
308  toSend_.emplace_back(ptr, item->wireSize());
309  } else {
310  asyncServer_->queue(currentTopic
311  , move(toSend_)
312  , toSendByteSize
313  , it);
314  toSend_.clear();
315 
316  currentTopic = item->topic();
317  currentTopicLen = item->topicLen;
318  toSendByteSize = item->wireSize();
319  toSend_.emplace_back(ptr, item->wireSize());
320  }
321 
322  if (unlikely(item->flag == hasMemoryAttachment::flag)) {
323  auto& a = item->wrapped<hasMemoryAttachment>();
324  toSend_.emplace_back(a.attachment, a.len);
325  toSendByteSize += a.len;
326  }
327  rater_.commit();
328  }
329 
330  if (toSend_.size()) {
331  asyncServer_->queue(currentTopic
332  , move(toSend_)
333  , toSendByteSize
334  , it);
335  toSend_.clear();
336  }
337  begin_ = it;
338  //use it here which is the maximum
339  auto newStart = asyncServer_->run(begin, it);
340  if (begin != end) { //only need to do this when there r things read from buffer_
341  for (auto it = begin; it != newStart; ++it) {
342  void* ptr = *it;
343  auto item = static_cast<TransportMessageHeader*>(ptr);
344  if (unlikely(item->flag == hasMemoryAttachment::flag)) {
345  auto& a = item->wrapped<hasMemoryAttachment>();
346  if (a.afterConsumedCleanupFunc) a.afterConsumedCleanupFunc(&a);
347  }
348  }
349  buffer_.wasteAfterPeek(begin, newStart - begin, true);
350  }
351  }
352 
353  size_t mtu_;
354  size_t maxSendBatch_;
355  bool waitForSlowReceivers_;
356  MonoLockFreeBuffer::iterator begin_;
357  vector<const_buffer> toSend_;
358  // SendServer *server_;
359  // unique_ptr<SyncSendServer> syncServer_;
360  unique_ptr<AsyncSendServer> asyncServer_;
361  size_t lastSeq_;
362 };
363 } //send_detail
364 
367 }}}
Definition: Message.hpp:135
capture the transportation mechanism
Definition: SendTransportEngine.hpp:31
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: Transport.hpp:22
Definition: SendTransportEngine.hpp:193
Definition: Message.hpp:34
Definition: Time.hpp:13
SendTransport(Config const &cfg, size_t maxMessageSize, size_t minRecvToStart)
ctor
Definition: SendTransportEngine.hpp:41
Definition: Traits.hpp:35
Definition: Message.hpp:55
Definition: Time.hpp:116
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
if a specific hmbdc network transport (for example tcpcast) supports message with memory attachment...
Definition: Message.hpp:158
Definition: Base.hpp:12
Definition: Timers.hpp:96