hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/tcpcast/Transport.hpp"
4 #include "hmbdc/app/tcpcast/RecvSession.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
7 #include "hmbdc/app/LoggerT.hpp"
8 #include "hmbdc/text/StringTrieSet.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 
14 #include <memory>
15 
16 namespace hmbdc { namespace app { namespace tcpcast {
17 
18 /**
19  * @brief interface to power a tcpcast transport receiving functions
20  */
22 : Transport {
23  using ptr = std::shared_ptr<RecvTransport>;
24  using Transport::Transport;
25  virtual ~RecvTransport(){}
26  /**
27  * @brief a take all arbitrator (no arbitration at all)
28  * @details it provides the default type for arbitration which does nothing
29  * it also provides a template on writing a user defined arbitrator
30  *
31  * @param h handle to retrieve what is inside of the message
32  * @return always returns 1 here
33  * In general: 1 keep the message; -1 to drop;
34  * 0 cannot decide, ask me later.
35  */
36  struct NoOpArb {
37  int operator()(TransportMessageHeader const* h) {
38  return 1; //always keep it
39  }
40  };
41 
42 private:
43  friend struct NetContext; //only be used by NetContext
44  virtual void listenTo(comm::Topic const& t) = 0;
45  virtual void stopListenTo(comm::Topic const& t) = 0;
46 };
47 
48 
49 namespace recvtransportengine_detail {
50 using namespace hmbdc::time;
51 using namespace boost::asio;
52 using boost::asio::ip::tcp;
53 using namespace std;
54 /**
55  * @brief impl class
56  * @details this needs to be created using NetContext and start in an app::Context
57  *
58  * @tparam OutputBuffer type of buffer to hold resulting network messages
59  */
60 template <typename OutputBuffer, typename MsgArbitrator>
66 , Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
67 , MessageHandler<RecvTransportEngine<OutputBuffer, MsgArbitrator>
68  , Subscribe, Unsubscribe, TopicSource> {
69  using SELF = RecvTransportEngine;
71  friend struct NetContext; //only be created by NetContext
72 
73 /**
74  * @brief ctor
75  * @details io_service could be passed in by user, in this case NO more than two threads should
76  * power this io_service instance since that would violate the thread garantee of Client, which
77  * is no callbacks are called in parallel
78  *
79  * @param cfg specify the details of the tcpcast transport
80  * @param outputBuffer holding the results
81  * @param arb arbitrator instance to decide which messages to drop and keep; it ONLY supports
82  * hmbdc message (AFTER topic filtering) level (NO packet level since it is tcp)
83  */
85  , OutputBuffer& outputBuffer
86  , MsgArbitrator arb = NoOpArb())
87  : RecvTransport(cfg)
88  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
89  , config_(cfg)
90  , mcConfig_(cfg)
91  , buffer_(std::max({sizeof(Subscribe), sizeof(Unsubscribe), sizeof(TopicSource)})
92  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
93  , outputBuffer_(outputBuffer)
94  , maxItemSize_(outputBuffer.maxItemSize())
95  // , mcReceiver_(cfg, buffer_, &*pIos_)
96  , arb_(arb)
97  , myIp_(ip::address_v4::from_string(
98  hmbdc::comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr"))
99  ).to_ulong()) {
100  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
101  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
102  HMBDC_THROW(std::out_of_range
103  , "buffer is too small for notification: SessionStarted and SessionDropped");
104  }
105 
106  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
107  mcConfig_.put("outBufferSizePower2", 3u);
108  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
109  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
110 
111  mcReceiver_.reset(new mcast::RecvTransportImpl<decltype(buffer_)>(
112  mcConfig_, buffer_, mcast::RecvTransport::NoOpArb()));
113  }
114 
115 /**
116  * @brief start the show by schedule the message recv
117  */
118  /*virtual*/
119  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
120  this->initInThread();
121  mcReceiver_->initInThread();
122  mcReceiver_->start();
123  mcReceiver_->listenTo(tcpcastAdTopic_);
124 
125  setCallback(
126  [this](TimerManager& tm, SysTime const& now) {
127  for (auto& s : recvSessions_) {
128  s.second->heartbeat();
129  }
130  }
131  );
132 
133  schedule(SysTime::now(), *this);
134  };
135 
136  char const* hmbdcName() const {
137  return this->hmbdcName_.c_str();
138  }
139 
140  std::tuple<char const*, int> schedSpec() const {
141  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
142  }
143 
144 /**
145  * @brief should not happen ever unless an exception thrown
146  *
147  * @param e exception thown
148  */
149  /*virtual*/
150  void stoppedCb(std::exception const& e) override {
151  HMBDC_LOG_C(e.what());
152  };
153 
154 /**
155  * @brief power the io_service and other things
156  *
157  */
158  /*virtual*/
159  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
161  auto n = buffer_.peek(begin, end);
162  auto it = begin;
163  while (it != end) {
164  MH::handleMessage(*static_cast<MessageHead*>(*it++));
165  }
166  buffer_.wasteAfterPeek(begin, n);
167  mcReceiver_->runOnce();
168  //the following is done above
169  // boost::system::error_code ec;
170  // *pIos_.poll(ec);
171  // if (ec) HMBDC_LOG_C("io_service error=", ec);
172  }
173 
174 /**
175  * @brief only used by MH
176  */
177  void handleMessageCb(Subscribe const& t) {
178  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
179  for (auto& p : recvSessions_) {
180  p.second->sendSubscribe(t.topic);
181  }
182  }
183 
184 /**
185  * @brief only used by MH
186  */
187  void handleMessageCb(Unsubscribe const& t) {
188  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
189  for (auto& p : recvSessions_) {
190  p.second->sendUnsubscribe(t.topic);
191  }
192  }
193 
194 /**
195  * @brief only used by MH
196  */
197  void handleMessageCb(TopicSource const& t) {
198  auto ip = ip::address_v4::from_string(t.ip).to_ulong();
199  //unless using loopback address, don't EVER listen to myself (own process)
200  if (ip != 2130706433ul //127.0.0.1
201  && ip == myIp_
202  && (!t.loopback || t.pid == getpid())) {
203  return;
204  }
205  auto key = make_pair(ip, t.port);
206  auto it = topicSourceDict_.find(key);
207 
208  if (likely(it != topicSourceDict_.end())) {
209  if (likely(it->second == t.timestamp)) {
210  return; //seen it before
211  } else {
212  it->second = t.timestamp;
213  }
214  } else {
215  topicSourceDict_[key] = t.timestamp;
216  }
217  if (recvSessions_.find(key) == recvSessions_.end()) {
218  boost::regex topicRegex(t.topicRegex);
219  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
220  if (boost::regex_match(it->first.c_str(), topicRegex)
221  || it->second) { //when wildcard is involved, assuming we need the connection
222  tcp::resolver resolver(*pIos_);
223  auto epIt = resolver.resolve(
224  tcp::resolver::query(t.ip, to_string(t.port)));
225  auto sess = new Session(
226  config_
227  , subscriptions_
228  , topicRegex
229  , [this, key]() {
230  recvSessions_.erase(key);
231  topicSourceDict_.erase(key);
232  }
233  , *pIos_
234  , outputBuffer_
235  , arb_
236  );
237  recvSessions_[key] = typename Session::ptr(sess);
238  sess->start(epIt);
239  break;
240  }
241  }
242  } //else ignore
243  }
244 
245 private:
247  void listenTo(Topic const& t) override {
248  buffer_.put(MessageWrap<Subscribe>{t});
249  }
250 
251  void stopListenTo(Topic const& t) override {
252  buffer_.put(MessageWrap<Unsubscribe>{t});
253  }
254  Config config_, mcConfig_;
256  OutputBuffer& outputBuffer_;
257  size_t maxItemSize_;
258  unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
259 
260  text::StringTrieSet subscriptions_;
261  boost::unordered_map<pair<uint64_t, uint16_t>
262  , typename Session::ptr> recvSessions_;
263  boost::unordered_map<pair<uint64_t, uint16_t>
264  , time::SysTime> topicSourceDict_;
265  MsgArbitrator arb_;
266  uint64_t myIp_;
267 };
268 
269 } //recvtransportengine_detail
270 template <typename OutputBuffer, typename MsgArbitrator>
272 }}}
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:84
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: Messages.hpp:68
Definition: Timers.hpp:65
Definition: Messages.hpp:61
impl class
Definition: RecvTransportEngine.hpp:57
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:197
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Transport.hpp:22
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:116
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:187
Definition: Time.hpp:13
a singleton that holding tcpcast resources
Definition: NetContext.hpp:43
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:150
Definition: Time.hpp:116
Definition: Messages.hpp:75
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:159
Definition: Rater.hpp:10
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
impl class
Definition: RecvTransportEngine.hpp:61
Definition: Base.hpp:12
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:177
Definition: Timers.hpp:96
Definition: MessageHandler.hpp:36
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:102
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:119
Definition: LockFreeBufferMisc.hpp:73