hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 
8 #include "hmbdc/app/netmap/Messages.hpp"
9 #include "hmbdc/app/Client.hpp"
10 #include "hmbdc/app/LoggerT.hpp" //log errors
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 #include "hmbdc/comm/Topic.hpp"
13 #include "hmbdc/comm/eth/Misc.h"
14 #include "hmbdc/time/Rater.hpp"
15 #include "hmbdc/numeric/BitMath.hpp"
16 
17 #include "hmbdc/app/Config.hpp"
18 #include "hmbdc/Compile.hpp"
19 
20 
21 #include <boost/regex.hpp>
22 #include <memory>
23 #include <type_traits>
24 
25 
26 #include <netinet/ether.h> /* ether_aton */
27 #include <linux/if_packet.h> /* sockaddr_ll */
28 #include <sys/sysctl.h> /* sysctl */
29 #include <ifaddrs.h> /* getifaddrs */
30 
31 #include <poll.h>
32 
33 namespace hmbdc { namespace app { namespace netmap {
34 
35 using namespace hmbdc;
36 using namespace hmbdc::time;
37 using namespace hmbdc::comm;
38 using namespace hmbdc::comm::eth;
39 
40 /**
41  * @brief power a netmap port sending functions
42  * @details this needs to be created using NetContext and start in an app::Context
43  *
44  */
46 : Client<SendTransportEngine> {
47  using ptr = std::shared_ptr<SendTransportEngine>;
48 
49 private:
50  uint16_t
51  outBufferSizePower2() {
52  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
53  if (res) {
54  return res;
55  }
56  res =hmbdc::numeric::log2Upper(16ul * 1024ul / (8ul + maxMessageSize_));
57  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
58  return res;
59  }
60 
61  friend class NetContext; //only be created by NetContext
62  SendTransportEngine(Config const& cfg, size_t maxMessageSize)
63  : config_(cfg)
64  , topic_(cfg.getExt<string>("topicRegex"))
65  , topicRegex_(topic_)
66  , maxMessageSize_(maxMessageSize)
67  , nmd_(nullptr)
68  , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(Topic)
69  , outBufferSizePower2())
70  , virtHeader_(0)
71  , srcEthAddr_{0}
72  , dstEthAddr_{0}
73  , doChecksum_(config_.getExt<bool>("doChecksum"))
74  , rater_(Duration::seconds(1u)
75  , config_.getExt<size_t>("sendBytesPerSec")
76  , config_.getExt<size_t>("sendBytesBurst")
77  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
78  )
79  , maxSendBatch_(cfg.getExt<size_t>("maxSendBatch"))
80  , mtu_(config_.getExt<size_t>("mtu")) {
81  mtu_ -= (8u + 20u); // 8bytes udp header and 20bytes ip header
82  cfg (hmbdcName_, "hmbdcName")
83  (schedPolicy_, "schedPolicy")
84  (schedPriority_, "schedPriority")
85  ;
86 
87  memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>("srcEthAddr").c_str())
88  , sizeof(srcEthAddr_));
89  memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>("dstEthAddr").c_str())
90  , sizeof(dstEthAddr_));
91  getMacAddresses();
92 
93  struct nmreq baseNmd;
94  bzero(&baseNmd, sizeof(baseNmd));
95  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
96 
97  auto nmport = cfg.getExt<std::string>("netmapPort");
98  nmd_ = nm_open(nmport.c_str(), &baseNmd
99  , cfg.getExt<uint64_t>("nmOpenFlags"), NULL);
100  if (!nmd_) {
101  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
102  }
103 
104  struct nmreq req;
105  memset(&req, 0, sizeof(req));
106  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
107  req.nr_version = NETMAP_API;
108  req.nr_cmd = NETMAP_VNET_HDR_GET;
109  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
110  if (err) {
111  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
112  }
113  virtHeader_ = req.nr_arg1;
114 
115  initializePacket(&precalculatedPacketHead_
116  , config_.getExt<uint16_t>("ttl")
117  , config_.getExt<string>("srcIp")
118  , config_.getExt<string>("dstIp")
119  , srcEthAddr_
120  , dstEthAddr_
121  , config_.getExt<uint16_t>("srcPort")
122  , config_.getExt<uint16_t>("dstPort")
123  );
124  sleep(config_.getExt<int>("nmResetWaitSec"));
125  //cleanup rings
126  if (unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
127  HMBDC_THROW(std::runtime_error, "IO error");
128  }
129  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
130  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
131  if (nm_ring_empty(txring))
132  continue;
133  txring->head = txring->cur = txring->tail;
134  }
135  }
136 
137 public:
138 
140  nm_close(nmd_);
141  }
142 
143  /*virtual*/ bool droppedCb() override {
144  buffer_.reset();
145  return true;
146  }
147 
148  /*virtual*/
149  void invokedCb(uint16_t threadSerialNumber) __restrict__ override {
150  if (unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
151  HMBDC_THROW(std::runtime_error, "IO error");
152  }
153 
154  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
155  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
156  if (nm_ring_empty(txring))
157  continue;
158 
159  sendPackets(txring);
160  }
161  }
162 
163  void stoppedCb(std::exception const& e) override {
164  HMBDC_LOG_C(e.what());
165  };
166 
167  char const* hmbdcName() const {
168  return this->hmbdcName_.c_str();
169  }
170 
171  std::tuple<char const*, int> schedSpec() const {
172  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
173  }
174 
175 private:
176  bool match(Topic const& t) const {
177  return boost::regex_match(t.c_str(), topicRegex_);
178  }
179  friend class Sender;
180  template <typename... Messages>
181  void queue(Topic const& t, Messages&&... msgs) __restrict__ {
182  auto n = sizeof...(msgs);
183  auto it = buffer_.claim(n);
184  queue(it, t, std::forward<Messages>(msgs)...);
185  buffer_.commit(it, n);
186  }
187 
188  template <typename... Messages>
189  bool tryQueue(Topic const& t, Messages&&... msgs) __restrict__ {
190  auto n = sizeof...(msgs);
191  auto it = buffer_.tryClaim(n);
192  if (it) {
193  queue(it, t, std::forward<Messages>(msgs)...);
194  buffer_.commit(it, n);
195  return true;
196  }
197  return false;
198  }
199 
200  template <typename M, typename... Messages>
202  , Topic const& t, M&& msg, Messages&&... msgs) {
203  using Message = typename std::remove_reference<M>::type;
204  if (unlikely(sizeof(Message) > maxMessageSize_)) {
205  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
206  }
207  auto s = *it;
208  TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
209  queue(++it, t, std::forward<M>(msgs)...);
210  }
211 
212  template <typename Message, typename ... Args>
213  void queueInPlace(Topic const& t, Args&&... args) __restrict__ {
214  if (unlikely(sizeof(Message) > maxMessageSize_)) {
215  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
216  }
217  auto s = buffer_.claim();
218  TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
219  buffer_.commit(s);
220  }
221 
222  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len) {
223  if (unlikely(len > maxMessageSize_)) {
224  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
225  }
226  auto s = buffer_.claim();
227  TransportMessageHeader::copyTo(*s, t, tag, bytes, len);
228  buffer_.commit(s);
229  }
230 
232  , Topic const& t) {}
233 
234  void sendPackets(struct netmap_ring * __restrict__ ring) __restrict__ {
235  uint32_t cur = ring->cur;
236  if (unlikely(cur == ring->tail)) return;
237  MonoLockFreeBuffer::iterator begin, end;
238  if (unlikely(!(buffer_.peek(begin, end, maxSendBatch_)))) {
239  return;
240  }
241  bool slotInited = false;
242  auto it = begin;
243  auto batch = maxSendBatch_;
244  struct netmap_slot *slot = &ring->slot[cur];
245  uint32_t slotLen = 0;
246  char *p = NETMAP_BUF(ring, slot->buf_idx);
247  pkt* currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
248  uint16_t slotLenMax = min(mtu_, (uint16_t)ring->nr_buf_size);
249  for (; it != end;) {
250  auto th = reinterpret_cast<TransportMessageHeader*>(*it);
251  if (rater_.check(th->wireSize())) {
252  if (!slotInited) {
253  auto wireSize = (uint16_t)(
254  sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_
255  );
256  memcpy(p, ((char*)&precalculatedPacketHead_) + sizeof(virt_header) - virtHeader_
257  , wireSize);
258  slotLen = wireSize;
259  slotInited = true;
260  }
261  auto wireSize = th->wireSize();
262  if (slotLen + wireSize <= slotLenMax) {
263  memcpy(p + slotLen, th, (int)wireSize);
264  slotLen += wireSize;
265  rater_.commit();
266  batch--;
267  ++it;
268  } else {
269  batch = 0; //this batch is done
270  }
271  if (!batch) {
272  slot->len = slotLen;
273  size_t wireSizeExcludingHead = slotLen
274  - (sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_);
275  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
276  cur = nm_ring_next(ring, cur);
277  slotLen = 0;
278  if (cur == ring->tail) break;
279  slot = &ring->slot[cur];
280  p = NETMAP_BUF(ring, slot->buf_idx);
281  currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
282  batch = maxSendBatch_;
283  slotInited = false;
284  }
285  } else {
286  break;
287  }
288  }
289  if (slotLen) {
290  slot->len = slotLen;
291  size_t wireSizeExcludingHead = slotLen
292  - (sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_);
293  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
294  cur = nm_ring_next(ring, cur);
295  }
296 
297  ring->head = ring->cur = cur;
298  buffer_.wasteAfterPeek(begin, it - begin, true);
299  }
300 
301  void getMacAddresses() {
302  auto nmport = config_.getExt<std::string>("netmapPort");
303 
304  if (strncmp(nmport.c_str(), "vale", 4) == 0) return;
305 
306  if (nmport.find_first_of(":") == std::string::npos) {
307  HMBDC_THROW(std::runtime_error
308  , "wrong netmapPort format (examples: netmap:eth0, netmap:eth0-0)");
309  }
310  auto iface = nmport.substr(nmport.find_first_of(":"));
311  iface = iface.substr(1, iface.find_first_of("-^") - 1);
312 
313 
314  struct ifaddrs *ifaphead, *ifap;
315  int l = sizeof(ifap->ifa_name);
316 
317  if (getifaddrs(&ifaphead) != 0) {
318  HMBDC_THROW(std::runtime_error, "getifaddrs failed for" << iface);
319  }
320  for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
321  struct sockaddr_ll *sll =
322  (struct sockaddr_ll *)ifap->ifa_addr;
323  uint8_t *mac;
324 
325  if (!sll || sll->sll_family != AF_PACKET)
326  continue;
327  if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
328  continue;
329  mac = (uint8_t *)(sll->sll_addr);
330 
331  char srcEthAddrStr[20];
332  sprintf(srcEthAddrStr, "%02x:%02x:%02x:%02x:%02x:%02x",
333  mac[0], mac[1], mac[2],
334  mac[3], mac[4], mac[5]);
335  memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr), sizeof(srcEthAddr_)); //6 bytes
336  break;
337  }
338  freeifaddrs(ifaphead);
339  if (!ifap) {
340  HMBDC_THROW(std::runtime_error, "no local interface named " << iface);
341  }
342  }
343 
344  static
345  void initializePacket(struct pkt *pkt, int ttl, std::string srcIpStr, std::string dstIpStr
346  , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
347  struct ether_header *eh;
348  struct ip *ip;
349  struct udphdr *udp;
350  uint32_t a, b, c, d;
351  sscanf(srcIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
352  auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
353  sscanf(dstIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
354  auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
355 
356  /* prepare the headers */
357  eh = &pkt->eh;
358  bcopy(&srcEthAddr, eh->ether_shost, 6);
359  bcopy(&dstEthAddr, eh->ether_dhost, 6);
360 
361  eh->ether_type = htons(ETHERTYPE_IP);
362  ip = &pkt->ipv4.ip;
363  udp = &pkt->ipv4.udp;
364  ip->ip_v = IPVERSION;
365  ip->ip_hl = sizeof(*ip) >> 2;
366  ip->ip_id = 0;
367  ip->ip_tos = IPTOS_LOWDELAY;
368  ip->ip_len = 0; //zero so chksum can happen in ip_sum
369  ip->ip_id = 0;
370  ip->ip_off = htons(IP_DF); /* Don't fragment */
371  ip->ip_ttl = ttl;
372  ip->ip_p = IPPROTO_UDP;
373  ip->ip_dst.s_addr = htonl(dstIp);
374  ip->ip_src.s_addr = htonl(srcIp);
375  ip->ip_sum = 0;
376  ip->ip_len = sizeof(*ip) + sizeof(udphdr); //ip->ip_len is unknown, put known part
377  udp->source = htons(srcPort);
378  udp->dest = htons(dstPort);
379  udp->len = sizeof(udphdr); //put known part
380  udp->check = 0;
381 
382  bzero(&pkt->vh, sizeof(pkt->vh));
383  }
384 
385  static
386  void updatePacket(struct pkt *packet, size_t payloadWireSize, bool doChecksum = true) {
387  packet->ipv4.ip.ip_len += payloadWireSize; //already has sizeof(ip) + sizeof(udphdr);
388  packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
389  if (doChecksum) {
390  packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0));
391  }
392 
393  packet->ipv4.udp.len += payloadWireSize;
394  packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
395  if (doChecksum) {
396  auto udp = &packet->ipv4.udp;
397  packet->ipv4.udp.check = wrapsum(
398  checksum(udp, sizeof(*udp), /* udp header */
399  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
400  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
401  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
402  }
403  }
404 
405 
406  Config const config_;
407  string hmbdcName_;
408  string schedPolicy_;
409  int schedPriority_;
410  std::string topic_;
411  boost::regex topicRegex_;
412  size_t maxMessageSize_;
413 
414 
415  struct nm_desc *nmd_;
417  int virtHeader_; //v hdr len
418 
419 
420  ether_addr srcEthAddr_;
421  ether_addr dstEthAddr_;
422  pkt precalculatedPacketHead_;
423  bool doChecksum_;
424  Rater rater_;
425  size_t maxSendBatch_;
426  uint16_t mtu_;
427 };
428 
429 }}}
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:9
fascade class for sending network messages
Definition: Sender.hpp:10
Definition: Misc.h:55
Definition: Misc.h:51
power a netmap port sending functions
Definition: SendTransportEngine.hpp:45
Definition: Misc.h:9
Definition: Rater.hpp:10
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
Definition: NetContext.hpp:26
Definition: Client.hpp:39
Definition: Rater.hpp:13
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73