1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/tcpcast/Transport.hpp" 4 #include "hmbdc/app/tcpcast/RecvSession.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/app/mcast/RecvTransportEngine.hpp" 7 #include "hmbdc/app/LoggerT.hpp" 8 #include "hmbdc/text/StringTrieSet.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 16 namespace hmbdc {
namespace app {
namespace tcpcast {
18 using namespace hmbdc;
28 using ptr = std::shared_ptr<RecvTransport>;
29 using Transport::Transport;
49 virtual void listenTo(
Topic const& t) = 0;
50 virtual void stopListenTo(
Topic const& t) = 0;
59 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
65 ,
Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
67 , Subscribe, Unsubscribe, TopicSource> {
84 , OutputBuffer& outputBuffer
85 , MsgArbitrator arb =
NoOpArb())
91 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
92 , outputBuffer_(outputBuffer)
93 , maxItemSize_(outputBuffer.maxItemSize())
96 , myIp_(ip::address_v4::from_string(cfg.
getExt<
string>(
"ifaceAddr").c_str()).to_ulong()) {
99 HMBDC_THROW(std::out_of_range
100 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
104 mcConfig_.put(
"outBufferSizePower2", 3u);
117 this->initInThread();
118 mcReceiver_->initInThread();
119 mcReceiver_->start();
120 mcReceiver_->listenTo(tcpcastAdTopic_);
124 for (
auto& s : recvSessions_) {
125 s.second->heartbeat();
130 schedule(SysTime::now(), *
this);
133 char const* hmbdcName()
const {
134 return this->hmbdcName_.c_str();
137 std::tuple<char const*, int> schedSpec()
const {
138 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
148 HMBDC_LOG_C(e.what());
156 void invokedCb(uint16_t threadSerialNumber) __restrict__
override {
158 auto n = buffer_.peek(begin, end);
161 MH::handleMessage(*static_cast<MessageHead*>(*it++));
163 buffer_.wasteAfterPeek(begin, n);
164 mcReceiver_->runOnce();
175 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
176 for (
auto& p : recvSessions_) {
177 p.second->sendSubscribe(t.topic);
185 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
186 for (
auto& p : recvSessions_) {
187 p.second->sendUnsubscribe(t.topic);
195 auto ip = ip::address_v4::from_string(t.ip).to_ulong();
197 if (ip != 2130706433ul
199 && (!t.loopback || t.pid == getpid())) {
202 auto key = make_pair(ip, t.port);
203 auto it = topicSourceDict_.find(key);
205 if (likely(it != topicSourceDict_.end())) {
206 if (likely(it->second == t.timestamp)) {
209 it->second = t.timestamp;
212 topicSourceDict_[key] = t.timestamp;
214 if (recvSessions_.find(key) == recvSessions_.end()) {
215 boost::regex topicRegex(t.topicRegex);
216 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
217 if (boost::regex_match(it->first.c_str(), topicRegex)
219 tcp::resolver resolver(*pIos_);
220 auto epIt = resolver.resolve(
221 tcp::resolver::query(t.ip, to_string(t.port)));
227 recvSessions_.erase(key);
228 topicSourceDict_.erase(key);
234 recvSessions_[key] =
typename Session::ptr(sess);
244 void listenTo(
Topic const& t)
override {
248 void stopListenTo(
Topic const& t)
override {
251 Config config_, mcConfig_;
253 OutputBuffer& outputBuffer_;
255 unique_ptr<mcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
258 boost::unordered_map<pair<uint64_t, uint16_t>
259 ,
typename Session::ptr> recvSessions_;
260 boost::unordered_map<pair<uint64_t, uint16_t>
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
impl class
Definition: RecvTransportEngine.hpp:60
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
impl class
Definition: RecvTransportEngine.hpp:57
Definition: StringTrieSet.hpp:113
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:116
Definition: TypedString.hpp:74
Definition: Messages.hpp:122
Definition: Timers.hpp:69
Definition: Messages.hpp:115
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:184
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
Definition: Transport.hpp:27
Definition: Messages.hpp:160
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:147
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:174
Definition: Client.hpp:14
Definition: NetContext.hpp:28
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:83
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:41
Definition: Message.hpp:46
Definition: Messages.hpp:129
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:194
Definition: RecvSession.hpp:29
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:143
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:26
Definition: Client.hpp:39
Definition: Client.hpp:11
void invokedCb(uint16_t threadSerialNumber) __restrict__ override
power the io_service and other things
Definition: RecvTransportEngine.hpp:156
Definition: Timers.hpp:100
Definition: MessageHandler.hpp:38
Definition: Messages.hpp:151
Definition: LockFreeBufferMisc.hpp:73