1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 9 #include "hmbdc/app/netmap/Messages.hpp" 10 #include "hmbdc/app/Client.hpp" 11 #include "hmbdc/app/LoggerT.hpp" 12 #include "hmbdc/app/Config.hpp" 13 #include "hmbdc/comm/Topic.hpp" 14 #include "hmbdc/comm/eth/Misc.h" 15 #include "hmbdc/Compile.hpp" 16 #include "hmbdc/text/StringTrieSet.hpp" 17 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 18 #include "hmbdc//Traits.hpp" 21 #include <boost/lexical_cast.hpp> 23 #include <type_traits> 26 #include <netinet/ether.h> 27 #include <linux/if_packet.h> 28 #include <sys/sysctl.h> 32 namespace hmbdc {
namespace app {
namespace netmap {
34 using namespace hmbdc;
44 using ptr = std::shared_ptr<RecvTransport>;
63 virtual void listenTo(
Topic const& t) = 0;
64 virtual void stopListenTo(
Topic const& t) = 0;
75 template <
typename OutBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
78 ,
Client<RecvTransportEngineImpl<OutBuffer, MsgArbitrator>>
79 ,
MessageHandler<RecvTransportEngineImpl<OutBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
96 , MsgArbitrator arb =
NoOpArb())
98 , buffer_(std::max(
sizeof(Subscribe),
sizeof(Unsubscribe))
99 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
100 , outBuffer_(outBuffer)
101 , maxMessageSize_(outBuffer.maxItemSize())
102 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
104 , pollWaitTimeMillisec_(0)
107 config (hmbdcName_,
"hmbdcName")
108 (schedPolicy_,
"schedPolicy")
109 (schedPriority_,
"schedPriority")
112 struct nmreq baseNmd;
113 bzero(&baseNmd,
sizeof(baseNmd));
114 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
115 auto nmport = config_.getExt<std::string>(
"netmapPort");
116 busyWait_ = config_.getExt<
bool>(
"busyWait");
117 if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<
int>(
"pollWaitTimeMillisec");
118 nmd_ = nm_open(nmport.c_str(), &baseNmd, config_.getExt<
int>(
"nmOpenFlags"), NULL);
120 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
124 memset(&req, 0,
sizeof(req));
125 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
126 req.nr_version = NETMAP_API;
127 req.nr_cmd = NETMAP_VNET_HDR_GET;
128 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
130 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
132 virtHeader_ = req.nr_arg1;
136 pfd_.events = POLLIN;
137 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
139 if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
140 HMBDC_THROW(std::runtime_error,
"IO error");
142 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
143 struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
144 if (nm_ring_empty(rxring))
146 rxring->head = rxring->cur = rxring->tail;
150 void listenTo(
Topic const& t)
override {
154 void stopListenTo(
Topic const& t)
override {
161 if (nmd_) nm_close(nmd_);
165 void invokedCb(uint16_t threadSerialNumber) __restrict__
override {
167 auto n = buffer_.peek(begin, end);
170 MH::handleMessage(*static_cast<MessageHead*>(*it++));
172 buffer_.wasteAfterPeek(begin, n);
175 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
176 struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
177 if (nm_ring_empty(rxring))
184 void handleMessageCb(Subscribe
const& t) {
185 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
188 void handleMessageCb(Unsubscribe
const& t) {
189 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
192 void stoppedCb(std::exception
const& e)
override {
193 HMBDC_LOG_C(e.what());
196 char const* hmbdcName()
const {
197 return this->hmbdcName_.c_str();
200 std::tuple<char const*, int> schedSpec()
const {
201 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
205 template <
bool is_raw_arb>
206 typename enable_if<is_raw_arb, int>::type applyRawArb(
size_t len) {
207 return arb_(data_, len);
210 template <
bool is_raw_arb>
211 typename enable_if<!is_raw_arb, int>::type applyRawArb(
size_t) {
215 template <
bool is_raw_arb>
220 template <
bool is_raw_arb>
231 void syncNetmap() __restrict__ {
232 if (likely(busyWait_)) {
233 if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
234 HMBDC_THROW(std::runtime_error,
"IO error");
239 auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
240 if (unlikely( res < 0)) {
241 HMBDC_THROW(std::runtime_error,
"IO error errno=" << errno);
248 void recvPackets(
struct netmap_ring * __restrict__ ring) __restrict__ {
249 auto cur = ring->cur;
251 while(cur != ring->tail) {
252 struct netmap_slot *slot = &ring->slot[cur];
253 auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
254 auto* p =
reinterpret_cast<pkt*
>(buf + virtHeader_ -
sizeof(
virt_header));
256 if (p->ipv4.ip.ip_p == IPPROTO_UDP
257 && p->ipv4.ip.ip_off == htons(IP_DF)) {
258 if (!data_) data_ = (uint8_t*)p->ipv4.body;
262 using arg0 =
typename traits::template arg<0>::type;
263 bool const is_raw_arb = std::is_same<
void*,
typename std::decay<arg0>::type>::value;
264 auto a = applyRawArb<is_raw_arb>(slot->len);
265 if (unlikely(a == 0)) {
267 ring->head = ring->cur = cur;
269 }
else if (likely(a > 0)) {
272 if (data_ + header->wireSize() <= buf + slot->len) {
273 if (subscriptions_.check(header->topic())) {
274 auto a = applyArb<is_raw_arb>(header);
276 auto l = std::min((
size_t)header->messagePayloadLen(), maxMessageSize_);
277 auto it = outBuffer_.claim();
278 char* b =
static_cast<char*
>(*it);
279 memcpy(b, header->payload(), l);
280 outBuffer_.commit(it);
281 }
else if (unlikely(a == 0)) {
282 ring->head = ring->cur = cur;
286 data_ += header->wireSize();
294 cur = nm_ring_next(ring, cur);
297 ring->head = ring->cur = cur;
301 bool verifyChecksum(
pkt* __restrict__ packet,
size_t payloadWireSize) {
303 auto tmp = packet->ipv4.ip.ip_sum;
304 packet->ipv4.ip.ip_sum = 0;
306 checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0))) {
309 packet->ipv4.ip.ip_sum = tmp;
313 auto tmp = packet->ipv4.udp.check;
314 packet->ipv4.udp.check = 0;
316 auto udp = &packet->ipv4.udp;
318 checksum(udp,
sizeof(*udp),
319 checksum(packet->ipv4.body, payloadWireSize,
320 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
321 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
324 packet->ipv4.udp.check = tmp;
334 OutBuffer& __restrict__ outBuffer_;
335 size_t maxMessageSize_;
339 struct nm_desc *nmd_;
344 int pollWaitTimeMillisec_;
Definition: Messages.hpp:119
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: Client.hpp:11
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: StringTrieSet.hpp:113
Definition: Messages.hpp:126
Definition: Message.hpp:46
Definition: NetContext.hpp:26
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:56
Definition: Client.hpp:39
impl class,
Definition: RecvTransportEngine.hpp:76
Definition: Client.hpp:11
Definition: MessageHandler.hpp:38
Definition: LfbStream.hpp:11
Definition: LockFreeBufferMisc.hpp:73