hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/comm/Topic.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/Compile.hpp"
14 #include "hmbdc/text/StringTrieSet.hpp"
15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
16 #include "hmbdc//Traits.hpp"
17 
18 
19 #include <boost/lexical_cast.hpp>
20 #include <memory>
21 #include <type_traits>
22 
23 
24 #include <netinet/ether.h> /* ether_aton */
25 #include <linux/if_packet.h> /* sockaddr_ll */
26 #include <sys/sysctl.h> /* sysctl */
27 #include <ifaddrs.h> /* getifaddrs */
28 #include <poll.h>
29 
30 namespace hmbdc { namespace app { namespace netmap {
31 
32 struct NetContext;
33 /**
34  * @brief power a netmap port receiving functions
35  * @details this needs to be created using NetContext and start in an app::Context
36  */
37 struct RecvTransport {
38  using ptr = std::shared_ptr<RecvTransport>;
39  virtual ~RecvTransport(){}
40  /**
41  * @brief a take all arbitrator (no arbitration at all)
42  * @details it provides the default type for arbitration which does nothing
43  * it also provides a template on writing a user defined arbitrator
44  *
45  * @param h handle to retrieve what is inside of the message
46  * @return always returns 1 here
47  * In general: 1 keep the message; -1 to drop;
48  * 0 cannot decide, ask me later.
49  */
50  struct NoOpArb {
51  int operator()(TransportMessageHeader const* h) {
52  return 1; //always keep it
53  }
54  };
55 private:
56  friend struct hmbdc::app::netmap::NetContext; //only be used by NetContext
57  virtual void listenTo(comm::Topic const& t) = 0;
58  virtual void stopListenTo(comm::Topic const& t) = 0;
59 };
60 
61 /**
62  * @brief impl class,
63  * @details this needs to be created using NetContext and start in an app::Context
64  *
65  * @tparam OutputBuffer type of buffer to hold resulting network messages
66  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
67  * between different recv transports. By default, keeping all
68  */
69 template <typename OutBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
72 , Client<RecvTransportEngine<OutBuffer, MsgArbitrator>>
73 , MessageHandler<RecvTransportEngine<OutBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
74 private:
76  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
77 
78 /**
79  * @brief ctor
80  *
81  * @param cfg specify the details of the mcast transport
82  * @param outputBuffer holding the results
83  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
84  * raw netmap (udp) packet (BEFORE topic filtering) or hmbdc message (AFTER topic filtering) level
85  * arbitration depending on which one of
86  * int operator()(void* bytes, size_t len) or
87  * int operator()(TransportMessageHeader const* header) presents in the arb
88  */
89  RecvTransportEngine(Config const& config, OutBuffer& outBuffer
90  , MsgArbitrator arb = NoOpArb())
91  : config_(config)
92  , data_(nullptr)
93  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
94  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
95  , outBuffer_(outBuffer)
96  , maxMessageSize_(outBuffer.maxItemSize())
97  , doChecksum_(config_.getExt<bool>("doChecksum"))
98  , busyWait_(true)
99  , pollWaitTimeMillisec_(0)
100  , arb_(arb) {
101  config (hmbdcName_, "hmbdcName")
102  (schedPolicy_, "schedPolicy")
103  (schedPriority_, "schedPriority")
104  ;
105 
106  busyWait_ = config_.getExt<bool>("busyWait");
107  if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<int>("pollWaitTimeMillisec");
108 
109  struct nmreq baseNmd;
110  bzero(&baseNmd, sizeof(baseNmd));
111  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
112  config_(baseNmd.nr_tx_slots, "nmTxSlots");
113  config_(baseNmd.nr_rx_slots, "nmRxSlots");
114  config_(baseNmd.nr_tx_rings, "nmTxRings");
115  config_(baseNmd.nr_rx_rings, "nmRxRings");
116  auto nmport = config_.getExt<std::string>("netmapPort");
117  uint32_t flags = config_.getExt<uint32_t>("nmOpenFlags");
118  nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
119  if (!nmd_) {
120  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
121  }
122 
123  struct nmreq req;
124  memset(&req, 0, sizeof(req));
125  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
126  req.nr_version = NETMAP_API;
127  req.nr_cmd = NETMAP_VNET_HDR_GET;
128  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
129  if (err) {
130  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
131  }
132  virtHeader_ = req.nr_arg1;
133 
134  //setting up poll - might be useful or not
135  pfd_.fd = nmd_->fd;
136  pfd_.events = POLLIN;
137  sleep(config_.getExt<int>("nmResetWaitSec"));
138  //cleanup rings
139  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
140  HMBDC_THROW(std::runtime_error, "IO error");
141  }
142  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
143  struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
144  if (nm_ring_empty(rxring))
145  continue;
146  rxring->head = rxring->cur = rxring->tail;
147  }
148  }
149 
150  void listenTo(Topic const& t) override {
151  buffer_.put(MessageWrap<Subscribe>{t});
152  }
153 
154  void stopListenTo(Topic const& t) override {
155  buffer_.put(MessageWrap<Unsubscribe>{t});
156  }
157 
158 public:
159 
161  if (nmd_) nm_close(nmd_);
162  }
163 
164  /*virtual*/
165  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
167  auto n = buffer_.peek(begin, end);
168  auto it = begin;
169  while (it != end) {
170  MH::handleMessage(*static_cast<MessageHead*>(*it++));
171  }
172  buffer_.wasteAfterPeek(begin, n);
173 
174  syncNetmap();
175  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
176  struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
177  if (nm_ring_empty(rxring))
178  continue;
179 
180  recvPackets(rxring);
181  }
182  }
183 
184  void handleMessageCb(Subscribe const& t) {
185  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
186  }
187 
188  void handleMessageCb(Unsubscribe const& t) {
189  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
190  }
191 
192  void stoppedCb(std::exception const& e) override {
193  HMBDC_LOG_C(e.what());
194  };
195 
196  char const* hmbdcName() const {
197  return this->hmbdcName_.c_str();
198  }
199 
200  std::tuple<char const*, int> schedSpec() const {
201  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
202  }
203 
204 private:
205  template <bool is_raw_arb>
206  typename std::enable_if<is_raw_arb, int>::type applyRawArb(size_t len) {
207  return arb_(data_, len);
208  }
209 
210  template <bool is_raw_arb>
211  typename std::enable_if<!is_raw_arb, int>::type applyRawArb(size_t) {
212  return 1;
213  }
214 
215  template <bool is_raw_arb>
216  typename std::enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
217  return arb_(h);
218  }
219 
220  template <bool is_raw_arb>
221  typename std::enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
222  return 1;
223  }
224 
225  /**
226  * @brief sync using busy wait or poll depending on config
227  * @details it turns out busy wait performance is very poor when using vale
228  * poll works mostly, but it works well only when an enough timeout is given
229  * less than 10 milli wont work well
230  */
231  void syncNetmap() HMBDC_RESTRICT {
232  if (hmbdc_likely(busyWait_)) {
233  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
234  HMBDC_THROW(std::runtime_error, "IO error");
235  } else {
236  return;
237  }
238  } else {
239  auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
240  if (hmbdc_unlikely( res < 0)) {
241  HMBDC_THROW(std::runtime_error, "IO error errno=" << errno);
242  } else {
243  return;
244  }
245  }
246  }
247 
248  void recvPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
249  using namespace comm::eth;
250  auto cur = ring->cur;
251 
252  while(cur != ring->tail) {
253  struct netmap_slot *slot = &ring->slot[cur];
254  auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
255  auto* p = reinterpret_cast<pkt*>(buf + virtHeader_ - sizeof(virt_header));
256 
257  if (hmbdc_likely(p->ipv4.ip.ip_p == IPPROTO_UDP
258  && p->ipv4.ip.ip_off == htons(IP_DF))) {
259  if (!data_) data_ = (uint8_t*)p->ipv4.body;
260 
261  using traits =
262  function_traits<typename std::remove_reference<MsgArbitrator>::type>;
263  using arg0 = typename traits::template arg<0>::type;
264  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
265  auto a = applyRawArb<is_raw_arb>(slot->len);
266  if (hmbdc_unlikely(a == 0)) {
267  //keep data_ unchanged
268  ring->head = ring->cur = cur;
269  return;
270  } else if (hmbdc_likely(a > 0)) {
271  while (data_ + sizeof(TransportMessageHeader) < buf + slot->len) {
272  auto header = reinterpret_cast<TransportMessageHeader*>(data_);
273  if (data_ + header->wireSize() <= buf + slot->len) {
274  if (subscriptions_.check(header->topic())) {
275  auto a = applyArb<is_raw_arb>(header);
276  if (hmbdc_likely(a > 0)) {
277  auto l = std::min(maxMessageSize_, (size_t)header->messagePayloadLen());
278  outBuffer_.put(header->payload(), l);
279  } else if (hmbdc_unlikely(a == 0)) {
280  ring->head = ring->cur = cur;
281  return;
282  } //else drop and move to next msg
283  }
284  data_ += header->wireSize();
285  } else {
286  break;
287  }
288  }
289  } //else drop the packet
290  }
291  data_ = nullptr;
292  cur = nm_ring_next(ring, cur);
293  }
294 
295  ring->head = ring->cur = cur;
296  }
297 
298  static
299  bool verifyChecksum(comm::eth::pkt* HMBDC_RESTRICT packet, size_t payloadWireSize) {
300  using namespace comm::eth;
301  {
302  auto tmp = packet->ipv4.ip.ip_sum;
303  packet->ipv4.ip.ip_sum = 0;
304  if (tmp != wrapsum(
305  checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0))) {
306  return false;
307  }
308  packet->ipv4.ip.ip_sum = tmp;
309  }
310 
311  {
312  auto tmp = packet->ipv4.udp.check;
313  packet->ipv4.udp.check = 0;
314 #pragma GCC diagnostic push
315 #ifdef __clang__
316 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
317 #endif
318  auto udp = &packet->ipv4.udp;
319  if (tmp != wrapsum(
320  checksum(udp, sizeof(*udp), /* udp header */
321  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
322  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
323  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
324  return false;
325  }
326  packet->ipv4.udp.check = tmp;
327  return true;
328 #pragma GCC diagnostic pop
329  }
330  }
331  Config config_;
332  std::string hmbdcName_;
333  std::string schedPolicy_;
334  int schedPriority_;
335  uint8_t* data_;
336 
338  OutBuffer& HMBDC_RESTRICT outBuffer_;
339  size_t maxMessageSize_;
340  text::StringTrieSet subscriptions_;
341 
342 
343  struct nm_desc *nmd_;
344  int virtHeader_; //v hdr len
345  bool doChecksum_;
346  struct pollfd pfd_;
347  bool busyWait_;
348  int pollWaitTimeMillisec_;
349  MsgArbitrator arb_;
350 };
351 
352 }}}
353 
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:192
Definition: Messages.hpp:111
Definition: MonoLockFreeBuffer.hpp:15
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:37
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Misc.h:55
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:160
Definition: Messages.hpp:118
impl class,
Definition: RecvTransportEngine.hpp:70
RecvTransportEngine(Config const &config, OutBuffer &outBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:89
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:72
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: RecvTransportEngine.hpp:231
a singleton that holding netmap resources
Definition: NetContext.hpp:38
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:172
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:165
Definition: Base.hpp:12
Definition: MessageHandler.hpp:36
Definition: Traits.hpp:8
Definition: LockFreeBufferMisc.hpp:74