1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/SendServer.hpp" 6 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/app/mcast/SendTransportEngine.hpp" 8 #include "hmbdc//Traits.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 13 #include <boost/circular_buffer.hpp> 16 #include <type_traits> 20 namespace hmbdc {
namespace app {
namespace tcpcast {
22 namespace send_detail {
26 using pattern::MonoLockFreeBuffer;
33 using ptr = std::shared_ptr<SendTransport>;
40 ,
size_t maxMessageSize)
42 , topic_(cfg.getExt<string>(
"topicRegex"))
44 , maxMessageSize_(maxMessageSize)
45 , minRecvToStart_(cfg.getExt<size_t>(
"minRecvToStart"))
47 , config_.getExt<uint16_t>(
"outBufferSizePower2")
48 ?config_.getExt<uint16_t>(
"outBufferSizePower2")
49 :
hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
52 , config_.getExt<size_t>(
"sendBytesPerSec")
53 , config_.getExt<size_t>(
"sendBytesBurst")
54 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul
57 mcConfig_.put(
"loopback",
true);
59 mcConfig_.put(
"topicRegex", tcpcastAdTopic_.c_str());
61 mcConfig_.put(
"outBufferSizePower2", 3u);
70 template <
typename... Messages>
71 void queue(
Topic const& t, Messages&&... msgs) {
72 auto n =
sizeof...(msgs);
73 auto it = buffer_.claim(n);
74 queue(it, t, std::forward<Messages>(msgs)...);
75 buffer_.commit(it, n);
78 template <
typename... Messages>
79 bool tryQueue(
Topic const& t, Messages&&... msgs) {
80 auto n =
sizeof...(msgs);
81 auto it = buffer_.tryClaim(n);
83 queue(it, t, std::forward<Messages>(msgs)...);
84 buffer_.commit(it, n);
90 template <
typename Message,
typename ... Args>
91 void queueInPlace(
Topic const& t, Args&&... args) {
92 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
94 ,
"hasMemoryAttachment has to the first base for Message");
95 auto s = buffer_.claim();
96 char* addr =
static_cast<char*
>(*s);
98 h->flag = calculateFlag<Message>();
102 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
105 HMBDC_THROW(std::out_of_range
106 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
111 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
112 auto s = buffer_.claim();
113 char* addr =
static_cast<char*
>(*s);
119 if (hmbdc_likely(len <= maxMessageSize_)) {
122 HMBDC_THROW(std::out_of_range
123 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
132 void initInThread() {
133 mcSendTransport_->initInThread();
134 Transport::initInThread();
137 bool match(
Topic const& t)
const {
138 return boost::regex_match(t.c_str(), topicRegex_);
143 boost::regex topicRegex_;
146 size_t maxMessageSize_;
147 size_t minRecvToStart_;
148 MonoLockFreeBuffer buffer_;
150 unique_ptr<mcast::SendTransport> mcSendTransport_;
156 template <
typename Message>
158 uint8_t calculateFlag() {
159 if (is_base_of<hasMemoryAttachment, Message>::value)
return hasMemoryAttachment::flag;
163 template<
typename M,
typename ... Messages>
164 void queue(MonoLockFreeBuffer::iterator it,
Topic const& t
165 , M&& m, Messages&&... msgs) {
166 using Message =
typename std::remove_reference<M>::type;
167 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
169 ,
"hasMemoryAttachment has to the first base for Message");
172 char* addr =
static_cast<char*
>(s);
174 h->flag = calculateFlag<Message>();
178 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
181 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
183 queue(++it, t, std::forward<Messages>(msgs)...);
186 void queue(MonoLockFreeBuffer::iterator it
195 ,
Client<SendTransportEngine> {
197 ,
size_t maxMessageSize)
200 , mtu_(cfg.
getExt<
size_t>(
"mtu") - 20 - 20)
201 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
202 , waitForSlowReceivers_(cfg.
getExt<
bool>(
"waitForSlowReceivers"))
207 if (maxMessageSize_ + totalHead > mtu_) {
208 HMBDC_THROW(std::out_of_range,
"maxMessageSize needs <= " << mtu_ - totalHead);
210 toSend_.reserve(maxSendBatch_);
212 MonoLockFreeBuffer::iterator end;
213 buffer_.peek(begin_, end, 1u);
214 buffer_.wasteAfterPeek(begin_, 0,
true);
218 char const* hmbdcName()
const {
219 return this->hmbdcName_.c_str();
222 std::tuple<char const*, int> schedSpec()
const {
223 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
227 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
230 if (hmbdc_likely(!minRecvToStart_
231 || asyncServer_->readySessionCount() >= minRecvToStart_)) {
235 mcSendTransport_->runOnce();
237 if (hmbdc_unlikely(pIos_->stopped())) pIos_->reset();
238 boost::system::error_code ec;
240 if (hmbdc_unlikely(ec)) HMBDC_LOG_C(
"io_service error=", ec);
244 bool droppedCb()
override {
251 void messageDispatchingStartedCb(uint16_t threadSerialNumber)
override {
252 this->initInThread();
253 asyncServer_.reset(
new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
256 mcSendTransport_->queue(
257 tcpcastAdTopic_, asyncServer_->advertisingMessage());
258 if (!waitForSlowReceivers_) cullSlow();
262 schedule(SysTime::now(), *
this);
271 if (stopped_)
return 0;
272 if (asyncServer_ && !minRecvToStart_) {
273 return asyncServer_->readySessionCount();
275 return numeric_limits<size_t>::max();
279 auto seq = buffer_.readSeq();
280 if (seq == lastSeq_ && buffer_.isFull()) {
281 asyncServer_->killSlowestSession();
286 void runAsync() HMBDC_RESTRICT {
287 MonoLockFreeBuffer::iterator begin, end;
288 buffer_.peek(begin, end, maxSendBatch_);
290 pair<char const*, char const*> currentTopic;
291 currentTopic.first =
nullptr;
292 size_t currentTopicLen = 0;
293 size_t toSendByteSize = 0;
299 if (hmbdc_unlikely(!rater_.check(item->wireSize())))
break;
300 if (hmbdc_unlikely(!item->flag
301 && toSendByteSize + item->wireSize() > mtu_))
break;
303 if (hmbdc_unlikely(!currentTopic.first)) {
304 currentTopic = item->topic();
305 currentTopicLen = item->topicLen;
306 toSendByteSize = item->wireSize();
307 toSend_.emplace_back(ptr, item->wireSize());
308 }
else if (item->topicLen == currentTopicLen
309 && strncmp(currentTopic.first, item->topic().first, currentTopicLen)
311 toSendByteSize += item->wireSize();
312 toSend_.emplace_back(ptr, item->wireSize());
314 asyncServer_->queue(currentTopic
320 currentTopic = item->topic();
321 currentTopicLen = item->topicLen;
322 toSendByteSize = item->wireSize();
323 toSend_.emplace_back(ptr, item->wireSize());
326 if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
328 toSend_.emplace_back(a.attachment, a.len);
329 toSendByteSize += a.len;
334 if (toSend_.size()) {
335 asyncServer_->queue(currentTopic
343 auto newStart = asyncServer_->run(begin, it);
345 for (
auto it = begin; it != newStart; ++it) {
348 if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
350 if (a.afterConsumedCleanupFunc) a.afterConsumedCleanupFunc(&a);
353 buffer_.wasteAfterPeek(begin, newStart - begin,
true);
358 size_t maxSendBatch_;
359 bool waitForSlowReceivers_;
360 MonoLockFreeBuffer::iterator begin_;
361 vector<const_buffer> toSend_;
364 unique_ptr<AsyncSendServer> asyncServer_;
Definition: Message.hpp:135
capture the transportation mechanism
Definition: SendTransportEngine.hpp:31
class to hold an hmbdc configuration
Definition: Config.hpp:44
Definition: SendTransportEngine.hpp:24
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
SendTransport(Config const &cfg, size_t maxMessageSize)
ctor
Definition: SendTransportEngine.hpp:39
Definition: Transport.hpp:22
Definition: SendTransportEngine.hpp:190
Definition: Message.hpp:34
Definition: Traits.hpp:35
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:270
Definition: Message.hpp:55
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
Definition: SendServer.hpp:60
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:158
Definition: Timers.hpp:96