hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/SendServer.hpp"
6 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
8 #include "hmbdc//Traits.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <boost/circular_buffer.hpp>
14 #include <memory>
15 #include <tuple>
16 #include <type_traits>
17 
18 #include <iostream>
19 
20 namespace hmbdc { namespace app { namespace tcpcast {
21 
22 using namespace hmbdc::time;
23 using namespace boost::asio;
24 
25 using namespace hmbdc;
26 using namespace hmbdc::pattern;
27 
28 /**
29  * @brief capture the transportation mechanism
30  *
31  */
33 : Transport {
34  using ptr = std::shared_ptr<SendTransport>;
35  /**
36  * @brief ctor
37  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
38  * @param maxMessageSize max messafe size in bytes to be sent
39  * @param minRecvToStart start send when there are that many recipient online, otherwise
40  * hold the message in buffer
41  */
42  SendTransport(Config const& cfg
43  , size_t maxMessageSize
44  , size_t minRecvToStart)
45  : Transport(cfg)
46  , topic_(cfg.getExt<string>("topicRegex"))
47  , topicRegex_(topic_)
48  , maxMessageSize_(maxMessageSize)
49  , minRecvToStart_(minRecvToStart)
50  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic)
51  , config_.getExt<uint16_t>("outBufferSizePower2")
52  ?config_.getExt<uint16_t>("outBufferSizePower2")
53  :hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
54  )
55  , rater_(Duration::seconds(1u)
56  , config_.getExt<size_t>("sendBytesPerSec")
57  , config_.getExt<size_t>("sendBytesBurst")
58  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
59  )
60  , mcConfig_(cfg) {
61  mcConfig_.put("loopback", true); //always allow advertising to loopback, so other process
62  //on the same machine get them
63  mcConfig_.put("topicRegex", tcpcastAdTopic_.c_str());
64  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
65  mcConfig_.put("outBufferSizePower2", 3u);
66  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
67  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
68  // mcConfig_.put("ttl", cfg.getExt<int>("ttl"));
69 
70  mcSendTransport_.reset(new mcast::SendTransport(
71  mcConfig_, sizeof(MessageWrap<TopicSource>)));
72  }
73 
74  template <typename... Messages>
75  void queue(Topic const& t, Messages&&... msgs) {
76  auto n = sizeof...(msgs);
77  auto it = buffer_.claim(n);
78  queue(it, t, std::forward<Messages>(msgs)...);
79  buffer_.commit(it, n);
80  }
81 
82  template <typename... Messages>
83  bool tryQueue(Topic const& t, Messages&&... msgs) {
84  auto n = sizeof...(msgs);
85  auto it = buffer_.tryClaim(n);
86  if (it) {
87  queue(it, t, std::forward<Messages>(msgs)...);
88  buffer_.commit(it, n);
89  return true;
90  }
91  return false;
92  }
93 
94  template <typename Message, typename ... Args>
95  void queueInPlace(Topic const& t, Args&&... args) {
96  static_assert(!is_base_of<hasFileAttachment, Message>::value
98  , "hasFileAttachment has to the first base for Message");
99  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
101  , "hasMemoryAttachment has to the first base for Message");
102  auto s = buffer_.claim();
103  char* addr = static_cast<char*>(*s);
104  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
105  h->flag = calculateFlag<Message>();
106  h->messagePayloadLen = sizeof(MessageWrap<Message>);
107  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
108  h->topicLen = tl;
109  if (likely(sizeof(Message) <= maxMessageSize_)) {
110  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
111  } else {
112  HMBDC_THROW(std::out_of_range
113  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
114  }
115  buffer_.commit(s);
116  }
117 
118  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
119  auto s = buffer_.claim();
120  char* addr = static_cast<char*>(*s);
121  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
122  h->flag = 0;
123  h->messagePayloadLen = sizeof(MessageWrap<JustBytes>) - 1 + len;
124  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
125  h->topicLen = tl;
126  if (likely(len <= maxMessageSize_)) {
127  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<JustBytes>(tag, bytes, len);
128  } else {
129  HMBDC_THROW(std::out_of_range
130  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
131  }
132  buffer_.commit(s);
133  }
134 
135  void stop() {
136  buffer_.reset();
137  };
138 
139  void initInThread() {
140  mcSendTransport_->initInThread();
141  Transport::initInThread();
142  }
143 
144  bool match(Topic const& t) const {
145  return boost::regex_match(t.c_str(), topicRegex_);
146  }
147 
148 private:
149  std::string topic_;
150  boost::regex topicRegex_;
151 
152 protected:
153  size_t maxMessageSize_;
154  size_t minRecvToStart_;
156 
157  unique_ptr<mcast::SendTransport> mcSendTransport_;
158  Rater rater_;
159  Config mcConfig_;
160 
161 private:
162 
163  template <typename Message>
164  static
165  uint8_t calculateFlag() {
166  if (is_base_of<hasMemoryAttachment, Message>::value) return hasMemoryAttachment::flag;
167  if (is_base_of<hasFileAttachment, Message>::value) return hasFileAttachment::flag;
168  return 0;
169  }
170 
171  template<typename M, typename ... Messages>
173  , M&& m, Messages&&... msgs) {
174  using Message = typename std::remove_reference<M>::type;
175  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
177  , "hasMemoryAttachment has to the first base for Message");
178  static_assert(!is_base_of<hasFileAttachment, Message>::value
180  , "hasFileAttachment has to the first base for Message");
181 
182  auto s = *it;
183  char* addr = static_cast<char*>(s);
184  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
185  h->flag = calculateFlag<Message>();
186  h->messagePayloadLen = sizeof(MessageWrap<Message>);
187  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
188  h->topicLen = tl;
189  if (likely(sizeof(Message) <= maxMessageSize_)) {
190  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
191  } else {
192  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
193  }
194  queue(++it, t, std::forward<Messages>(msgs)...);
195  }
196 
198  , Topic const& t) {}
199 };
200 
203 , TimerManager
206 , Client<SendTransportEngine> {
207  SendTransportEngine(Config const& cfg
208  , size_t maxMessageSize
209  , size_t minRecvToStart)
210  : SendTransport(cfg, maxMessageSize, minRecvToStart)
211  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("topicAdvertisePeriodSeconds")))
212  , mtu_(cfg.getExt<size_t>("mtu") - 20 - 20) //20 bytes ip header and 20 bytes tcp header
213  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
214  , waitForSlowReceivers_(cfg.getExt<bool>("waitForSlowReceivers"))
215  , lastSeq_(0) {
216  auto totalHead = sizeof(app::MessageHead)
217  + sizeof(Topic) + sizeof(TransportMessageHeader);
218  if (maxMessageSize_ + totalHead > mtu_) {
219  HMBDC_THROW(std::out_of_range, "maxMessageSize needs <= " << mtu_ - totalHead);
220  }
221  toSend_.reserve(maxSendBatch_);
222  // inititialize begin_ to first item
224  buffer_.peek(begin_, end, 1u);
225  buffer_.wasteAfterPeek(begin_, 0, true);
226  }
228 
229  char const* hmbdcName() const {
230  return this->hmbdcName_.c_str();
231  }
232 
233  std::tuple<char const*, int> schedSpec() const {
234  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
235  }
236 
237  /*virtual*/
238  void invokedCb(uint16_t threadSerialNumber) __restrict__ override {
239  //only START to send when enough sessions are ready to receive or
240  //buffer is full
241  if (likely(!minRecvToStart_
242  || asyncServer_->readySessionCount() >= minRecvToStart_)) {
243  minRecvToStart_ = 0; //now started no going back
244  runAsync();
245  }
246  mcSendTransport_->runOnce();
247 
248  if (unlikely(pIos_->stopped())) pIos_->reset();
249  boost::system::error_code ec;
250  pIos_->poll(ec);
251  if (unlikely(ec)) HMBDC_LOG_C("io_service error=", ec);
252  }
253 
254  /*virtual*/
255  bool droppedCb() override {
256  stop();
257  return true;
258  };
259 
260  /*virtual*/
261  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
262  this->initInThread();
263  asyncServer_.reset(new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
264  setCallback(
265  [this](TimerManager& tm, SysTime const& now) {
266  mcSendTransport_->queue(
267  tcpcastAdTopic_, asyncServer_->advertisingMessage());
268  if (!waitForSlowReceivers_) cullSlow();
269  }
270  );
271 
272  schedule(SysTime::now(), *this);
273  }
274 
275 
276 private:
277  void cullSlow() {
278  auto seq = buffer_.readSeq();
279  if (seq == lastSeq_ && buffer_.isFull()) {
280  asyncServer_->killSlowestSession();
281  }
282  lastSeq_ = seq;
283  }
284 
285  void runAsync() __restrict__ {
286  MonoLockFreeBuffer::iterator begin, end;
287  buffer_.peek(begin, end, maxSendBatch_);
288  auto it = begin_;
289  pair<char const*, char const*> currentTopic;
290  currentTopic.first = nullptr;
291  size_t currentTopicLen = 0;
292  size_t toSendByteSize = 0;
293 
294  while (it != end) {
295  void* ptr = *it;
296  auto item = static_cast<TransportMessageHeader*>(ptr);
297 
298  if (unlikely(!rater_.check(item->wireSize()))) break;
299  if (unlikely(!item->flag
300  && toSendByteSize + item->wireSize() > mtu_)) break;
301  it++;
302  if (unlikely(!currentTopic.first)) {
303  currentTopic = item->topic();
304  currentTopicLen = item->topicLen;
305  toSendByteSize = item->wireSize();
306  toSend_.emplace_back(ptr, item->wireSize());
307  } else if (item->topicLen == currentTopicLen
308  && strncmp(currentTopic.first, item->topic().first, currentTopicLen)
309  == 0) {
310  toSendByteSize += item->wireSize();
311  toSend_.emplace_back(ptr, item->wireSize());
312  } else {
313  asyncServer_->queue(currentTopic
314  , move(toSend_)
315  , toSendByteSize
316  , it);
317  toSend_.clear();
318 
319  currentTopic = item->topic();
320  currentTopicLen = item->topicLen;
321  toSendByteSize = item->wireSize();
322  toSend_.emplace_back(ptr, item->wireSize());
323  }
324 
325  if (unlikely(item->flag == hasMemoryAttachment::flag)) {
326  auto& a = item->wrapped<hasMemoryAttachment>();
327  toSend_.emplace_back(a.attachment, a.len);
328  toSendByteSize += a.len;
329  } else if (unlikely(item->flag == hasFileAttachment::flag)) {
330  auto& a = item->wrapped<hasFileAttachment>();
331  toSend_.emplace_back(a.attachment, a.len);
332  toSendByteSize += a.len;
333  }
334  rater_.commit();
335  }
336 
337  if (toSend_.size()) {
338  asyncServer_->queue(currentTopic
339  , move(toSend_)
340  , toSendByteSize
341  , it);
342  toSend_.clear();
343  }
344  begin_ = it;
345  //use it here which is the maximum
346  auto newStart = asyncServer_->run(begin, it);
347  if (begin != end) { //only need to do this when there r things read from buffer_
348  for (auto it = begin; it != newStart; ++it) {
349  void* ptr = *it;
350  auto item = static_cast<TransportMessageHeader*>(ptr);
351  if (unlikely(item->flag == hasMemoryAttachment::flag)) {
352  auto& a = item->wrapped<hasMemoryAttachment>();
353  a.free();
354  } else if (unlikely(item->flag == hasFileAttachment::flag)) {
355  auto& a = item->wrapped<hasFileAttachment>();
356  a.unmap();
357  }
358  }
359  buffer_.wasteAfterPeek(begin, newStart - begin, true);
360  }
361  }
362 
363  size_t mtu_;
364  size_t maxSendBatch_;
365  bool waitForSlowReceivers_;
367  vector<const_buffer> toSend_;
368  // SendServer *server_;
369  // unique_ptr<SyncSendServer> syncServer_;
370  unique_ptr<AsyncSendServer> asyncServer_;
371  size_t lastSeq_;
372 };
373 
374 }}}
Definition: Message.hpp:125
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Timers.hpp:69
Definition: SendServer.hpp:58
SendTransport(Config const &cfg, size_t maxMessageSize, size_t minRecvToStart)
ctor
Definition: SendTransportEngine.hpp:42
Definition: Transport.hpp:27
Definition: SendTransportEngine.hpp:24
Definition: GuardedSingleton.hpp:9
Definition: Messages.hpp:23
Definition: Message.hpp:25
Definition: Time.hpp:15
Definition: Traits.hpp:35
Definition: SendTransportEngine.hpp:201
capture the transportation mechanism
Definition: SendTransportEngine.hpp:32
Definition: Message.hpp:46
Definition: Time.hpp:118
Definition: Rater.hpp:10
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
Definition: Messages.hpp:35
Definition: Client.hpp:39
Definition: Rater.hpp:13
Definition: Client.hpp:11
Definition: Timers.hpp:100
Definition: LockFreeBufferMisc.hpp:73