1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/mcast/Transport.hpp" 4 #include "hmbdc/text/StringTrieSet.hpp" 5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 6 #include "hmbdc//Traits.hpp" 8 #include <boost/lexical_cast.hpp> 11 #include <type_traits> 13 #include <sys/epoll.h> 15 namespace hmbdc {
namespace app {
namespace mcast {
21 using ptr = std::shared_ptr<RecvTransport>;
22 using Transport::Transport;
41 virtual void listenTo(
Topic const& t) = 0;
42 virtual void stopListenTo(
Topic const& t) = 0;
45 namespace recvtransportengine_detail {
58 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
61 ,
MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
81 , OutputBuffer& outputBuffer
82 , MsgArbitrator arb =
NoOpArb())
85 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
86 , outputBuffer_(outputBuffer)
87 , maxItemSize_(outputBuffer.maxItemSize())
88 , buf_(new char[mtu_])
93 if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
94 HMBDC_LOG_C(
"failed to set reuse address errno=", errno);
96 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
98 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz,
sizeof(sz)) < 0) {
99 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
104 if (bind(fd, (
struct sockaddr *)&mcAddr,
sizeof(mcAddr)) < 0) {
105 HMBDC_THROW(runtime_error,
"failed to bind " 106 << config_.getExt<
string>(
"mcastAddr") <<
':' 107 << cfg.
getExt<
short>(
"mcastPort"));
111 mreq.imr_multiaddr.s_addr=inet_addr(config_.getExt<
string>(
"mcastAddr").c_str());
113 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
114 mreq.imr_interface.s_addr=inet_addr(iface.c_str());
115 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
116 HMBDC_THROW(runtime_error,
"failed to join " << config_.getExt<
string>(
"mcastAddr") <<
':' 117 << cfg.
getExt<
short>(
"mcastPort"));
129 utils::EpollTask::instance().add(EPOLLIN|EPOLLET, *
this);
132 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
134 auto n = buffer_.peek(begin, end);
137 MH::handleMessage(*static_cast<MessageHead*>(*it++));
139 buffer_.wasteAfterPeek(begin, n);
140 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
141 utils::EpollTask::instance().poll();
150 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
157 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
161 void listenTo(
Topic const& t)
override {
165 void stopListenTo(
Topic const& t)
override {
170 template <
bool is_raw_arb>
171 typename enable_if<is_raw_arb, int>::type applyRawArb(
void* pkt,
size_t len) {
172 return arb_(pkt, len);
175 template <
bool is_raw_arb>
176 typename enable_if<!is_raw_arb, int>::type applyRawArb(
void*,
size_t) {
180 template <
bool is_raw_arb>
185 template <
bool is_raw_arb>
190 void resumeRead() HMBDC_RESTRICT {
193 using arg0 =
typename traits::template arg<0>::type;
194 bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
196 if (bytesRecved_ && bufCur_ == buf_) {
197 auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
198 if (hmbdc_unlikely(a == 0)) {
201 }
else if (hmbdc_unlikely(a < 0)) {
205 while (bytesRecved_) {
207 auto wireSize = h->wireSize();
209 if (hmbdc_likely(bytesRecved_ >= wireSize)) {
210 if (subscriptions_.check(h->topic())) {
211 if (hmbdc_unlikely(wireSize > bytesRecved_)) {
214 auto a = applyArb<is_raw_arb>(h);
216 if (hmbdc_likely(a > 0)) {
217 auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
218 outputBuffer_.put(h->payload(), l);
224 bytesRecved_ -= wireSize;
233 auto l = recv(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT);
234 if (hmbdc_unlikely(l < 0)) {
236 HMBDC_LOG_C(
"recvmmsg failed errno=", errno);
245 }
while(bytesRecved_);
318 OutputBuffer& outputBuffer_;
335 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
338 ,
Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
368 HMBDC_LOG_C(e.what());
375 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
378 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:367
Definition: MonoLockFreeBuffer.hpp:15
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:80
class to hold an hmbdc configuration
Definition: Config.hpp:44
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:20
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:348
impl class
Definition: RecvTransportEngine.hpp:59
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
a singleton that holding mcast resources
Definition: NetContext.hpp:38
Definition: Messages.hpp:72
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:357
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:149
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:156
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:128
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:72
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:43
Definition: Messages.hpp:65
Definition: RecvTransportEngine.hpp:336
Definition: MessageHandler.hpp:36
Definition: LockFreeBufferMisc.hpp:74