1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 7 #undef NETMAP_WITH_LIBS 9 #include "hmbdc/app/netmap/Messages.hpp" 10 #include "hmbdc/app/Base.hpp" 11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 12 #include "hmbdc/comm/eth/Misc.h" 13 #include "hmbdc/time/Rater.hpp" 14 #include "hmbdc/numeric/BitMath.hpp" 16 #include <boost/regex.hpp> 18 #include <type_traits> 21 #include <netinet/ether.h> 22 #include <linux/if_packet.h> 23 #include <sys/sysctl.h> 28 namespace hmbdc {
namespace app {
namespace netmap {
33 namespace sendtransportengine_detail {
44 :
Client<SendTransportEngine> {
45 using ptr = std::shared_ptr<SendTransportEngine>;
49 outBufferSizePower2() {
50 auto res = config_.getExt<uint16_t>(
"outBufferSizePower2");
54 res =hmbdc::numeric::log2Upper(16ul * 1024ul / (8ul + maxMessageSize_));
55 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
62 , topic_(cfg.
getExt<
string>(
"topicRegex"))
64 , maxMessageSize_(maxMessageSize)
67 , outBufferSizePower2())
71 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
72 , rater_(Duration::seconds(1u)
73 , config_.getExt<
size_t>(
"sendBytesPerSec")
74 , config_.getExt<
size_t>(
"sendBytesBurst")
75 , config_.getExt<
size_t>(
"sendBytesBurst") != 0ul
77 , maxSendBatch_(cfg.
getExt<
size_t>(
"maxSendBatch"))
78 , mtu_(config_.getExt<
size_t>(
"mtu")) {
80 cfg (hmbdcName_,
"hmbdcName")
81 (schedPolicy_,
"schedPolicy")
82 (schedPriority_,
"schedPriority")
85 memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>(
"srcEthAddr").c_str())
86 ,
sizeof(srcEthAddr_));
87 memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>(
"dstEthAddr").c_str())
88 ,
sizeof(dstEthAddr_));
92 bzero(&baseNmd,
sizeof(baseNmd));
93 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
95 auto nmport = cfg.
getExt<std::string>(
"netmapPort");
96 nmd_ = nm_open(nmport.c_str(), &baseNmd
97 , cfg.
getExt<uint64_t>(
"nmOpenFlags"), NULL);
99 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
103 memset(&req, 0,
sizeof(req));
104 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
105 req.nr_version = NETMAP_API;
106 req.nr_cmd = NETMAP_VNET_HDR_GET;
107 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
109 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
111 virtHeader_ = req.nr_arg1;
113 initializePacket(&precalculatedPacketHead_
114 , config_.getExt<uint16_t>(
"ttl")
115 , config_.getExt<
string>(
"srcIp")
116 , config_.getExt<
string>(
"dstIp")
119 , config_.getExt<uint16_t>(
"srcPort")
120 , config_.getExt<uint16_t>(
"dstPort")
122 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
124 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
125 HMBDC_THROW(std::runtime_error,
"IO error");
127 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
128 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
129 if (nm_ring_empty(txring))
131 txring->head = txring->cur = txring->tail;
141 bool droppedCb()
override {
147 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
148 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
149 HMBDC_THROW(std::runtime_error,
"IO error");
152 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
153 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
154 if (nm_ring_empty(txring))
161 void stoppedCb(std::exception
const& e)
override {
162 HMBDC_LOG_C(e.what());
165 char const* hmbdcName()
const {
166 return this->hmbdcName_.c_str();
169 std::tuple<char const*, int> schedSpec()
const {
170 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
174 bool match(
Topic const& t)
const {
175 return boost::regex_match(t.c_str(), topicRegex_);
178 template <
typename... Messages>
179 void queue(
Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
180 auto n =
sizeof...(msgs);
181 auto it = buffer_.claim(n);
182 queue(it, t, std::forward<Messages>(msgs)...);
183 buffer_.commit(it, n);
186 template <
typename... Messages>
187 bool tryQueue(
Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
188 auto n =
sizeof...(msgs);
189 auto it = buffer_.tryClaim(n);
191 queue(it, t, std::forward<Messages>(msgs)...);
192 buffer_.commit(it, n);
198 template <
typename M,
typename... Messages>
200 ,
Topic const& t, M&& msg, Messages&&... msgs) {
201 using Message =
typename std::remove_reference<M>::type;
202 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
203 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
206 TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
207 queue(++it, t, std::forward<M>(msgs)...);
210 template <
typename Message,
typename ... Args>
211 void queueInPlace(
Topic const& t, Args&&... args) HMBDC_RESTRICT {
212 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
213 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
215 auto s = buffer_.claim();
216 TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
220 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
221 if (hmbdc_unlikely(len > maxMessageSize_)) {
222 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
224 auto s = buffer_.claim();
225 TransportMessageHeader::copyTo(*s, t, tag, bytes, len);
232 void sendPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
233 uint32_t cur = ring->cur;
234 if (hmbdc_unlikely(cur == ring->tail))
return;
236 if (hmbdc_unlikely(!(buffer_.peek(begin, end, maxSendBatch_)))) {
239 bool slotInited =
false;
241 auto batch = maxSendBatch_;
242 struct netmap_slot *slot = &ring->slot[cur];
243 uint32_t slotLen = 0;
244 char *p = NETMAP_BUF(ring, slot->buf_idx);
246 uint16_t slotLenMax = min(mtu_, (uint16_t)ring->nr_buf_size);
249 if (rater_.check(th->wireSize())) {
251 auto wireSize = (uint16_t)(
252 sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_
254 memcpy(p, ((
char*)&precalculatedPacketHead_) +
sizeof(
virt_header) - virtHeader_
259 auto wireSize = th->wireSize();
260 if (slotLen + wireSize <= slotLenMax) {
261 memcpy(p + slotLen, th, (
int)wireSize);
271 size_t wireSizeExcludingHead = slotLen
272 - (
sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_);
273 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
274 cur = nm_ring_next(ring, cur);
276 if (cur == ring->tail)
break;
277 slot = &ring->slot[cur];
278 p = NETMAP_BUF(ring, slot->buf_idx);
280 batch = maxSendBatch_;
289 size_t wireSizeExcludingHead = slotLen
290 - (
sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_);
291 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
292 cur = nm_ring_next(ring, cur);
295 ring->head = ring->cur = cur;
296 buffer_.wasteAfterPeek(begin, it - begin,
true);
299 void getMacAddresses() {
300 auto nmport = config_.getExt<std::string>(
"netmapPort");
302 if (strncmp(nmport.c_str(),
"vale", 4) == 0)
return;
304 if (nmport.find_first_of(
":") == std::string::npos) {
305 HMBDC_THROW(std::runtime_error
306 ,
"wrong netmapPort format (examples: netmap:eth0, netmap:eth0-0)");
308 auto iface = nmport.substr(nmport.find_first_of(
":"));
309 iface = iface.substr(1, iface.find_first_of(
"-^") - 1);
312 struct ifaddrs *ifaphead, *ifap;
313 int l =
sizeof(ifap->ifa_name);
315 if (getifaddrs(&ifaphead) != 0) {
316 HMBDC_THROW(std::runtime_error,
"getifaddrs failed for" << iface);
318 for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
319 struct sockaddr_ll *sll =
320 (
struct sockaddr_ll *)ifap->ifa_addr;
323 if (!sll || sll->sll_family != AF_PACKET)
325 if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
327 mac = (uint8_t *)(sll->sll_addr);
329 char srcEthAddrStr[20];
330 sprintf(srcEthAddrStr,
"%02x:%02x:%02x:%02x:%02x:%02x",
331 mac[0], mac[1], mac[2],
332 mac[3], mac[4], mac[5]);
333 memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr),
sizeof(srcEthAddr_));
336 freeifaddrs(ifaphead);
338 HMBDC_THROW(std::runtime_error,
"no local interface named " << iface);
343 void initializePacket(
struct pkt *
pkt,
int ttl, std::string srcIpStr, std::string dstIpStr
344 , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
345 struct ether_header *eh;
349 sscanf(srcIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
350 auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
351 sscanf(dstIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
352 auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
356 bcopy(&srcEthAddr, eh->ether_shost, 6);
357 bcopy(&dstEthAddr, eh->ether_dhost, 6);
359 eh->ether_type = htons(ETHERTYPE_IP);
361 #pragma GCC diagnostic push 363 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 366 udp = &pkt->ipv4.udp;
367 ip->ip_v = IPVERSION;
368 ip->ip_hl =
sizeof(*ip) >> 2;
370 ip->ip_tos = IPTOS_LOWDELAY;
373 ip->ip_off = htons(IP_DF);
375 ip->ip_p = IPPROTO_UDP;
376 ip->ip_dst.s_addr = htonl(dstIp);
377 ip->ip_src.s_addr = htonl(srcIp);
379 ip->ip_len =
sizeof(*ip) +
sizeof(udphdr);
380 udp->source = htons(srcPort);
381 udp->dest = htons(dstPort);
382 udp->len =
sizeof(udphdr);
385 bzero(&pkt->vh,
sizeof(pkt->vh));
389 void updatePacket(
struct pkt *packet,
size_t payloadWireSize,
bool doChecksum =
true) {
390 packet->ipv4.ip.ip_len += payloadWireSize;
391 packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
393 packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0));
396 packet->ipv4.udp.len += payloadWireSize;
397 packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
399 auto udp = &packet->ipv4.udp;
400 packet->ipv4.udp.check = wrapsum(
401 checksum(udp,
sizeof(*udp),
402 checksum(packet->ipv4.body, payloadWireSize,
403 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
404 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
408 #pragma GCC diagnostic pop 415 boost::regex topicRegex_;
416 size_t maxMessageSize_;
419 struct nm_desc *nmd_;
424 ether_addr srcEthAddr_;
425 ether_addr dstEthAddr_;
426 pkt precalculatedPacketHead_;
429 size_t maxSendBatch_;
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
fascade class for sending network messages
Definition: Sender.hpp:10
Definition: Message.hpp:34
power a netmap port sending functions
Definition: SendTransportEngine.hpp:43
a singleton that holding netmap resources
Definition: NetContext.hpp:38
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: LockFreeBufferMisc.hpp:73