hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/comm/Topic.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/Compile.hpp"
14 #include "hmbdc/text/StringTrieSet.hpp"
15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
16 #include "hmbdc//Traits.hpp"
17 
18 
19 #include <boost/lexical_cast.hpp>
20 #include <memory>
21 #include <type_traits>
22 
23 
24 #include <netinet/ether.h> /* ether_aton */
25 #include <linux/if_packet.h> /* sockaddr_ll */
26 #include <sys/sysctl.h> /* sysctl */
27 #include <ifaddrs.h> /* getifaddrs */
28 #include <poll.h>
29 
30 namespace hmbdc { namespace app { namespace netmap {
31 
32 struct NetContext;
33 /**
34  * @brief power a netmap port receiving functions
35  * @details this needs to be created using NetContext and start in an app::Context
36  */
37 struct RecvTransport {
38  using ptr = std::shared_ptr<RecvTransport>;
39  virtual ~RecvTransport(){}
40  /**
41  * @brief a take all arbitrator (no arbitration at all)
42  * @details it provides the default type for arbitration which does nothing
43  * it also provides a template on writing a user defined arbitrator
44  *
45  * @param h handle to retrieve what is inside of the message
46  * @return always returns 1 here
47  * In general: 1 keep the message; -1 to drop;
48  * 0 cannot decide, ask me later.
49  */
50  struct NoOpArb {
51  int operator()(TransportMessageHeader const* h) {
52  return 1; //always keep it
53  }
54  };
55 private:
56  friend struct hmbdc::app::netmap::NetContext; //only be used by NetContext
57  virtual void listenTo(comm::Topic const& t) = 0;
58  virtual void stopListenTo(comm::Topic const& t) = 0;
59 };
60 
61 /**
62  * @brief impl class,
63  * @details this needs to be created using NetContext and start in an app::Context
64  *
65  * @tparam OutputBuffer type of buffer to hold resulting network messages
66  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
67  * between different recv transports. By default, keeping all
68  */
69 template <typename OutBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
72 , Client<RecvTransportEngine<OutBuffer, MsgArbitrator>>
73 , MessageHandler<RecvTransportEngine<OutBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
74 private:
76  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
77 
78 /**
79  * @brief ctor
80  *
81  * @param cfg specify the details of the mcast transport
82  * @param outputBuffer holding the results
83  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
84  * raw netmap (udp) packet (BEFORE topic filtering) or hmbdc message (AFTER topic filtering) level
85  * arbitration depending on which one of
86  * int operator()(void* bytes, size_t len) or
87  * int operator()(TransportMessageHeader const* header) presents in the arb
88  */
89  RecvTransportEngine(Config const& config, OutBuffer& outBuffer
90  , MsgArbitrator arb = NoOpArb())
91  : config_(config)
92  , buffer_(std::max(sizeof(Subscribe), sizeof(Unsubscribe))
93  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
94  , outBuffer_(outBuffer)
95  , maxMessageSize_(outBuffer.maxItemSize())
96  , doChecksum_(config_.getExt<bool>("doChecksum"))
97  , busyWait_(true)
98  , pollWaitTimeMillisec_(0)
99  , arb_(arb)
100  , data_(nullptr) {
101  config (hmbdcName_, "hmbdcName")
102  (schedPolicy_, "schedPolicy")
103  (schedPriority_, "schedPriority")
104  ;
105 
106  struct nmreq baseNmd;
107  bzero(&baseNmd, sizeof(baseNmd));
108  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
109  auto nmport = config_.getExt<std::string>("netmapPort");
110  busyWait_ = config_.getExt<bool>("busyWait");
111  if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<int>("pollWaitTimeMillisec");
112  nmd_ = nm_open(nmport.c_str(), &baseNmd, config_.getExt<int>("nmOpenFlags"), NULL);
113  if (!nmd_) {
114  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
115  }
116 
117  struct nmreq req;
118  memset(&req, 0, sizeof(req));
119  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
120  req.nr_version = NETMAP_API;
121  req.nr_cmd = NETMAP_VNET_HDR_GET;
122  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
123  if (err) {
124  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
125  }
126  virtHeader_ = req.nr_arg1;
127 
128  //setting up poll - might be useful or not
129  pfd_.fd = nmd_->fd;
130  pfd_.events = POLLIN;
131  sleep(config_.getExt<int>("nmResetWaitSec"));
132  //cleanup rings
133  if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
134  HMBDC_THROW(std::runtime_error, "IO error");
135  }
136  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
137  struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
138  if (nm_ring_empty(rxring))
139  continue;
140  rxring->head = rxring->cur = rxring->tail;
141  }
142  }
143 
144  void listenTo(Topic const& t) override {
145  buffer_.put(MessageWrap<Subscribe>{t});
146  }
147 
148  void stopListenTo(Topic const& t) override {
149  buffer_.put(MessageWrap<Unsubscribe>{t});
150  }
151 
152 public:
153 
155  if (nmd_) nm_close(nmd_);
156  }
157 
158  /*virtual*/
159  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
161  auto n = buffer_.peek(begin, end);
162  auto it = begin;
163  while (it != end) {
164  MH::handleMessage(*static_cast<MessageHead*>(*it++));
165  }
166  buffer_.wasteAfterPeek(begin, n);
167 
168  syncNetmap();
169  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
170  struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
171  if (nm_ring_empty(rxring))
172  continue;
173 
174  recvPackets(rxring);
175  }
176  }
177 
178  void handleMessageCb(Subscribe const& t) {
179  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
180  }
181 
182  void handleMessageCb(Unsubscribe const& t) {
183  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
184  }
185 
186  void stoppedCb(std::exception const& e) override {
187  HMBDC_LOG_C(e.what());
188  };
189 
190  char const* hmbdcName() const {
191  return this->hmbdcName_.c_str();
192  }
193 
194  std::tuple<char const*, int> schedSpec() const {
195  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
196  }
197 
198 private:
199  template <bool is_raw_arb>
200  typename std::enable_if<is_raw_arb, int>::type applyRawArb(size_t len) {
201  return arb_(data_, len);
202  }
203 
204  template <bool is_raw_arb>
205  typename std::enable_if<!is_raw_arb, int>::type applyRawArb(size_t) {
206  return 1;
207  }
208 
209  template <bool is_raw_arb>
210  typename std::enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
211  return arb_(h);
212  }
213 
214  template <bool is_raw_arb>
215  typename std::enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
216  return 1;
217  }
218 
219  /**
220  * @brief sync using busy wait or poll depending on config
221  * @details it turns out busy wait performance is very poor when using vale
222  * poll works mostly, but it works well only when an enough timeout is given
223  * less than 10 milli wont work well
224  */
225  void syncNetmap() HMBDC_RESTRICT {
226  if (likely(busyWait_)) {
227  if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
228  HMBDC_THROW(std::runtime_error, "IO error");
229  } else {
230  return;
231  }
232  } else {
233  auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
234  if (unlikely( res < 0)) {
235  HMBDC_THROW(std::runtime_error, "IO error errno=" << errno);
236  } else {
237  return;
238  }
239  }
240  }
241 
242  void recvPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
243  using namespace comm::eth;
244  auto cur = ring->cur;
245 
246  while(cur != ring->tail) {
247  struct netmap_slot *slot = &ring->slot[cur];
248  auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
249  auto* p = reinterpret_cast<pkt*>(buf + virtHeader_ - sizeof(virt_header));
250 
251  if (p->ipv4.ip.ip_p == IPPROTO_UDP
252  && p->ipv4.ip.ip_off == htons(IP_DF)) {
253  if (!data_) data_ = (uint8_t*)p->ipv4.body;
254 
255  using traits =
256  pattern::function_traits<typename std::remove_reference<MsgArbitrator>::type>;
257  using arg0 = typename traits::template arg<0>::type;
258  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
259  auto a = applyRawArb<is_raw_arb>(slot->len);
260  if (unlikely(a == 0)) {
261  //keep data_ unchanged
262  ring->head = ring->cur = cur;
263  return;
264  } else if (likely(a > 0)) {
265  while (data_ + sizeof(TransportMessageHeader) < buf + slot->len) {
266  auto header = reinterpret_cast<TransportMessageHeader*>(data_);
267  if (data_ + header->wireSize() <= buf + slot->len) {
268  if (subscriptions_.check(header->topic())) {
269  auto a = applyArb<is_raw_arb>(header);
270  if (likely(a > 0)) {
271  auto l = std::min((size_t)header->messagePayloadLen(), maxMessageSize_);
272  auto it = outBuffer_.claim();
273  char* b = static_cast<char*>(*it);
274  memcpy(b, header->payload(), l);
275  outBuffer_.commit(it);
276  } else if (unlikely(a == 0)) {
277  ring->head = ring->cur = cur;
278  return;
279  } //else drop and move to next msg
280  }
281  data_ += header->wireSize();
282  } else {
283  break;
284  }
285  }
286  } //else drop the packet
287  }
288  data_ = nullptr;
289  cur = nm_ring_next(ring, cur);
290  }
291 
292  ring->head = ring->cur = cur;
293  }
294 
295  static
296  bool verifyChecksum(comm::eth::pkt* HMBDC_RESTRICT packet, size_t payloadWireSize) {
297  using namespace comm::eth;
298  {
299  auto tmp = packet->ipv4.ip.ip_sum;
300  packet->ipv4.ip.ip_sum = 0;
301  if (tmp != wrapsum(
302  checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0))) {
303  return false;
304  }
305  packet->ipv4.ip.ip_sum = tmp;
306  }
307 
308  {
309  auto tmp = packet->ipv4.udp.check;
310  packet->ipv4.udp.check = 0;
311 #pragma GCC diagnostic push
312 #ifdef __clang__
313 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
314 #endif
315  auto udp = &packet->ipv4.udp;
316  if (tmp != wrapsum(
317  checksum(udp, sizeof(*udp), /* udp header */
318  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
319  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
320  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
321  return false;
322  }
323  packet->ipv4.udp.check = tmp;
324  return true;
325 #pragma GCC diagnostic pop
326  }
327  }
328  Config config_;
329  std::string hmbdcName_;
330  std::string schedPolicy_;
331  int schedPriority_;
332 
334  OutBuffer& HMBDC_RESTRICT outBuffer_;
335  size_t maxMessageSize_;
336  text::StringTrieSet subscriptions_;
337 
338 
339  struct nm_desc *nmd_;
340  int virtHeader_; //v hdr len
341  bool doChecksum_;
342  struct pollfd pfd_;
343  bool busyWait_;
344  int pollWaitTimeMillisec_;
345  MsgArbitrator arb_;
346  uint8_t* data_;
347 };
348 
349 }}}
350 
Definition: Messages.hpp:111
Definition: MonoLockFreeBuffer.hpp:14
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:37
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:55
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:158
Definition: Messages.hpp:118
impl class,
Definition: RecvTransportEngine.hpp:70
Definition: StringTrieSetDetail.hpp:115
Definition: Traits.hpp:8
Definition: Message.hpp:55
a singleton that holding netmap resources
Definition: NetContext.hpp:37
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:170
Definition: Base.hpp:12
Definition: MessageHandler.hpp:36
Definition: LockFreeBufferMisc.hpp:73