hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/tcpcast/Transport.hpp"
4 #include "hmbdc/app/tcpcast/RecvSession.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
7 #include "hmbdc/app/Logger.hpp"
8 #include "hmbdc/text/StringTrieSet.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 
14 #include <memory>
15 
16 namespace hmbdc { namespace app { namespace tcpcast {
17 
18 /**
19  * @brief interface to power a tcpcast transport receiving functions
20  */
22 : Transport {
23  using ptr = std::shared_ptr<RecvTransport>;
24  using Transport::Transport;
25  virtual ~RecvTransport(){}
26  /**
27  * @brief a take all arbitrator (no arbitration at all)
28  * @details it provides the default type for arbitration which does nothing
29  * it also provides a template on writing a user defined arbitrator
30  *
31  * @param h handle to retrieve what is inside of the message
32  * @return always returns 1 here
33  * In general: 1 keep the message; -1 to drop;
34  * 0 cannot decide, ask me later.
35  */
36  struct NoOpArb {
37  int operator()(TransportMessageHeader const* h) {
38  return 1; //always keep it
39  }
40  };
41 
42 private:
43  friend struct NetContext; //only be used by NetContext
44  virtual void listenTo(comm::Topic const& t) = 0;
45  virtual void stopListenTo(comm::Topic const& t) = 0;
46 };
47 
48 
49 namespace recvtransportengine_detail {
50 using namespace hmbdc::time;
51 using namespace std;
52 /**
53  * @brief impl class
54  * @details this needs to be created using NetContext and start in an app::Context
55  *
56  * @tparam OutputBuffer type of buffer to hold resulting network messages
57  */
58 template <typename OutputBuffer, typename MsgArbitrator>
63 , Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
64 , MessageHandler<RecvTransportEngine<OutputBuffer, MsgArbitrator>
65  , Subscribe, Unsubscribe, TopicSource> {
66  using SELF = RecvTransportEngine;
68  friend struct NetContext; //only be created by NetContext
69 
70 /**
71  * @brief ctor
72  * @details io_service could be passed in by user, in this case NO more than two threads should
73  * power this io_service instance since that would violate the thread garantee of Client, which
74  * is no callbacks are called in parallel
75  *
76  * @param cfg specify the details of the tcpcast transport
77  * @param outputBuffer holding the results
78  * @param arb arbitrator instance to decide which messages to drop and keep; it ONLY supports
79  * hmbdc message (AFTER topic filtering) level (NO packet level since it is tcp)
80  */
82  , OutputBuffer& outputBuffer
83  , MsgArbitrator arb = NoOpArb())
84  : RecvTransport(cfg)
85  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
86  , config_(cfg)
87  , mcConfig_(cfg)
88  , buffer_(std::max({sizeof(MessageWrap<Subscribe>)
89  , sizeof(MessageWrap<Unsubscribe>)
90  , sizeof(MessageWrap<TopicSource>)
91  }), config_.getExt<uint16_t>("cmdBufferSizePower2"))
92  , outputBuffer_(outputBuffer)
93  , maxItemSize_(outputBuffer.maxItemSize())
94  // , mcReceiver_(cfg, buffer_, &*pIos_)
95  , arb_(arb)
96  , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
97  cfg.getExt<string>("ifaceAddr")).c_str()))
98  , additionalTopicSourcesScanTimer_(
99  Duration::seconds(cfg.getExt<size_t>("additionalTopicSourcesScanPeriodSeconds"))) {
100  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
101  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
102  HMBDC_THROW(std::out_of_range
103  , "buffer is too small for notification: SessionStarted and SessionDropped");
104  }
105 
106  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
107  mcConfig_.put("outBufferSizePower2", 3u);
108  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
109  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
110 
111  mcReceiver_.reset(new mcast::RecvTransportImpl<decltype(buffer_)>(
112  mcConfig_, buffer_, mcast::RecvTransport::NoOpArb()));
113 
114  cfg(additionalTopicSources_, "additionalTopicSources");
115  }
116 
117 /**
118  * @brief in the environment that multicast cannot be enabled on either the sender or the receiver side,
119  * the sender info cannot be automatically detected by the receiver.
120  * User can use this method to explicitedly tell the receiver about the sender information
121  * @details not like additionalTopicSources config which will periodically retry,
122  * this call has no effect if the sender is not up and fully ready
123  *
124  * @param s describing the sender
125  */
126  void tryTopicSource(TopicSource const& s) {
127  buffer_.put(MessageWrap<TopicSource>(s));
128  }
129 
130 /**
131  * @brief start the show by schedule the message recv
132  */
133  /*virtual*/
134  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
135  mcReceiver_->start();
136  mcReceiver_->listenTo(tcpcastAdTopic_);
137 
138  setCallback(
139  [this](TimerManager& tm, SysTime const& now) {
140  for (auto& s : recvSessions_) {
141  s.second->heartbeat();
142  }
143  }
144  );
145 
146  schedule(SysTime::now(), *this);
147  if (additionalTopicSources_.size()) {
148  additionalTopicSourcesScanTimer_.setCallback(
149  [this](TimerManager& tm, SysTime const& now) {
150  for (auto const& s : additionalTopicSources_) {
151  handleMessageCb(s);
152  }
153  }
154  );
155  schedule(SysTime::now(), additionalTopicSourcesScanTimer_);
156  }
157  };
158 
159  using Transport::hmbdcName;
160  using Transport::schedSpec;
161 
162 /**
163  * @brief should not happen ever unless an exception thrown
164  *
165  * @param e exception thown
166  */
167  /*virtual*/
168  void stoppedCb(std::exception const& e) override {
169  HMBDC_LOG_C(e.what());
170  };
171 
172 /**
173  * @brief power the io_service and other things
174  *
175  */
176  /*virtual*/
177  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
178  for (auto it = recvSessions_.begin(); it != recvSessions_.end();) {
179  if (hmbdc_unlikely(!(*it).second->runOnce())) {
180  (*it).second->stop();
181  recvSessions_.erase(it++);
182  } else {
183  it++;
184  }
185  }
187  auto n = buffer_.peek(begin, end);
188  auto it = begin;
189  while (it != end) {
190  MH::handleMessage(*static_cast<MessageHead*>(*it++));
191  }
192  buffer_.wasteAfterPeek(begin, n);
193  mcReceiver_->runOnce(true);
194  }
195 
196 /**
197  * @brief only used by MH
198  */
199  void handleMessageCb(Subscribe const& t) {
200  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
201  for (auto& p : recvSessions_) {
202  p.second->sendSubscribe(t.topic);
203  }
204  }
205 
206 /**
207  * @brief only used by MH
208  */
209  void handleMessageCb(Unsubscribe const& t) {
210  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
211  for (auto& p : recvSessions_) {
212  p.second->sendUnsubscribe(t.topic);
213  }
214  }
215 
216 /**
217  * @brief only used by MH
218  */
219  void handleMessageCb(TopicSource const& t) {
220  auto ip = inet_addr(t.ip);
221  //unless using loopback address, don't EVER listen to myself (own process)
222  if (ip != 0x100007ful //127.0.0.1
223  && ip == myIp_
224  && (!t.loopback || t.pid == getpid())) {
225  return;
226  }
227  auto key = make_pair(ip, t.port);
228  if (recvSessions_.find(key) == recvSessions_.end()) {
229  boost::regex topicRegex(t.topicRegex);
230  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
231  if (boost::regex_match(it->first.c_str(), topicRegex)
232  || it->second) {
233  try {
234  auto sess = new Session(
235  config_
236  , subscriptions_
237  , topicRegex
238  , outputBuffer_
239  , arb_
240  );
241  sess->start(ip, t.port);
242  recvSessions_[key] = typename Session::ptr(sess);
243  } catch (std::exception const& e) {
244  HMBDC_LOG_C(e.what());
245  }
246  break;
247  }
248  }
249  } //else ignore
250  }
251 
252 private:
254  void listenTo(Topic const& t) override {
255  buffer_.put(MessageWrap<Subscribe>{t});
256  }
257 
258  void stopListenTo(Topic const& t) override {
259  buffer_.put(MessageWrap<Unsubscribe>{t});
260  }
261  Config config_, mcConfig_;
263  OutputBuffer& outputBuffer_;
264  size_t maxItemSize_;
265  unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
266 
267  text::StringTrieSet subscriptions_;
268  boost::unordered_map<pair<uint64_t, uint16_t>
269  , typename Session::ptr> recvSessions_;
270  MsgArbitrator arb_;
271  uint32_t myIp_;
272  vector<TopicSource> additionalTopicSources_;
273  ReoccuringTimer additionalTopicSourcesScanTimer_;
274 };
275 
276 } //recvtransportengine_detail
277 template <typename OutputBuffer, typename MsgArbitrator>
279 }}}
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:81
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Messages.hpp:71
Definition: Timers.hpp:65
Definition: Messages.hpp:64
impl class
Definition: RecvTransportEngine.hpp:59
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:219
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
Definition: Transport.hpp:64
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:140
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:209
Definition: Time.hpp:14
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:72
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:168
Definition: Time.hpp:125
void tryTopicSource(TopicSource const &s)
in the environment that multicast cannot be enabled on either the sender or the receiver side...
Definition: RecvTransportEngine.hpp:126
Definition: Messages.hpp:78
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:177
Definition: Rater.hpp:10
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
impl class
Definition: RecvTransportEngine.hpp:59
Definition: Base.hpp:12
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:199
Definition: Timers.hpp:104
Definition: MessageHandler.hpp:36
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:126
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:134
Definition: LockFreeBufferMisc.hpp:74