hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/udpcast/Transport.hpp"
4 #include "hmbdc/text/StringTrieSet.hpp"
5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
6 #include "hmbdc//Traits.hpp"
7 
8 #include <boost/lexical_cast.hpp>
9 
10 #include <memory>
11 #include <type_traits>
12 
13 #include <sys/epoll.h>
14 
15 namespace hmbdc { namespace app { namespace udpcast {
16 
17 /**
18  * @brief interface to power a multicast transport receiving functions
19  */
21  using ptr = std::shared_ptr<RecvTransport>;
22  using Transport::Transport;
23  virtual ~RecvTransport(){}
24  /**
25  * @brief a take all arbitrator (no arbitration at all)
26  * @details it provides the default type for arbitration which does nothing
27  * it also provides a template on writing a user defined arbitrator
28  *
29  * @param h handle to retrieve what is inside of the message
30  * @return always returns 1 here
31  * In general: 1 keep the message; -1 to drop;
32  * 0 cannot decide, ask me later.
33  */
34  struct NoOpArb {
35  int operator()(TransportMessageHeader const* h) {
36  return 1; //always keep it
37  }
38  };
39 private:
40  friend struct NetContext; //only be used by NetContext
41  virtual void listenTo(Topic const& t) = 0;
42  virtual void stopListenTo(Topic const& t) = 0;
43 };
44 
45 namespace recvtransportengine_detail {
46 using namespace hmbdc::time;
47 using namespace std;
49 
50 /**
51  * @class RecvTransportImpl<>
52  * @brief impl class
53  *
54  * @tparam OutputBuffer type of buffer to hold resulting network messages
55  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
56  * between different recv transport. By default, keeping all
57  */
58 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
61 , MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
62  using SELF = RecvTransportImpl;
64  friend struct NetContext; //only be created by NetContext
65 
66 /**
67  * @brief ctor
68  * @details io_service could be passed in by user, in this case NO more than two threads should
69  * power this io_service instance since that would violate the thread garantee of Client, which
70  * is no callbacks are called in parallel
71  *
72  * @param cfg specify the details of the udpcast transport
73  * @param outputBuffer holding the results
74  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
75  * raw udp packet (BEFORE topic filtering) or hmbdc udpcast message (AFTER topic filtering) level
76  * arbitration depending on which one of
77  * int operator()(void* bytes, size_t len) or
78  * int operator()(TransportMessageHeader const* header) presents in the arb
79  */
81  , OutputBuffer& outputBuffer
82  , MsgArbitrator arb = NoOpArb())
83  : RecvTransport(cfg)
84  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
85  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
86  , outputBuffer_(outputBuffer)
87  , maxItemSize_(outputBuffer.maxItemSize())
88  , buf_(new char[mtu_])
89  , bufCur_(buf_)
90  , bytesRecved_(0)
91  , arb_(arb) {
92  uint32_t yes = 1;
93  if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
94  HMBDC_LOG_C("failed to set reuse address errno=", errno);
95  }
96  auto sz = config_.getExt<int>("udpRecvBufferBytes");
97  if (sz) {
98  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
99  HMBDC_LOG_C("failed to set send buffer size=", sz);
100  }
101  }
102 
103  auto udpcastListenPort = config_.getExt<uint16_t>("udpcastListenPort");
104  struct sockaddr_in udpcastListenAddrPort;
105  memset(&udpcastListenAddrPort, 0, sizeof(udpcastListenAddrPort));
106  udpcastListenAddrPort.sin_family = AF_INET;
107  auto ipStr = cfg.getExt<string>("udpcastListenAddr") == string("ifaceAddr")
108  ? comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr")):cfg.getExt<string>("udpcastListenAddr");
109  udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
110  udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
111  if (bind(fd, (struct sockaddr *)&udpcastListenAddrPort, sizeof(udpcastListenAddrPort)) < 0) {
112  HMBDC_THROW(runtime_error, "failed to bind unicast udpcast listen address "
113  << ipStr << ':' << cfg.getExt<short>("udpcastListenPort") << " errno=" << errno);
114  }
115 
116  if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {//multicast address
117  /* use setsockopt() to request that the kernel join a multicast group */
118  struct ip_mreq mreq;
119  mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
120  auto iface =
121  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
122  mreq.imr_interface.s_addr=inet_addr(iface.c_str());
123  if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
124  HMBDC_THROW(runtime_error, "failed to join " << ipStr << ':'
125  << cfg.getExt<short>("udpcastPort"));
126  }
127  }
128  }
129 
130  ~RecvTransportImpl() {
131  delete [] buf_;
132  }
133 
134 /**
135  * @brief start the show by schedule the mesage recv
136  */
137  void start() {
138  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, *this);
139  }
140 
141  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
142  typename Buffer::iterator begin, end;
143  auto n = buffer_.peek(begin, end);
144  auto it = begin;
145  while (it != end) {
146  MH::handleMessage(*static_cast<MessageHead*>(*it++));
147  }
148  buffer_.wasteAfterPeek(begin, n);
149  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
150  utils::EpollTask::instance().poll();
151  }
152  resumeRead();
153  }
154 
155 /**
156  * @brief only used by MH
157  */
158  void handleMessageCb(Subscribe const& t) {
159  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
160  }
161 
162 /**
163  * @brief only used by MH
164  */
165  void handleMessageCb(Unsubscribe const& t) {
166  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
167  }
168 
169 
170  void listenTo(Topic const& t) override {
171  buffer_.put(MessageWrap<Subscribe>{t});
172  }
173 
174  void stopListenTo(Topic const& t) override {
175  buffer_.put(MessageWrap<Unsubscribe>{t});
176  }
177  sockaddr_in udpcastListenRemoteAddr = {0};
178 private:
179  template <bool is_raw_arb>
180  typename enable_if<is_raw_arb, int>::type applyRawArb(void* pkt, size_t len) {
181  return arb_(pkt, len);
182  }
183 
184  template <bool is_raw_arb>
185  typename enable_if<!is_raw_arb, int>::type applyRawArb(void*, size_t) {
186  return 1;
187  }
188 
189  template <bool is_raw_arb>
190  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
191  return arb_(h);
192  }
193 
194  template <bool is_raw_arb>
195  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
196  return 1;
197  }
198 
199  void resumeRead() HMBDC_RESTRICT {
200  using traits =
202  using arg0 = typename traits::template arg<0>::type;
203  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
204  do {
205  if (bytesRecved_ && bufCur_ == buf_) {
206  auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
207  if (hmbdc_unlikely(a == 0)) {
208  //keep bufCur_ and bytesRecved_ unchanged
209  return;
210  } else if (hmbdc_unlikely(a < 0)) { //drop the whole packet
211  bytesRecved_ = 0;
212  } // else process it
213  }
214  while (bytesRecved_) {
215  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
216  auto wireSize = h->wireSize();
217 
218  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
219  if (subscriptions_.check(h->topic())) {
220  if (hmbdc_unlikely(wireSize > bytesRecved_)) {
221  break;
222  }
223  auto a = applyArb<is_raw_arb>(h);
224 
225  if (hmbdc_likely(a > 0)) {
226  auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
227  outputBuffer_.put(h->payload(), l);
228  } else if (a == 0) {
229  //keep bufCur_ and bytesRecved_ unchanged
230  return;
231  } //else drop and move on
232  }
233  bytesRecved_ -= wireSize;
234  bufCur_ += wireSize;
235  } else {
236  break;
237  }
238  }
239  bytesRecved_ = 0;
240  bufCur_ = buf_;
241  if (isFdReady()) {
242  socklen_t addrLen = sizeof(udpcastListenRemoteAddr);
243  auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
244  , (struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
245  if (hmbdc_unlikely(l < 0)) {
246  if (!checkErr()) {
247  HMBDC_LOG_C("recvmmsg failed errno=", errno);
248  }
249  return;
250  } else if (l == 0) {
251  //nothing to do now
252  return;
253  }
254  bytesRecved_ = l;
255  }
256  } while(bytesRecved_);
257  }
258 
259  Buffer buffer_;
260  OutputBuffer& outputBuffer_;
261  size_t maxItemSize_;
262  char* buf_;
263  char* bufCur_;
264  size_t bytesRecved_;
265  text::StringTrieSet subscriptions_;
266  MsgArbitrator arb_;
267 };
268 
269 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
271 : RecvTransportImpl<OutputBuffer, MsgArbitrator>
272 , Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
276 
277 /**
278  * @brief power the io_service and other things
279  *
280  */
281  /*virtual*/
282  void invokedCb(uint16_t) HMBDC_RESTRICT override {
284  }
285 
286 
287 /**
288  * @brief start the show by schedule the mesage recv
289  */
290  /*virtual*/
291  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
293  };
294 
295 /**
296  * @brief should not happen ever unless an exception thrown
297  *
298  * @param e exception thown
299  */
300  /*virtual*/
301  void stoppedCb(std::exception const& e) override {
302  HMBDC_LOG_C(e.what());
303  };
304 
305 };
306 
307 } //recvtransportengine_detail
308 
309 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
311 
312 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
314 
315 }}}
Definition: MonoLockFreeBuffer.hpp:15
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:282
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:165
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:180
a singleton that holding udpcast resources
Definition: NetContext.hpp:38
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:20
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
Definition: Messages.hpp:65
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:158
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:80
impl class
Definition: RecvTransportEngine.hpp:59
Definition: StringTrieSetDetail.hpp:115
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:137
Definition: Message.hpp:76
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:301
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Definition: Messages.hpp:73
Definition: Base.hpp:12
Definition: MessageHandler.hpp:39
Definition: Traits.hpp:8
Definition: LockFreeBufferMisc.hpp:74
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:291