hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/tcpcast/Transport.hpp"
4 #include "hmbdc/app/tcpcast/RecvSession.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
7 #include "hmbdc/app/LoggerT.hpp"
8 #include "hmbdc/text/StringTrieSet.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 
14 #include <memory>
15 
16 namespace hmbdc { namespace app { namespace tcpcast {
17 
18 using namespace hmbdc;
19 using namespace hmbdc::time;
20 
21 using namespace boost::asio;
22 
23 /**
24  * @brief interface to power a tcpcast transport receiving functions
25  */
27 : Transport {
28  using ptr = std::shared_ptr<RecvTransport>;
29  using Transport::Transport;
30  virtual ~RecvTransport(){}
31  /**
32  * @brief a take all arbitrator (no arbitration at all)
33  * @details it provides the default type for arbitration which does nothing
34  * it also provides a template on writing a user defined arbitrator
35  *
36  * @param h handle to retrieve what is inside of the message
37  * @return always returns 1 here
38  * In general: 1 keep the message; -1 to drop;
39  * 0 cannot decide, ask me later.
40  */
41  struct NoOpArb {
42  int operator()(TransportMessageHeader const* h) {
43  return 1; //always keep it
44  }
45  };
46 
47 private:
48  friend class NetContext; //only be used by NetContext
49  virtual void listenTo(Topic const& t) = 0;
50  virtual void stopListenTo(Topic const& t) = 0;
51 };
52 
53 /**
54  * @brief impl class
55  * @details this needs to be created using NetContext and start in an app::Context
56  *
57  * @tparam OutputBuffer type of buffer to hold resulting network messages
58  */
59 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
65 , Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
66 , MessageHandler<RecvTransportEngine<OutputBuffer, MsgArbitrator>
67  , Subscribe, Unsubscribe, TopicSource> {
68  using SELF = RecvTransportEngine;
70  friend class NetContext; //only be created by NetContext
71 
72 /**
73  * @brief ctor
74  * @details io_service could be passed in by user, in this case NO more than two threads should
75  * power this io_service instance since that would violate the thread garantee of Client, which
76  * is no callbacks are called in parallel
77  *
78  * @param cfg specify the details of the tcpcast transport
79  * @param outputBuffer holding the results
80  * @param arb arbitrator instance to decide which messages to drop and keep; it ONLY supports
81  * hmbdc message (AFTER topic filtering) level (NO packet level since it is tcp)
82  */
84  , OutputBuffer& outputBuffer
85  , MsgArbitrator arb = NoOpArb())
86  : RecvTransport(cfg)
87  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
88  , config_(cfg)
89  , mcConfig_(cfg)
90  , buffer_(std::max({sizeof(Subscribe), sizeof(Unsubscribe), sizeof(TopicSource)})
91  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
92  , outputBuffer_(outputBuffer)
93  , maxItemSize_(outputBuffer.maxItemSize())
94  // , mcReceiver_(cfg, buffer_, &*pIos_)
95  , arb_(arb)
96  , myIp_(ip::address_v4::from_string(cfg.getExt<string>("ifaceAddr").c_str()).to_ulong()) {
97  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
98  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
99  HMBDC_THROW(std::out_of_range
100  , "buffer is too small for notification: SessionStarted and SessionDropped");
101  }
102 
103  // mcConfig_.put("ifaceAddr", cfg.getExt<string>("ifaceAddr"));
104  mcConfig_.put("outBufferSizePower2", 3u);
105  // mcConfig_.put("mcastPort", cfg.getExt<uint16_t>("mcastPort"));
106  // mcConfig_.put("mcastAddr", cfg.getExt<string>("mcastAddr"));
107 
108  mcReceiver_.reset(new mcast::RecvTransportImpl<decltype(buffer_)>(
109  mcConfig_, buffer_, mcast::RecvTransport::NoOpArb()));
110  }
111 
112 /**
113  * @brief start the show by schedule the message recv
114  */
115  /*virtual*/
116  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
117  this->initInThread();
118  mcReceiver_->initInThread();
119  mcReceiver_->start();
120  mcReceiver_->listenTo(tcpcastAdTopic_);
121 
122  setCallback(
123  [this](TimerManager& tm, SysTime const& now) {
124  for (auto& s : recvSessions_) {
125  s.second->heartbeat();
126  }
127  }
128  );
129 
130  schedule(SysTime::now(), *this);
131  };
132 
133  char const* hmbdcName() const {
134  return this->hmbdcName_.c_str();
135  }
136 
137  std::tuple<char const*, int> schedSpec() const {
138  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
139  }
140 
141 /**
142  * @brief should not happen ever unless an exception thrown
143  *
144  * @param e exception thown
145  */
146  /*virtual*/
147  void stoppedCb(std::exception const& e) override {
148  HMBDC_LOG_C(e.what());
149  };
150 
151 /**
152  * @brief power the io_service and other things
153  *
154  */
155  /*virtual*/
156  void invokedCb(uint16_t threadSerialNumber) __restrict__ override {
157  MonoLockFreeBuffer::iterator begin, end;
158  auto n = buffer_.peek(begin, end);
159  auto it = begin;
160  while (it != end) {
161  MH::handleMessage(*static_cast<MessageHead*>(*it++));
162  }
163  buffer_.wasteAfterPeek(begin, n);
164  mcReceiver_->runOnce();
165  //the following is done above
166  // boost::system::error_code ec;
167  // *pIos_.poll(ec);
168  // if (ec) HMBDC_LOG_C("io_service error=", ec);
169  }
170 
171 /**
172  * @brief only used by MH
173  */
174  void handleMessageCb(Subscribe const& t) {
175  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
176  for (auto& p : recvSessions_) {
177  p.second->sendSubscribe(t.topic);
178  }
179  }
180 
181 /**
182  * @brief only used by MH
183  */
184  void handleMessageCb(Unsubscribe const& t) {
185  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
186  for (auto& p : recvSessions_) {
187  p.second->sendUnsubscribe(t.topic);
188  }
189  }
190 
191 /**
192  * @brief only used by MH
193  */
194  void handleMessageCb(TopicSource const& t) {
195  auto ip = ip::address_v4::from_string(t.ip).to_ulong();
196  //unless using loopback address, don't EVER listen to myself (own process)
197  if (ip != 2130706433ul //127.0.0.1
198  && ip == myIp_
199  && (!t.loopback || t.pid == getpid())) {
200  return;
201  }
202  auto key = make_pair(ip, t.port);
203  auto it = topicSourceDict_.find(key);
204 
205  if (likely(it != topicSourceDict_.end())) {
206  if (likely(it->second == t.timestamp)) {
207  return; //seen it before
208  } else {
209  it->second = t.timestamp;
210  }
211  } else {
212  topicSourceDict_[key] = t.timestamp;
213  }
214  if (recvSessions_.find(key) == recvSessions_.end()) {
215  boost::regex topicRegex(t.topicRegex);
216  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
217  if (boost::regex_match(it->first.c_str(), topicRegex)
218  || it->second) { //when wildcard is involved, assuming we need the connection
219  tcp::resolver resolver(*pIos_);
220  auto epIt = resolver.resolve(
221  tcp::resolver::query(t.ip, to_string(t.port)));
222  auto sess = new Session(
223  config_
224  , subscriptions_
225  , topicRegex
226  , [this, key]() {
227  recvSessions_.erase(key);
228  topicSourceDict_.erase(key);
229  }
230  , *pIos_
231  , outputBuffer_
232  , arb_
233  );
234  recvSessions_[key] = typename Session::ptr(sess);
235  sess->start(epIt);
236  break;
237  }
238  }
239  } //else ignore
240  }
241 
242 private:
244  void listenTo(Topic const& t) override {
245  buffer_.put(MessageWrap<Subscribe>{t});
246  }
247 
248  void stopListenTo(Topic const& t) override {
249  buffer_.put(MessageWrap<Unsubscribe>{t});
250  }
251  Config config_, mcConfig_;
253  OutputBuffer& outputBuffer_;
254  size_t maxItemSize_;
255  unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
256 
257  StringTrieSet subscriptions_;
258  boost::unordered_map<pair<uint64_t, uint16_t>
259  , typename Session::ptr> recvSessions_;
260  boost::unordered_map<pair<uint64_t, uint16_t>
261  , time::SysTime> topicSourceDict_;
262  MsgArbitrator arb_;
263  uint64_t myIp_;
264 };
265 
266 }}}
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
impl class
Definition: RecvTransportEngine.hpp:60
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
impl class
Definition: RecvTransportEngine.hpp:57
Definition: StringTrieSet.hpp:113
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:116
Definition: TypedString.hpp:74
Definition: Messages.hpp:122
Definition: Timers.hpp:69
Definition: Messages.hpp:115
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:184
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
Definition: Transport.hpp:27
Definition: Messages.hpp:160
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:147
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:174
Definition: Time.hpp:15
Definition: NetContext.hpp:28
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:83
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:41
Definition: Message.hpp:46
Definition: Time.hpp:118
Definition: Messages.hpp:129
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:194
Definition: Rater.hpp:10
Definition: RecvSession.hpp:29
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:26
Definition: Client.hpp:39
Definition: Client.hpp:11
void invokedCb(uint16_t threadSerialNumber) __restrict__ override
power the io_service and other things
Definition: RecvTransportEngine.hpp:156
Definition: Timers.hpp:100
Definition: MessageHandler.hpp:38
Definition: Messages.hpp:151
Definition: LockFreeBufferMisc.hpp:73