hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/time/Rater.hpp"
14 #include "hmbdc/numeric/BitMath.hpp"
15 
16 #include <boost/regex.hpp>
17 #include <memory>
18 #include <type_traits>
19 
20 
21 #include <netinet/ether.h> /* ether_aton */
22 #include <linux/if_packet.h> /* sockaddr_ll */
23 #include <sys/sysctl.h> /* sysctl */
24 #include <ifaddrs.h> /* getifaddrs */
25 
26 #include <poll.h>
27 
28 namespace hmbdc { namespace app { namespace netmap {
29 
30 struct NetContext;
31 struct Sender;
32 
33 namespace sendtransportengine_detail {
34 using namespace std;
35 using namespace hmbdc::time;
36 using namespace hmbdc::comm::eth;
37 
38 /**
39  * @brief power a netmap port sending functions
40  * @details this needs to be created using NetContext and start in an app::Context
41  *
42  */
44 : Client<SendTransportEngine> {
45  using ptr = std::shared_ptr<SendTransportEngine>;
46 
47 private:
48  uint16_t
49  outBufferSizePower2();
50 
51  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
52  SendTransportEngine(Config const&, size_t);
53 public:
55  bool droppedCb() override;
56 
57  /*virtual*/
58  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
59  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
60  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
61  if (nm_ring_empty(txring))
62  continue;
63 
64  sendPackets(txring);
65  }
66  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
67  HMBDC_THROW(std::runtime_error, "IO error");
68  }
69  }
70 
71  void stoppedCb(std::exception const& e) override;
72 
73  char const* hmbdcName() const {
74  return this->hmbdcName_.c_str();
75  }
76 
77  std::tuple<char const*, int> schedSpec() const {
78  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
79  }
80 
81 private:
82  bool match(Topic const& t) const {
83  return boost::regex_match(t.c_str(), topicRegex_);
84  }
85  friend struct hmbdc::app::netmap::Sender;
86  template <typename... Messages>
87  void queue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
88  auto n = sizeof...(msgs);
89  auto it = buffer_.claim(n);
90  queue(it, t, std::forward<Messages>(msgs)...);
91  buffer_.commit(it, n);
92  }
93 
94  template <typename... Messages>
95  bool tryQueue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
96  auto n = sizeof...(msgs);
97  auto it = buffer_.tryClaim(n);
98  if (it) {
99  queue(it, t, std::forward<Messages>(msgs)...);
100  buffer_.commit(it, n);
101  return true;
102  }
103  return false;
104  }
105 
106  template <typename M, typename... Messages>
108  , Topic const& t, M&& msg, Messages&&... msgs) {
109  using Message = typename std::remove_reference<M>::type;
110  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
111  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
112  }
113  auto s = *it;
114  TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
115  queue(++it, t, std::forward<M>(msgs)...);
116  }
117 
118  template <typename Message, typename ... Args>
119  void queueInPlace(Topic const& t, Args&&... args) HMBDC_RESTRICT {
120  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
121  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
122  }
123  auto s = buffer_.claim();
124  TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
125  buffer_.commit(s);
126  }
127 
128  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
130  , Topic const& t) {}
131 
132  void sendPackets(struct netmap_ring *);
133 
134  void getMacAddresses();
135  static
136  void initializePacket(struct pkt *, int, std::string, std::string, ether_addr, ether_addr, uint16_t, uint16_t);
137 
138  static
139  void updatePacket(struct pkt *, size_t, bool = true);
140 
141  Config const config_;
142  string hmbdcName_;
143  string schedPolicy_;
144  int schedPriority_;
145  std::string topic_;
146  boost::regex topicRegex_;
147  size_t maxMessageSize_;
148 
149 
150  struct nm_desc *nmd_;
152  int virtHeader_; //v hdr len
153 
154 
155  ether_addr srcEthAddr_;
156  ether_addr dstEthAddr_;
157  pkt precalculatedPacketHead_;
158  bool doChecksum_;
159  Rater rater_;
160  size_t maxSendBatch_;
161  uint16_t mtu_;
162 };
163 
164 } //sendtransportengine_detail
165 
167 
168 }}}
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:44
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:58
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:9
Definition: TypedString.hpp:74
fascade class for sending network messages
Definition: Sender.hpp:10
Definition: Misc.h:55
power a netmap port sending functions
Definition: SendTransportEngine.hpp:43
Definition: Rater.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:38
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74