1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 7 #undef NETMAP_WITH_LIBS 9 #include "hmbdc/app/netmap/Messages.hpp" 10 #include "hmbdc/app/Base.hpp" 11 #include "hmbdc/comm/Topic.hpp" 12 #include "hmbdc/comm/eth/Misc.h" 13 #include "hmbdc/Compile.hpp" 14 #include "hmbdc/text/StringTrieSet.hpp" 15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 16 #include "hmbdc//Traits.hpp" 19 #include <boost/lexical_cast.hpp> 21 #include <type_traits> 24 #include <netinet/ether.h> 25 #include <linux/if_packet.h> 26 #include <sys/sysctl.h> 30 namespace hmbdc {
namespace app {
namespace netmap {
38 using ptr = std::shared_ptr<RecvTransport>;
69 template <
typename OutBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
72 ,
Client<RecvTransportEngine<OutBuffer, MsgArbitrator>>
73 ,
MessageHandler<RecvTransportEngine<OutBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
90 , MsgArbitrator arb =
NoOpArb())
92 , buffer_(std::max(
sizeof(Subscribe),
sizeof(Unsubscribe))
93 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
94 , outBuffer_(outBuffer)
95 , maxMessageSize_(outBuffer.maxItemSize())
96 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
98 , pollWaitTimeMillisec_(0)
101 config (hmbdcName_,
"hmbdcName")
102 (schedPolicy_,
"schedPolicy")
103 (schedPriority_,
"schedPriority")
106 struct nmreq baseNmd;
107 bzero(&baseNmd,
sizeof(baseNmd));
108 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
109 auto nmport = config_.getExt<std::string>(
"netmapPort");
110 busyWait_ = config_.getExt<
bool>(
"busyWait");
111 if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<
int>(
"pollWaitTimeMillisec");
112 nmd_ = nm_open(nmport.c_str(), &baseNmd, config_.getExt<
int>(
"nmOpenFlags"), NULL);
114 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
118 memset(&req, 0,
sizeof(req));
119 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
120 req.nr_version = NETMAP_API;
121 req.nr_cmd = NETMAP_VNET_HDR_GET;
122 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
124 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
126 virtHeader_ = req.nr_arg1;
130 pfd_.events = POLLIN;
131 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
133 if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
134 HMBDC_THROW(std::runtime_error,
"IO error");
136 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
137 struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
138 if (nm_ring_empty(rxring))
140 rxring->head = rxring->cur = rxring->tail;
144 void listenTo(
Topic const& t)
override {
148 void stopListenTo(
Topic const& t)
override {
155 if (nmd_) nm_close(nmd_);
159 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
161 auto n = buffer_.peek(begin, end);
164 MH::handleMessage(*static_cast<MessageHead*>(*it++));
166 buffer_.wasteAfterPeek(begin, n);
169 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
170 struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
171 if (nm_ring_empty(rxring))
178 void handleMessageCb(Subscribe
const& t) {
179 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
182 void handleMessageCb(Unsubscribe
const& t) {
183 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
186 void stoppedCb(std::exception
const& e)
override {
187 HMBDC_LOG_C(e.what());
190 char const* hmbdcName()
const {
191 return this->hmbdcName_.c_str();
194 std::tuple<char const*, int> schedSpec()
const {
195 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
199 template <
bool is_raw_arb>
200 typename std::enable_if<is_raw_arb, int>::type applyRawArb(
size_t len) {
201 return arb_(data_, len);
204 template <
bool is_raw_arb>
205 typename std::enable_if<!is_raw_arb, int>::type applyRawArb(
size_t) {
209 template <
bool is_raw_arb>
214 template <
bool is_raw_arb>
225 void syncNetmap() HMBDC_RESTRICT {
226 if (likely(busyWait_)) {
227 if (unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
228 HMBDC_THROW(std::runtime_error,
"IO error");
233 auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
234 if (unlikely( res < 0)) {
235 HMBDC_THROW(std::runtime_error,
"IO error errno=" << errno);
242 void recvPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
243 using namespace comm::eth;
244 auto cur = ring->cur;
246 while(cur != ring->tail) {
247 struct netmap_slot *slot = &ring->slot[cur];
248 auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
249 auto* p =
reinterpret_cast<pkt*
>(buf + virtHeader_ -
sizeof(virt_header));
251 if (p->ipv4.ip.ip_p == IPPROTO_UDP
252 && p->ipv4.ip.ip_off == htons(IP_DF)) {
253 if (!data_) data_ = (uint8_t*)p->ipv4.body;
257 using arg0 =
typename traits::template arg<0>::type;
258 bool const is_raw_arb = std::is_same<
void*,
typename std::decay<arg0>::type>::value;
259 auto a = applyRawArb<is_raw_arb>(slot->len);
260 if (unlikely(a == 0)) {
262 ring->head = ring->cur = cur;
264 }
else if (likely(a > 0)) {
267 if (data_ + header->wireSize() <= buf + slot->len) {
268 if (subscriptions_.check(header->topic())) {
269 auto a = applyArb<is_raw_arb>(header);
271 auto l = std::min((
size_t)header->messagePayloadLen(), maxMessageSize_);
272 auto it = outBuffer_.claim();
273 char* b =
static_cast<char*
>(*it);
274 memcpy(b, header->payload(), l);
275 outBuffer_.commit(it);
276 }
else if (unlikely(a == 0)) {
277 ring->head = ring->cur = cur;
281 data_ += header->wireSize();
289 cur = nm_ring_next(ring, cur);
292 ring->head = ring->cur = cur;
296 bool verifyChecksum(
comm::eth::pkt* HMBDC_RESTRICT packet,
size_t payloadWireSize) {
297 using namespace comm::eth;
299 auto tmp = packet->ipv4.ip.ip_sum;
300 packet->ipv4.ip.ip_sum = 0;
302 checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0))) {
305 packet->ipv4.ip.ip_sum = tmp;
309 auto tmp = packet->ipv4.udp.check;
310 packet->ipv4.udp.check = 0;
311 #pragma GCC diagnostic push 313 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 315 auto udp = &packet->ipv4.udp;
317 checksum(udp,
sizeof(*udp),
318 checksum(packet->ipv4.body, payloadWireSize,
319 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
320 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
323 packet->ipv4.udp.check = tmp;
325 #pragma GCC diagnostic pop 329 std::string hmbdcName_;
330 std::string schedPolicy_;
334 OutBuffer& HMBDC_RESTRICT outBuffer_;
335 size_t maxMessageSize_;
339 struct nm_desc *nmd_;
344 int pollWaitTimeMillisec_;
Definition: Messages.hpp:111
Definition: MonoLockFreeBuffer.hpp:14
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:37
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:158
Definition: Messages.hpp:118
impl class,
Definition: RecvTransportEngine.hpp:70
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
a singleton that holding netmap resources
Definition: NetContext.hpp:37
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:170
Definition: MessageHandler.hpp:36
Definition: LockFreeBufferMisc.hpp:73