1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/tcpcast/Transport.hpp" 4 #include "hmbdc/app/tcpcast/RecvSession.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp" 7 #include "hmbdc/app/LoggerT.hpp" 8 #include "hmbdc/text/StringTrieSet.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 16 namespace hmbdc {
namespace app {
namespace tcpcast {
23 using ptr = std::shared_ptr<RecvTransport>;
24 using Transport::Transport;
45 virtual void stopListenTo(
comm::Topic const& t) = 0;
49 namespace recvtransportengine_detail {
52 using boost::asio::ip::tcp;
60 template <
typename OutputBuffer,
typename MsgArbitrator>
66 ,
Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
68 , Subscribe, Unsubscribe, TopicSource> {
85 , OutputBuffer& outputBuffer
86 , MsgArbitrator arb =
NoOpArb())
94 }), config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
95 , outputBuffer_(outputBuffer)
96 , maxItemSize_(outputBuffer.maxItemSize())
99 , myIp_(ip::address_v4::from_string(
100 hmbdc::comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr"))
102 , additionalTopicSourcesScanTimer_(
103 Duration::seconds(cfg.
getExt<
size_t>(
"additionalTopicSourcesScanPeriodSeconds"))) {
106 HMBDC_THROW(std::out_of_range
107 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
111 mcConfig_.put(
"outBufferSizePower2", 3u);
118 cfg(additionalTopicSources_,
"additionalTopicSources");
139 this->initInThread();
140 mcReceiver_->initInThread();
141 mcReceiver_->start();
142 mcReceiver_->listenTo(tcpcastAdTopic_);
146 for (
auto& s : recvSessions_) {
147 s.second->heartbeat();
152 schedule(SysTime::now(), *
this);
153 if (additionalTopicSources_.size()) {
154 additionalTopicSourcesScanTimer_.setCallback(
156 for (
auto const& s : additionalTopicSources_) {
161 schedule(SysTime::now(), additionalTopicSourcesScanTimer_);
165 char const* hmbdcName()
const {
166 return this->hmbdcName_.c_str();
169 std::tuple<char const*, int> schedSpec()
const {
170 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
180 HMBDC_LOG_C(e.what());
188 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
190 auto n = buffer_.peek(begin, end);
193 MH::handleMessage(*static_cast<MessageHead*>(*it++));
195 buffer_.wasteAfterPeek(begin, n);
196 mcReceiver_->runOnce();
203 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
204 for (
auto& p : recvSessions_) {
205 p.second->sendSubscribe(t.topic);
213 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
214 for (
auto& p : recvSessions_) {
215 p.second->sendUnsubscribe(t.topic);
223 auto ip = ip::address_v4::from_string(t.ip).to_ulong();
225 if (ip != 2130706433ul
227 && (!t.loopback || t.pid == getpid())) {
230 auto key = make_pair(ip, t.port);
231 if (recvSessions_.find(key) == recvSessions_.end()) {
232 boost::regex topicRegex(t.topicRegex);
233 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
234 if (boost::regex_match(it->first.c_str(), topicRegex)
236 tcp::resolver resolver(*pIos_);
237 auto epIt = resolver.resolve(
238 tcp::resolver::query(t.ip, to_string(t.port)));
244 recvSessions_.erase(key);
250 recvSessions_[key] =
typename Session::ptr(sess);
260 void listenTo(
Topic const& t)
override {
264 void stopListenTo(
Topic const& t)
override {
267 Config config_, mcConfig_;
269 OutputBuffer& outputBuffer_;
271 unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
274 boost::unordered_map<pair<uint64_t, uint16_t>
275 ,
typename Session::ptr> recvSessions_;
278 vector<TopicSource> additionalTopicSources_;
283 template <
typename OutputBuffer,
typename MsgArbitrator>
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:84
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Messages.hpp:70
Definition: Timers.hpp:65
Definition: Messages.hpp:63
impl class
Definition: RecvTransportEngine.hpp:57
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:222
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Transport.hpp:22
this message appears in the receiver's buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:141
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:212
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
Definition: RecvSession.hpp:31
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:179
void tryTopicSource(TopicSource const &s)
in the environment that multicast cannot be enabled on either the sender or the receiver side...
Definition: RecvTransportEngine.hpp:130
Definition: Messages.hpp:77
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:188
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
impl class
Definition: RecvTransportEngine.hpp:61
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:202
Definition: Timers.hpp:96
Definition: MessageHandler.hpp:36
this message appears in the receiver's buffer indicating a new source is connected ...
Definition: Messages.hpp:127
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:138
Definition: LockFreeBufferMisc.hpp:73