hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/time/Rater.hpp"
14 #include "hmbdc/numeric/BitMath.hpp"
15 
16 #include <boost/regex.hpp>
17 #include <memory>
18 #include <type_traits>
19 
20 
21 #include <netinet/ether.h> /* ether_aton */
22 #include <linux/if_packet.h> /* sockaddr_ll */
23 #include <sys/sysctl.h> /* sysctl */
24 #include <ifaddrs.h> /* getifaddrs */
25 
26 #include <poll.h>
27 
28 namespace hmbdc { namespace app { namespace netmap {
29 
30 struct NetContext;
31 struct Sender;
32 
33 namespace sendtransportengine_detail {
34 using namespace std;
35 using namespace hmbdc::time;
36 using namespace hmbdc::comm::eth;
37 
38 /**
39  * @brief power a netmap port sending functions
40  * @details this needs to be created using NetContext and start in an app::Context
41  *
42  */
44 : Client<SendTransportEngine> {
45  using ptr = std::shared_ptr<SendTransportEngine>;
46 
47 private:
48  uint16_t
49  outBufferSizePower2();
50 
51  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
52  SendTransportEngine(Config const&, size_t);
53 public:
55  bool droppedCb() override;
56 
57  /*virtual*/
58  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
59  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
60  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
61  if (nm_ring_empty(txring))
62  continue;
63 
64  sendPackets(txring);
65  }
66  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
67  HMBDC_THROW(std::runtime_error, "IO error");
68  }
69  }
70 
71  void stoppedCb(std::exception const& e) override;
72 
73  char const* hmbdcName() const {
74  return this->hmbdcName_.c_str();
75  }
76 
77  std::tuple<char const*, int> schedSpec() const {
78  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
79  }
80 
81 private:
82  bool match(Topic const& t) const {
83  return boost::regex_match(t.c_str(), topicRegex_);
84  }
85  friend struct hmbdc::app::netmap::Sender;
86  template <typename... Messages>
87  void queue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
88  auto n = sizeof...(msgs);
89  auto it = buffer_.claim(n);
90  queue(it, t, std::forward<Messages>(msgs)...);
91  buffer_.commit(it, n);
92  }
93 
94  template <typename... Messages>
95  bool tryQueue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
96  auto n = sizeof...(msgs);
97  auto it = buffer_.tryClaim(n);
98  if (it) {
99  queue(it, t, std::forward<Messages>(msgs)...);
100  buffer_.commit(it, n);
101  return true;
102  }
103  return false;
104  }
105 
106  template <typename M, typename... Messages>
108  , Topic const& t, M&& msg, Messages&&... msgs) {
109  using Message = typename std::remove_reference<M>::type;
110  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
111  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
112  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
113  }
114  auto s = *it;
115  TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
116  queue(++it, t, std::forward<M>(msgs)...);
117  }
118 
119  template <typename Message, typename ... Args>
120  void queueInPlace(Topic const& t, Args&&... args) HMBDC_RESTRICT {
121  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
122  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
123  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
124  }
125  auto s = buffer_.claim();
126  TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
127  buffer_.commit(s);
128  }
129 
130  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
132  , Topic const& t) {}
133 
134  void sendPackets(struct netmap_ring *);
135 
136  void getMacAddresses();
137  static
138  void initializePacket(struct pkt *, int, std::string, std::string, ether_addr, ether_addr, uint16_t, uint16_t);
139 
140  static
141  void updatePacket(struct pkt *, size_t, bool = true);
142 
143  Config const config_;
144  string hmbdcName_;
145  string schedPolicy_;
146  int schedPriority_;
147  std::string topic_;
148  boost::regex topicRegex_;
149  size_t maxMessageSize_;
150 
151 
152  struct nm_desc *nmd_;
154  int virtHeader_; //v hdr len
155 
156 
157  ether_addr srcEthAddr_;
158  ether_addr dstEthAddr_;
159  pkt precalculatedPacketHead_;
160  bool doChecksum_;
161  Rater rater_;
162  size_t maxSendBatch_;
163  uint16_t mtu_;
164 };
165 
166 } //sendtransportengine_detail
167 
169 
170 }}}
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:46
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:58
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:9
Definition: TypedString.hpp:74
fascade class for sending network messages
Definition: Sender.hpp:10
Definition: Misc.h:55
power a netmap port sending functions
Definition: SendTransportEngine.hpp:43
Definition: Rater.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:38
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74