1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/tcpcast/Transport.hpp" 4 #include "hmbdc/app/tcpcast/RecvSession.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp" 7 #include "hmbdc/app/Logger.hpp" 8 #include "hmbdc/text/StringTrieSet.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 16 namespace hmbdc {
namespace app {
namespace tcpcast {
23 using ptr = std::shared_ptr<RecvTransport>;
24 using Transport::Transport;
45 virtual void stopListenTo(
comm::Topic const& t) = 0;
49 namespace recvtransportengine_detail {
58 template <
typename OutputBuffer,
typename MsgArbitrator>
63 ,
Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
65 , Subscribe, Unsubscribe, TopicSource> {
82 , OutputBuffer& outputBuffer
83 , MsgArbitrator arb =
NoOpArb())
91 }), config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
92 , outputBuffer_(outputBuffer)
93 , maxItemSize_(outputBuffer.maxItemSize())
96 , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
97 cfg.
getExt<
string>(
"ifaceAddr")).c_str()))
98 , additionalTopicSourcesScanTimer_(
99 Duration::seconds(cfg.
getExt<
size_t>(
"additionalTopicSourcesScanPeriodSeconds"))) {
102 HMBDC_THROW(std::out_of_range
103 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
107 mcConfig_.put(
"outBufferSizePower2", 3u);
114 cfg(additionalTopicSources_,
"additionalTopicSources");
135 mcReceiver_->start();
136 mcReceiver_->listenTo(tcpcastAdTopic_);
140 for (
auto& s : recvSessions_) {
141 s.second->heartbeat();
146 schedule(SysTime::now(), *
this);
147 if (additionalTopicSources_.size()) {
148 additionalTopicSourcesScanTimer_.setCallback(
150 for (
auto const& s : additionalTopicSources_) {
155 schedule(SysTime::now(), additionalTopicSourcesScanTimer_);
159 using Transport::hmbdcName;
160 using Transport::schedSpec;
169 HMBDC_LOG_C(e.what());
177 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
178 for (
auto it = recvSessions_.begin(); it != recvSessions_.end();) {
179 if (hmbdc_unlikely(!(*it).second->runOnce())) {
180 (*it).second->stop();
181 recvSessions_.erase(it++);
187 auto n = buffer_.peek(begin, end);
190 MH::handleMessage(*static_cast<MessageHead*>(*it++));
192 buffer_.wasteAfterPeek(begin, n);
193 mcReceiver_->runOnce(
true);
200 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
201 for (
auto& p : recvSessions_) {
202 p.second->sendSubscribe(t.topic);
210 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
211 for (
auto& p : recvSessions_) {
212 p.second->sendUnsubscribe(t.topic);
220 auto ip = inet_addr(t.ip);
222 if (ip != 0x100007ful
224 && (!t.loopback || t.pid == getpid())) {
227 auto key = make_pair(ip, t.port);
228 if (recvSessions_.find(key) == recvSessions_.end()) {
229 boost::regex topicRegex(t.topicRegex);
230 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
231 if (boost::regex_match(it->first.c_str(), topicRegex)
241 sess->start(ip, t.port);
242 recvSessions_[key] =
typename Session::ptr(sess);
243 }
catch (std::exception
const& e) {
244 HMBDC_LOG_C(e.what());
254 void listenTo(
Topic const& t)
override {
258 void stopListenTo(
Topic const& t)
override {
261 Config config_, mcConfig_;
263 OutputBuffer& outputBuffer_;
265 unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
268 boost::unordered_map<pair<uint64_t, uint16_t>
269 ,
typename Session::ptr> recvSessions_;
272 vector<TopicSource> additionalTopicSources_;
277 template <
typename OutputBuffer,
typename MsgArbitrator>
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:81
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Messages.hpp:71
Definition: Timers.hpp:65
Definition: Messages.hpp:64
impl class
Definition: RecvTransportEngine.hpp:59
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:219
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
Definition: Transport.hpp:64
this message appears in the receiver's buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:140
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:209
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
Definition: RecvSession.hpp:28
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:72
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:168
void tryTopicSource(TopicSource const &s)
in the environment that multicast cannot be enabled on either the sender or the receiver side...
Definition: RecvTransportEngine.hpp:126
Definition: Messages.hpp:78
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:177
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
impl class
Definition: RecvTransportEngine.hpp:59
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:199
Definition: Timers.hpp:104
Definition: MessageHandler.hpp:36
this message appears in the receiver's buffer indicating a new source is connected ...
Definition: Messages.hpp:126
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:134
Definition: LockFreeBufferMisc.hpp:74