hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/tcpcast/Transport.hpp"
4 #include "hmbdc/app/tcpcast/RecvSession.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
7 #include "hmbdc/app/LoggerT.hpp"
8 #include "hmbdc/text/StringTrieSet.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 
14 #include <memory>
15 
16 namespace hmbdc { namespace app { namespace tcpcast {
17 
18 /**
19  * @brief interface to power a tcpcast transport receiving functions
20  */
22 : Transport {
23  using ptr = std::shared_ptr<RecvTransport>;
24  using Transport::Transport;
25  virtual ~RecvTransport(){}
26  /**
27  * @brief a take all arbitrator (no arbitration at all)
28  * @details it provides the default type for arbitration which does nothing
29  * it also provides a template on writing a user defined arbitrator
30  *
31  * @param h handle to retrieve what is inside of the message
32  * @return always returns 1 here
33  * In general: 1 keep the message; -1 to drop;
34  * 0 cannot decide, ask me later.
35  */
36  struct NoOpArb {
37  int operator()(TransportMessageHeader const* h) {
38  return 1; //always keep it
39  }
40  };
41 
42 private:
43  friend struct NetContext; //only be used by NetContext
44  virtual void listenTo(comm::Topic const& t) = 0;
45  virtual void stopListenTo(comm::Topic const& t) = 0;
46 };
47 
48 
49 namespace recvtransportengine_detail {
50 using namespace hmbdc::time;
51 using namespace boost::asio;
52 using boost::asio::ip::tcp;
53 using namespace std;
54 /**
55  * @brief impl class
56  * @details this needs to be created using NetContext and start in an app::Context
57  *
58  * @tparam OutputBuffer type of buffer to hold resulting network messages
59  */
60 template <typename OutputBuffer, typename MsgArbitrator>
66 , Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
67 , MessageHandler<RecvTransportEngine<OutputBuffer, MsgArbitrator>
68  , Subscribe, Unsubscribe, TopicSource> {
69  using SELF = RecvTransportEngine;
71  friend struct NetContext; //only be created by NetContext
72 
73 /**
74  * @brief ctor
75  * @details io_service could be passed in by user, in this case NO more than two threads should
76  * power this io_service instance since that would violate the thread garantee of Client, which
77  * is no callbacks are called in parallel
78  *
79  * @param cfg specify the details of the tcpcast transport
80  * @param outputBuffer holding the results
81  * @param arb arbitrator instance to decide which messages to drop and keep; it ONLY supports
82  * hmbdc message (AFTER topic filtering) level (NO packet level since it is tcp)
83  */
85  , OutputBuffer& outputBuffer
86  , MsgArbitrator arb = NoOpArb())
87  : RecvTransport(cfg)
88  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
89  , config_(cfg)
90  , mcConfig_(cfg)
91  , buffer_(std::max({sizeof(MessageWrap<Subscribe>)
92  , sizeof(MessageWrap<Unsubscribe>)
93  , sizeof(MessageWrap<TopicSource>)
94  }), config_.getExt<uint16_t>("cmdBufferSizePower2"))
95  , outputBuffer_(outputBuffer)
96  , maxItemSize_(outputBuffer.maxItemSize())
97  // , mcReceiver_(cfg, buffer_, &*pIos_)
98  , arb_(arb)
99  , myIp_(ip::address_v4::from_string(
100  hmbdc::comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr"))
101  ).to_ulong())
102  , additionalTopicSourcesScanTimer_(
103  Duration::seconds(cfg.getExt<size_t>("additionalTopicSourcesScanPeriodSeconds"))) {
104  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
105  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
106  HMBDC_THROW(std::out_of_range
107  , "buffer is too small for notification: SessionStarted and SessionDropped");
108  }
109 
110  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
111  mcConfig_.put("outBufferSizePower2", 3u);
112  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
113  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
114 
115  mcReceiver_.reset(new mcast::RecvTransportImpl<decltype(buffer_)>(
116  mcConfig_, buffer_, mcast::RecvTransport::NoOpArb()));
117 
118  cfg(additionalTopicSources_, "additionalTopicSources");
119  }
120 
121 /**
122  * @brief in the environment that multicast cannot be enabled on either the sender or the receiver side,
123  * the sender info cannot be automatically detected by the receiver.
124  * User can use this method to explicitedly tell the receiver about the sender information
125  * @details not like additionalTopicSources config which will periodically retry,
126  * this call has no effect if the sender is not up and fully ready
127  *
128  * @param s describing the sender
129  */
130  void tryTopicSource(TopicSource const& s) {
131  buffer_.put(MessageWrap<TopicSource>(s));
132  }
133 
134 /**
135  * @brief start the show by schedule the message recv
136  */
137  /*virtual*/
138  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
139  this->initInThread();
140  mcReceiver_->initInThread();
141  mcReceiver_->start();
142  mcReceiver_->listenTo(tcpcastAdTopic_);
143 
144  setCallback(
145  [this](TimerManager& tm, SysTime const& now) {
146  for (auto& s : recvSessions_) {
147  s.second->heartbeat();
148  }
149  }
150  );
151 
152  schedule(SysTime::now(), *this);
153  if (additionalTopicSources_.size()) {
154  additionalTopicSourcesScanTimer_.setCallback(
155  [this](TimerManager& tm, SysTime const& now) {
156  for (auto const& s : additionalTopicSources_) {
157  handleMessageCb(s);
158  }
159  }
160  );
161  schedule(SysTime::now(), additionalTopicSourcesScanTimer_);
162  }
163  };
164 
165  char const* hmbdcName() const {
166  return this->hmbdcName_.c_str();
167  }
168 
169  std::tuple<char const*, int> schedSpec() const {
170  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
171  }
172 
173 /**
174  * @brief should not happen ever unless an exception thrown
175  *
176  * @param e exception thown
177  */
178  /*virtual*/
179  void stoppedCb(std::exception const& e) override {
180  HMBDC_LOG_C(e.what());
181  };
182 
183 /**
184  * @brief power the io_service and other things
185  *
186  */
187  /*virtual*/
188  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
190  auto n = buffer_.peek(begin, end);
191  auto it = begin;
192  while (it != end) {
193  MH::handleMessage(*static_cast<MessageHead*>(*it++));
194  }
195  buffer_.wasteAfterPeek(begin, n);
196  mcReceiver_->runOnce();
197  }
198 
199 /**
200  * @brief only used by MH
201  */
202  void handleMessageCb(Subscribe const& t) {
203  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
204  for (auto& p : recvSessions_) {
205  p.second->sendSubscribe(t.topic);
206  }
207  }
208 
209 /**
210  * @brief only used by MH
211  */
212  void handleMessageCb(Unsubscribe const& t) {
213  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
214  for (auto& p : recvSessions_) {
215  p.second->sendUnsubscribe(t.topic);
216  }
217  }
218 
219 /**
220  * @brief only used by MH
221  */
222  void handleMessageCb(TopicSource const& t) {
223  auto ip = ip::address_v4::from_string(t.ip).to_ulong();
224  //unless using loopback address, don't EVER listen to myself (own process)
225  if (ip != 2130706433ul //127.0.0.1
226  && ip == myIp_
227  && (!t.loopback || t.pid == getpid())) {
228  return;
229  }
230  auto key = make_pair(ip, t.port);
231  if (recvSessions_.find(key) == recvSessions_.end()) {
232  boost::regex topicRegex(t.topicRegex);
233  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
234  if (boost::regex_match(it->first.c_str(), topicRegex)
235  || it->second) { //when wildcard is involved, assuming we need the connection
236  tcp::resolver resolver(*pIos_);
237  auto epIt = resolver.resolve(
238  tcp::resolver::query(t.ip, to_string(t.port)));
239  auto sess = new Session(
240  config_
241  , subscriptions_
242  , topicRegex
243  , [this, key]() {
244  recvSessions_.erase(key);
245  }
246  , *pIos_
247  , outputBuffer_
248  , arb_
249  );
250  recvSessions_[key] = typename Session::ptr(sess);
251  sess->start(epIt);
252  break;
253  }
254  }
255  } //else ignore
256  }
257 
258 private:
260  void listenTo(Topic const& t) override {
261  buffer_.put(MessageWrap<Subscribe>{t});
262  }
263 
264  void stopListenTo(Topic const& t) override {
265  buffer_.put(MessageWrap<Unsubscribe>{t});
266  }
267  Config config_, mcConfig_;
269  OutputBuffer& outputBuffer_;
270  size_t maxItemSize_;
271  unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
272 
273  text::StringTrieSet subscriptions_;
274  boost::unordered_map<pair<uint64_t, uint16_t>
275  , typename Session::ptr> recvSessions_;
276  MsgArbitrator arb_;
277  uint64_t myIp_;
278  vector<TopicSource> additionalTopicSources_;
279  ReoccuringTimer additionalTopicSourcesScanTimer_;
280 };
281 
282 } //recvtransportengine_detail
283 template <typename OutputBuffer, typename MsgArbitrator>
285 }}}
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:84
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Messages.hpp:70
Definition: Timers.hpp:65
Definition: Messages.hpp:63
impl class
Definition: RecvTransportEngine.hpp:57
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:222
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Transport.hpp:22
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:141
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:212
Definition: Time.hpp:13
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:179
Definition: Time.hpp:116
void tryTopicSource(TopicSource const &s)
in the environment that multicast cannot be enabled on either the sender or the receiver side...
Definition: RecvTransportEngine.hpp:130
Definition: Messages.hpp:77
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:188
Definition: Rater.hpp:10
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
impl class
Definition: RecvTransportEngine.hpp:61
Definition: Base.hpp:12
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:202
Definition: Timers.hpp:96
Definition: MessageHandler.hpp:36
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:127
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:138
Definition: LockFreeBufferMisc.hpp:73