hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/SendServer.hpp"
6 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/app/udpcast/SendTransportEngine.hpp"
8 #include "hmbdc//Traits.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <boost/circular_buffer.hpp>
14 #include <memory>
15 #include <tuple>
16 #include <type_traits>
17 
18 namespace hmbdc { namespace app { namespace tcpcast {
19 
20 namespace send_detail {
21 using namespace hmbdc::time;
22 using namespace std;
23 using pattern::MonoLockFreeBuffer;
24 /**
25  * @brief capture the transportation mechanism
26  *
27  */
29 : Transport {
30  using ptr = std::shared_ptr<SendTransport>;
31  /**
32  * @brief ctor
33  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
34  * @param maxMessageSize max messafe size in bytes to be sent
35  */
36  SendTransport(Config const&, size_t);
37  template <typename... Messages>
38  void queue(Topic const& t, Messages&&... msgs) {
39  auto n = sizeof...(msgs);
40  auto it = buffer_.claim(n);
41  queue(it, t, std::forward<Messages>(msgs)...);
42  buffer_.commit(it, n);
43  }
44 
45  template <typename... Messages>
46  bool tryQueue(Topic const& t, Messages&&... msgs) {
47  auto n = sizeof...(msgs);
48  auto it = buffer_.tryClaim(n);
49  if (it) {
50  queue(it, t, std::forward<Messages>(msgs)...);
51  buffer_.commit(it, n);
52  return true;
53  }
54  return false;
55  }
56 
57  template <typename Message, typename ... Args>
58  void queueInPlace(Topic const& t, Args&&... args) {
59  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
60  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
62  , "hasMemoryAttachment has to the first base for Message");
63  auto s = buffer_.claim();
64  char* addr = static_cast<char*>(*s);
65  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
66  h->flag = calculateFlag<Message>();
67  h->messagePayloadLen = sizeof(MessageWrap<Message>);
68  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
69  h->topicLen = tl;
70  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
71  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
72  } else {
73  HMBDC_THROW(std::out_of_range
74  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
75  }
76  buffer_.commit(s);
77  }
78 
79  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
80 
81  void stop();
82 
83  bool match(Topic const& t) const {
84  return boost::regex_match(t.c_str(), topicRegex_);
85  }
86 
87 private:
88  std::string topic_;
89  boost::regex topicRegex_;
90 
91 protected:
92  size_t maxMessageSize_;
93  size_t minRecvToStart_;
94  MonoLockFreeBuffer buffer_;
95 
96  unique_ptr<udpcast::SendTransport> mcSendTransport_;
97  Rater rater_;
98  Config mcConfig_;
99 
100 private:
101 
102  template <typename Message>
103  static
104  uint8_t calculateFlag() {
105  if (is_base_of<hasMemoryAttachment, Message>::value) return hasMemoryAttachment::flag;
106  return 0;
107  }
108 
109  template<typename M, typename ... Messages>
110  void queue(MonoLockFreeBuffer::iterator it, Topic const& t
111  , M&& m, Messages&&... msgs) {
112  using Message = typename std::remove_reference<M>::type;
113  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
114  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
116  , "hasMemoryAttachment has to the first base for Message");
117 
118  auto s = *it;
119  char* addr = static_cast<char*>(s);
120  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
121  h->flag = calculateFlag<Message>();
122  h->messagePayloadLen = sizeof(MessageWrap<Message>);
123  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
124  h->topicLen = tl;
125  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
126  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
127  } else {
128  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
129  }
130  queue(++it, t, std::forward<Messages>(msgs)...);
131  }
132 
133  void queue(MonoLockFreeBuffer::iterator it
134  , Topic const& t) {}
135 };
136 
139 , TimerManager
141 , Client<SendTransportEngine> {
142  SendTransportEngine(Config const&, size_t);
143  using SendTransport::hmbdcName;
144  using SendTransport::schedSpec;
145 
146  /*virtual*/
147  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
148  runOnce();
149  mcSendTransport_->runOnce(true);
150  }
151 
152  /*virtual*/
153  bool droppedCb() override {
154  stop();
155  stopped_ = true;
156  return true;
157  };
158 
159  /**
160  * @brief check how many recipient sessions are still active
161  * @return numeric_limits<size_t>::max() if the sending hasn't started due to
162  * minRecvToStart has not been met yet
163  */
164  size_t sessionsRemainingActive() const {
165  if (stopped_) return 0;
166  if (server_ && !minRecvToStart_) {
167  return server_->readySessionCount();
168  }
169  return numeric_limits<size_t>::max();
170  }
171 private:
172  void cullSlow() {
173  auto seq = buffer_.readSeq();
174  if (seq == lastSeq_ && buffer_.isFull()) {
175  server_->killSlowestSession();
176  }
177  lastSeq_ = seq;
178  }
179 
180  void runOnce();
181 
182  size_t mtu_;
183  size_t maxSendBatch_;
184  bool waitForSlowReceivers_;
186  ToSend toSend_;
187  // SendServer *server_;
188  // unique_ptr<SyncSendServer> syncServer_;
189  unique_ptr<SendServer> server_;
190  size_t lastSeq_;
191  bool stopped_;
192 };
193 } //send_detail
194 
197 }}}
Definition: MonoLockFreeBuffer.hpp:15
capture the transportation mechanism
Definition: SendTransportEngine.hpp:28
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:153
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: Transport.hpp:69
Definition: Traits.hpp:35
Definition: SendTransportEngine.hpp:137
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:147
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:164
Definition: Message.hpp:76
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: Timers.hpp:104
Definition: LockFreeBufferMisc.hpp:74