1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/mcast/Transport.hpp" 5 #include "hmbdc/app/mcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/time/Rater.hpp" 8 #include "hmbdc/numeric/BitMath.hpp" 11 #include <boost/bind.hpp> 16 namespace hmbdc {
namespace app {
namespace mcast {
21 using namespace hmbdc;
26 using ptr = std::shared_ptr<SendTransport>;
28 ,
size_t maxMessageSize)
30 , topic_(cfg.
getExt<
string>(
"topicRegex"))
32 , maxMessageSize_(maxMessageSize)
34 , config_.getExt<uint16_t>(
"outBufferSizePower2")
35 ?config_.getExt<uint16_t>(
"outBufferSizePower2")
36 :hmbdc::numeric::log2Upper(8ul * 1024ul / maxMessageSize)
38 , endpoint_(ip::address::from_string(config_.getExt<
string>(
"mcastAddr"))
39 , cfg.
getExt<
short>(
"mcastPort"))
41 , rater_(Duration::seconds(1u)
42 , config_.getExt<
size_t>(
"sendBytesPerSec")
43 , config_.getExt<
size_t>(
"sendBytesBurst")
44 , config_.getExt<
size_t>(
"sendBytesBurst") != 0ul)
45 , mtu_(config_.getExt<
size_t>(
"mtu"))
46 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch")) {
49 if (maxMessageSize_ + totalHead > mtu_) {
50 HMBDC_THROW(std::out_of_range,
"maxMessageSize needs <= " << mtu_ - totalHead);
52 toSend_.reserve(maxSendBatch_);
59 bool match(
Topic const& t)
const {
60 return boost::regex_match(t.c_str(), topicRegex_);
63 template <
typename... Messages>
64 void queue(
Topic const& t, Messages&&... msgs) {
65 auto n =
sizeof...(msgs);
66 auto it = buffer_.claim(n);
67 queue(it, t, std::forward<Messages>(msgs)...);
68 buffer_.commit(it, n);
71 template <
typename... Messages>
72 bool tryQueue(
Topic const& t, Messages&&... msgs) {
73 auto n =
sizeof...(msgs);
74 auto it = buffer_.tryClaim(n);
76 queue(it, t, std::forward<Messages>(msgs)...);
77 buffer_.commit(it, n);
83 template <
typename Message,
typename ... Args>
84 void queueInPlace(
Topic const& t, Args&&... args) {
85 auto s = buffer_.claim();
86 char* addr =
static_cast<char*
>(*s);
90 if (likely(
sizeof(Message) <= maxMessageSize_)) {
94 HMBDC_THROW(std::out_of_range
95 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
100 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
101 auto s = buffer_.claim();
102 char* addr =
static_cast<char*
>(*s);
106 if (likely(len <= maxMessageSize_)) {
110 HMBDC_THROW(std::out_of_range
111 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
117 void runOnce() __restrict__ {
118 if (!toSend_.size()) {
119 if (unlikely(pIos_->stopped())) pIos_->reset();
120 boost::system::error_code error;
121 size_t bytesTransferred = 0;
122 resumeSend(error, bytesTransferred);
124 boost::system::error_code ec;
126 if (ec) HMBDC_LOG_C(
"io_service error=", ec);
141 void initInThread() {
142 Transport::initInThread();
144 pSocket_ =
new boost::asio::ip::udp::socket(*pIos_, endpoint_.protocol());
147 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
148 ip::multicast::outbound_interface outFrom(ip::address_v4::from_string(iface));
149 pSocket_->set_option(outFrom);
151 ip::multicast::hops hops(config_.getExt<uint16_t>(
"ttl"));
152 pSocket_->set_option(hops);
160 ip::multicast::enable_loopback loop(config_.getExt<
bool>(
"loopback"));
161 pSocket_->set_option(loop);
163 auto sz = config_.getExt<int32_t>(
"udpSendBufferBytes");
165 socket_base::send_buffer_size option(sz);
166 boost::system::error_code ec;
167 pSocket_->set_option(option);
170 socket_base::send_buffer_size option;
171 pSocket_->get_option(option);
172 if (sz == 0 || sz >= option.value()) {
174 HMBDC_LOG_C(
"set udp send buffer size unsuccessful, want " 175 , sz,
" actual: ", option.value());
183 boost::regex topicRegex_;
184 size_t maxMessageSize_;
188 ip::udp::endpoint endpoint_;
189 ip::udp::socket* pSocket_;
192 size_t maxSendBatch_;
193 vector<const_buffer> toSend_;
195 template<
typename M,
typename ... Messages>
197 , M&& m, Messages&&... msgs) {
198 using Message =
typename std::remove_reference<M>::type;
200 char* addr =
static_cast<char*
>(s);
204 if (likely(
sizeof(Message) <= maxMessageSize_)) {
208 HMBDC_THROW(std::out_of_range
209 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
211 queue(++it, t, std::forward<Messages>(msgs)...);
217 void resumeSend(
const boost::system::error_code& error
218 ,
size_t bytesTransferred) {
219 if (unlikely(error)) {
220 HMBDC_LOG_C(
"asio error=", error);
223 buffer_.wasteAfterPeek(0, toSend_.size());
227 buffer_.peek(0, begin_, end, maxSendBatch_);
229 size_t batchBytes = 0;
233 if (unlikely(!rater_.check(item->wireSize())))
break;
234 batchBytes += item->wireSize();
235 if (unlikely(batchBytes > mtu_))
break;
237 toSend_.emplace_back(ptr, item->wireSize());
241 if (unlikely(toSend_.size() == 1)) {
242 pSocket_->send_to(toSend_, endpoint_);
243 buffer_.wasteAfterPeek(0, 1);
246 }
else if (likely(toSend_.size())) {
247 pSocket_->async_send_to(
250 , boost::bind(&SendTransport::resumeSend
252 , boost::asio::placeholders::error
253 , boost::asio::placeholders::bytes_transferred)
262 ,
Client<SendTransportEngine> {
263 using SendTransport::SendTransport;
266 char const* hmbdcName()
const {
267 return this->hmbdcName_.c_str();
270 std::tuple<char const*, int> schedSpec()
const {
271 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
275 void messageDispatchingStartedCb(uint16_t threadSerialNumber)
override {
276 this->initInThread();
280 void invokedCb(uint16_t threadSerialNumber) __restrict__
override {
284 bool droppedCb()
override {
Definition: Message.hpp:125
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: SendTransportEngine.hpp:259
Definition: SendTransportEngine.hpp:24
Definition: GuardedSingleton.hpp:9
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: SendTransportEngine.hpp:137
Definition: Message.hpp:25
Definition: Client.hpp:14
Definition: LockFreeBufferT.hpp:18
Definition: Message.hpp:46
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:39
Definition: Transport.hpp:25
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73