1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/SendServer.hpp" 6 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/app/mcast/SendTransportEngine.hpp" 8 #include "hmbdc//Traits.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 13 #include <boost/circular_buffer.hpp> 16 #include <type_traits> 20 namespace hmbdc {
namespace app {
namespace tcpcast {
22 namespace send_detail {
26 using pattern::MonoLockFreeBuffer;
33 using ptr = std::shared_ptr<SendTransport>;
42 ,
size_t maxMessageSize
43 ,
size_t minRecvToStart)
45 , topic_(cfg.getExt<string>(
"topicRegex"))
47 , maxMessageSize_(maxMessageSize)
48 , minRecvToStart_(minRecvToStart)
50 , config_.getExt<uint16_t>(
"outBufferSizePower2")
51 ?config_.getExt<uint16_t>(
"outBufferSizePower2")
52 :
hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
55 , config_.getExt<size_t>(
"sendBytesPerSec")
56 , config_.getExt<size_t>(
"sendBytesBurst")
57 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul
60 mcConfig_.put(
"loopback",
true);
62 mcConfig_.put(
"topicRegex", tcpcastAdTopic_.c_str());
64 mcConfig_.put(
"outBufferSizePower2", 3u);
73 template <
typename... Messages>
74 void queue(
Topic const& t, Messages&&... msgs) {
75 auto n =
sizeof...(msgs);
76 auto it = buffer_.claim(n);
77 queue(it, t, std::forward<Messages>(msgs)...);
78 buffer_.commit(it, n);
81 template <
typename... Messages>
82 bool tryQueue(
Topic const& t, Messages&&... msgs) {
83 auto n =
sizeof...(msgs);
84 auto it = buffer_.tryClaim(n);
86 queue(it, t, std::forward<Messages>(msgs)...);
87 buffer_.commit(it, n);
93 template <
typename Message,
typename ... Args>
94 void queueInPlace(
Topic const& t, Args&&... args) {
95 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
97 ,
"hasMemoryAttachment has to the first base for Message");
98 auto s = buffer_.claim();
99 char* addr =
static_cast<char*
>(*s);
101 h->flag = calculateFlag<Message>();
105 if (likely(
sizeof(Message) <= maxMessageSize_)) {
108 HMBDC_THROW(std::out_of_range
109 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
114 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
115 auto s = buffer_.claim();
116 char* addr =
static_cast<char*
>(*s);
122 if (likely(len <= maxMessageSize_)) {
125 HMBDC_THROW(std::out_of_range
126 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
135 void initInThread() {
136 mcSendTransport_->initInThread();
137 Transport::initInThread();
140 bool match(
Topic const& t)
const {
141 return boost::regex_match(t.c_str(), topicRegex_);
146 boost::regex topicRegex_;
149 size_t maxMessageSize_;
150 size_t minRecvToStart_;
151 MonoLockFreeBuffer buffer_;
153 unique_ptr<mcast::SendTransport> mcSendTransport_;
159 template <
typename Message>
161 uint8_t calculateFlag() {
162 if (is_base_of<hasMemoryAttachment, Message>::value)
return hasMemoryAttachment::flag;
166 template<
typename M,
typename ... Messages>
167 void queue(MonoLockFreeBuffer::iterator it,
Topic const& t
168 , M&& m, Messages&&... msgs) {
169 using Message =
typename std::remove_reference<M>::type;
170 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
172 ,
"hasMemoryAttachment has to the first base for Message");
175 char* addr =
static_cast<char*
>(s);
177 h->flag = calculateFlag<Message>();
181 if (likely(
sizeof(Message) <= maxMessageSize_)) {
184 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
186 queue(++it, t, std::forward<Messages>(msgs)...);
189 void queue(MonoLockFreeBuffer::iterator it
198 ,
Client<SendTransportEngine> {
200 ,
size_t maxMessageSize
201 ,
size_t minRecvToStart)
204 , mtu_(cfg.
getExt<
size_t>(
"mtu") - 20 - 20)
205 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
206 , waitForSlowReceivers_(cfg.
getExt<
bool>(
"waitForSlowReceivers"))
210 if (maxMessageSize_ + totalHead > mtu_) {
211 HMBDC_THROW(std::out_of_range,
"maxMessageSize needs <= " << mtu_ - totalHead);
213 toSend_.reserve(maxSendBatch_);
215 MonoLockFreeBuffer::iterator end;
216 buffer_.peek(begin_, end, 1u);
217 buffer_.wasteAfterPeek(begin_, 0,
true);
221 char const* hmbdcName()
const {
222 return this->hmbdcName_.c_str();
225 std::tuple<char const*, int> schedSpec()
const {
226 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
230 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
233 if (likely(!minRecvToStart_
234 || asyncServer_->readySessionCount() >= minRecvToStart_)) {
238 mcSendTransport_->runOnce();
240 if (unlikely(pIos_->stopped())) pIos_->reset();
241 boost::system::error_code ec;
243 if (unlikely(ec)) HMBDC_LOG_C(
"io_service error=", ec);
247 bool droppedCb()
override {
253 void messageDispatchingStartedCb(uint16_t threadSerialNumber)
override {
254 this->initInThread();
255 asyncServer_.reset(
new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
258 mcSendTransport_->queue(
259 tcpcastAdTopic_, asyncServer_->advertisingMessage());
260 if (!waitForSlowReceivers_) cullSlow();
264 schedule(SysTime::now(), *
this);
267 size_t activeSessionCount()
const {
268 if (asyncServer_ && !minRecvToStart_) {
269 return asyncServer_->readySessionCount();
271 return numeric_limits<size_t>::max();
275 auto seq = buffer_.readSeq();
276 if (seq == lastSeq_ && buffer_.isFull()) {
277 asyncServer_->killSlowestSession();
282 void runAsync() HMBDC_RESTRICT {
283 MonoLockFreeBuffer::iterator begin, end;
284 buffer_.peek(begin, end, maxSendBatch_);
286 pair<char const*, char const*> currentTopic;
287 currentTopic.first =
nullptr;
288 size_t currentTopicLen = 0;
289 size_t toSendByteSize = 0;
295 if (unlikely(!rater_.check(item->wireSize())))
break;
296 if (unlikely(!item->flag
297 && toSendByteSize + item->wireSize() > mtu_))
break;
299 if (unlikely(!currentTopic.first)) {
300 currentTopic = item->topic();
301 currentTopicLen = item->topicLen;
302 toSendByteSize = item->wireSize();
303 toSend_.emplace_back(ptr, item->wireSize());
304 }
else if (item->topicLen == currentTopicLen
305 && strncmp(currentTopic.first, item->topic().first, currentTopicLen)
307 toSendByteSize += item->wireSize();
308 toSend_.emplace_back(ptr, item->wireSize());
310 asyncServer_->queue(currentTopic
316 currentTopic = item->topic();
317 currentTopicLen = item->topicLen;
318 toSendByteSize = item->wireSize();
319 toSend_.emplace_back(ptr, item->wireSize());
322 if (unlikely(item->flag == hasMemoryAttachment::flag)) {
324 toSend_.emplace_back(a.attachment, a.len);
325 toSendByteSize += a.len;
330 if (toSend_.size()) {
331 asyncServer_->queue(currentTopic
339 auto newStart = asyncServer_->run(begin, it);
341 for (
auto it = begin; it != newStart; ++it) {
344 if (unlikely(item->flag == hasMemoryAttachment::flag)) {
346 if (a.afterConsumedCleanupFunc) a.afterConsumedCleanupFunc(&a);
349 buffer_.wasteAfterPeek(begin, newStart - begin,
true);
354 size_t maxSendBatch_;
355 bool waitForSlowReceivers_;
356 MonoLockFreeBuffer::iterator begin_;
357 vector<const_buffer> toSend_;
360 unique_ptr<AsyncSendServer> asyncServer_;
Definition: Message.hpp:135
capture the transportation mechanism
Definition: SendTransportEngine.hpp:31
class to hold an hmbdc configuration
Definition: Config.hpp:43
Definition: SendTransportEngine.hpp:24
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: Transport.hpp:22
Definition: SendTransportEngine.hpp:193
Definition: Message.hpp:34
SendTransport(Config const &cfg, size_t maxMessageSize, size_t minRecvToStart)
ctor
Definition: SendTransportEngine.hpp:41
Definition: Traits.hpp:35
Definition: Message.hpp:55
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:17
Definition: SendServer.hpp:60
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
if a specific hmbdc network transport (for example tcpcast) supports message with memory attachment...
Definition: Message.hpp:158
Definition: Timers.hpp:96