hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/mcast/Transport.hpp"
4 #include "hmbdc/app/LoggerT.hpp"
5 #include "hmbdc/text/StringTrieSet.hpp"
6 #include "hmbdc//Traits.hpp"
7 
8 #include <boost/bind.hpp>
9 #include <boost/lexical_cast.hpp>
10 
11 #include <memory>
12 #include <type_traits>
13 
14 namespace hmbdc { namespace app { namespace mcast {
15 
16 /**
17  * @brief interface to power a multicast transport receiving functions
18  */
20  using ptr = std::shared_ptr<RecvTransport>;
21  using Transport::Transport;
22  virtual ~RecvTransport(){}
23  /**
24  * @brief a take all arbitrator (no arbitration at all)
25  * @details it provides the default type for arbitration which does nothing
26  * it also provides a template on writing a user defined arbitrator
27  *
28  * @param h handle to retrieve what is inside of the message
29  * @return always returns 1 here
30  * In general: 1 keep the message; -1 to drop;
31  * 0 cannot decide, ask me later.
32  */
33  struct NoOpArb {
34  int operator()(TransportMessageHeader const* h) {
35  return 1; //always keep it
36  }
37  };
38 private:
39  friend struct NetContext; //only be used by NetContext
40  virtual void listenTo(Topic const& t) = 0;
41  virtual void stopListenTo(Topic const& t) = 0;
42 };
43 
44 namespace recvtransportengine_detail {
45 using namespace hmbdc::time;
46 using namespace boost::asio;
47 
48 /**
49  * @class RecvTransportImpl<>
50  * @brief impl class
51  *
52  * @tparam OutputBuffer type of buffer to hold resulting network messages
53  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
54  * between different recv transport. By default, keeping all
55  */
56 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
59 , MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
60  using SELF = RecvTransportImpl;
62  friend struct NetContext; //only be created by NetContext
63 
64 /**
65  * @brief ctor
66  * @details io_service could be passed in by user, in this case NO more than two threads should
67  * power this io_service instance since that would violate the thread garantee of Client, which
68  * is no callbacks are called in parallel
69  *
70  * @param cfg specify the details of the mcast transport
71  * @param outputBuffer holding the results
72  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
73  * raw udp packet (BEFORE topic filtering) or hmbdc udpcast message (AFTER topic filtering) level
74  * arbitration depending on which one of
75  * int operator()(void* bytes, size_t len) or
76  * int operator()(TransportMessageHeader const* header) presents in the arb
77  */
79  , OutputBuffer& outputBuffer
80  , MsgArbitrator arb = NoOpArb())
81  : RecvTransport(cfg)
82  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
83  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
84  , outputBuffer_(outputBuffer)
85  , maxItemSize_(outputBuffer.maxItemSize())
86  , endpoint_(ip::address::from_string(cfg.getExt<std::string>("mcastAddr"))
87  , cfg.getExt<short>("mcastPort"))
88  , pSocket_(nullptr)
89  , bufCur_(nullptr)
90  , bytesRecved_(0)
91  , arb_(arb) {
92  }
93 
95  delete pSocket_;
96  }
97 
98 /**
99  * @brief start the show by schedule the mesage recv
100  */
101  void start() {
102  pSocket_->async_receive_from(
103  boost::asio::buffer(buf_, sizeof(buf_))
104  , sender_endpoint_
105  , boost::bind(&SELF::handleReceiveFrom, this,
106  boost::asio::placeholders::error,
107  boost::asio::placeholders::bytes_transferred)
108  );
109  }
110 
111 /**
112  * @brief power the io_service and other things
113  *
114  */
115  void runOnce() HMBDC_RESTRICT {
116  // if (pIos_->stopped()) pIos_->reset();
117  typename Buffer::iterator begin, end;
118  auto n = buffer_.peek(0, begin, end);
119  auto it = begin;
120  while (it != end) {
121  MH::handleMessage(*static_cast<MessageHead*>(*it++));
122  }
123  buffer_.wasteAfterPeek(0, n);
124  boost::system::error_code ec;
125  if (hmbdc_unlikely(bufCur_)) { //there is unfinished data in buf - arb_
126  handleReceiveFrom(ec, 0);
127  }
128  if (hmbdc_unlikely(pIos_->stopped())) pIos_->reset();
129  pIos_->poll(ec);
130  if (ec) HMBDC_LOG_C("io_service error=", ec);
131  }
132 
133 /**
134  * @brief only used by MH
135  */
136  void handleMessageCb(Subscribe const& t) {
137  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
138  }
139 
140 /**
141  * @brief only used by MH
142  */
143  void handleMessageCb(Unsubscribe const& t) {
144  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
145  }
146 
147 
148  void listenTo(Topic const& t) override {
149  buffer_.put(MessageWrap<Subscribe>{t});
150  }
151 
152  void stopListenTo(Topic const& t) override {
153  buffer_.put(MessageWrap<Unsubscribe>{t});
154  }
155 
156  /**
157  * @brief expose so user can manipulate it
158  * @return reference to boost::asio::ip::udp::socket
159  */
160  boost::asio::ip::udp::socket& asioSocket() {
161  return *pSocket_;
162  }
163 
164  void initInThread() {
165  Transport::initInThread();
166 
167  delete pSocket_;
168  pSocket_ = new boost::asio::ip::udp::socket(*pIos_);
169  pSocket_->open(endpoint_.protocol());
170  pSocket_->set_option(ip::udp::socket::reuse_address(true));
171  pSocket_->bind(endpoint_);
172  // Join the multicast group.
173  auto multicastAddress =
174  ip::address::from_string(config_.getExt<std::string>("mcastAddr"));
175  auto listenAddress = ip::address::from_string(
176  comm::inet::getLocalIpMatchMask(config_.getExt<std::string>("ifaceAddr")));
177 
178 
179  auto sz = config_.getExt<int>("udpRecvBufferBytes");
180  if (sz) {
181  socket_base::receive_buffer_size option(sz);
182  boost::system::error_code ec;
183  pSocket_->set_option(option);
184  }
185 
186  socket_base::receive_buffer_size option;
187  pSocket_->get_option(option);
188  if (sz == 0 || sz >= option.value()) {
189  } else {
190  HMBDC_LOG_C("set udp receive buffer size unsuccessful, want "
191  , sz, " actual: ", option.value()
192  , " resulting lower receiving rate, check OS limits!");
193  }
194  pSocket_->set_option(ip::multicast::join_group(
195  multicastAddress.to_v4(), listenAddress.to_v4())
196  );
197  }
198 
199 private:
200  template <bool is_raw_arb>
201  typename enable_if<is_raw_arb, int>::type applyRawArb() {
202  return arb_(bufCur_, bytesRecved_);
203  }
204 
205  template <bool is_raw_arb>
206  typename enable_if<!is_raw_arb, int>::type applyRawArb() {
207  return 1;
208  }
209 
210  template <bool is_raw_arb>
211  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
212  return arb_(h);
213  }
214 
215  template <bool is_raw_arb>
216  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
217  return 1;
218  }
219 
220  void handleReceiveFrom(const boost::system::error_code& error
221  , size_t bytesRecved) {
222  if (hmbdc_likely(!error)) {
223  if (!bufCur_) {
224  bufCur_ = buf_;
225  bytesRecved_ = bytesRecved;
226  }
227  using traits =
229  using arg0 = typename traits::template arg<0>::type;
230  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
231  auto a = applyRawArb<is_raw_arb>();
232  if (hmbdc_unlikely(a == 0)) {
233  //keep bufCur_ and bytesRecved_ unchanged
234  return;
235  } else if (hmbdc_unlikely(a < 0)) { //drop the whole packet
236  bytesRecved_ = 0;
237  } // else process it
238 
239  while (bytesRecved_) {
240  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
241  auto wireSize = h->wireSize();
242 
243  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
244  if (subscriptions_.check(h->topic())) {
245  // auto tmp = *(MessageHead*)(h->payload()); HMBDC_LOG_DEBUG(tmp);
246  auto l = std::min<size_t>(maxItemSize_, wireSize);
247 
248  if (hmbdc_unlikely(l > bytesRecved_)) {
249  break;
250  }
251  auto a = applyArb<is_raw_arb>(h);
252 
253  if (hmbdc_likely(a > 0)) {
254  auto it = outputBuffer_.claim();
255  char* b = static_cast<char*>(*it);
256  memcpy(b, h->payload(), l);
257  outputBuffer_.commit(it);
258  } else if (a == 0) {
259  //keep bufCur_ and bytesRecved_ unchanged
260  return;
261  } //else drop and move on
262  }
263  bytesRecved_ -= wireSize;
264  bufCur_ += wireSize;
265  } else {
266  break;
267  }
268  }
269  } else {
270  HMBDC_LOG_C("asio error=", error);
271  }
272  bufCur_ = nullptr;
273  pSocket_->async_receive_from(
274  boost::asio::buffer(buf_, sizeof(buf_))
275  , sender_endpoint_
276  , boost::bind(&SELF::handleReceiveFrom, this,
277  boost::asio::placeholders::error,
278  boost::asio::placeholders::bytes_transferred)
279  );
280  }
281 
282  Buffer buffer_;
283  OutputBuffer& outputBuffer_;
284  size_t maxItemSize_;
285  boost::asio::ip::udp::endpoint endpoint_;
286  boost::asio::ip::udp::endpoint sender_endpoint_;
287  ip::udp::socket* pSocket_;
288  char buf_[4*1024];
289  char* bufCur_;
290  size_t bytesRecved_;
291  text::StringTrieSet subscriptions_;
292  MsgArbitrator arb_;
293 };
294 
295 
296 /**
297  * @class RecvTransportEngineImpl<>
298  * @brief impl class
299  * @details this needs to be created using NetContext and start in an app::Context
300  *
301  * @tparam OutputBuffer type of buffer to hold resulting network messages
302  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, sued to arbitrate
303  * between difefrent recv transports. By default, keep all
304  */
305 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
307 : RecvTransportImpl<OutputBuffer, MsgArbitrator>
309 , Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
311 
312 
313  char const* hmbdcName() const {
314  return this->hmbdcName_.c_str();
315  }
316 
317  std::tuple<char const*, int> schedSpec() const {
318  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
319  }
320 
321 /**
322  * @brief power the io_service and other things
323  *
324  */
325  /*virtual*/
326  void invokedCb(uint16_t) HMBDC_RESTRICT override {
328  }
329 
330 
331 /**
332  * @brief start the show by schedule the mesage recv
333  */
334  /*virtual*/
335  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
336  this->initInThread();
338  };
339 
340 /**
341  * @brief should not happen ever unless an exception thrown
342  *
343  * @param e exception thown
344  */
345  /*virtual*/
346  void stoppedCb(std::exception const& e) override {
347  HMBDC_LOG_C(e.what());
348  };
349 
350 };
351 
352 } //recvtransportengine_detail
353 
354 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
356 
357 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
359 
360 }}}
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:346
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:78
class to hold an hmbdc configuration
Definition: Config.hpp:44
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:19
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:326
impl class
Definition: RecvTransportEngine.hpp:57
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
a singleton that holding mcast resources
Definition: NetContext.hpp:38
Definition: Messages.hpp:72
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:335
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:136
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:143
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: RecvTransportEngine.hpp:160
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:101
Definition: StringTrieSetDetail.hpp:115
Definition: Traits.hpp:8
Definition: Message.hpp:55
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
Definition: Rater.hpp:10
void runOnce() HMBDC_RESTRICT
power the io_service and other things
Definition: RecvTransportEngine.hpp:115
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:17
Definition: Messages.hpp:65
impl class
Definition: RecvTransportEngine.hpp:306
Definition: Base.hpp:12
Definition: MessageHandler.hpp:36
Definition: LockFreeBufferMisc.hpp:73