1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/udpcast/Transport.hpp" 4 #include "hmbdc/text/StringTrieSet.hpp" 5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 6 #include "hmbdc//Traits.hpp" 8 #include <boost/lexical_cast.hpp> 11 #include <type_traits> 13 #include <sys/epoll.h> 15 namespace hmbdc {
namespace app {
namespace udpcast {
21 using ptr = std::shared_ptr<RecvTransport>;
22 using Transport::Transport;
41 virtual void listenTo(
Topic const& t) = 0;
42 virtual void stopListenTo(
Topic const& t) = 0;
45 namespace recvtransportengine_detail {
58 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
61 ,
MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
81 , OutputBuffer& outputBuffer
82 , MsgArbitrator arb =
NoOpArb())
85 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
86 , outputBuffer_(outputBuffer)
87 , maxItemSize_(outputBuffer.maxItemSize())
88 , buf_(new char[mtu_])
93 if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
94 HMBDC_LOG_C(
"failed to set reuse address errno=", errno);
96 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
98 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz,
sizeof(sz)) < 0) {
99 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
103 auto udpcastListenPort = config_.getExt<uint16_t>(
"udpcastListenPort");
104 struct sockaddr_in udpcastListenAddrPort;
105 memset(&udpcastListenAddrPort, 0,
sizeof(udpcastListenAddrPort));
106 udpcastListenAddrPort.sin_family = AF_INET;
107 auto ipStr = cfg.
getExt<
string>(
"udpcastListenAddr") ==
string(
"ifaceAddr")
108 ? comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr")):cfg.
getExt<
string>(
"udpcastListenAddr");
109 udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
110 udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
111 if (bind(fd, (
struct sockaddr *)&udpcastListenAddrPort,
sizeof(udpcastListenAddrPort)) < 0) {
112 HMBDC_THROW(runtime_error,
"failed to bind unicast udpcast listen address " 113 << ipStr <<
':' << cfg.
getExt<
short>(
"udpcastListenPort") <<
" errno=" << errno);
116 if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {
119 mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
121 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
122 mreq.imr_interface.s_addr=inet_addr(iface.c_str());
123 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
124 HMBDC_THROW(runtime_error,
"failed to join " << ipStr <<
':' 125 << cfg.
getExt<
short>(
"udpcastPort"));
138 utils::EpollTask::instance().add(EPOLLIN|EPOLLET, *
this);
141 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
143 auto n = buffer_.peek(begin, end);
146 MH::handleMessage(*static_cast<MessageHead*>(*it++));
148 buffer_.wasteAfterPeek(begin, n);
149 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
150 utils::EpollTask::instance().poll();
159 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
166 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
170 void listenTo(
Topic const& t)
override {
174 void stopListenTo(
Topic const& t)
override {
177 sockaddr_in udpcastListenRemoteAddr = {0};
179 template <
bool is_raw_arb>
180 typename enable_if<is_raw_arb, int>::type applyRawArb(
void* pkt,
size_t len) {
181 return arb_(pkt, len);
184 template <
bool is_raw_arb>
185 typename enable_if<!is_raw_arb, int>::type applyRawArb(
void*,
size_t) {
189 template <
bool is_raw_arb>
194 template <
bool is_raw_arb>
199 void resumeRead() HMBDC_RESTRICT {
202 using arg0 =
typename traits::template arg<0>::type;
203 bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
205 if (bytesRecved_ && bufCur_ == buf_) {
206 auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
207 if (hmbdc_unlikely(a == 0)) {
210 }
else if (hmbdc_unlikely(a < 0)) {
214 while (bytesRecved_) {
216 auto wireSize = h->wireSize();
218 if (hmbdc_likely(bytesRecved_ >= wireSize)) {
219 if (subscriptions_.check(h->topic())) {
220 if (hmbdc_unlikely(wireSize > bytesRecved_)) {
223 auto a = applyArb<is_raw_arb>(h);
225 if (hmbdc_likely(a > 0)) {
226 auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
227 outputBuffer_.put(h->payload(), l);
233 bytesRecved_ -= wireSize;
242 socklen_t addrLen =
sizeof(udpcastListenRemoteAddr);
243 auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
244 , (
struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
245 if (hmbdc_unlikely(l < 0)) {
247 HMBDC_LOG_C(
"recvmmsg failed errno=", errno);
256 }
while(bytesRecved_);
260 OutputBuffer& outputBuffer_;
269 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
272 ,
Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
302 HMBDC_LOG_C(e.what());
309 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
312 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
Definition: MonoLockFreeBuffer.hpp:15
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:282
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:165
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:180
a singleton that holding udpcast resources
Definition: NetContext.hpp:38
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:20
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
Definition: Messages.hpp:65
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:158
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:80
Definition: RecvTransportEngine.hpp:270
impl class
Definition: RecvTransportEngine.hpp:59
Definition: StringTrieSetDetail.hpp:115
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:137
Definition: Message.hpp:76
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:301
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Definition: Messages.hpp:73
Definition: MessageHandler.hpp:39
Definition: LockFreeBufferMisc.hpp:74
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:291