hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/udpcast/Transport.hpp"
4 #include "hmbdc/app/udpcast/Messages.hpp"
5 #include "hmbdc/app/Base.hpp"
6 #include "hmbdc/comm/inet/Endpoint.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/time/Rater.hpp"
10 #include "hmbdc/numeric/BitMath.hpp"
11 
12 #include <boost/regex.hpp>
13 
14 #include <memory>
15 #include <iostream>
16 
17 #include <sys/epoll.h>
18 namespace hmbdc { namespace app { namespace udpcast {
19 
20 namespace sendtransportengine_detail {
21 using namespace std;
22 using namespace hmbdc::time;
23 using namespace hmbdc::pattern;
25 
27 : Transport {
28  using ptr = std::shared_ptr<SendTransport>;
29  SendTransport(Config const&, size_t);
30  ~SendTransport();
31 
32  bool match(Topic const& t) const {
33  return boost::regex_match(t.c_str(), topicRegex_);
34  }
35 
36  template <typename... Messages>
37  void queue(Topic const& t, Messages&&... msgs) {
38  auto n = sizeof...(msgs);
39  auto it = buffer_.claim(n);
40  queue(it, t, std::forward<Messages>(msgs)...);
41  buffer_.commit(it, n);
42  }
43 
44  template <typename... Messages>
45  bool tryQueue(Topic const& t, Messages&&... msgs) {
46  auto n = sizeof...(msgs);
47  auto it = buffer_.tryClaim(n);
48  if (it) {
49  queue(it, t, std::forward<Messages>(msgs)...);
50  buffer_.commit(it, n);
51  return true;
52  }
53  return false;
54  }
55 
56  template <typename Message, typename ... Args>
57  void queueInPlace(Topic const& t, Args&&... args) {
58  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
59  auto s = buffer_.claim();
60  char* addr = static_cast<char*>(*s);
61  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
62  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
63  h->topicLen = tl;
64  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
65  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
66  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
67  } else {
68  HMBDC_THROW(std::out_of_range
69  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
70  }
71  buffer_.commit(s);
72  }
73 
74  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
75 
76  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
77  resumeSend();
78  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
79  utils::EpollTask::instance().poll();
80  }
81  }
82 
83  template <typename M>
84  static TransportMessageHeader* encode(Topic const& t, void* buf, size_t bufLen, M const& m) {
85  using Message = typename std::remove_reference<M>::type;
86  char* addr = static_cast<char*>(buf);
87  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
88  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
89  h->topicLen = tl;
90  if (hmbdc_likely(sizeof(Message) <= bufLen)) {
91  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(m);
92  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
93  } else {
94  HMBDC_THROW(std::out_of_range, "bufLen too small to hold a message");
95  }
96  return h;
97  }
98 
99  void stop();
100 
101 private:
102  std::string topic_;
103  boost::regex topicRegex_;
104  size_t maxMessageSize_;
105  typename Buffer::iterator begin_, it_, end_;
106  Buffer buffer_;
107  Rater rater_;
108  size_t maxSendBatch_;
109 
110  iovec* toSendMsgs_;
111  size_t toSendMsgsHead_;
112  size_t toSendMsgsTail_;
113  mmsghdr* toSendPkts_;
114  size_t toSendPktsHead_;
115  size_t toSendPktsTail_;
116  std::vector<comm::inet::Endpoint> udpcastDests_;
117 
118  uint16_t
119  outBufferSizePower2();
120 
121  template<typename M, typename ... Messages>
122  void queue(typename Buffer::iterator it, Topic const& t
123  , M&& m, Messages&&... msgs) {
124  using Message = typename std::remove_reference<M>::type;
125  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
126  auto s = *it;
127  char* addr = static_cast<char*>(s);
128  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
129  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
130  h->topicLen = tl;
131  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
132  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
133  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
134  } else {
135  HMBDC_THROW(std::out_of_range
136  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
137  }
138  queue(++it, t, std::forward<Messages>(msgs)...);
139  }
140 
141  void queue(typename Buffer::iterator it
142  , Topic const& t) {}
143  void resumeSend();
144 };
145 
148 , Client<SendTransportEngine> {
149  using SendTransport::SendTransport;
150  using SendTransport::hmbdcName;
151  using SendTransport::schedSpec;
152 
153  /*virtual*/
154  void invokedCb(uint16_t) HMBDC_RESTRICT override {
155  runOnce();
156  }
157  /*virtual*/ bool droppedCb() override {
158  stop();
159  return true;
160  };
161 };
162 
163 } //sendtransportengine_detail
166 }}}
Definition: MonoLockFreeBuffer.hpp:15
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:10
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:154
Definition: Message.hpp:76
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Definition: Rater.hpp:11
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:157
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74