hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Client.hpp"
11 #include "hmbdc/app/LoggerT.hpp" //log errors
12 #include "hmbdc/app/Config.hpp"
13 #include "hmbdc/comm/Topic.hpp"
14 #include "hmbdc/comm/eth/Misc.h"
15 #include "hmbdc/Compile.hpp"
16 #include "hmbdc/text/StringTrieSet.hpp"
17 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
18 #include "hmbdc//Traits.hpp"
19 
20 
21 #include <boost/lexical_cast.hpp>
22 #include <memory>
23 #include <type_traits>
24 
25 
26 #include <netinet/ether.h> /* ether_aton */
27 #include <linux/if_packet.h> /* sockaddr_ll */
28 #include <sys/sysctl.h> /* sysctl */
29 #include <ifaddrs.h> /* getifaddrs */
30 #include <poll.h>
31 
32 namespace hmbdc { namespace app { namespace netmap {
33 
34 using namespace hmbdc;
35 using namespace hmbdc::app;
36 using namespace hmbdc::comm;
37 using namespace hmbdc::comm::eth;
38 using namespace hmbdc::text;
39 /**
40  * @brief power a netmap port receiving functions
41  * @details this needs to be created using NetContext and start in an app::Context
42  */
43 struct RecvTransport {
44  using ptr = std::shared_ptr<RecvTransport>;
45  virtual ~RecvTransport(){}
46  /**
47  * @brief a take all arbitrator (no arbitration at all)
48  * @details it provides the default type for arbitration which does nothing
49  * it also provides a template on writing a user defined arbitrator
50  *
51  * @param h handle to retrieve what is inside of the message
52  * @return always returns 1 here
53  * In general: 1 keep the message; -1 to drop;
54  * 0 cannot decide, ask me later.
55  */
56  struct NoOpArb {
57  int operator()(TransportMessageHeader const* h) {
58  return 1; //always keep it
59  }
60  };
61 private:
62  friend class NetContext; //only be used by NetContext
63  virtual void listenTo(Topic const& t) = 0;
64  virtual void stopListenTo(Topic const& t) = 0;
65 };
66 
67 /**
68  * @brief impl class,
69  * @details this needs to be created using NetContext and start in an app::Context
70  *
71  * @tparam OutputBuffer type of buffer to hold resulting network messages
72  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
73  * between different recv transports. By default, keeping all
74  */
75 template <typename OutBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
78 , Client<RecvTransportEngineImpl<OutBuffer, MsgArbitrator>>
79 , MessageHandler<RecvTransportEngineImpl<OutBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
80 private:
82  friend class NetContext; //only be created by NetContext
83 
84 /**
85  * @brief ctor
86  *
87  * @param cfg specify the details of the mcast transport
88  * @param outputBuffer holding the results
89  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
90  * raw netmap (udp) packet (BEFORE topic filtering) or hmbdc message (AFTER topic filtering) level
91  * arbitration depending on which one of
92  * int operator()(void* bytes, size_t len) or
93  * int operator()(TransportMessageHeader const* header) presents in the arb
94  */
95  RecvTransportEngineImpl(Config const& config, OutBuffer& outBuffer
96  , MsgArbitrator arb = NoOpArb())
97  : config_(config)
98  , buffer_(std::max(sizeof(Subscribe), sizeof(Unsubscribe))
99  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
100  , outBuffer_(outBuffer)
101  , maxMessageSize_(outBuffer.maxItemSize())
102  , doChecksum_(config_.getExt<bool>("doChecksum"))
103  , busyWait_(true)
104  , pollWaitTimeMillisec_(0)
105  , arb_(arb)
106  , data_(nullptr) {
107  config (hmbdcName_, "hmbdcName")
108  (schedPolicy_, "schedPolicy")
109  (schedPriority_, "schedPriority")
110  ;
111 
112  struct nmreq baseNmd;
113  bzero(&baseNmd, sizeof(baseNmd));
114  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
115  auto nmport = config_.getExt<std::string>("netmapPort");
116  busyWait_ = config_.getExt<bool>("busyWait");
117  if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<int>("pollWaitTimeMillisec");
118  nmd_ = nm_open(nmport.c_str(), &baseNmd, config_.getExt<int>("nmOpenFlags"), NULL);
119  if (!nmd_) {
120  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
121  }
122 
123  struct nmreq req;
124  memset(&req, 0, sizeof(req));
125  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
126  req.nr_version = NETMAP_API;
127  req.nr_cmd = NETMAP_VNET_HDR_GET;
128  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
129  if (err) {
130  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
131  }
132  virtHeader_ = req.nr_arg1;
133 
134  //setting up poll - might be useful or not
135  pfd_.fd = nmd_->fd;
136  pfd_.events = POLLIN;
137  sleep(config_.getExt<int>("nmResetWaitSec"));
138  //cleanup rings
139  if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
140  HMBDC_THROW(std::runtime_error, "IO error");
141  }
142  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
143  struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
144  if (nm_ring_empty(rxring))
145  continue;
146  rxring->head = rxring->cur = rxring->tail;
147  }
148  }
149 
150  void listenTo(Topic const& t) override {
151  buffer_.put(MessageWrap<Subscribe>{t});
152  }
153 
154  void stopListenTo(Topic const& t) override {
155  buffer_.put(MessageWrap<Unsubscribe>{t});
156  }
157 
158 public:
159 
161  if (nmd_) nm_close(nmd_);
162  }
163 
164  /*virtual*/
165  void invokedCb(uint16_t threadSerialNumber) __restrict__ override {
166  MonoLockFreeBuffer::iterator begin, end;
167  auto n = buffer_.peek(begin, end);
168  auto it = begin;
169  while (it != end) {
170  MH::handleMessage(*static_cast<MessageHead*>(*it++));
171  }
172  buffer_.wasteAfterPeek(begin, n);
173 
174  syncNetmap();
175  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
176  struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
177  if (nm_ring_empty(rxring))
178  continue;
179 
180  recvPackets(rxring);
181  }
182  }
183 
184  void handleMessageCb(Subscribe const& t) {
185  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
186  }
187 
188  void handleMessageCb(Unsubscribe const& t) {
189  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
190  }
191 
192  void stoppedCb(std::exception const& e) override {
193  HMBDC_LOG_C(e.what());
194  };
195 
196  char const* hmbdcName() const {
197  return this->hmbdcName_.c_str();
198  }
199 
200  std::tuple<char const*, int> schedSpec() const {
201  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
202  }
203 
204 private:
205  template <bool is_raw_arb>
206  typename enable_if<is_raw_arb, int>::type applyRawArb(size_t len) {
207  return arb_(data_, len);
208  }
209 
210  template <bool is_raw_arb>
211  typename enable_if<!is_raw_arb, int>::type applyRawArb(size_t) {
212  return 1;
213  }
214 
215  template <bool is_raw_arb>
216  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
217  return arb_(h);
218  }
219 
220  template <bool is_raw_arb>
221  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
222  return 1;
223  }
224 
225  /**
226  * @brief sync using busy wait or poll depending on config
227  * @details it turns out busy wait performance is very poor when using vale
228  * poll works mostly, but it works well only when an enough timeout is given
229  * less than 10 milli wont work well
230  */
231  void syncNetmap() __restrict__ {
232  if (likely(busyWait_)) {
233  if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
234  HMBDC_THROW(std::runtime_error, "IO error");
235  } else {
236  return;
237  }
238  } else {
239  auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
240  if (unlikely( res < 0)) {
241  HMBDC_THROW(std::runtime_error, "IO error errno=" << errno);
242  } else {
243  return;
244  }
245  }
246  }
247 
248  void recvPackets(struct netmap_ring * __restrict__ ring) __restrict__ {
249  auto cur = ring->cur;
250 
251  while(cur != ring->tail) {
252  struct netmap_slot *slot = &ring->slot[cur];
253  auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
254  auto* p = reinterpret_cast<pkt*>(buf + virtHeader_ - sizeof(virt_header));
255 
256  if (p->ipv4.ip.ip_p == IPPROTO_UDP
257  && p->ipv4.ip.ip_off == htons(IP_DF)) {
258  if (!data_) data_ = (uint8_t*)p->ipv4.body;
259 
260  using traits =
261  pattern::function_traits<typename std::remove_reference<MsgArbitrator>::type>;
262  using arg0 = typename traits::template arg<0>::type;
263  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
264  auto a = applyRawArb<is_raw_arb>(slot->len);
265  if (unlikely(a == 0)) {
266  //keep data_ unchanged
267  ring->head = ring->cur = cur;
268  return;
269  } else if (likely(a > 0)) {
270  while (data_ + sizeof(TransportMessageHeader) < buf + slot->len) {
271  auto header = reinterpret_cast<TransportMessageHeader*>(data_);
272  if (data_ + header->wireSize() <= buf + slot->len) {
273  if (subscriptions_.check(header->topic())) {
274  auto a = applyArb<is_raw_arb>(header);
275  if (likely(a > 0)) {
276  auto l = std::min((size_t)header->messagePayloadLen(), maxMessageSize_);
277  auto it = outBuffer_.claim();
278  char* b = static_cast<char*>(*it);
279  memcpy(b, header->payload(), l);
280  outBuffer_.commit(it);
281  } else if (unlikely(a == 0)) {
282  ring->head = ring->cur = cur;
283  return;
284  } //else drop and move to next msg
285  }
286  data_ += header->wireSize();
287  } else {
288  break;
289  }
290  }
291  } //else drop the packet
292  }
293  data_ = nullptr;
294  cur = nm_ring_next(ring, cur);
295  }
296 
297  ring->head = ring->cur = cur;
298  }
299 
300  static
301  bool verifyChecksum(pkt* __restrict__ packet, size_t payloadWireSize) {
302  {
303  auto tmp = packet->ipv4.ip.ip_sum;
304  packet->ipv4.ip.ip_sum = 0;
305  if (tmp != wrapsum(
306  checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0))) {
307  return false;
308  }
309  packet->ipv4.ip.ip_sum = tmp;
310  }
311 
312  {
313  auto tmp = packet->ipv4.udp.check;
314  packet->ipv4.udp.check = 0;
315 
316  auto udp = &packet->ipv4.udp;
317  if (tmp != wrapsum(
318  checksum(udp, sizeof(*udp), /* udp header */
319  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
320  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
321  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
322  return false;
323  }
324  packet->ipv4.udp.check = tmp;
325  return true;
326  }
327  }
328  Config config_;
329  string hmbdcName_;
330  string schedPolicy_;
331  int schedPriority_;
332 
334  OutBuffer& __restrict__ outBuffer_;
335  size_t maxMessageSize_;
336  StringTrieSet subscriptions_;
337 
338 
339  struct nm_desc *nmd_;
340  int virtHeader_; //v hdr len
341  bool doChecksum_;
342  struct pollfd pfd_;
343  bool busyWait_;
344  int pollWaitTimeMillisec_;
345  MsgArbitrator arb_;
346  uint8_t* data_;
347 };
348 
349 }}}
350 
Definition: Messages.hpp:119
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: Client.hpp:11
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:9
Definition: StringTrieSet.hpp:113
Definition: Misc.h:55
Definition: Misc.h:51
Definition: Messages.hpp:126
Definition: Traits.hpp:8
Definition: Misc.h:9
Definition: Message.hpp:46
Definition: NetContext.hpp:26
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:56
Definition: Client.hpp:39
impl class,
Definition: RecvTransportEngine.hpp:76
Definition: Client.hpp:11
Definition: MessageHandler.hpp:38
Definition: LfbStream.hpp:11
Definition: LockFreeBufferMisc.hpp:73