1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/tcpcast/Transport.hpp" 4 #include "hmbdc/app/tcpcast/RecvSession.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp" 7 #include "hmbdc/app/LoggerT.hpp" 8 #include "hmbdc/text/StringTrieSet.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 16 namespace hmbdc {
namespace app {
namespace tcpcast {
23 using ptr = std::shared_ptr<RecvTransport>;
24 using Transport::Transport;
45 virtual void stopListenTo(
comm::Topic const& t) = 0;
49 namespace recvtransportengine_detail {
52 using boost::asio::ip::tcp;
60 template <
typename OutputBuffer,
typename MsgArbitrator>
66 ,
Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
68 , Subscribe, Unsubscribe, TopicSource> {
85 , OutputBuffer& outputBuffer
86 , MsgArbitrator arb =
NoOpArb())
92 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
93 , outputBuffer_(outputBuffer)
94 , maxItemSize_(outputBuffer.maxItemSize())
97 , myIp_(ip::address_v4::from_string(
98 hmbdc::comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr"))
102 HMBDC_THROW(std::out_of_range
103 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
107 mcConfig_.put(
"outBufferSizePower2", 3u);
120 this->initInThread();
121 mcReceiver_->initInThread();
122 mcReceiver_->start();
123 mcReceiver_->listenTo(tcpcastAdTopic_);
127 for (
auto& s : recvSessions_) {
128 s.second->heartbeat();
133 schedule(SysTime::now(), *
this);
136 char const* hmbdcName()
const {
137 return this->hmbdcName_.c_str();
140 std::tuple<char const*, int> schedSpec()
const {
141 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
151 HMBDC_LOG_C(e.what());
159 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
161 auto n = buffer_.peek(begin, end);
164 MH::handleMessage(*static_cast<MessageHead*>(*it++));
166 buffer_.wasteAfterPeek(begin, n);
167 mcReceiver_->runOnce();
178 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
179 for (
auto& p : recvSessions_) {
180 p.second->sendSubscribe(t.topic);
188 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
189 for (
auto& p : recvSessions_) {
190 p.second->sendUnsubscribe(t.topic);
198 auto ip = ip::address_v4::from_string(t.ip).to_ulong();
200 if (ip != 2130706433ul
202 && (!t.loopback || t.pid == getpid())) {
205 auto key = make_pair(ip, t.port);
206 auto it = topicSourceDict_.find(key);
208 if (likely(it != topicSourceDict_.end())) {
209 if (likely(it->second == t.timestamp)) {
212 it->second = t.timestamp;
215 topicSourceDict_[key] = t.timestamp;
217 if (recvSessions_.find(key) == recvSessions_.end()) {
218 boost::regex topicRegex(t.topicRegex);
219 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
220 if (boost::regex_match(it->first.c_str(), topicRegex)
222 tcp::resolver resolver(*pIos_);
223 auto epIt = resolver.resolve(
224 tcp::resolver::query(t.ip, to_string(t.port)));
230 recvSessions_.erase(key);
231 topicSourceDict_.erase(key);
237 recvSessions_[key] =
typename Session::ptr(sess);
247 void listenTo(
Topic const& t)
override {
251 void stopListenTo(
Topic const& t)
override {
254 Config config_, mcConfig_;
256 OutputBuffer& outputBuffer_;
258 unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
261 boost::unordered_map<pair<uint64_t, uint16_t>
262 ,
typename Session::ptr> recvSessions_;
263 boost::unordered_map<pair<uint64_t, uint16_t>
270 template <
typename OutputBuffer,
typename MsgArbitrator>
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:84
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: Messages.hpp:68
Definition: Timers.hpp:65
Definition: Messages.hpp:61
impl class
Definition: RecvTransportEngine.hpp:57
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:197
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Transport.hpp:22
this message appears in the receiver's buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:116
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:187
a singleton that holding tcpcast resources
Definition: NetContext.hpp:43
Definition: RecvSession.hpp:31
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:150
Definition: Messages.hpp:75
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:159
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
impl class
Definition: RecvTransportEngine.hpp:61
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:177
Definition: Timers.hpp:96
Definition: MessageHandler.hpp:36
this message appears in the receiver's buffer indicating a new source is connected ...
Definition: Messages.hpp:102
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:119
Definition: LockFreeBufferMisc.hpp:73