1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 8 #include "hmbdc/app/netmap/Messages.hpp" 9 #include "hmbdc/app/Client.hpp" 10 #include "hmbdc/app/LoggerT.hpp" 11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 12 #include "hmbdc/comm/Topic.hpp" 13 #include "hmbdc/comm/eth/Misc.h" 14 #include "hmbdc/time/Rater.hpp" 15 #include "hmbdc/numeric/BitMath.hpp" 17 #include "hmbdc/app/Config.hpp" 18 #include "hmbdc/Compile.hpp" 21 #include <boost/regex.hpp> 23 #include <type_traits> 26 #include <netinet/ether.h> 27 #include <linux/if_packet.h> 28 #include <sys/sysctl.h> 33 namespace hmbdc {
namespace app {
namespace netmap {
35 using namespace hmbdc;
46 :
Client<SendTransportEngine> {
47 using ptr = std::shared_ptr<SendTransportEngine>;
51 outBufferSizePower2() {
52 auto res = config_.getExt<uint16_t>(
"outBufferSizePower2");
56 res =hmbdc::numeric::log2Upper(16ul * 1024ul / (8ul + maxMessageSize_));
57 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
64 , topic_(cfg.
getExt<
string>(
"topicRegex"))
66 , maxMessageSize_(maxMessageSize)
69 , outBufferSizePower2())
73 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
74 , rater_(Duration::seconds(1u)
75 , config_.getExt<
size_t>(
"sendBytesPerSec")
76 , config_.getExt<
size_t>(
"sendBytesBurst")
77 , config_.getExt<
size_t>(
"sendBytesBurst") != 0ul
79 , maxSendBatch_(cfg.
getExt<
size_t>(
"maxSendBatch"))
80 , mtu_(config_.getExt<
size_t>(
"mtu")) {
82 cfg (hmbdcName_,
"hmbdcName")
83 (schedPolicy_,
"schedPolicy")
84 (schedPriority_,
"schedPriority")
87 memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>(
"srcEthAddr").c_str())
88 ,
sizeof(srcEthAddr_));
89 memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>(
"dstEthAddr").c_str())
90 ,
sizeof(dstEthAddr_));
94 bzero(&baseNmd,
sizeof(baseNmd));
95 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
97 auto nmport = cfg.
getExt<std::string>(
"netmapPort");
98 nmd_ = nm_open(nmport.c_str(), &baseNmd
99 , cfg.
getExt<uint64_t>(
"nmOpenFlags"), NULL);
101 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
105 memset(&req, 0,
sizeof(req));
106 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
107 req.nr_version = NETMAP_API;
108 req.nr_cmd = NETMAP_VNET_HDR_GET;
109 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
111 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
113 virtHeader_ = req.nr_arg1;
115 initializePacket(&precalculatedPacketHead_
116 , config_.getExt<uint16_t>(
"ttl")
117 , config_.getExt<
string>(
"srcIp")
118 , config_.getExt<
string>(
"dstIp")
121 , config_.getExt<uint16_t>(
"srcPort")
122 , config_.getExt<uint16_t>(
"dstPort")
124 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
126 if (unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
127 HMBDC_THROW(std::runtime_error,
"IO error");
129 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
130 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
131 if (nm_ring_empty(txring))
133 txring->head = txring->cur = txring->tail;
143 bool droppedCb()
override {
149 void invokedCb(uint16_t threadSerialNumber) __restrict__
override {
150 if (unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
151 HMBDC_THROW(std::runtime_error,
"IO error");
154 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
155 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
156 if (nm_ring_empty(txring))
163 void stoppedCb(std::exception
const& e)
override {
164 HMBDC_LOG_C(e.what());
167 char const* hmbdcName()
const {
168 return this->hmbdcName_.c_str();
171 std::tuple<char const*, int> schedSpec()
const {
172 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
176 bool match(
Topic const& t)
const {
177 return boost::regex_match(t.c_str(), topicRegex_);
180 template <
typename... Messages>
181 void queue(
Topic const& t, Messages&&... msgs) __restrict__ {
182 auto n =
sizeof...(msgs);
183 auto it = buffer_.claim(n);
184 queue(it, t, std::forward<Messages>(msgs)...);
185 buffer_.commit(it, n);
188 template <
typename... Messages>
189 bool tryQueue(
Topic const& t, Messages&&... msgs) __restrict__ {
190 auto n =
sizeof...(msgs);
191 auto it = buffer_.tryClaim(n);
193 queue(it, t, std::forward<Messages>(msgs)...);
194 buffer_.commit(it, n);
200 template <
typename M,
typename... Messages>
202 ,
Topic const& t, M&& msg, Messages&&... msgs) {
203 using Message =
typename std::remove_reference<M>::type;
204 if (unlikely(
sizeof(Message) > maxMessageSize_)) {
205 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
208 TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
209 queue(++it, t, std::forward<M>(msgs)...);
212 template <
typename Message,
typename ... Args>
213 void queueInPlace(
Topic const& t, Args&&... args) __restrict__ {
214 if (unlikely(
sizeof(Message) > maxMessageSize_)) {
215 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
217 auto s = buffer_.claim();
218 TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
222 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
223 if (unlikely(len > maxMessageSize_)) {
224 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
226 auto s = buffer_.claim();
227 TransportMessageHeader::copyTo(*s, t, tag, bytes, len);
234 void sendPackets(
struct netmap_ring * __restrict__ ring) __restrict__ {
235 uint32_t cur = ring->cur;
236 if (unlikely(cur == ring->tail))
return;
238 if (unlikely(!(buffer_.peek(begin, end, maxSendBatch_)))) {
241 bool slotInited =
false;
243 auto batch = maxSendBatch_;
244 struct netmap_slot *slot = &ring->slot[cur];
245 uint32_t slotLen = 0;
246 char *p = NETMAP_BUF(ring, slot->buf_idx);
248 uint16_t slotLenMax = min(mtu_, (uint16_t)ring->nr_buf_size);
251 if (rater_.check(th->wireSize())) {
253 auto wireSize = (uint16_t)(
254 sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_
256 memcpy(p, ((
char*)&precalculatedPacketHead_) +
sizeof(
virt_header) - virtHeader_
261 auto wireSize = th->wireSize();
262 if (slotLen + wireSize <= slotLenMax) {
263 memcpy(p + slotLen, th, (
int)wireSize);
273 size_t wireSizeExcludingHead = slotLen
274 - (
sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_);
275 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
276 cur = nm_ring_next(ring, cur);
278 if (cur == ring->tail)
break;
279 slot = &ring->slot[cur];
280 p = NETMAP_BUF(ring, slot->buf_idx);
282 batch = maxSendBatch_;
291 size_t wireSizeExcludingHead = slotLen
292 - (
sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_);
293 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
294 cur = nm_ring_next(ring, cur);
297 ring->head = ring->cur = cur;
298 buffer_.wasteAfterPeek(begin, it - begin,
true);
301 void getMacAddresses() {
302 auto nmport = config_.getExt<std::string>(
"netmapPort");
304 if (strncmp(nmport.c_str(),
"vale", 4) == 0)
return;
306 if (nmport.find_first_of(
":") == std::string::npos) {
307 HMBDC_THROW(std::runtime_error
308 ,
"wrong netmapPort format (examples: netmap:eth0, netmap:eth0-0)");
310 auto iface = nmport.substr(nmport.find_first_of(
":"));
311 iface = iface.substr(1, iface.find_first_of(
"-^") - 1);
314 struct ifaddrs *ifaphead, *ifap;
315 int l =
sizeof(ifap->ifa_name);
317 if (getifaddrs(&ifaphead) != 0) {
318 HMBDC_THROW(std::runtime_error,
"getifaddrs failed for" << iface);
320 for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
321 struct sockaddr_ll *sll =
322 (
struct sockaddr_ll *)ifap->ifa_addr;
325 if (!sll || sll->sll_family != AF_PACKET)
327 if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
329 mac = (uint8_t *)(sll->sll_addr);
331 char srcEthAddrStr[20];
332 sprintf(srcEthAddrStr,
"%02x:%02x:%02x:%02x:%02x:%02x",
333 mac[0], mac[1], mac[2],
334 mac[3], mac[4], mac[5]);
335 memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr),
sizeof(srcEthAddr_));
338 freeifaddrs(ifaphead);
340 HMBDC_THROW(std::runtime_error,
"no local interface named " << iface);
345 void initializePacket(
struct pkt *
pkt,
int ttl, std::string srcIpStr, std::string dstIpStr
346 , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
347 struct ether_header *eh;
351 sscanf(srcIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
352 auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
353 sscanf(dstIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
354 auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
358 bcopy(&srcEthAddr, eh->ether_shost, 6);
359 bcopy(&dstEthAddr, eh->ether_dhost, 6);
361 eh->ether_type = htons(ETHERTYPE_IP);
363 udp = &pkt->ipv4.udp;
364 ip->ip_v = IPVERSION;
365 ip->ip_hl =
sizeof(*ip) >> 2;
367 ip->ip_tos = IPTOS_LOWDELAY;
370 ip->ip_off = htons(IP_DF);
372 ip->ip_p = IPPROTO_UDP;
373 ip->ip_dst.s_addr = htonl(dstIp);
374 ip->ip_src.s_addr = htonl(srcIp);
376 ip->ip_len =
sizeof(*ip) +
sizeof(udphdr);
377 udp->source = htons(srcPort);
378 udp->dest = htons(dstPort);
379 udp->len =
sizeof(udphdr);
382 bzero(&pkt->vh,
sizeof(pkt->vh));
386 void updatePacket(
struct pkt *packet,
size_t payloadWireSize,
bool doChecksum =
true) {
387 packet->ipv4.ip.ip_len += payloadWireSize;
388 packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
390 packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0));
393 packet->ipv4.udp.len += payloadWireSize;
394 packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
396 auto udp = &packet->ipv4.udp;
397 packet->ipv4.udp.check = wrapsum(
398 checksum(udp,
sizeof(*udp),
399 checksum(packet->ipv4.body, payloadWireSize,
400 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
401 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
411 boost::regex topicRegex_;
412 size_t maxMessageSize_;
415 struct nm_desc *nmd_;
420 ether_addr srcEthAddr_;
421 ether_addr dstEthAddr_;
422 pkt precalculatedPacketHead_;
425 size_t maxSendBatch_;
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:10
power a netmap port sending functions
Definition: SendTransportEngine.hpp:45
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:143
Definition: NetContext.hpp:26
Definition: Client.hpp:39
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73