1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/tcpcast/Transport.hpp" 4 #include "hmbdc/app/tcpcast/RecvSession.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/app/udpcast/RecvTransportEngine.hpp" 7 #include "hmbdc/app/Logger.hpp" 8 #include "hmbdc/text/StringTrieSet.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 16 namespace hmbdc {
namespace app {
namespace tcpcast {
23 using ptr = std::shared_ptr<RecvTransport>;
24 using Transport::Transport;
45 virtual void stopListenTo(
comm::Topic const& t) = 0;
49 namespace recvtransportengine_detail {
58 template <
typename OutputBuffer,
typename MsgArbitrator>
63 ,
Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>>
65 , Subscribe, Unsubscribe, TopicSource> {
82 , OutputBuffer& outputBuffer
83 , MsgArbitrator arb =
NoOpArb())
91 }), config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
92 , outputBuffer_(outputBuffer)
93 , maxItemSize_(outputBuffer.maxItemSize())
95 , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
96 cfg.
getExt<
string>(
"ifaceAddr")).c_str()))
97 , mcastEmuProxyScanTimer_(
98 Duration::seconds(cfg.
getExt<
size_t>(
"mcastEmuProxyScanPeriodSeconds"))) {
101 HMBDC_THROW(std::out_of_range
102 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
104 mcConfig_.put(
"outBufferSizePower2", 3u);
108 if (cfg.
getExt<
bool>(
"useTcpcastMcProxy")) {
109 config_(tcpcastMcEmuProxyAddrs_,
"udpcastDests");
110 if (tcpcastMcEmuProxyAddrs_.size() == 0) {
111 HMBDC_THROW(std::out_of_range,
"no TcpcastEmuProxy specified");
113 topicSinkMsgBufLen_ =
114 udpcast::SendTransport::encode(tcpcastAdTopic_, topicSinkMsgBuf_,
sizeof(topicSinkMsgBuf_)
116 , cfg.
getExt<uint16_t>(
"udpcastListenPort")})->wireSize();
125 mcReceiver_->start();
126 mcReceiver_->listenTo(tcpcastAdTopic_);
130 for (
auto& s : recvSessions_) {
131 s.second->heartbeat();
136 schedule(SysTime::now(), *
this);
137 if (tcpcastMcEmuProxyAddrs_.size()) {
138 mcastEmuProxyScanTimer_.setCallback(
140 for (
auto const& s : tcpcastMcEmuProxyAddrs_) {
141 sendto(mcReceiver_->fd, topicSinkMsgBuf_, topicSinkMsgBufLen_
142 , MSG_NOSIGNAL|MSG_DONTWAIT, (
struct sockaddr *)&s.v,
sizeof(s.v));
146 schedule(SysTime::now(), mcastEmuProxyScanTimer_);
150 using Transport::hmbdcName;
151 using Transport::schedSpec;
160 HMBDC_LOG_C(e.what());
168 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
169 for (
auto it = recvSessions_.begin(); it != recvSessions_.end();) {
170 if (hmbdc_unlikely(!(*it).second->runOnce())) {
171 (*it).second->stop();
172 recvSessions_.erase(it++);
178 auto n = buffer_.peek(begin, end);
181 MH::handleMessage(*static_cast<MessageHead*>(*it++));
183 buffer_.wasteAfterPeek(begin, n);
185 mcReceiver_->runOnce(
true);
192 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
193 for (
auto& p : recvSessions_) {
194 p.second->sendSubscribe(t.topic);
202 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
203 for (
auto& p : recvSessions_) {
204 p.second->sendUnsubscribe(t.topic);
212 auto ip = inet_addr(t.ip);
214 if (ip != 0x100007ful
216 && (!t.loopback || t.pid == getpid())) {
219 auto key = make_pair(ip, t.port);
220 if (recvSessions_.find(key) == recvSessions_.end()) {
221 boost::regex topicRegex(t.topicRegex);
222 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
223 if (boost::regex_match(it->first.c_str(), topicRegex)
234 sess->start(ip, t.port);
235 recvSessions_[key] =
typename Session::ptr(sess);
236 }
catch (std::exception
const& e) {
237 HMBDC_LOG_C(e.what());
247 void listenTo(
Topic const& t)
override {
251 void stopListenTo(
Topic const& t)
override {
254 Config config_, mcConfig_;
256 OutputBuffer& outputBuffer_;
258 unique_ptr<udpcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
261 boost::unordered_map<pair<uint64_t, uint16_t>
262 ,
typename Session::ptr> recvSessions_;
265 std::vector<comm::inet::Endpoint> tcpcastMcEmuProxyAddrs_;
267 char topicSinkMsgBuf_[64];
268 size_t topicSinkMsgBufLen_;
272 template <
typename OutputBuffer,
typename MsgArbitrator>
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:81
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:46
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:180
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: Messages.hpp:73
Definition: Timers.hpp:65
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
Definition: Messages.hpp:66
void handleMessageCb(TopicSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:211
Definition: Transport.hpp:69
this message appears in the receiver's buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:145
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:201
Definition: Messages.hpp:154
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
Definition: RecvSession.hpp:28
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
impl class
Definition: RecvTransportEngine.hpp:59
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:76
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:159
Definition: Messages.hpp:82
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:168
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
impl class
Definition: RecvTransportEngine.hpp:59
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:191
Definition: Timers.hpp:104
Definition: MessageHandler.hpp:39
this message appears in the receiver's buffer indicating a new source is connected ...
Definition: Messages.hpp:131
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:124
Definition: LockFreeBufferMisc.hpp:74