1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 7 #undef NETMAP_WITH_LIBS 9 #include "hmbdc/app/netmap/Messages.hpp" 10 #include "hmbdc/app/Base.hpp" 11 #include "hmbdc/comm/Topic.hpp" 12 #include "hmbdc/comm/eth/Misc.h" 13 #include "hmbdc/Compile.hpp" 14 #include "hmbdc/text/StringTrieSet.hpp" 15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 16 #include "hmbdc//Traits.hpp" 19 #include <boost/lexical_cast.hpp> 21 #include <type_traits> 24 #include <netinet/ether.h> 25 #include <linux/if_packet.h> 26 #include <sys/sysctl.h> 30 namespace hmbdc {
namespace app {
namespace netmap {
38 using ptr = std::shared_ptr<RecvTransport>;
69 template <
typename OutBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
72 ,
Client<RecvTransportEngine<OutBuffer, MsgArbitrator>>
73 ,
MessageHandler<RecvTransportEngine<OutBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
90 , MsgArbitrator arb =
NoOpArb())
94 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
95 , outBuffer_(outBuffer)
96 , maxMessageSize_(outBuffer.maxItemSize())
97 , doChecksum_(config_.getExt<bool>(
"doChecksum"))
99 , pollWaitTimeMillisec_(0)
101 config (hmbdcName_,
"hmbdcName")
102 (schedPolicy_,
"schedPolicy")
103 (schedPriority_,
"schedPriority")
106 busyWait_ = config_.
getExt<
bool>(
"busyWait");
107 if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<
int>(
"pollWaitTimeMillisec");
109 struct nmreq baseNmd;
110 bzero(&baseNmd,
sizeof(baseNmd));
111 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
112 config_(baseNmd.nr_tx_slots,
"nmTxSlots");
113 config_(baseNmd.nr_rx_slots,
"nmRxSlots");
114 config_(baseNmd.nr_tx_rings,
"nmTxRings");
115 config_(baseNmd.nr_rx_rings,
"nmRxRings");
116 auto nmport = config_.getExt<std::string>(
"netmapPort");
117 uint32_t flags = config_.getExt<uint32_t>(
"nmOpenFlags");
118 nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
120 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
124 memset(&req, 0,
sizeof(req));
125 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
126 req.nr_version = NETMAP_API;
127 req.nr_cmd = NETMAP_VNET_HDR_GET;
128 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
130 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
132 virtHeader_ = req.nr_arg1;
136 pfd_.events = POLLIN;
137 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
139 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
140 HMBDC_THROW(std::runtime_error,
"IO error");
142 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
143 struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
144 if (nm_ring_empty(rxring))
146 rxring->head = rxring->cur = rxring->tail;
150 void listenTo(
Topic const& t)
override {
154 void stopListenTo(
Topic const& t)
override {
161 if (nmd_) nm_close(nmd_);
165 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
167 auto n = buffer_.peek(begin, end);
170 MH::handleMessage(*static_cast<MessageHead*>(*it++));
172 buffer_.wasteAfterPeek(begin, n);
175 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
176 struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
177 if (nm_ring_empty(rxring))
184 void handleMessageCb(Subscribe
const& t) {
185 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
188 void handleMessageCb(Unsubscribe
const& t) {
189 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
193 HMBDC_LOG_C(e.what());
196 char const* hmbdcName()
const {
197 return this->hmbdcName_.c_str();
200 std::tuple<char const*, int> schedSpec()
const {
201 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
205 template <
bool is_raw_arb>
206 typename std::enable_if<is_raw_arb, int>::type applyRawArb(
size_t len) {
207 return arb_(data_, len);
210 template <
bool is_raw_arb>
211 typename std::enable_if<!is_raw_arb, int>::type applyRawArb(
size_t) {
215 template <
bool is_raw_arb>
220 template <
bool is_raw_arb>
232 if (hmbdc_likely(busyWait_)) {
233 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
234 HMBDC_THROW(std::runtime_error,
"IO error");
239 auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
240 if (hmbdc_unlikely( res < 0)) {
241 HMBDC_THROW(std::runtime_error,
"IO error errno=" << errno);
248 void recvPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
249 using namespace comm::eth;
250 auto cur = ring->cur;
252 while(cur != ring->tail) {
253 struct netmap_slot *slot = &ring->slot[cur];
254 auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
255 auto* p =
reinterpret_cast<pkt*
>(buf + virtHeader_ -
sizeof(virt_header));
257 if (hmbdc_likely(p->ipv4.ip.ip_p == IPPROTO_UDP
258 && p->ipv4.ip.ip_off == htons(IP_DF))) {
259 if (!data_) data_ = (uint8_t*)p->ipv4.body;
263 using arg0 =
typename traits::template arg<0>::type;
264 bool const is_raw_arb = std::is_same<
void*,
typename std::decay<arg0>::type>::value;
265 auto a = applyRawArb<is_raw_arb>(slot->len);
266 if (hmbdc_unlikely(a == 0)) {
268 ring->head = ring->cur = cur;
270 }
else if (hmbdc_likely(a > 0)) {
273 if (data_ + header->wireSize() <= buf + slot->len) {
274 if (subscriptions_.check(header->topic())) {
275 auto a = applyArb<is_raw_arb>(header);
276 if (hmbdc_likely(a > 0)) {
277 auto l = std::min(maxMessageSize_, (
size_t)header->messagePayloadLen());
278 outBuffer_.put(header->payload(), l);
279 }
else if (hmbdc_unlikely(a == 0)) {
280 ring->head = ring->cur = cur;
284 data_ += header->wireSize();
292 cur = nm_ring_next(ring, cur);
295 ring->head = ring->cur = cur;
299 bool verifyChecksum(
comm::eth::pkt* HMBDC_RESTRICT packet,
size_t payloadWireSize) {
300 using namespace comm::eth;
302 auto tmp = packet->ipv4.ip.ip_sum;
303 packet->ipv4.ip.ip_sum = 0;
305 checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0))) {
308 packet->ipv4.ip.ip_sum = tmp;
312 auto tmp = packet->ipv4.udp.check;
313 packet->ipv4.udp.check = 0;
314 #pragma GCC diagnostic push 316 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 318 auto udp = &packet->ipv4.udp;
320 checksum(udp,
sizeof(*udp),
321 checksum(packet->ipv4.body, payloadWireSize,
322 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
323 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
326 packet->ipv4.udp.check = tmp;
328 #pragma GCC diagnostic pop 332 std::string hmbdcName_;
333 std::string schedPolicy_;
338 OutBuffer& HMBDC_RESTRICT outBuffer_;
339 size_t maxMessageSize_;
343 struct nm_desc *nmd_;
348 int pollWaitTimeMillisec_;
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:192
Definition: Messages.hpp:111
Definition: MonoLockFreeBuffer.hpp:15
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:37
class to hold an hmbdc configuration
Definition: Config.hpp:46
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:180
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:160
Definition: Messages.hpp:118
impl class,
Definition: RecvTransportEngine.hpp:70
RecvTransportEngine(Config const &config, OutBuffer &outBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:89
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:76
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: RecvTransportEngine.hpp:231
a singleton that holding netmap resources
Definition: NetContext.hpp:38
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:172
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:165
Definition: MessageHandler.hpp:39
Definition: LockFreeBufferMisc.hpp:74