1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/SendServer.hpp" 6 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/app/mcast/SendTransportEngine.hpp" 8 #include "hmbdc//Traits.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 13 #include <boost/circular_buffer.hpp> 16 #include <type_traits> 20 namespace hmbdc {
namespace app {
namespace tcpcast {
25 using namespace hmbdc;
34 using ptr = std::shared_ptr<SendTransport>;
43 ,
size_t maxMessageSize
44 ,
size_t minRecvToStart)
46 , topic_(cfg.getExt<string>(
"topicRegex"))
48 , maxMessageSize_(maxMessageSize)
49 , minRecvToStart_(minRecvToStart)
51 , config_.getExt<uint16_t>(
"outBufferSizePower2")
52 ?config_.getExt<uint16_t>(
"outBufferSizePower2")
53 :
hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
56 , config_.getExt<size_t>(
"sendBytesPerSec")
57 , config_.getExt<size_t>(
"sendBytesBurst")
58 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul
61 mcConfig_.put(
"loopback",
true);
63 mcConfig_.put(
"topicRegex", tcpcastAdTopic_.c_str());
65 mcConfig_.put(
"outBufferSizePower2", 3u);
74 template <
typename... Messages>
75 void queue(
Topic const& t, Messages&&... msgs) {
76 auto n =
sizeof...(msgs);
77 auto it = buffer_.claim(n);
78 queue(it, t, std::forward<Messages>(msgs)...);
79 buffer_.commit(it, n);
82 template <
typename... Messages>
83 bool tryQueue(
Topic const& t, Messages&&... msgs) {
84 auto n =
sizeof...(msgs);
85 auto it = buffer_.tryClaim(n);
87 queue(it, t, std::forward<Messages>(msgs)...);
88 buffer_.commit(it, n);
94 template <
typename Message,
typename ... Args>
95 void queueInPlace(
Topic const& t, Args&&... args) {
96 static_assert(!is_base_of<hasFileAttachment, Message>::value
98 ,
"hasFileAttachment has to the first base for Message");
99 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
101 ,
"hasMemoryAttachment has to the first base for Message");
102 auto s = buffer_.claim();
103 char* addr =
static_cast<char*
>(*s);
105 h->flag = calculateFlag<Message>();
109 if (likely(
sizeof(Message) <= maxMessageSize_)) {
112 HMBDC_THROW(std::out_of_range
113 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
118 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len) {
119 auto s = buffer_.claim();
120 char* addr =
static_cast<char*
>(*s);
126 if (likely(len <= maxMessageSize_)) {
129 HMBDC_THROW(std::out_of_range
130 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
139 void initInThread() {
140 mcSendTransport_->initInThread();
141 Transport::initInThread();
144 bool match(
Topic const& t)
const {
145 return boost::regex_match(t.c_str(), topicRegex_);
150 boost::regex topicRegex_;
153 size_t maxMessageSize_;
154 size_t minRecvToStart_;
157 unique_ptr<mcast::SendTransport> mcSendTransport_;
163 template <
typename Message>
165 uint8_t calculateFlag() {
166 if (is_base_of<hasMemoryAttachment, Message>::value)
return hasMemoryAttachment::flag;
167 if (is_base_of<hasFileAttachment, Message>::value)
return hasFileAttachment::flag;
171 template<
typename M,
typename ... Messages>
173 , M&& m, Messages&&... msgs) {
174 using Message =
typename std::remove_reference<M>::type;
175 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
177 ,
"hasMemoryAttachment has to the first base for Message");
178 static_assert(!is_base_of<hasFileAttachment, Message>::value
180 ,
"hasFileAttachment has to the first base for Message");
183 char* addr =
static_cast<char*
>(s);
185 h->flag = calculateFlag<Message>();
189 if (likely(
sizeof(Message) <= maxMessageSize_)) {
192 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
194 queue(++it, t, std::forward<Messages>(msgs)...);
206 ,
Client<SendTransportEngine> {
208 ,
size_t maxMessageSize
209 ,
size_t minRecvToStart)
212 , mtu_(cfg.
getExt<
size_t>(
"mtu") - 20 - 20)
213 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
214 , waitForSlowReceivers_(cfg.
getExt<
bool>(
"waitForSlowReceivers"))
218 if (maxMessageSize_ + totalHead > mtu_) {
219 HMBDC_THROW(std::out_of_range,
"maxMessageSize needs <= " << mtu_ - totalHead);
221 toSend_.reserve(maxSendBatch_);
224 buffer_.peek(begin_, end, 1u);
225 buffer_.wasteAfterPeek(begin_, 0,
true);
229 char const* hmbdcName()
const {
230 return this->hmbdcName_.c_str();
233 std::tuple<char const*, int> schedSpec()
const {
234 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
238 void invokedCb(uint16_t threadSerialNumber) __restrict__
override {
241 if (likely(!minRecvToStart_
242 || asyncServer_->readySessionCount() >= minRecvToStart_)) {
246 mcSendTransport_->runOnce();
248 if (unlikely(pIos_->stopped())) pIos_->reset();
249 boost::system::error_code ec;
251 if (unlikely(ec)) HMBDC_LOG_C(
"io_service error=", ec);
255 bool droppedCb()
override {
261 void messageDispatchingStartedCb(uint16_t threadSerialNumber)
override {
262 this->initInThread();
263 asyncServer_.reset(
new AsyncSendServer(config_, *pIos_, buffer_.capacity()));
266 mcSendTransport_->queue(
267 tcpcastAdTopic_, asyncServer_->advertisingMessage());
268 if (!waitForSlowReceivers_) cullSlow();
272 schedule(SysTime::now(), *
this);
278 auto seq = buffer_.readSeq();
279 if (seq == lastSeq_ && buffer_.isFull()) {
280 asyncServer_->killSlowestSession();
285 void runAsync() __restrict__ {
287 buffer_.peek(begin, end, maxSendBatch_);
289 pair<char const*, char const*> currentTopic;
290 currentTopic.first =
nullptr;
291 size_t currentTopicLen = 0;
292 size_t toSendByteSize = 0;
298 if (unlikely(!rater_.check(item->wireSize())))
break;
299 if (unlikely(!item->flag
300 && toSendByteSize + item->wireSize() > mtu_))
break;
302 if (unlikely(!currentTopic.first)) {
303 currentTopic = item->topic();
304 currentTopicLen = item->topicLen;
305 toSendByteSize = item->wireSize();
306 toSend_.emplace_back(ptr, item->wireSize());
307 }
else if (item->topicLen == currentTopicLen
308 && strncmp(currentTopic.first, item->topic().first, currentTopicLen)
310 toSendByteSize += item->wireSize();
311 toSend_.emplace_back(ptr, item->wireSize());
313 asyncServer_->queue(currentTopic
319 currentTopic = item->topic();
320 currentTopicLen = item->topicLen;
321 toSendByteSize = item->wireSize();
322 toSend_.emplace_back(ptr, item->wireSize());
325 if (unlikely(item->flag == hasMemoryAttachment::flag)) {
327 toSend_.emplace_back(a.attachment, a.len);
328 toSendByteSize += a.len;
329 }
else if (unlikely(item->flag == hasFileAttachment::flag)) {
331 toSend_.emplace_back(a.attachment, a.len);
332 toSendByteSize += a.len;
337 if (toSend_.size()) {
338 asyncServer_->queue(currentTopic
346 auto newStart = asyncServer_->run(begin, it);
348 for (
auto it = begin; it != newStart; ++it) {
351 if (unlikely(item->flag == hasMemoryAttachment::flag)) {
354 }
else if (unlikely(item->flag == hasFileAttachment::flag)) {
359 buffer_.wasteAfterPeek(begin, newStart - begin,
true);
364 size_t maxSendBatch_;
365 bool waitForSlowReceivers_;
367 vector<const_buffer> toSend_;
370 unique_ptr<AsyncSendServer> asyncServer_;
Definition: Message.hpp:125
Definition: MonoLockFreeBuffer.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Timers.hpp:69
Definition: SendServer.hpp:58
SendTransport(Config const &cfg, size_t maxMessageSize, size_t minRecvToStart)
ctor
Definition: SendTransportEngine.hpp:42
Definition: Transport.hpp:27
Definition: SendTransportEngine.hpp:24
Definition: GuardedSingleton.hpp:9
Definition: Messages.hpp:23
Definition: Message.hpp:25
Definition: Client.hpp:14
Definition: Traits.hpp:35
Definition: SendTransportEngine.hpp:201
capture the transportation mechanism
Definition: SendTransportEngine.hpp:32
Definition: Message.hpp:46
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:143
Definition: Messages.hpp:35
Definition: Client.hpp:39
Definition: Client.hpp:11
Definition: Timers.hpp:100
Definition: LockFreeBufferMisc.hpp:73