1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/mcast/Transport.hpp" 4 #include "hmbdc/app/mcast/Messages.hpp" 5 #include "hmbdc/app/Base.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/time/Rater.hpp" 8 #include "hmbdc/numeric/BitMath.hpp" 11 #include <boost/bind.hpp> 16 namespace hmbdc {
namespace app {
namespace mcast {
18 namespace sendtransportengine_detail {
26 using ptr = std::shared_ptr<SendTransport>;
28 ,
size_t maxMessageSize)
30 , topic_(cfg.
getExt<
string>(
"topicRegex"))
32 , maxMessageSize_(maxMessageSize)
34 , endpoint_(ip::address::from_string(config_.getExt<
string>(
"mcastAddr"))
35 , cfg.
getExt<
short>(
"mcastPort"))
37 , rater_(Duration::seconds(1u)
38 , config_.getExt<
size_t>(
"sendBytesPerSec")
39 , config_.getExt<
size_t>(
"sendBytesBurst")
40 , config_.getExt<
size_t>(
"sendBytesBurst") != 0ul)
41 , mtu_(config_.getExt<
size_t>(
"mtu"))
42 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch")) {
45 if (maxMessageSize_ + totalHead > mtu_) {
46 HMBDC_THROW(std::out_of_range,
"maxMessageSize needs <= " << mtu_ - totalHead);
48 toSend_.reserve(maxSendBatch_);
55 bool match(
Topic const& t)
const {
56 return boost::regex_match(t.c_str(), topicRegex_);
59 template <
typename... Messages>
60 void queue(
Topic const& t, Messages&&... msgs) {
61 auto n =
sizeof...(msgs);
62 auto it = buffer_.claim(n);
63 queue(it, t, std::forward<Messages>(msgs)...);
64 buffer_.commit(it, n);
67 template <
typename... Messages>
68 bool tryQueue(
Topic const& t, Messages&&... msgs) {
69 auto n =
sizeof...(msgs);
70 auto it = buffer_.tryClaim(n);
72 queue(it, t, std::forward<Messages>(msgs)...);
73 buffer_.commit(it, n);
79 template <
typename Message,
typename ... Args>
80 void queueInPlace(
Topic const& t, Args&&... args) {
81 auto s = buffer_.claim();
82 char* addr =
static_cast<char*
>(*s);
86 if (likely(
sizeof(Message) <= maxMessageSize_)) {
90 HMBDC_THROW(std::out_of_range
91 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
96 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
97 auto s = buffer_.claim();
98 char* addr =
static_cast<char*
>(*s);
102 if (likely(len <= maxMessageSize_)) {
106 HMBDC_THROW(std::out_of_range
107 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
113 void runOnce() HMBDC_RESTRICT {
114 if (!toSend_.size()) {
115 if (unlikely(pIos_->stopped())) pIos_->reset();
116 boost::system::error_code error;
117 size_t bytesTransferred = 0;
118 resumeSend(error, bytesTransferred);
120 boost::system::error_code ec;
122 if (ec) HMBDC_LOG_C(
"io_service error=", ec);
137 void initInThread() {
138 Transport::initInThread();
140 pSocket_ =
new boost::asio::ip::udp::socket(*pIos_, endpoint_.protocol());
143 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
144 ip::multicast::outbound_interface outFrom(ip::address_v4::from_string(iface));
145 pSocket_->set_option(outFrom);
147 ip::multicast::hops hops(config_.getExt<uint16_t>(
"ttl"));
148 pSocket_->set_option(hops);
156 ip::multicast::enable_loopback loop(config_.getExt<
bool>(
"loopback"));
157 pSocket_->set_option(loop);
159 auto sz = config_.getExt<int32_t>(
"udpSendBufferBytes");
161 socket_base::send_buffer_size option(sz);
162 boost::system::error_code ec;
163 pSocket_->set_option(option);
166 socket_base::send_buffer_size option;
167 pSocket_->get_option(option);
168 if (sz == 0 || sz >= option.value()) {
170 HMBDC_LOG_C(
"set udp send buffer size unsuccessful, want " 171 , sz,
" actual: ", option.value());
179 boost::regex topicRegex_;
180 size_t maxMessageSize_;
184 ip::udp::endpoint endpoint_;
185 ip::udp::socket* pSocket_;
188 size_t maxSendBatch_;
189 vector<const_buffer> toSend_;
193 outBufferSizePower2() {
194 auto res = config_.getExt<uint16_t>(
"outBufferSizePower2");
198 res =hmbdc::numeric::log2Upper(8ul * 1024ul / (8ul + maxMessageSize_));
199 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
203 template<
typename M,
typename ... Messages>
205 , M&& m, Messages&&... msgs) {
206 using Message =
typename std::remove_reference<M>::type;
208 char* addr =
static_cast<char*
>(s);
212 if (likely(
sizeof(Message) <= maxMessageSize_)) {
216 HMBDC_THROW(std::out_of_range
217 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
219 queue(++it, t, std::forward<Messages>(msgs)...);
225 void resumeSend(
const boost::system::error_code& error
226 ,
size_t bytesTransferred) {
227 if (unlikely(error)) {
228 HMBDC_LOG_C(
"asio error=", error);
231 buffer_.wasteAfterPeek(0, toSend_.size());
235 buffer_.peek(0, begin_, end, maxSendBatch_);
237 size_t batchBytes = 0;
241 if (unlikely(!rater_.check(item->wireSize())))
break;
242 batchBytes += item->wireSize();
243 if (unlikely(batchBytes > mtu_))
break;
245 toSend_.emplace_back(ptr, item->wireSize());
249 if (unlikely(toSend_.size() == 1)) {
250 pSocket_->send_to(toSend_, endpoint_);
251 buffer_.wasteAfterPeek(0, 1);
254 }
else if (likely(toSend_.size())) {
255 pSocket_->async_send_to(
258 , boost::bind(&SendTransport::resumeSend
260 , boost::asio::placeholders::error
261 , boost::asio::placeholders::bytes_transferred)
270 ,
Client<SendTransportEngine> {
271 using SendTransport::SendTransport;
274 char const* hmbdcName()
const {
275 return this->hmbdcName_.c_str();
278 std::tuple<char const*, int> schedSpec()
const {
279 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
283 void messageDispatchingStartedCb(uint16_t threadSerialNumber)
override {
284 this->initInThread();
288 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
292 bool droppedCb()
override {
Definition: Message.hpp:135
class to hold an hmbdc configuration
Definition: Config.hpp:43
Definition: SendTransportEngine.hpp:24
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: GuardedSingleton.hpp:9
Definition: Message.hpp:34
boost::asio::ip::udp::socket & asioSocket()
expose so user can manipulate it
Definition: SendTransportEngine.hpp:133
Definition: Message.hpp:55
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:17
Definition: SendTransportEngine.hpp:267
Definition: LockFreeBufferMisc.hpp:73