hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/mcast/Transport.hpp"
4 #include "hmbdc/app/LoggerT.hpp"
5 #include "hmbdc/text/StringTrieSet.hpp"
6 #include "hmbdc//Traits.hpp"
7 
8 #include <boost/bind.hpp>
9 #include <boost/lexical_cast.hpp>
10 
11 #include <memory>
12 #include <type_traits>
13 
14 namespace hmbdc { namespace app { namespace mcast {
15 
16 using namespace hmbdc;
17 using namespace hmbdc::time;
18 
19 using namespace boost::asio;
20 
21 /**
22  * @brief interface to power a multicast transport receiving functions
23  */
25  using ptr = std::shared_ptr<RecvTransport>;
26  using Transport::Transport;
27  virtual ~RecvTransport(){}
28  /**
29  * @brief a take all arbitrator (no arbitration at all)
30  * @details it provides the default type for arbitration which does nothing
31  * it also provides a template on writing a user defined arbitrator
32  *
33  * @param h handle to retrieve what is inside of the message
34  * @return always returns 1 here
35  * In general: 1 keep the message; -1 to drop;
36  * 0 cannot decide, ask me later.
37  */
38  struct NoOpArb {
39  int operator()(TransportMessageHeader const* h) {
40  return 1; //always keep it
41  }
42  };
43 private:
44  friend class NetContext; //only be used by NetContext
45  virtual void listenTo(Topic const& t) = 0;
46  virtual void stopListenTo(Topic const& t) = 0;
47 };
48 
49 /**
50  * @brief impl class
51  *
52  * @tparam OutputBuffer type of buffer to hold resulting network messages
53  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
54  * between different recv transport. By default, keeping all
55  */
56 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
59 , MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
60  using SELF = RecvTransportImpl;
62  friend class NetContext; //only be created by NetContext
63 
64 /**
65  * @brief ctor
66  * @details io_service could be passed in by user, in this case NO more than two threads should
67  * power this io_service instance since that would violate the thread garantee of Client, which
68  * is no callbacks are called in parallel
69  *
70  * @param cfg specify the details of the mcast transport
71  * @param outputBuffer holding the results
72  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
73  * raw udp packet (BEFORE topic filtering) or hmbdc udpcast message (AFTER topic filtering) level
74  * arbitration depending on which one of
75  * int operator()(void* bytes, size_t len) or
76  * int operator()(TransportMessageHeader const* header) presents in the arb
77  */
79  , OutputBuffer& outputBuffer
80  , MsgArbitrator arb = NoOpArb())
81  : RecvTransport(cfg)
82  , buffer_(std::max(sizeof(Subscribe), sizeof(Unsubscribe))
83  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
84  , outputBuffer_(outputBuffer)
85  , maxItemSize_(outputBuffer.maxItemSize())
86  , endpoint_(ip::address::from_string(cfg.getExt<std::string>("mcastAddr"))
87  , cfg.getExt<short>("mcastPort"))
88  , pSocket_(nullptr)
89  , bufCur_(nullptr)
90  , bytesRecved_(0)
91  , arb_(arb) {
92  }
93 
95  delete pSocket_;
96  }
97 
98 /**
99  * @brief start the show by schedule the mesage recv
100  */
101  void start() {
102  pSocket_->async_receive_from(
103  boost::asio::buffer(buf_, sizeof(buf_))
104  , sender_endpoint_
105  , boost::bind(&SELF::handleReceiveFrom, this,
106  boost::asio::placeholders::error,
107  boost::asio::placeholders::bytes_transferred)
108  );
109  }
110 
111 /**
112  * @brief power the io_service and other things
113  *
114  */
115  void runOnce() __restrict__ {
116  // if (pIos_->stopped()) pIos_->reset();
117  typename Buffer::iterator begin, end;
118  auto n = buffer_.peek(0, begin, end);
119  auto it = begin;
120  while (it != end) {
121  MH::handleMessage(*static_cast<MessageHead*>(*it++));
122  }
123  buffer_.wasteAfterPeek(0, n);
124  boost::system::error_code ec;
125  if (unlikely(bufCur_)) { //there is unfinished data in buf - arb_
126  handleReceiveFrom(ec, 0);
127  }
128  if (unlikely(pIos_->stopped())) pIos_->reset();
129  pIos_->poll(ec);
130  if (ec) HMBDC_LOG_C("io_service error=", ec);
131  }
132 
133 /**
134  * @brief only used by MH
135  */
136  void handleMessageCb(Subscribe const& t) {
137  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
138  }
139 
140 /**
141  * @brief only used by MH
142  */
143  void handleMessageCb(Unsubscribe const& t) {
144  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
145  }
146 
147 
148  void listenTo(Topic const& t) override {
149  buffer_.put(MessageWrap<Subscribe>{t});
150  }
151 
152  void stopListenTo(Topic const& t) override {
153  buffer_.put(MessageWrap<Unsubscribe>{t});
154  }
155 
156  /**
157  * @brief expose so user can manipulate it
158  * @return reference to boost::asio::ip::udp::socket
159  */
160  boost::asio::ip::udp::socket& asioSocket() {
161  return *pSocket_;
162  }
163 
164  void initInThread() {
165  Transport::initInThread();
166 
167  delete pSocket_;
168  pSocket_ = new boost::asio::ip::udp::socket(*pIos_);
169  pSocket_->open(endpoint_.protocol());
170  pSocket_->set_option(ip::udp::socket::reuse_address(true));
171  pSocket_->bind(endpoint_);
172  // Join the multicast group.
173  auto multicastAddress =
174  ip::address::from_string(config_.getExt<std::string>("mcastAddr"));
175  auto listenAddress = ip::address::from_string(
176  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr")));
177 
178 
179  auto sz = config_.getExt<int>("udpRecvBufferBytes");
180  if (sz) {
181  socket_base::receive_buffer_size option(sz);
182  boost::system::error_code ec;
183  pSocket_->set_option(option);
184  }
185 
186  socket_base::receive_buffer_size option;
187  pSocket_->get_option(option);
188  if (sz == 0 || sz >= option.value()) {
189  } else {
190  HMBDC_LOG_C("set udp receive buffer size unsuccessful, want "
191  , sz, " actual: ", option.value()
192  , " resulting lower receiving rate, check OS limits!");
193  }
194  pSocket_->set_option(ip::multicast::join_group(
195  multicastAddress.to_v4(), listenAddress.to_v4())
196  );
197  }
198 
199 private:
200  template <bool is_raw_arb>
201  typename enable_if<is_raw_arb, int>::type applyRawArb() {
202  return arb_(bufCur_, bytesRecved_);
203  }
204 
205  template <bool is_raw_arb>
206  typename enable_if<!is_raw_arb, int>::type applyRawArb() {
207  return 1;
208  }
209 
210  template <bool is_raw_arb>
211  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
212  return arb_(h);
213  }
214 
215  template <bool is_raw_arb>
216  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
217  return 1;
218  }
219 
220  void handleReceiveFrom(const boost::system::error_code& error
221  , size_t bytesRecved) {
222  if (likely(!error)) {
223  if (!bufCur_) {
224  bufCur_ = buf_;
225  bytesRecved_ = bytesRecved;
226  }
227  using traits =
229  using arg0 = typename traits::template arg<0>::type;
230  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
231  auto a = applyRawArb<is_raw_arb>();
232  if (unlikely(a == 0)) {
233  //keep bufCur_ and bytesRecved_ unchanged
234  return;
235  } else if (unlikely(a < 0)) { //drop the whole packet
236  bytesRecved_ = 0;
237  } // else process it
238 
239  while (bytesRecved_) {
240  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
241  auto wireSize = h->wireSize();
242 
243  if (likely(bytesRecved_ >= wireSize)) {
244  if (subscriptions_.check(h->topic())) {
245  // auto tmp = *(MessageHead*)(h->payload()); HMBDC_LOG_DEBUG(tmp);
246  auto l = std::min<size_t>(maxItemSize_, wireSize);
247 
248  if (unlikely(l > bytesRecved_)) {
249  break;
250  }
251  auto a = applyArb<is_raw_arb>(h);
252 
253  if (likely(a > 0)) {
254  auto it = outputBuffer_.claim();
255  char* b = static_cast<char*>(*it);
256  memcpy(b, h->payload(), l);
257  outputBuffer_.commit(it);
258  } else if (a == 0) {
259  //keep bufCur_ and bytesRecved_ unchanged
260  return;
261  } //else drop and move on
262  }
263  bytesRecved_ -= wireSize;
264  bufCur_ += wireSize;
265  } else {
266  break;
267  }
268  }
269  } else {
270  HMBDC_LOG_C("asio error=", error);
271  }
272  bufCur_ = nullptr;
273  pSocket_->async_receive_from(
274  boost::asio::buffer(buf_, sizeof(buf_))
275  , sender_endpoint_
276  , boost::bind(&SELF::handleReceiveFrom, this,
277  boost::asio::placeholders::error,
278  boost::asio::placeholders::bytes_transferred)
279  );
280  }
281 
282  Buffer buffer_;
283  OutputBuffer& outputBuffer_;
284  size_t maxItemSize_;
285  boost::asio::ip::udp::endpoint endpoint_;
286  boost::asio::ip::udp::endpoint sender_endpoint_;
287  ip::udp::socket* pSocket_;
288  char buf_[4*1024];
289  char* bufCur_;
290  size_t bytesRecved_;
291  StringTrieSet subscriptions_;
292  MsgArbitrator arb_;
293 };
294 
295 
296 /**
297  * @brief impl class
298  * @details this needs to be created using NetContext and start in an app::Context
299  *
300  * @tparam OutputBuffer type of buffer to hold resulting network messages
301  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, sued to arbitrate
302  * between difefrent recv transports. By default, keep all
303  */
304 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
306 : RecvTransportImpl<OutputBuffer, MsgArbitrator>
308 , Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
310 
311 
312  char const* hmbdcName() const {
313  return this->hmbdcName_.c_str();
314  }
315 
316  std::tuple<char const*, int> schedSpec() const {
317  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
318  }
319 
320 /**
321  * @brief power the io_service and other things
322  *
323  */
324  /*virtual*/
325  void invokedCb(uint16_t) __restrict__ override {
327  }
328 
329 
330 /**
331  * @brief start the show by schedule the mesage recv
332  */
333  /*virtual*/
334  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
335  this->initInThread();
337  };
338 
339 /**
340  * @brief should not happen ever unless an exception thrown
341  *
342  * @param e exception thown
343  */
344  /*virtual*/
345  void stoppedCb(std::exception const& e) override {
346  HMBDC_LOG_C(e.what());
347  };
348 
349 };
350 
351 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:35
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:143
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:345
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:24
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: RecvTransportEngine.hpp:160
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
impl class
Definition: RecvTransportEngine.hpp:57
Definition: StringTrieSet.hpp:113
Definition: TypedString.hpp:74
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:136
void invokedCb(uint16_t) __restrict__ override
power the io_service and other things
Definition: RecvTransportEngine.hpp:325
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
Definition: NetContext.hpp:33
Definition: Messages.hpp:76
impl class
Definition: RecvTransportEngine.hpp:305
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:101
Definition: LockFreeBufferT.hpp:18
Definition: Traits.hpp:8
Definition: Message.hpp:46
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:78
void runOnce() __restrict__
power the io_service and other things
Definition: RecvTransportEngine.hpp:115
Definition: Rater.hpp:10
Definition: Client.hpp:39
Definition: Transport.hpp:25
Definition: Messages.hpp:69
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:334
Definition: Client.hpp:11
Definition: MessageHandler.hpp:38
Definition: LockFreeBufferMisc.hpp:73