1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/os/DownloadFile.hpp" 8 #include "hmbdc/os/DownloadMemory.hpp" 9 #include "hmbdc/comm/inet/Misc.hpp" 11 #include "hmbdc/app/tcpcast/RecvSession.hpp" 13 #include <boost/asio.hpp> 14 #include <boost/lexical_cast.hpp> 22 namespace hmbdc {
namespace app {
namespace tcpcast {
24 namespace recvsession_detail {
26 using boost::asio::ip::tcp;
30 template <
typename OutputBuffer,
typename MsgArbitrator>
32 : enable_shared_from_this<RecvSession<OutputBuffer, MsgArbitrator>> {
33 using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
34 using CleanupFunc = std::function<void()>;
37 , boost::regex
const& topicRegex
38 , CleanupFunc cleanupFunc
40 , OutputBuffer& outputBuffer
43 , subscriptions_(subscriptions)
44 , topicRegex_(topicRegex)
45 , cleanupFunc_(cleanupFunc)
48 , outputBuffer_(outputBuffer)
49 , bufSize_(config.
getExt<
size_t>(
"maxTcpReadBytes"))
50 , buf_((
char*)memalign(SMP_CACHE_BYTES, bufSize_))
54 , currTransportHeadFlag_(0) {
55 config(allowRecvMemoryAttachment_,
"allowRecvMemoryAttachment");
60 HMBDC_LOG_N(
"RecvSession retired: ",
id());
63 void start(tcp::resolver::iterator endpointIt) {
64 auto self(this->shared_from_this());
65 async_connect(socket_, endpointIt,
66 [
this,
self](boost::system::error_code ec, tcp::resolver::iterator) {
68 id_ = boost::lexical_cast<
string>(socket_.remote_endpoint());
69 strncpy(sessionStarted_.payload.ip, id_.c_str()
70 ,
sizeof(sessionStarted_.payload.ip));
71 sessionStarted_.payload.ip[
sizeof(sessionStarted_.payload.ip) - 1] = 0;
72 strncpy(sessionDropped_.payload.ip, id_.c_str()
73 ,
sizeof(sessionDropped_.payload.ip));
74 sessionDropped_.payload.ip[
sizeof(sessionDropped_.payload.ip) - 1] = 0;
76 auto sz = config_.getExt<
int>(
"tcpRecvBufferBytes");
78 socket_base::receive_buffer_size option(sz);
79 boost::system::error_code ec;
80 socket_.set_option(option);
83 socket_base::receive_buffer_size option;
84 socket_.get_option(option);
85 if (sz == 0 || sz >= option.value()) {
88 HMBDC_LOG_C(
"set tcpcast RecvSession receive buffer size unsuccessful, want " 89 , sz,
" actual: ", option.value()
90 ,
" resulting higher receiver dropping possibility, check OS limits!");
94 socket_.non_blocking(
true);
96 outputBuffer_.put(sessionStarted_);
102 void sendSubscribe(
Topic const& t) {
103 if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
106 auto l = t.copyTo(buf + 1);
108 if (l + 2 != socket_.send(buffer(buf, l + 2))) {
109 HMBDC_LOG_C(
"error when sending subscribe to ",
id());
115 void sendUnsubscribe(
Topic const& t) {
116 if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
119 auto l = t.copyTo(buf + 1);
121 if (l + 2 != socket_.send(buffer(buf, l + 2))) {
122 HMBDC_LOG_C(
"error when sending unsubscribe to ",
id());
128 if (!socket_.is_open())
return;
129 char const* hb =
"+\t";
130 if (socket_.send(buffer(hb, 2)) != 2) {
131 HMBDC_LOG_C(
"error when sending heartbeat to ",
id());
135 char const* id()
const {
140 void sendSubscriptions() {
142 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
143 auto const& t = it->first;
144 if (boost::regex_match(t.c_str(), topicRegex_)) {
152 auto s = oss.str() +
"+\t";
153 auto sz = socket_.send(buffer(s.c_str(), s.size()));
154 if (sz != s.size()) {
155 HMBDC_LOG_C(
"error when sending subscriptions to ",
id());
161 if (unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag)) {
162 auto s = memoryAttachment_.write(bufCur_, filledLen_);
163 if (memoryAttachment_.writeDone()) {
164 memoryAttachment_.close();
165 currTransportHeadFlag_ = 0;
166 outputBuffer_.commit(it_);
171 while (currTransportHeadFlag_ == 0
174 auto wireSize = h->wireSize();
175 if (likely(filledLen_ >= wireSize)) {
176 if (likely(subscriptions_.check(h->topic()))) {
179 it_ = outputBuffer_.claim();
180 char* b =
static_cast<char*
>(*it_);
181 auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
182 memcpy(b, h->payload(), l);
183 if (allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end()) {
184 currTransportHeadFlag_ = h->flag;
186 currTransportHeadFlag_ = 0;
188 if (!currTransportHeadFlag_) {
189 outputBuffer_.commit(it_);
190 }
else if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
193 att.attachment = memoryAttachment_.open(att.len);
195 }
else if (unlikely(a == 0)) {
198 doReadTimer_.expires_from_now(boost::posix_time::microseconds(0));
200 auto self(this->shared_from_this());
201 doReadTimer_.async_wait(
202 [
this,
self](boost::system::error_code ec) {
211 filledLen_ -= wireSize;
216 memmove(buf_, bufCur_, filledLen_);
219 auto self(this->shared_from_this());
220 socket_.async_read_some(boost::asio::buffer(buf_ + filledLen_, bufSize_ - filledLen_),
221 [
this,
self](boost::system::error_code ec, std::size_t length) {
223 filledLen_ += length;
226 HMBDC_LOG_C(
id(),
" receiving error = ", ec);
229 outputBuffer_.put(sessionDropped_);
236 boost::regex topicRegex_;
237 std::function<void()> cleanupFunc_;
240 OutputBuffer& outputBuffer_;
244 size_t const bufSize_;
248 deadline_timer doReadTimer_;
249 uint8_t currTransportHeadFlag_;
250 typename OutputBuffer::iterator it_;
253 std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
257 template <
typename OutputBuffer,
typename MsgArbitrator>
Definition: DownloadFile.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: DownloadMemory.hpp:10
Definition: RecvSession.hpp:31
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
Definition: LfbStream.hpp:11