hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/mcast/Transport.hpp"
4 #include "hmbdc/app/mcast/Messages.hpp"
5 #include "hmbdc/app/Base.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/time/Rater.hpp"
8 #include "hmbdc/numeric/BitMath.hpp"
9 
10 
11 #include <boost/bind.hpp>
12 #include <memory>
13 
14 #include <iostream>
15 
16 namespace hmbdc { namespace app { namespace mcast {
17 
18 namespace sendtransportengine_detail {
19 using namespace std;
20 using namespace hmbdc::time;
21 using namespace boost::asio;
22 using namespace hmbdc::pattern;
23 
25 : Transport {
26  using ptr = std::shared_ptr<SendTransport>;
27  SendTransport(Config const& cfg
28  , size_t maxMessageSize)
29  : Transport(cfg)
30  , topic_(cfg.getExt<string>("topicRegex"))
31  , topicRegex_(topic_)
32  , maxMessageSize_(maxMessageSize)
33  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic) + sizeof(MessageHead), outBufferSizePower2())
34  , endpoint_(ip::address::from_string(config_.getExt<string>("mcastAddr"))
35  , cfg.getExt<short>("mcastPort"))
36  , pSocket_(nullptr)
37  , rater_(Duration::seconds(1u)
38  , config_.getExt<size_t>("sendBytesPerSec")
39  , config_.getExt<size_t>("sendBytesBurst")
40  , config_.getExt<size_t>("sendBytesBurst") != 0ul) //no rate control by default
41  , mtu_(config_.getExt<size_t>("mtu"))
42  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch")) {
43  mtu_ -= (8u + 20u); // 8bytes udp header and 20bytes ip header
44  auto totalHead = sizeof(app::MessageHead) + sizeof(Topic) + sizeof(TransportMessageHeader);
45  if (maxMessageSize_ + totalHead > mtu_) {
46  HMBDC_THROW(std::out_of_range, "maxMessageSize needs <= " << mtu_ - totalHead);
47  }
48  toSend_.reserve(maxSendBatch_);
49  }
50 
51  ~SendTransport() {
52  delete pSocket_;
53  }
54 
55  bool match(Topic const& t) const {
56  return boost::regex_match(t.c_str(), topicRegex_);
57  }
58 
59  template <typename... Messages>
60  void queue(Topic const& t, Messages&&... msgs) {
61  auto n = sizeof...(msgs);
62  auto it = buffer_.claim(n);
63  queue(it, t, std::forward<Messages>(msgs)...);
64  buffer_.commit(it, n);
65  }
66 
67  template <typename... Messages>
68  bool tryQueue(Topic const& t, Messages&&... msgs) {
69  auto n = sizeof...(msgs);
70  auto it = buffer_.tryClaim(n);
71  if (it) {
72  queue(it, t, std::forward<Messages>(msgs)...);
73  buffer_.commit(it, n);
74  return true;
75  }
76  return false;
77  }
78 
79  template <typename Message, typename ... Args>
80  void queueInPlace(Topic const& t, Args&&... args) {
81  auto s = buffer_.claim();
82  char* addr = static_cast<char*>(*s);
83  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
84  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
85  h->topicLen = tl;
86  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
87  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
88  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
89  } else {
90  HMBDC_THROW(std::out_of_range
91  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
92  }
93  buffer_.commit(s);
94  }
95 
96  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
97  auto s = buffer_.claim();
98  char* addr = static_cast<char*>(*s);
99  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
100  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
101  h->topicLen = tl;
102  if (hmbdc_likely(len <= maxMessageSize_)) {
103  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<JustBytes>(tag, bytes, len);
104  h->messagePayloadLen() = sizeof(MessageWrap<JustBytes>) - sizeof(MessageWrap<JustBytes>::payload) + len;
105  } else {
106  HMBDC_THROW(std::out_of_range
107  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
108  }
109  buffer_.commit(s);
110 
111  }
112 
113  void runOnce() HMBDC_RESTRICT {
114  if (!toSend_.size()) {
115  if (hmbdc_unlikely(pIos_->stopped())) pIos_->reset();
116  boost::system::error_code error;
117  size_t bytesTransferred = 0;
118  resumeSend(error, bytesTransferred);
119  }
120  boost::system::error_code ec;
121  pIos_->poll(ec);
122  if (ec) HMBDC_LOG_C("io_service error=", ec);
123  }
124 
125  void stop() {
126  buffer_.reset(0);
127  }
128 
129  /**
130  * @brief expose so user can manipulate it
131  * @return reference to boost::asio::ip::udp::socket
132  */
133  boost::asio::ip::udp::socket& asioSocket() {
134  return *pSocket_;
135  }
136 
137  void initInThread() {
138  Transport::initInThread();
139  delete pSocket_;
140  pSocket_ = new boost::asio::ip::udp::socket(*pIos_, endpoint_.protocol());
141 
142  auto iface =
143  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
144  ip::multicast::outbound_interface outFrom(ip::address_v4::from_string(iface));
145  pSocket_->set_option(outFrom);
146 
147  ip::multicast::hops hops(config_.getExt<uint16_t>("ttl"));
148  pSocket_->set_option(hops);
149 
150  // uint8_t loopch = config_.getExt("loopback", false)?1:0;
151  // if (setsockopt(pSocket_->native_handle(), IPPROTO_IP, IP_MULTICAST_LOOP
152  // , (char *)&loopch, sizeof(loopch) < 0)) {
153  // HMBDC_LOG_C("cannot set IP_MULTICAST_LOOP to ", (int)loopch);
154  // }
155 
156  ip::multicast::enable_loopback loop(config_.getExt<bool>("loopback"));
157  pSocket_->set_option(loop);
158 
159  auto sz = config_.getExt<int32_t>("udpSendBufferBytes");
160  if (sz) {
161  socket_base::send_buffer_size option(sz);
162  boost::system::error_code ec;
163  pSocket_->set_option(option);
164  }
165 
166  socket_base::send_buffer_size option;
167  pSocket_->get_option(option);
168  if (sz == 0 || sz >= option.value()) {
169  } else {
170  HMBDC_LOG_C("set udp send buffer size unsuccessful, want "
171  , sz, " actual: ", option.value());
172  }
173  //we need to wait for mcast registration to settle, otherwise sent msg might drop
174  usleep(10000);
175  }
176 
177 private:
178  std::string topic_;
179  boost::regex topicRegex_;
180  size_t maxMessageSize_;
181  Buffer buffer_;
182  typename Buffer::iterator begin_;
183 
184  ip::udp::endpoint endpoint_;
185  ip::udp::socket* pSocket_;
186  Rater rater_;
187  size_t mtu_;
188  size_t maxSendBatch_;
189  vector<const_buffer> toSend_;
190 
191 
192  uint16_t
193  outBufferSizePower2() {
194  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
195  if (res) {
196  return res;
197  }
198  res =hmbdc::numeric::log2Upper(8ul * 1024ul / (8ul + maxMessageSize_));
199  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
200  return res;
201  }
202 
203  template<typename M, typename ... Messages>
204  void queue(typename Buffer::iterator it, Topic const& t
205  , M&& m, Messages&&... msgs) {
206  using Message = typename std::remove_reference<M>::type;
207  auto s = *it;
208  char* addr = static_cast<char*>(s);
209  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
210  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
211  h->topicLen = tl;
212  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
213  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
214  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
215  } else {
216  HMBDC_THROW(std::out_of_range
217  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
218  }
219  queue(++it, t, std::forward<Messages>(msgs)...);
220  }
221 
222  void queue(typename Buffer::iterator it
223  , Topic const& t) {}
224 
225  void resumeSend(const boost::system::error_code& error
226  , size_t bytesTransferred) {
227  if (hmbdc_unlikely(error)) {
228  HMBDC_LOG_C("asio error=", error);
229  }
230  if (hmbdc_likely(begin_))
231  buffer_.wasteAfterPeek(0, toSend_.size());
232  toSend_.clear();
233 
234  typename Buffer::iterator end;
235  buffer_.peek(0, begin_, end, maxSendBatch_);
236  auto it = begin_;
237  size_t batchBytes = 0;
238  while (it != end) {
239  void* ptr = *it;
240  auto item = static_cast<TransportMessageHeader*>(ptr);
241  if (hmbdc_unlikely(!rater_.check(item->wireSize()))) break;
242  batchBytes += item->wireSize();
243  if (hmbdc_unlikely(batchBytes > mtu_)) break;
244  it++;
245  toSend_.emplace_back(ptr, item->wireSize());
246  rater_.commit();
247  }
248 
249  if (hmbdc_unlikely(toSend_.size() == 1)) {
250  pSocket_->send_to(toSend_, endpoint_);
251  buffer_.wasteAfterPeek(0, 1);
252  toSend_.clear();
253  begin_.clear();
254  } else if (hmbdc_likely(toSend_.size())) {
255  pSocket_->async_send_to(
256  toSend_
257  , endpoint_
258  , boost::bind(&SendTransport::resumeSend
259  , this
260  , boost::asio::placeholders::error
261  , boost::asio::placeholders::bytes_transferred)
262  );
263  }
264  }
265 };
266 
270 , Client<SendTransportEngine> {
271  using SendTransport::SendTransport;
272 
273 
274  char const* hmbdcName() const {
275  return this->hmbdcName_.c_str();
276  }
277 
278  std::tuple<char const*, int> schedSpec() const {
279  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
280  }
281 
282  /*virtual*/
283  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
284  this->initInThread();
285  }
286 
287  /*virtual*/
288  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
289  runOnce();
290  }
291 
292  /*virtual*/ bool droppedCb() override {
293  stop();
294  return true;
295  };
296 };
297 
298 } //sendtransportengine_detail
301 }}}
Definition: Message.hpp:135
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: GuardedSingleton.hpp:9
Definition: Message.hpp:34
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: SendTransportEngine.hpp:133
Definition: Message.hpp:55
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:17
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:73