hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/time/Rater.hpp"
14 #include "hmbdc/numeric/BitMath.hpp"
15 
16 #include <boost/regex.hpp>
17 #include <memory>
18 #include <type_traits>
19 
20 
21 #include <netinet/ether.h> /* ether_aton */
22 #include <linux/if_packet.h> /* sockaddr_ll */
23 #include <sys/sysctl.h> /* sysctl */
24 #include <ifaddrs.h> /* getifaddrs */
25 
26 #include <poll.h>
27 
28 namespace hmbdc { namespace app { namespace netmap {
29 
30 struct NetContext;
31 struct Sender;
32 
33 namespace sendtransportengine_detail {
34 using namespace std;
35 using namespace hmbdc::time;
36 using namespace hmbdc::comm::eth;
37 
38 /**
39  * @brief power a netmap port sending functions
40  * @details this needs to be created using NetContext and start in an app::Context
41  *
42  */
44 : Client<SendTransportEngine> {
45  using ptr = std::shared_ptr<SendTransportEngine>;
46 
47 private:
48  uint16_t
49  outBufferSizePower2() {
50  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
51  if (res) {
52  return res;
53  }
54  res =hmbdc::numeric::log2Upper(16ul * 1024ul / (8ul + maxMessageSize_));
55  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
56  return res;
57  }
58 
59  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
60  SendTransportEngine(Config const& cfg, size_t maxMessageSize)
61  : config_(cfg)
62  , topic_(cfg.getExt<string>("topicRegex"))
63  , topicRegex_(topic_)
64  , maxMessageSize_(maxMessageSize)
65  , nmd_(nullptr)
66  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic) + sizeof(MessageHead)
67  , outBufferSizePower2())
68  , virtHeader_(0)
69  , srcEthAddr_{{0}}
70  , dstEthAddr_{{0}}
71  , doChecksum_(config_.getExt<bool>("doChecksum"))
72  , rater_(Duration::seconds(1u)
73  , config_.getExt<size_t>("sendBytesPerSec")
74  , config_.getExt<size_t>("sendBytesBurst")
75  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
76  )
77  , maxSendBatch_(cfg.getExt<size_t>("maxSendBatch"))
78  , mtu_(config_.getExt<size_t>("mtu")) {
79  mtu_ -= (8u + 20u); // 8bytes udp header and 20bytes ip header
80  cfg (hmbdcName_, "hmbdcName")
81  (schedPolicy_, "schedPolicy")
82  (schedPriority_, "schedPriority")
83  ;
84 
85  memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>("srcEthAddr").c_str())
86  , sizeof(srcEthAddr_));
87  memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>("dstEthAddr").c_str())
88  , sizeof(dstEthAddr_));
89  getMacAddresses();
90 
91  struct nmreq baseNmd;
92  bzero(&baseNmd, sizeof(baseNmd));
93  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
94 
95  auto nmport = cfg.getExt<std::string>("netmapPort");
96  nmd_ = nm_open(nmport.c_str(), &baseNmd
97  , cfg.getExt<uint64_t>("nmOpenFlags"), NULL);
98  if (!nmd_) {
99  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
100  }
101 
102  struct nmreq req;
103  memset(&req, 0, sizeof(req));
104  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
105  req.nr_version = NETMAP_API;
106  req.nr_cmd = NETMAP_VNET_HDR_GET;
107  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
108  if (err) {
109  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
110  }
111  virtHeader_ = req.nr_arg1;
112 
113  initializePacket(&precalculatedPacketHead_
114  , config_.getExt<uint16_t>("ttl")
115  , config_.getExt<string>("srcIp")
116  , config_.getExt<string>("dstIp")
117  , srcEthAddr_
118  , dstEthAddr_
119  , config_.getExt<uint16_t>("srcPort")
120  , config_.getExt<uint16_t>("dstPort")
121  );
122  sleep(config_.getExt<int>("nmResetWaitSec"));
123  //cleanup rings
124  if (unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
125  HMBDC_THROW(std::runtime_error, "IO error");
126  }
127  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
128  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
129  if (nm_ring_empty(txring))
130  continue;
131  txring->head = txring->cur = txring->tail;
132  }
133  }
134 
135 public:
136 
138  nm_close(nmd_);
139  }
140 
141  /*virtual*/ bool droppedCb() override {
142  buffer_.reset();
143  return true;
144  }
145 
146  /*virtual*/
147  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
148  if (unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
149  HMBDC_THROW(std::runtime_error, "IO error");
150  }
151 
152  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
153  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
154  if (nm_ring_empty(txring))
155  continue;
156 
157  sendPackets(txring);
158  }
159  }
160 
161  void stoppedCb(std::exception const& e) override {
162  HMBDC_LOG_C(e.what());
163  };
164 
165  char const* hmbdcName() const {
166  return this->hmbdcName_.c_str();
167  }
168 
169  std::tuple<char const*, int> schedSpec() const {
170  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
171  }
172 
173 private:
174  bool match(Topic const& t) const {
175  return boost::regex_match(t.c_str(), topicRegex_);
176  }
177  friend struct hmbdc::app::netmap::Sender;
178  template <typename... Messages>
179  void queue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
180  auto n = sizeof...(msgs);
181  auto it = buffer_.claim(n);
182  queue(it, t, std::forward<Messages>(msgs)...);
183  buffer_.commit(it, n);
184  }
185 
186  template <typename... Messages>
187  bool tryQueue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
188  auto n = sizeof...(msgs);
189  auto it = buffer_.tryClaim(n);
190  if (it) {
191  queue(it, t, std::forward<Messages>(msgs)...);
192  buffer_.commit(it, n);
193  return true;
194  }
195  return false;
196  }
197 
198  template <typename M, typename... Messages>
200  , Topic const& t, M&& msg, Messages&&... msgs) {
201  using Message = typename std::remove_reference<M>::type;
202  if (unlikely(sizeof(Message) > maxMessageSize_)) {
203  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
204  }
205  auto s = *it;
206  TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
207  queue(++it, t, std::forward<M>(msgs)...);
208  }
209 
210  template <typename Message, typename ... Args>
211  void queueInPlace(Topic const& t, Args&&... args) HMBDC_RESTRICT {
212  if (unlikely(sizeof(Message) > maxMessageSize_)) {
213  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
214  }
215  auto s = buffer_.claim();
216  TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
217  buffer_.commit(s);
218  }
219 
220  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
221  if (unlikely(len > maxMessageSize_)) {
222  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
223  }
224  auto s = buffer_.claim();
225  TransportMessageHeader::copyTo(*s, t, tag, bytes, len);
226  buffer_.commit(s);
227  }
228 
230  , Topic const& t) {}
231 
232  void sendPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
233  uint32_t cur = ring->cur;
234  if (unlikely(cur == ring->tail)) return;
236  if (unlikely(!(buffer_.peek(begin, end, maxSendBatch_)))) {
237  return;
238  }
239  bool slotInited = false;
240  auto it = begin;
241  auto batch = maxSendBatch_;
242  struct netmap_slot *slot = &ring->slot[cur];
243  uint32_t slotLen = 0;
244  char *p = NETMAP_BUF(ring, slot->buf_idx);
245  pkt* currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
246  uint16_t slotLenMax = min(mtu_, (uint16_t)ring->nr_buf_size);
247  for (; it != end;) {
248  auto th = reinterpret_cast<TransportMessageHeader*>(*it);
249  if (rater_.check(th->wireSize())) {
250  if (!slotInited) {
251  auto wireSize = (uint16_t)(
252  sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_
253  );
254  memcpy(p, ((char*)&precalculatedPacketHead_) + sizeof(virt_header) - virtHeader_
255  , wireSize);
256  slotLen = wireSize;
257  slotInited = true;
258  }
259  auto wireSize = th->wireSize();
260  if (slotLen + wireSize <= slotLenMax) {
261  memcpy(p + slotLen, th, (int)wireSize);
262  slotLen += wireSize;
263  rater_.commit();
264  batch--;
265  ++it;
266  } else {
267  batch = 0; //this batch is done
268  }
269  if (!batch) {
270  slot->len = slotLen;
271  size_t wireSizeExcludingHead = slotLen
272  - (sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_);
273  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
274  cur = nm_ring_next(ring, cur);
275  slotLen = 0;
276  if (cur == ring->tail) break;
277  slot = &ring->slot[cur];
278  p = NETMAP_BUF(ring, slot->buf_idx);
279  currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
280  batch = maxSendBatch_;
281  slotInited = false;
282  }
283  } else {
284  break;
285  }
286  }
287  if (slotLen) {
288  slot->len = slotLen;
289  size_t wireSizeExcludingHead = slotLen
290  - (sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_);
291  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
292  cur = nm_ring_next(ring, cur);
293  }
294 
295  ring->head = ring->cur = cur;
296  buffer_.wasteAfterPeek(begin, it - begin, true);
297  }
298 
299  void getMacAddresses() {
300  auto nmport = config_.getExt<std::string>("netmapPort");
301 
302  if (strncmp(nmport.c_str(), "vale", 4) == 0) return;
303 
304  if (nmport.find_first_of(":") == std::string::npos) {
305  HMBDC_THROW(std::runtime_error
306  , "wrong netmapPort format (examples: netmap:eth0, netmap:eth0-0)");
307  }
308  auto iface = nmport.substr(nmport.find_first_of(":"));
309  iface = iface.substr(1, iface.find_first_of("-^") - 1);
310 
311 
312  struct ifaddrs *ifaphead, *ifap;
313  int l = sizeof(ifap->ifa_name);
314 
315  if (getifaddrs(&ifaphead) != 0) {
316  HMBDC_THROW(std::runtime_error, "getifaddrs failed for" << iface);
317  }
318  for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
319  struct sockaddr_ll *sll =
320  (struct sockaddr_ll *)ifap->ifa_addr;
321  uint8_t *mac;
322 
323  if (!sll || sll->sll_family != AF_PACKET)
324  continue;
325  if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
326  continue;
327  mac = (uint8_t *)(sll->sll_addr);
328 
329  char srcEthAddrStr[20];
330  sprintf(srcEthAddrStr, "%02x:%02x:%02x:%02x:%02x:%02x",
331  mac[0], mac[1], mac[2],
332  mac[3], mac[4], mac[5]);
333  memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr), sizeof(srcEthAddr_)); //6 bytes
334  break;
335  }
336  freeifaddrs(ifaphead);
337  if (!ifap) {
338  HMBDC_THROW(std::runtime_error, "no local interface named " << iface);
339  }
340  }
341 
342  static
343  void initializePacket(struct pkt *pkt, int ttl, std::string srcIpStr, std::string dstIpStr
344  , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
345  struct ether_header *eh;
346  struct ip *ip;
347  struct udphdr *udp;
348  uint32_t a, b, c, d;
349  sscanf(srcIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
350  auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
351  sscanf(dstIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
352  auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
353 
354  /* prepare the headers */
355  eh = &pkt->eh;
356  bcopy(&srcEthAddr, eh->ether_shost, 6);
357  bcopy(&dstEthAddr, eh->ether_dhost, 6);
358 
359  eh->ether_type = htons(ETHERTYPE_IP);
360 
361 #pragma GCC diagnostic push
362 #ifdef __clang__
363 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
364 #endif
365  ip = &pkt->ipv4.ip;
366  udp = &pkt->ipv4.udp;
367  ip->ip_v = IPVERSION;
368  ip->ip_hl = sizeof(*ip) >> 2;
369  ip->ip_id = 0;
370  ip->ip_tos = IPTOS_LOWDELAY;
371  ip->ip_len = 0; //zero so chksum can happen in ip_sum
372  ip->ip_id = 0;
373  ip->ip_off = htons(IP_DF); /* Don't fragment */
374  ip->ip_ttl = ttl;
375  ip->ip_p = IPPROTO_UDP;
376  ip->ip_dst.s_addr = htonl(dstIp);
377  ip->ip_src.s_addr = htonl(srcIp);
378  ip->ip_sum = 0;
379  ip->ip_len = sizeof(*ip) + sizeof(udphdr); //ip->ip_len is unknown, put known part
380  udp->source = htons(srcPort);
381  udp->dest = htons(dstPort);
382  udp->len = sizeof(udphdr); //put known part
383  udp->check = 0;
384 
385  bzero(&pkt->vh, sizeof(pkt->vh));
386  }
387 
388  static
389  void updatePacket(struct pkt *packet, size_t payloadWireSize, bool doChecksum = true) {
390  packet->ipv4.ip.ip_len += payloadWireSize; //already has sizeof(ip) + sizeof(udphdr);
391  packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
392  if (doChecksum) {
393  packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0));
394  }
395 
396  packet->ipv4.udp.len += payloadWireSize;
397  packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
398  if (doChecksum) {
399  auto udp = &packet->ipv4.udp;
400  packet->ipv4.udp.check = wrapsum(
401  checksum(udp, sizeof(*udp), /* udp header */
402  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
403  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
404  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
405  }
406  }
407 
408 #pragma GCC diagnostic pop
409 
410  Config const config_;
411  string hmbdcName_;
412  string schedPolicy_;
413  int schedPriority_;
414  std::string topic_;
415  boost::regex topicRegex_;
416  size_t maxMessageSize_;
417 
418 
419  struct nm_desc *nmd_;
421  int virtHeader_; //v hdr len
422 
423 
424  ether_addr srcEthAddr_;
425  ether_addr dstEthAddr_;
426  pkt precalculatedPacketHead_;
427  bool doChecksum_;
428  Rater rater_;
429  size_t maxSendBatch_;
430  uint16_t mtu_;
431 };
432 
433 } //sendtransportengine_detail
434 
436 
437 }}}
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:9
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
fascade class for sending network messages
Definition: Sender.hpp:10
Definition: Misc.h:55
Definition: Misc.h:51
Definition: Message.hpp:34
power a netmap port sending functions
Definition: SendTransportEngine.hpp:43
Definition: Rater.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:37
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:73