1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/mcast/Transport.hpp" 4 #include "hmbdc/app/LoggerT.hpp" 5 #include "hmbdc/text/StringTrieSet.hpp" 6 #include "hmbdc//Traits.hpp" 8 #include <boost/bind.hpp> 9 #include <boost/lexical_cast.hpp> 12 #include <type_traits> 14 namespace hmbdc {
namespace app {
namespace mcast {
20 using ptr = std::shared_ptr<RecvTransport>;
21 using Transport::Transport;
40 virtual void listenTo(
Topic const& t) = 0;
41 virtual void stopListenTo(
Topic const& t) = 0;
44 namespace recvtransportengine_detail {
56 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
59 ,
MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
79 , OutputBuffer& outputBuffer
80 , MsgArbitrator arb =
NoOpArb())
83 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
84 , outputBuffer_(outputBuffer)
85 , maxItemSize_(outputBuffer.maxItemSize())
86 , endpoint_(ip::address::from_string(cfg.getExt<
std::string>(
"mcastAddr"))
87 , cfg.getExt<short>(
"mcastPort"))
102 pSocket_->async_receive_from(
103 boost::asio::buffer(buf_,
sizeof(buf_))
105 , boost::bind(&SELF::handleReceiveFrom,
this,
106 boost::asio::placeholders::error,
107 boost::asio::placeholders::bytes_transferred)
118 auto n = buffer_.peek(0, begin, end);
121 MH::handleMessage(*static_cast<MessageHead*>(*it++));
123 buffer_.wasteAfterPeek(0, n);
124 boost::system::error_code ec;
125 if (hmbdc_unlikely(bufCur_)) {
126 handleReceiveFrom(ec, 0);
128 if (hmbdc_unlikely(pIos_->stopped())) pIos_->reset();
130 if (ec) HMBDC_LOG_C(
"io_service error=", ec);
137 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
144 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
148 void listenTo(
Topic const& t)
override {
152 void stopListenTo(
Topic const& t)
override {
164 void initInThread() {
165 Transport::initInThread();
168 pSocket_ =
new boost::asio::ip::udp::socket(*pIos_);
169 pSocket_->open(endpoint_.protocol());
170 pSocket_->set_option(ip::udp::socket::reuse_address(
true));
171 pSocket_->bind(endpoint_);
173 auto multicastAddress =
174 ip::address::from_string(config_.getExt<std::string>(
"mcastAddr"));
175 auto listenAddress = ip::address::from_string(
176 comm::inet::getLocalIpMatchMask(config_.getExt<std::string>(
"ifaceAddr")));
179 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
181 socket_base::receive_buffer_size option(sz);
182 boost::system::error_code ec;
183 pSocket_->set_option(option);
186 socket_base::receive_buffer_size option;
187 pSocket_->get_option(option);
188 if (sz == 0 || sz >= option.value()) {
190 HMBDC_LOG_C(
"set udp receive buffer size unsuccessful, want " 191 , sz,
" actual: ", option.value()
192 ,
" resulting lower receiving rate, check OS limits!");
194 pSocket_->set_option(ip::multicast::join_group(
195 multicastAddress.to_v4(), listenAddress.to_v4())
200 template <
bool is_raw_arb>
201 typename enable_if<is_raw_arb, int>::type applyRawArb() {
202 return arb_(bufCur_, bytesRecved_);
205 template <
bool is_raw_arb>
206 typename enable_if<!is_raw_arb, int>::type applyRawArb() {
210 template <
bool is_raw_arb>
215 template <
bool is_raw_arb>
220 void handleReceiveFrom(
const boost::system::error_code& error
221 ,
size_t bytesRecved) {
222 if (hmbdc_likely(!error)) {
225 bytesRecved_ = bytesRecved;
229 using arg0 =
typename traits::template arg<0>::type;
230 bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
231 auto a = applyRawArb<is_raw_arb>();
232 if (hmbdc_unlikely(a == 0)) {
235 }
else if (hmbdc_unlikely(a < 0)) {
239 while (bytesRecved_) {
241 auto wireSize = h->wireSize();
243 if (hmbdc_likely(bytesRecved_ >= wireSize)) {
244 if (subscriptions_.check(h->topic())) {
246 auto l = std::min<size_t>(maxItemSize_, wireSize);
248 if (hmbdc_unlikely(l > bytesRecved_)) {
251 auto a = applyArb<is_raw_arb>(h);
253 if (hmbdc_likely(a > 0)) {
254 auto it = outputBuffer_.claim();
255 char* b =
static_cast<char*
>(*it);
256 memcpy(b, h->payload(), l);
257 outputBuffer_.commit(it);
263 bytesRecved_ -= wireSize;
270 HMBDC_LOG_C(
"asio error=", error);
273 pSocket_->async_receive_from(
274 boost::asio::buffer(buf_,
sizeof(buf_))
276 , boost::bind(&SELF::handleReceiveFrom,
this,
277 boost::asio::placeholders::error,
278 boost::asio::placeholders::bytes_transferred)
283 OutputBuffer& outputBuffer_;
285 boost::asio::ip::udp::endpoint endpoint_;
286 boost::asio::ip::udp::endpoint sender_endpoint_;
287 ip::udp::socket* pSocket_;
305 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
309 ,
Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
313 char const* hmbdcName()
const {
314 return this->hmbdcName_.c_str();
317 std::tuple<char const*, int> schedSpec()
const {
318 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
336 this->initInThread();
347 HMBDC_LOG_C(e.what());
354 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
357 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:346
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:78
class to hold an hmbdc configuration
Definition: Config.hpp:44
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:19
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:326
impl class
Definition: RecvTransportEngine.hpp:57
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
a singleton that holding mcast resources
Definition: NetContext.hpp:38
Definition: Messages.hpp:72
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:335
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:136
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:143
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: RecvTransportEngine.hpp:160
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:101
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
void runOnce() HMBDC_RESTRICT
power the io_service and other things
Definition: RecvTransportEngine.hpp:115
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:17
Definition: Messages.hpp:65
impl class
Definition: RecvTransportEngine.hpp:306
Definition: MessageHandler.hpp:36
Definition: LockFreeBufferMisc.hpp:73