1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/mcast/Transport.hpp" 4 #include "hmbdc/app/LoggerT.hpp" 5 #include "hmbdc/text/StringTrieSet.hpp" 6 #include "hmbdc//Traits.hpp" 8 #include <boost/bind.hpp> 9 #include <boost/lexical_cast.hpp> 12 #include <type_traits> 14 namespace hmbdc {
namespace app {
namespace mcast {
16 using namespace hmbdc;
25 using ptr = std::shared_ptr<RecvTransport>;
26 using Transport::Transport;
45 virtual void listenTo(
Topic const& t) = 0;
46 virtual void stopListenTo(
Topic const& t) = 0;
56 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
59 ,
MessageHandler<RecvTransportImpl<OutputBuffer, MsgArbitrator>, Subscribe, Unsubscribe> {
79 , OutputBuffer& outputBuffer
80 , MsgArbitrator arb =
NoOpArb())
83 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
84 , outputBuffer_(outputBuffer)
85 , maxItemSize_(outputBuffer.maxItemSize())
86 , endpoint_(ip::address::from_string(cfg.getExt<
std::string>(
"mcastAddr"))
87 , cfg.getExt<short>(
"mcastPort"))
102 pSocket_->async_receive_from(
103 boost::asio::buffer(buf_,
sizeof(buf_))
105 , boost::bind(&SELF::handleReceiveFrom,
this,
106 boost::asio::placeholders::error,
107 boost::asio::placeholders::bytes_transferred)
118 auto n = buffer_.peek(0, begin, end);
121 MH::handleMessage(*static_cast<MessageHead*>(*it++));
123 buffer_.wasteAfterPeek(0, n);
124 boost::system::error_code ec;
125 if (unlikely(bufCur_)) {
126 handleReceiveFrom(ec, 0);
128 if (unlikely(pIos_->stopped())) pIos_->reset();
130 if (ec) HMBDC_LOG_C(
"io_service error=", ec);
137 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
144 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
148 void listenTo(
Topic const& t)
override {
152 void stopListenTo(
Topic const& t)
override {
164 void initInThread() {
165 Transport::initInThread();
168 pSocket_ =
new boost::asio::ip::udp::socket(*pIos_);
169 pSocket_->open(endpoint_.protocol());
170 pSocket_->set_option(ip::udp::socket::reuse_address(
true));
171 pSocket_->bind(endpoint_);
173 auto multicastAddress =
174 ip::address::from_string(config_.getExt<std::string>(
"mcastAddr"));
175 auto listenAddress = ip::address::from_string(
176 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr")));
179 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
181 socket_base::receive_buffer_size option(sz);
182 boost::system::error_code ec;
183 pSocket_->set_option(option);
186 socket_base::receive_buffer_size option;
187 pSocket_->get_option(option);
188 if (sz == 0 || sz >= option.value()) {
190 HMBDC_LOG_C(
"set udp receive buffer size unsuccessful, want " 191 , sz,
" actual: ", option.value()
192 ,
" resulting lower receiving rate, check OS limits!");
194 pSocket_->set_option(ip::multicast::join_group(
195 multicastAddress.to_v4(), listenAddress.to_v4())
200 template <
bool is_raw_arb>
201 typename enable_if<is_raw_arb, int>::type applyRawArb() {
202 return arb_(bufCur_, bytesRecved_);
205 template <
bool is_raw_arb>
206 typename enable_if<!is_raw_arb, int>::type applyRawArb() {
210 template <
bool is_raw_arb>
215 template <
bool is_raw_arb>
220 void handleReceiveFrom(
const boost::system::error_code& error
221 ,
size_t bytesRecved) {
222 if (likely(!error)) {
225 bytesRecved_ = bytesRecved;
229 using arg0 =
typename traits::template arg<0>::type;
230 bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
231 auto a = applyRawArb<is_raw_arb>();
232 if (unlikely(a == 0)) {
235 }
else if (unlikely(a < 0)) {
239 while (bytesRecved_) {
241 auto wireSize = h->wireSize();
243 if (likely(bytesRecved_ >= wireSize)) {
244 if (subscriptions_.check(h->topic())) {
246 auto l = std::min<size_t>(maxItemSize_, wireSize);
248 if (unlikely(l > bytesRecved_)) {
251 auto a = applyArb<is_raw_arb>(h);
254 auto it = outputBuffer_.claim();
255 char* b =
static_cast<char*
>(*it);
256 memcpy(b, h->payload(), l);
257 outputBuffer_.commit(it);
263 bytesRecved_ -= wireSize;
270 HMBDC_LOG_C(
"asio error=", error);
273 pSocket_->async_receive_from(
274 boost::asio::buffer(buf_,
sizeof(buf_))
276 , boost::bind(&SELF::handleReceiveFrom,
this,
277 boost::asio::placeholders::error,
278 boost::asio::placeholders::bytes_transferred)
283 OutputBuffer& outputBuffer_;
285 boost::asio::ip::udp::endpoint endpoint_;
286 boost::asio::ip::udp::endpoint sender_endpoint_;
287 ip::udp::socket* pSocket_;
304 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
308 ,
Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
312 char const* hmbdcName()
const {
313 return this->hmbdcName_.c_str();
316 std::tuple<char const*, int> schedSpec()
const {
317 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
335 this->initInThread();
346 HMBDC_LOG_C(e.what());
class to hold an hmbdc configuration
Definition: Config.hpp:35
void handleMessageCb(Unsubscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:143
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:345
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:24
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: RecvTransportEngine.hpp:160
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
impl class
Definition: RecvTransportEngine.hpp:57
Definition: StringTrieSet.hpp:113
Definition: TypedString.hpp:74
void handleMessageCb(Subscribe const &t)
only used by MH
Definition: RecvTransportEngine.hpp:136
void invokedCb(uint16_t) __restrict__ override
power the io_service and other things
Definition: RecvTransportEngine.hpp:325
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
Definition: NetContext.hpp:33
Definition: Messages.hpp:76
impl class
Definition: RecvTransportEngine.hpp:305
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:101
Definition: Client.hpp:14
Definition: LockFreeBufferT.hpp:18
Definition: Message.hpp:46
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:78
void runOnce() __restrict__
power the io_service and other things
Definition: RecvTransportEngine.hpp:115
Definition: Client.hpp:39
Definition: Transport.hpp:25
Definition: Messages.hpp:69
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:334
Definition: Client.hpp:11
Definition: MessageHandler.hpp:38
Definition: LockFreeBufferMisc.hpp:73