hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/mcast/Transport.hpp"
5 #include "hmbdc/app/mcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/time/Rater.hpp"
8 #include "hmbdc/numeric/BitMath.hpp"
9 
10 
11 #include <boost/bind.hpp>
12 #include <memory>
13 
14 #include <iostream>
15 
16 namespace hmbdc { namespace app { namespace mcast {
17 
18 using namespace hmbdc::time;
19 using namespace boost::asio;
20 
21 using namespace hmbdc;
22 using namespace hmbdc::pattern;
23 
25 : Transport {
26  using ptr = std::shared_ptr<SendTransport>;
27  SendTransport(Config const& cfg
28  , size_t maxMessageSize)
29  : Transport(cfg)
30  , topic_(cfg.getExt<string>("topicRegex"))
31  , topicRegex_(topic_)
32  , maxMessageSize_(maxMessageSize)
33  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic)
34  , config_.getExt<uint16_t>("outBufferSizePower2")
35  ?config_.getExt<uint16_t>("outBufferSizePower2")
36  :hmbdc::numeric::log2Upper(8ul * 1024ul / maxMessageSize)
37  )
38  , endpoint_(ip::address::from_string(config_.getExt<string>("mcastAddr"))
39  , cfg.getExt<short>("mcastPort"))
40  , pSocket_(nullptr)
41  , rater_(Duration::seconds(1u)
42  , config_.getExt<size_t>("sendBytesPerSec")
43  , config_.getExt<size_t>("sendBytesBurst")
44  , config_.getExt<size_t>("sendBytesBurst") != 0ul) //no rate control by default
45  , mtu_(config_.getExt<size_t>("mtu"))
46  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch")) {
47  mtu_ -= (8u + 20u); // 8bytes udp header and 20bytes ip header
48  auto totalHead = sizeof(app::MessageHead) + sizeof(Topic) + sizeof(TransportMessageHeader);
49  if (maxMessageSize_ + totalHead > mtu_) {
50  HMBDC_THROW(std::out_of_range, "maxMessageSize needs <= " << mtu_ - totalHead);
51  }
52  toSend_.reserve(maxSendBatch_);
53  }
54 
55  ~SendTransport() {
56  delete pSocket_;
57  }
58 
59  bool match(Topic const& t) const {
60  return boost::regex_match(t.c_str(), topicRegex_);
61  }
62 
63  template <typename... Messages>
64  void queue(Topic const& t, Messages&&... msgs) {
65  auto n = sizeof...(msgs);
66  auto it = buffer_.claim(n);
67  queue(it, t, std::forward<Messages>(msgs)...);
68  buffer_.commit(it, n);
69  }
70 
71  template <typename... Messages>
72  bool tryQueue(Topic const& t, Messages&&... msgs) {
73  auto n = sizeof...(msgs);
74  auto it = buffer_.tryClaim(n);
75  if (it) {
76  queue(it, t, std::forward<Messages>(msgs)...);
77  buffer_.commit(it, n);
78  return true;
79  }
80  return false;
81  }
82 
83  template <typename Message, typename ... Args>
84  void queueInPlace(Topic const& t, Args&&... args) {
85  auto s = buffer_.claim();
86  char* addr = static_cast<char*>(*s);
87  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
88  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
89  h->topicLen = tl;
90  if (likely(sizeof(Message) <= maxMessageSize_)) {
91  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
92  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
93  } else {
94  HMBDC_THROW(std::out_of_range
95  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
96  }
97  buffer_.commit(s);
98  }
99 
100  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
101  auto s = buffer_.claim();
102  char* addr = static_cast<char*>(*s);
103  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
104  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
105  h->topicLen = tl;
106  if (likely(len <= maxMessageSize_)) {
107  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<JustBytes>(tag, bytes, len);
108  h->messagePayloadLen() = sizeof(MessageWrap<JustBytes>) - sizeof(MessageWrap<JustBytes>::payload) + len;
109  } else {
110  HMBDC_THROW(std::out_of_range
111  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
112  }
113  buffer_.commit(s);
114 
115  }
116 
117  void runOnce() __restrict__ {
118  if (!toSend_.size()) {
119  if (unlikely(pIos_->stopped())) pIos_->reset();
120  boost::system::error_code error;
121  size_t bytesTransferred = 0;
122  resumeSend(error, bytesTransferred);
123  }
124  boost::system::error_code ec;
125  pIos_->poll(ec);
126  if (ec) HMBDC_LOG_C("io_service error=", ec);
127  }
128 
129  void stop() {
130  buffer_.reset(0);
131  }
132 
133  /**
134  * @brief expose so user can manipulate it
135  * @return reference to boost::asio::ip::udp::socket
136  */
137  boost::asio::ip::udp::socket& asioSocket() {
138  return *pSocket_;
139  }
140 
141  void initInThread() {
142  Transport::initInThread();
143  delete pSocket_;
144  pSocket_ = new boost::asio::ip::udp::socket(*pIos_, endpoint_.protocol());
145 
146  auto iface =
147  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
148  ip::multicast::outbound_interface outFrom(ip::address_v4::from_string(iface));
149  pSocket_->set_option(outFrom);
150 
151  ip::multicast::hops hops(config_.getExt<uint16_t>("ttl"));
152  pSocket_->set_option(hops);
153 
154  // uint8_t loopch = config_.getExt("loopback", false)?1:0;
155  // if (setsockopt(pSocket_->native_handle(), IPPROTO_IP, IP_MULTICAST_LOOP
156  // , (char *)&loopch, sizeof(loopch) < 0)) {
157  // HMBDC_LOG_C("cannot set IP_MULTICAST_LOOP to ", (int)loopch);
158  // }
159 
160  ip::multicast::enable_loopback loop(config_.getExt<bool>("loopback"));
161  pSocket_->set_option(loop);
162 
163  auto sz = config_.getExt<int32_t>("udpSendBufferBytes");
164  if (sz) {
165  socket_base::send_buffer_size option(sz);
166  boost::system::error_code ec;
167  pSocket_->set_option(option);
168  }
169 
170  socket_base::send_buffer_size option;
171  pSocket_->get_option(option);
172  if (sz == 0 || sz >= option.value()) {
173  } else {
174  HMBDC_LOG_C("set udp send buffer size unsuccessful, want "
175  , sz, " actual: ", option.value());
176  }
177  //we need to wait for mcast registration to settle, otherwise sent msg might drop
178  usleep(10000);
179  }
180 
181 private:
182  std::string topic_;
183  boost::regex topicRegex_;
184  size_t maxMessageSize_;
185  Buffer buffer_;
186  typename Buffer::iterator begin_;
187 
188  ip::udp::endpoint endpoint_;
189  ip::udp::socket* pSocket_;
190  Rater rater_;
191  size_t mtu_;
192  size_t maxSendBatch_;
193  vector<const_buffer> toSend_;
194 
195  template<typename M, typename ... Messages>
196  void queue(typename Buffer::iterator it, Topic const& t
197  , M&& m, Messages&&... msgs) {
198  using Message = typename std::remove_reference<M>::type;
199  auto s = *it;
200  char* addr = static_cast<char*>(s);
201  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
202  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
203  h->topicLen = tl;
204  if (likely(sizeof(Message) <= maxMessageSize_)) {
205  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
206  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
207  } else {
208  HMBDC_THROW(std::out_of_range
209  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
210  }
211  queue(++it, t, std::forward<Messages>(msgs)...);
212  }
213 
214  void queue(typename Buffer::iterator it
215  , Topic const& t) {}
216 
217  void resumeSend(const boost::system::error_code& error
218  , size_t bytesTransferred) {
219  if (unlikely(error)) {
220  HMBDC_LOG_C("asio error=", error);
221  }
222  if (likely(begin_))
223  buffer_.wasteAfterPeek(0, toSend_.size());
224  toSend_.clear();
225 
226  typename Buffer::iterator end;
227  buffer_.peek(0, begin_, end, maxSendBatch_);
228  auto it = begin_;
229  size_t batchBytes = 0;
230  while (it != end) {
231  void* ptr = *it;
232  auto item = static_cast<TransportMessageHeader*>(ptr);
233  if (unlikely(!rater_.check(item->wireSize()))) break;
234  batchBytes += item->wireSize();
235  if (unlikely(batchBytes > mtu_)) break;
236  it++;
237  toSend_.emplace_back(ptr, item->wireSize());
238  rater_.commit();
239  }
240 
241  if (unlikely(toSend_.size() == 1)) {
242  pSocket_->send_to(toSend_, endpoint_);
243  buffer_.wasteAfterPeek(0, 1);
244  toSend_.clear();
245  begin_.clear();
246  } else if (likely(toSend_.size())) {
247  pSocket_->async_send_to(
248  toSend_
249  , endpoint_
250  , boost::bind(&SendTransport::resumeSend
251  , this
252  , boost::asio::placeholders::error
253  , boost::asio::placeholders::bytes_transferred)
254  );
255  }
256  }
257 };
258 
262 , Client<SendTransportEngine> {
263  using SendTransport::SendTransport;
264 
265 
266  char const* hmbdcName() const {
267  return this->hmbdcName_.c_str();
268  }
269 
270  std::tuple<char const*, int> schedSpec() const {
271  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
272  }
273 
274  /*virtual*/
275  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
276  this->initInThread();
277  }
278 
279  /*virtual*/
280  void invokedCb(uint16_t threadSerialNumber) __restrict__ override {
281  runOnce();
282  }
283 
284  /*virtual*/ bool droppedCb() override {
285  stop();
286  return true;
287  };
288 };
289 
290 }}}
Definition: Message.hpp:125
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: SendTransportEngine.hpp:259
Definition: SendTransportEngine.hpp:24
Definition: GuardedSingleton.hpp:9
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: SendTransportEngine.hpp:137
Definition: Message.hpp:25
Definition: LockFreeBufferT.hpp:18
Definition: Message.hpp:46
Definition: Rater.hpp:10
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:39
Definition: Transport.hpp:25
Definition: Rater.hpp:13
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73