hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/SendServer.hpp"
6 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
8 #include "hmbdc//Traits.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <boost/circular_buffer.hpp>
14 #include <memory>
15 #include <tuple>
16 #include <type_traits>
17 
18 namespace hmbdc { namespace app { namespace tcpcast {
19 
20 namespace send_detail {
21 using namespace hmbdc::time;
22 using namespace std;
23 using pattern::MonoLockFreeBuffer;
24 /**
25  * @brief capture the transportation mechanism
26  *
27  */
29 : Transport {
30  using ptr = std::shared_ptr<SendTransport>;
31  /**
32  * @brief ctor
33  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
34  * @param maxMessageSize max messafe size in bytes to be sent
35  */
36  SendTransport(Config const&, size_t);
37  template <typename... Messages>
38  void queue(Topic const& t, Messages&&... msgs) {
39  auto n = sizeof...(msgs);
40  auto it = buffer_.claim(n);
41  queue(it, t, std::forward<Messages>(msgs)...);
42  buffer_.commit(it, n);
43  }
44 
45  template <typename... Messages>
46  bool tryQueue(Topic const& t, Messages&&... msgs) {
47  auto n = sizeof...(msgs);
48  auto it = buffer_.tryClaim(n);
49  if (it) {
50  queue(it, t, std::forward<Messages>(msgs)...);
51  buffer_.commit(it, n);
52  return true;
53  }
54  return false;
55  }
56 
57  template <typename Message, typename ... Args>
58  void queueInPlace(Topic const& t, Args&&... args) {
59  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
61  , "hasMemoryAttachment has to the first base for Message");
62  auto s = buffer_.claim();
63  char* addr = static_cast<char*>(*s);
64  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
65  h->flag = calculateFlag<Message>();
66  h->messagePayloadLen = sizeof(MessageWrap<Message>);
67  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
68  h->topicLen = tl;
69  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
70  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
71  } else {
72  HMBDC_THROW(std::out_of_range
73  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
74  }
75  buffer_.commit(s);
76  }
77 
78  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
79 
80  void stop();
81 
82  bool match(Topic const& t) const {
83  return boost::regex_match(t.c_str(), topicRegex_);
84  }
85 
86 private:
87  std::string topic_;
88  boost::regex topicRegex_;
89 
90 protected:
91  size_t maxMessageSize_;
92  size_t minRecvToStart_;
93  MonoLockFreeBuffer buffer_;
94 
95  unique_ptr<mcast::SendTransport> mcSendTransport_;
96  Rater rater_;
97  Config mcConfig_;
98 
99 private:
100 
101  template <typename Message>
102  static
103  uint8_t calculateFlag() {
104  if (is_base_of<hasMemoryAttachment, Message>::value) return hasMemoryAttachment::flag;
105  return 0;
106  }
107 
108  template<typename M, typename ... Messages>
109  void queue(MonoLockFreeBuffer::iterator it, Topic const& t
110  , M&& m, Messages&&... msgs) {
111  using Message = typename std::remove_reference<M>::type;
112  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
114  , "hasMemoryAttachment has to the first base for Message");
115 
116  auto s = *it;
117  char* addr = static_cast<char*>(s);
118  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
119  h->flag = calculateFlag<Message>();
120  h->messagePayloadLen = sizeof(MessageWrap<Message>);
121  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
122  h->topicLen = tl;
123  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
124  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
125  } else {
126  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
127  }
128  queue(++it, t, std::forward<Messages>(msgs)...);
129  }
130 
131  void queue(MonoLockFreeBuffer::iterator it
132  , Topic const& t) {}
133 };
134 
137 , TimerManager
139 , Client<SendTransportEngine> {
140  SendTransportEngine(Config const&, size_t);
142  using SendTransport::hmbdcName;
143  using SendTransport::schedSpec;
144 
145  /*virtual*/
146  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
147  runOnce();
148  mcSendTransport_->runOnce(true);
149  }
150 
151  /*virtual*/
152  bool droppedCb() override {
153  stop();
154  stopped_ = true;
155  return true;
156  };
157 
158  // /*virtual*/
159  // void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
160  // server_.reset(new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
161  // setCallback(
162  // [this](TimerManager& tm, SysTime const& now) {
163  // mcSendTransport_->queue(
164  // tcpcastAdTopic_, server_->advertisingMessage());
165  // if (!waitForSlowReceivers_) cullSlow();
166  // }
167  // );
168 
169  // schedule(SysTime::now(), *this);
170  // }
171 
172  /**
173  * @brief check how many recipient sessions are still active
174  * @return numeric_limits<size_t>::max() if the sending hasn't started due to
175  * minRecvToStart has not been met yet
176  */
177  size_t sessionsRemainingActive() const {
178  if (stopped_) return 0;
179  if (server_ && !minRecvToStart_) {
180  return server_->readySessionCount();
181  }
182  return numeric_limits<size_t>::max();
183  }
184 private:
185  void cullSlow() {
186  auto seq = buffer_.readSeq();
187  if (seq == lastSeq_ && buffer_.isFull()) {
188  server_->killSlowestSession();
189  }
190  lastSeq_ = seq;
191  }
192 
193  void runOnce();
194 
195  size_t mtu_;
196  size_t maxSendBatch_;
197  bool waitForSlowReceivers_;
198  MonoLockFreeBuffer::iterator begin_;
199  ToSend toSend_;
200  // SendServer *server_;
201  // unique_ptr<SyncSendServer> syncServer_;
202  unique_ptr<SendServer> server_;
203  size_t lastSeq_;
204  bool stopped_;
205 };
206 } //send_detail
207 
210 }}}
capture the transportation mechanism
Definition: SendTransportEngine.hpp:28
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:152
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: Transport.hpp:64
Definition: Traits.hpp:35
Definition: SendTransportEngine.hpp:135
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:146
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:177
SendTransport(Config const &, size_t)
ctor
Definition: Message.hpp:72
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: Timers.hpp:104