hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/tcpcast/Transport.hpp"
4 #include "hmbdc/app/tcpcast/RecvSession.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/udpcast/RecvTransportEngine.hpp"
7 #include "hmbdc/app/Logger.hpp"
8 #include "hmbdc/text/StringTrieSet.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 
14 #include <memory>
15 
16 namespace hmbdc { namespace app { namespace tcpcast {
17 
18 /**
19  * @brief interface to power a tcpcast transport receiving functions
20  */
22 : Transport {
23  using ptr = std::shared_ptr<RecvTransport>;
24  using Transport::Transport;
25  virtual ~RecvTransport(){}
26  /**
27  * @brief a take all arbitrator (no arbitration at all)
28  * @details it provides the default type for arbitration which does nothing
29  * it also provides a template on writing a user defined arbitrator
30  *
31  * @param h handle to retrieve what is inside of the message
32  * @return always returns 1 here
33  * In general: 1 keep the message; -1 to drop;
34  * 0 cannot decide, ask me later.
35  */
36  struct NoOpArb {
37  int operator()(TransportMessageHeader const* h) {
38  return 1; //always keep it
39  }
40  };
41 
42 private:
43  friend struct NetContext; //only be used by NetContext
44  virtual void listenTo(comm::Topic const& t) = 0;
45  virtual void stopListenTo(comm::Topic const& t) = 0;
46 };
47 
48 
49 namespace recvtransportengine_detail {
50 using namespace hmbdc::time;
51 using namespace std;
52 /**
53  * @brief impl class
54  * @details this needs to be created using NetContext and start in an app::Context
55  *
56  * @tparam OutputBuffer type of buffer to hold resulting network messages
57  */
58 template <typename OutputBuffer, typename MsgArbitrator>
63 , Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
64 , MessageHandler<RecvTransportEngine<OutputBuffer, MsgArbitrator>
65  , Subscribe, Unsubscribe, TopicSource> {
66  using SELF = RecvTransportEngine;
68  friend struct NetContext; //only be created by NetContext
69 
70 /**
71  * @brief ctor
72  * @details io_service could be passed in by user, in this case NO more than two threads should
73  * power this io_service instance since that would violate the thread garantee of Client, which
74  * is no callbacks are called in parallel
75  *
76  * @param cfg specify the details of the tcpcast transport
77  * @param outputBuffer holding the results
78  * @param arb arbitrator instance to decide which messages to drop and keep; it ONLY supports
79  * hmbdc message (AFTER topic filtering) level (NO packet level since it is tcp)
80  */
82  , OutputBuffer& outputBuffer
83  , MsgArbitrator arb = NoOpArb())
84  : RecvTransport(cfg)
85  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
86  , config_(cfg)
87  , mcConfig_(cfg)
88  , buffer_(std::max({sizeof(MessageWrap<Subscribe>)
89  , sizeof(MessageWrap<Unsubscribe>)
90  , sizeof(MessageWrap<TopicSource>)
91  }), config_.getExt<uint16_t>("cmdBufferSizePower2"))
92  , outputBuffer_(outputBuffer)
93  , maxItemSize_(outputBuffer.maxItemSize())
94  , arb_(arb)
95  , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
96  cfg.getExt<string>("ifaceAddr")).c_str()))
97  , mcastEmuProxyScanTimer_(
98  Duration::seconds(cfg.getExt<size_t>("mcastEmuProxyScanPeriodSeconds"))) {
99  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
100  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
101  HMBDC_THROW(std::out_of_range
102  , "buffer is too small for notification: SessionStarted and SessionDropped");
103  }
104  mcConfig_.put("outBufferSizePower2", 3u);
105  mcReceiver_.reset(new udpcast::RecvTransportImpl<decltype(buffer_)>(
106  mcConfig_, buffer_, udpcast::RecvTransport::NoOpArb()));
107 
108  if (cfg.getExt<bool>("useTcpcastMcProxy")) {
109  config_(tcpcastMcEmuProxyAddrs_, "udpcastDests");
110  if (tcpcastMcEmuProxyAddrs_.size() == 0) {
111  HMBDC_THROW(std::out_of_range, "no TcpcastEmuProxy specified");
112  }
113  topicSinkMsgBufLen_ =
114  udpcast::SendTransport::encode(tcpcastAdTopic_, topicSinkMsgBuf_, sizeof(topicSinkMsgBuf_)
115  , TopicSink{cfg.getExt<std::string>("udpcastListenAddr")
116  , cfg.getExt<uint16_t>("udpcastListenPort")})->wireSize();
117  }
118  }
119 
120 /**
121  * @brief start the show by schedule the message recv
122  */
123  /*virtual*/
124  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
125  mcReceiver_->start();
126  mcReceiver_->listenTo(tcpcastAdTopic_);
127 
128  setCallback(
129  [this](TimerManager& tm, SysTime const& now) {
130  for (auto& s : recvSessions_) {
131  s.second->heartbeat();
132  }
133  }
134  );
135 
136  schedule(SysTime::now(), *this);
137  if (tcpcastMcEmuProxyAddrs_.size()) {
138  mcastEmuProxyScanTimer_.setCallback(
139  [this](TimerManager& tm, SysTime const& now) {
140  for (auto const& s : tcpcastMcEmuProxyAddrs_) {
141  sendto(mcReceiver_->fd, topicSinkMsgBuf_, topicSinkMsgBufLen_
142  , MSG_NOSIGNAL|MSG_DONTWAIT, (struct sockaddr *)&s.v, sizeof(s.v));
143  }
144  }
145  );
146  schedule(SysTime::now(), mcastEmuProxyScanTimer_);
147  }
148  };
149 
150  using Transport::hmbdcName;
151  using Transport::schedSpec;
152 
153 /**
154  * @brief should not happen ever unless an exception thrown
155  *
156  * @param e exception thown
157  */
158  /*virtual*/
159  void stoppedCb(std::exception const& e) override {
160  HMBDC_LOG_C(e.what());
161  };
162 
163 /**
164  * @brief power the io_service and other things
165  *
166  */
167  /*virtual*/
168  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
169  for (auto it = recvSessions_.begin(); it != recvSessions_.end();) {
170  if (hmbdc_unlikely(!(*it).second->runOnce())) {
171  (*it).second->stop();
172  recvSessions_.erase(it++);
173  } else {
174  it++;
175  }
176  }
178  auto n = buffer_.peek(begin, end);
179  auto it = begin;
180  while (it != end) {
181  MH::handleMessage(*static_cast<MessageHead*>(*it++));
182  }
183  buffer_.wasteAfterPeek(begin, n);
184 
185  mcReceiver_->runOnce(true);
186  }
187 
188 /**
189  * @brief only used by MH
190  */
191  void handleMessageCb(Subscribe const& t) {
192  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
193  for (auto& p : recvSessions_) {
194  p.second->sendSubscribe(t.topic);
195  }
196  }
197 
198 /**
199  * @brief only used by MH
200  */
201  void handleMessageCb(Unsubscribe const& t) {
202  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
203  for (auto& p : recvSessions_) {
204  p.second->sendUnsubscribe(t.topic);
205  }
206  }
207 
208 /**
209  * @brief only used by MH
210  */
211  void handleMessageCb(TopicSource const& t) {
212  auto ip = inet_addr(t.ip);
213  //unless using loopback address, don't EVER listen to myself (own process)
214  if (ip != 0x100007ful //127.0.0.1
215  && ip == myIp_
216  && (!t.loopback || t.pid == getpid())) {
217  return;
218  }
219  auto key = make_pair(ip, t.port);
220  if (recvSessions_.find(key) == recvSessions_.end()) {
221  boost::regex topicRegex(t.topicRegex);
222  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
223  if (boost::regex_match(it->first.c_str(), topicRegex)
224  || it->second) {
225  try {
226  auto sess = new Session(
227  config_
228  , subscriptions_
229  , topicRegex
230  , outputBuffer_
231  , t.connKey
232  , arb_
233  );
234  sess->start(ip, t.port);
235  recvSessions_[key] = typename Session::ptr(sess);
236  } catch (std::exception const& e) {
237  HMBDC_LOG_C(e.what());
238  }
239  break;
240  }
241  }
242  } //else ignore
243  }
244 
245 private:
247  void listenTo(Topic const& t) override {
248  buffer_.put(MessageWrap<Subscribe>{t});
249  }
250 
251  void stopListenTo(Topic const& t) override {
252  buffer_.put(MessageWrap<Unsubscribe>{t});
253  }
254  Config config_, mcConfig_;
256  OutputBuffer& outputBuffer_;
257  size_t maxItemSize_;
258  unique_ptr<udpcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
259 
260  text::StringTrieSet subscriptions_;
261  boost::unordered_map<pair<uint64_t, uint16_t>
262  , typename Session::ptr> recvSessions_;
263  MsgArbitrator arb_;
264  uint32_t myIp_;
265  std::vector<comm::inet::Endpoint> tcpcastMcEmuProxyAddrs_;
266  ReoccuringTimer mcastEmuProxyScanTimer_;
267  char topicSinkMsgBuf_[64];
268  size_t topicSinkMsgBufLen_;
269 };
270 
271 } //recvtransportengine_detail
272 template <typename OutputBuffer, typename MsgArbitrator>
274 }}}
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:81
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:46
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:180
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: Messages.hpp:73
Definition: Timers.hpp:65
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
Definition: Messages.hpp:66
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:211
Definition: Transport.hpp:69
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:145
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:201
Definition: Messages.hpp:154
Definition: Time.hpp:14
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
impl class
Definition: RecvTransportEngine.hpp:59
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:76
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:159
Definition: Time.hpp:125
Definition: Messages.hpp:82
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:168
Definition: Rater.hpp:10
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
impl class
Definition: RecvTransportEngine.hpp:59
Definition: Base.hpp:12
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:191
Definition: Timers.hpp:104
Definition: MessageHandler.hpp:39
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:131
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:124
Definition: LockFreeBufferMisc.hpp:74