hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/mcast/Transport.hpp"
4 #include "hmbdc/text/StringTrieSet.hpp"
5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
6 #include "hmbdc//Traits.hpp"
7 
8 #include <boost/lexical_cast.hpp>
9 
10 #include <memory>
11 #include <type_traits>
12 
13 #include <sys/epoll.h>
14 
15 namespace hmbdc { namespace app { namespace mcast {
16 
17 /**
18  * @brief interface to power a multicast transport receiving functions
19  */
21  using ptr = std::shared_ptr<RecvTransport>;
22  using Transport::Transport;
23  virtual ~RecvTransport(){}
24  /**
25  * @brief a take all arbitrator (no arbitration at all)
26  * @details it provides the default type for arbitration which does nothing
27  * it also provides a template on writing a user defined arbitrator
28  *
29  * @param h handle to retrieve what is inside of the message
30  * @return always returns 1 here
31  * In general: 1 keep the message; -1 to drop;
32  * 0 cannot decide, ask me later.
33  */
34  struct NoOpArb {
35  int operator()(TransportMessageHeader const* h) {
36  return 1; //always keep it
37  }
38  };
39 private:
40  friend struct NetContext; //only be used by NetContext
41  virtual void listenTo(Topic const& t) = 0;
42  virtual void stopListenTo(Topic const& t) = 0;
43 };
44 
45 namespace recvtransportengine_detail {
46 using namespace hmbdc::time;
47 using namespace std;
49 
50 /**
51  * @class RecvTransportImpl<>
52  * @brief impl class
53  *
54  * @tparam OutputBuffer type of buffer to hold resulting network messages
55  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
56  * between different recv transport. By default, keeping all
57  */
58 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
61 , MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
62  using SELF = RecvTransportImpl;
64  friend struct NetContext; //only be created by NetContext
65 
66 /**
67  * @brief ctor
68  * @details io_service could be passed in by user, in this case NO more than two threads should
69  * power this io_service instance since that would violate the thread garantee of Client, which
70  * is no callbacks are called in parallel
71  *
72  * @param cfg specify the details of the mcast transport
73  * @param outputBuffer holding the results
74  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
75  * raw udp packet (BEFORE topic filtering) or hmbdc udpcast message (AFTER topic filtering) level
76  * arbitration depending on which one of
77  * int operator()(void* bytes, size_t len) or
78  * int operator()(TransportMessageHeader const* header) presents in the arb
79  */
81  , OutputBuffer& outputBuffer
82  , MsgArbitrator arb = NoOpArb())
83  : RecvTransport(cfg)
84  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
85  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
86  , outputBuffer_(outputBuffer)
87  , maxItemSize_(outputBuffer.maxItemSize())
88  , buf_(new char[mtu_])
89  , bufCur_(buf_)
90  , bytesRecved_(0)
91  , arb_(arb) {
92  uint32_t yes = 1;
93  if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
94  HMBDC_LOG_C("failed to set reuse address errno=", errno);
95  }
96  auto sz = config_.getExt<int>("udpRecvBufferBytes");
97  if (sz) {
98  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
99  HMBDC_LOG_C("failed to set send buffer size=", sz);
100  }
101  }
102 
103  /* bind to receive address */
104  if (bind(fd, (struct sockaddr *)&mcAddr, sizeof(mcAddr)) < 0) {
105  HMBDC_THROW(runtime_error, "failed to bind "
106  << config_.getExt<string>("mcastAddr") << ':'
107  << cfg.getExt<short>("mcastPort"));
108  }
109  /* use setsockopt() to request that the kernel join a multicast group */
110  struct ip_mreq mreq;
111  mreq.imr_multiaddr.s_addr=inet_addr(config_.getExt<string>("mcastAddr").c_str());
112  auto iface =
113  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
114  mreq.imr_interface.s_addr=inet_addr(iface.c_str());
115  if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
116  HMBDC_THROW(runtime_error, "failed to join " << config_.getExt<string>("mcastAddr") << ':'
117  << cfg.getExt<short>("mcastPort"));
118  }
119  }
120 
121  ~RecvTransportImpl() {
122  delete [] buf_;
123  }
124 
125 /**
126  * @brief start the show by schedule the mesage recv
127  */
128  void start() {
129  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, *this);
130  }
131 
132  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
133  typename Buffer::iterator begin, end;
134  auto n = buffer_.peek(begin, end);
135  auto it = begin;
136  while (it != end) {
137  MH::handleMessage(*static_cast<MessageHead*>(*it++));
138  }
139  buffer_.wasteAfterPeek(begin, n);
140  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
141  utils::EpollTask::instance().poll();
142  }
143  resumeRead();
144  }
145 
146 /**
147  * @brief only used by MH
148  */
149  void handleMessageCb(Subscribe const& t) {
150  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
151  }
152 
153 /**
154  * @brief only used by MH
155  */
156  void handleMessageCb(Unsubscribe const& t) {
157  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
158  }
159 
160 
161  void listenTo(Topic const& t) override {
162  buffer_.put(MessageWrap<Subscribe>{t});
163  }
164 
165  void stopListenTo(Topic const& t) override {
166  buffer_.put(MessageWrap<Unsubscribe>{t});
167  }
168 
169 private:
170  template <bool is_raw_arb>
171  typename enable_if<is_raw_arb, int>::type applyRawArb(void* pkt, size_t len) {
172  return arb_(pkt, len);
173  }
174 
175  template <bool is_raw_arb>
176  typename enable_if<!is_raw_arb, int>::type applyRawArb(void*, size_t) {
177  return 1;
178  }
179 
180  template <bool is_raw_arb>
181  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
182  return arb_(h);
183  }
184 
185  template <bool is_raw_arb>
186  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
187  return 1;
188  }
189 
190  void resumeRead() HMBDC_RESTRICT {
191  using traits =
193  using arg0 = typename traits::template arg<0>::type;
194  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
195  do {
196  if (bytesRecved_ && bufCur_ == buf_) {
197  auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
198  if (hmbdc_unlikely(a == 0)) {
199  //keep bufCur_ and bytesRecved_ unchanged
200  return;
201  } else if (hmbdc_unlikely(a < 0)) { //drop the whole packet
202  bytesRecved_ = 0;
203  } // else process it
204  }
205  while (bytesRecved_) {
206  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
207  auto wireSize = h->wireSize();
208 
209  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
210  if (subscriptions_.check(h->topic())) {
211  if (hmbdc_unlikely(wireSize > bytesRecved_)) {
212  break;
213  }
214  auto a = applyArb<is_raw_arb>(h);
215 
216  if (hmbdc_likely(a > 0)) {
217  auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
218  outputBuffer_.put(h->payload(), l);
219  } else if (a == 0) {
220  //keep bufCur_ and bytesRecved_ unchanged
221  return;
222  } //else drop and move on
223  }
224  bytesRecved_ -= wireSize;
225  bufCur_ += wireSize;
226  } else {
227  break;
228  }
229  }
230  bytesRecved_ = 0;
231  bufCur_ = buf_;
232  if (isFdReady()) {
233  auto l = recv(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT);
234  if (hmbdc_unlikely(l < 0)) {
235  if (!checkErr()) {
236  HMBDC_LOG_C("recvmmsg failed errno=", errno);
237  }
238  return;
239  } else if (l == 0) {
240  //nothing to do now
241  return;
242  }
243  bytesRecved_ = l;
244  }
245  } while(bytesRecved_);
246  }
247 
248  // void resumeRead() {
249  // if (sdReady_ && pktsBufTail_ < pktsBufSize_) {
250  // auto l = recvmmsg(fd, pktsBuf_ + pktsBufTail_
251  // , pktsBufSize_ - pktsBufTail_, MSG_NOSIGNAL|MSG_DONTWAIT, NULL);
252  // if (hmbdc_unlikely(l < 0)) {
253  // sdReady_ = false;
254  // if (errno != EAGAIN) {
255  // HMBDC_LOG_C("recvmmsg failed errno=", errno);
256  // if (!utils::EpollTask::instance().delFd(fd)) {
257  // HMBDC_LOG_C("failed to del fd from epoll errno=", errno);
258  // } else {
259  // close(fd);
260  // fd = -1;
261  // }
262  // }
263  // return;
264  // } else if (l == 0) {
265  // //nothing to do now
266  // return;
267  // }
268  // pktsBufTail_ += l;
269  // while (pktsBufHead_ != pktsBufTail_) {
270  // auto pkt = pktsBuf3_[pktsBufHead_];
271  // size_t pktSize = pktsBuf_[pktsBufHead_].msg_len;
272  // using traits =
273  // function_traits<typename std::remove_reference<MsgArbitrator>::type>;
274  // using arg0 = typename traits::template arg<0>::type;
275  // bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
276  // auto a = applyRawArb<is_raw_arb>(pkt, pktSize);
277  // if (hmbdc_unlikely(a == 0)) {
278  // return;
279  // } else if (hmbdc_unlikely(a < 0)) { //drop the whole packet
280  // pktsBufHead_++;
281  // continue;
282  // } // else process it
283 
284  // while (inPktOffset_ < pktSize) {
285  // char* p = pkt + inPktOffset_;
286  // auto h = reinterpret_cast<TransportMessageHeader*>(p);
287  // auto wireSize = h->wireSize();
288  // if (hmbdc_likely(pktSize - inPktOffset_ >= wireSize)) {
289  // if (subscriptions_.check(h->topic())) {
290  // // auto tmp = *(MessageHead*)(h->payload()); HMBDC_LOG_DEBUG(tmp);
291  // auto a = applyArb<is_raw_arb>(h);
292  // if (hmbdc_likely(a > 0)) {
293  // auto it = outputBuffer_.claim();
294  // char* b = static_cast<char*>(*it);
295  // auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
296  // memcpy(b, h->payload(), l);
297  // outputBuffer_.commit(it);
298  // } else if (a == 0) {
299  // //keep pktsBufHead_, inPktOffset_ unchanged will be back fine
300  // return;
301  // } //else drop and move on
302  // }
303  // inPktOffset_ += wireSize;
304  // } else {
305  // break;
306  // }
307  // }
308  // pktsBufHead_++;
309  // inPktOffset_ = 0;
310  // }
311  // if (pktsBufHead_ == pktsBufTail_) {
312  // pktsBufHead_ = pktsBufTail_ = 0;
313  // }
314  // }
315  // }
316 
317  Buffer buffer_;
318  OutputBuffer& outputBuffer_;
319  size_t maxItemSize_;
320  char* buf_;
321  char* bufCur_;
322  size_t bytesRecved_;
323  // size_t maxPktSize_;
324  // size_t pktsBufSize_;
325  // mmsghdr pktsBuf_[10];
326  // size_t pktsBufHead_;
327  // size_t pktsBufTail_;
328  // iovec pktsBuf2_[10];
329  // char pktsBuf3_[10][2048];
330  // size_t inPktOffset_;
331  text::StringTrieSet subscriptions_;
332  MsgArbitrator arb_;
333 };
334 
335 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
337 : RecvTransportImpl<OutputBuffer, MsgArbitrator>
338 , Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
342 
343 /**
344  * @brief power the io_service and other things
345  *
346  */
347  /*virtual*/
348  void invokedCb(uint16_t) HMBDC_RESTRICT override {
350  }
351 
352 
353 /**
354  * @brief start the show by schedule the mesage recv
355  */
356  /*virtual*/
357  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
359  };
360 
361 /**
362  * @brief should not happen ever unless an exception thrown
363  *
364  * @param e exception thown
365  */
366  /*virtual*/
367  void stoppedCb(std::exception const& e) override {
368  HMBDC_LOG_C(e.what());
369  };
370 
371 };
372 
373 } //recvtransportengine_detail
374 
375 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
377 
378 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
380 
381 }}}
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:367
Definition: MonoLockFreeBuffer.hpp:15
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:80
class to hold an hmbdc configuration
Definition: Config.hpp:44
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:20
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:348
impl class
Definition: RecvTransportEngine.hpp:59
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
a singleton that holding mcast resources
Definition: NetContext.hpp:38
Definition: Messages.hpp:72
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:357
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:149
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:156
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:128
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:72
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:43
Definition: Messages.hpp:65
Definition: Base.hpp:12
Definition: MessageHandler.hpp:36
Definition: Traits.hpp:8
Definition: LockFreeBufferMisc.hpp:74