1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/os/DownloadFile.hpp" 8 #include "hmbdc/os/DownloadMemory.hpp" 9 #include "hmbdc/comm/inet/Misc.hpp" 11 #include "hmbdc/app/tcpcast/RecvSession.hpp" 13 #include <boost/asio.hpp> 14 #include <boost/lexical_cast.hpp> 22 namespace hmbdc {
namespace app {
namespace tcpcast {
24 namespace recvsession_detail {
26 using boost::asio::ip::tcp;
30 template <
typename OutputBuffer,
typename MsgArbitrator>
32 : enable_shared_from_this<RecvSession<OutputBuffer, MsgArbitrator>> {
33 using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
34 using CleanupFunc = std::function<void()>;
37 , boost::regex
const& topicRegex
38 , CleanupFunc cleanupFunc
40 , OutputBuffer& outputBuffer
43 , subscriptions_(subscriptions)
44 , topicRegex_(topicRegex)
45 , cleanupFunc_(cleanupFunc)
48 , outputBuffer_(outputBuffer)
49 , bufSize_(config.
getExt<
size_t>(
"maxTcpReadBytes"))
50 , buf_((
char*)memalign(SMP_CACHE_BYTES, bufSize_))
54 , currTransportHeadFlag_(0) {
55 config(allowRecvMemoryAttachment_,
"allowRecvMemoryAttachment");
60 HMBDC_LOG_N(
"RecvSession retired: ",
id());
63 void start(tcp::resolver::iterator endpointIt) {
64 HMBDC_LOG_N(
"RecvSession started: ",
id());
65 auto self(this->shared_from_this());
66 async_connect(socket_, endpointIt,
67 [
this,
self](boost::system::error_code ec, tcp::resolver::iterator) {
69 id_ = boost::lexical_cast<
string>(socket_.remote_endpoint());
70 strncpy(sessionStarted_.payload.ip, id_.c_str()
71 ,
sizeof(sessionStarted_.payload.ip));
72 sessionStarted_.payload.ip[
sizeof(sessionStarted_.payload.ip) - 1] = 0;
73 strncpy(sessionDropped_.payload.ip, id_.c_str()
74 ,
sizeof(sessionDropped_.payload.ip));
75 sessionDropped_.payload.ip[
sizeof(sessionDropped_.payload.ip) - 1] = 0;
77 auto sz = config_.getExt<
int>(
"tcpRecvBufferBytes");
79 socket_base::receive_buffer_size option(sz);
80 boost::system::error_code ec;
81 socket_.set_option(option);
84 socket_base::receive_buffer_size option;
85 socket_.get_option(option);
86 if (sz == 0 || sz >= option.value()) {
89 HMBDC_LOG_C(
"set tcpcast RecvSession receive buffer size unsuccessful, want " 90 , sz,
" actual: ", option.value()
91 ,
" resulting higher receiver dropping possibility, check OS limits!");
95 socket_.non_blocking(
true);
97 outputBuffer_.putSome(sessionStarted_);
99 HMBDC_LOG_C(
id(),
" connect error = ", ec);
107 void sendSubscribe(
Topic const& t) {
108 if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
111 auto l = t.copyTo(buf + 1);
113 boost::system::error_code ec;
114 if (hmbdc_unlikely(l + 2 != boost::asio::write(socket_, buffer(buf, l + 2), ec))) {
115 HMBDC_LOG_C(
"error when sending subscribe to ",
id());
121 void sendUnsubscribe(
Topic const& t) {
122 if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
125 auto l = t.copyTo(buf + 1);
127 boost::system::error_code ec;
128 if (hmbdc_unlikely(l + 2 != boost::asio::write(socket_, buffer(buf, l + 2), ec))) {
129 HMBDC_LOG_C(
"error when sending unsubscribe to ",
id());
135 if (!socket_.is_open())
return;
136 char const* hb =
"+\t";
137 boost::system::error_code ec;
138 if (hmbdc_unlikely(2 != boost::asio::write(socket_, buffer(hb, 2)))) {
139 HMBDC_LOG_C(
"error when sending heartbeat to ",
id());
143 char const* id()
const {
148 void sendSubscriptions() {
150 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
151 auto const& t = it->first;
152 if (boost::regex_match(t.c_str(), topicRegex_)) {
160 auto s = oss.str() +
"+\t";
161 boost::system::error_code ec;
162 auto sz = boost::asio::write(socket_, buffer(s.c_str(), s.size()), ec);
163 if (hmbdc_unlikely(sz != s.size())) {
164 HMBDC_LOG_C(
"error when sending subscriptions to ",
id());
168 if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag)) {
169 auto s = memoryAttachment_.write(bufCur_, filledLen_);
170 if (memoryAttachment_.writeDone()) {
171 memoryAttachment_.close();
172 currTransportHeadFlag_ = 0;
173 outputBuffer_.commit(it_);
178 while (currTransportHeadFlag_ == 0
181 auto wireSize = h->wireSize();
182 if (hmbdc_likely(filledLen_ >= wireSize)) {
183 if (hmbdc_likely(subscriptions_.check(h->topic()))) {
185 if (hmbdc_likely(a > 0)) {
186 it_ = outputBuffer_.claim();
187 char* b =
static_cast<char*
>(*it_);
188 auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
189 memcpy(b, h->payload(), l);
190 if (allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end()) {
191 currTransportHeadFlag_ = h->flag;
193 currTransportHeadFlag_ = 0;
195 if (!currTransportHeadFlag_) {
196 outputBuffer_.commit(it_);
197 }
else if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
200 att.attachment = memoryAttachment_.open(att.len);
202 }
else if (hmbdc_unlikely(a == 0)) {
205 doReadTimer_.expires_from_now(boost::posix_time::microseconds(0));
207 auto self(this->shared_from_this());
208 doReadTimer_.async_wait(
209 [
this,
self](boost::system::error_code ec) {
218 filledLen_ -= wireSize;
223 memmove(buf_, bufCur_, filledLen_);
226 auto self(this->shared_from_this());
227 socket_.async_read_some(boost::asio::buffer(buf_ + filledLen_, bufSize_ - filledLen_),
228 [
this,
self](boost::system::error_code ec, std::size_t length) {
230 filledLen_ += length;
233 HMBDC_LOG_C(
id(),
" receiving error = ", ec);
236 outputBuffer_.putSome(sessionDropped_);
243 boost::regex topicRegex_;
244 std::function<void()> cleanupFunc_;
247 OutputBuffer& outputBuffer_;
251 size_t const bufSize_;
255 deadline_timer doReadTimer_;
256 uint8_t currTransportHeadFlag_;
257 typename OutputBuffer::iterator it_;
260 std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
264 template <
typename OutputBuffer,
typename MsgArbitrator>
Definition: DownloadFile.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: DownloadMemory.hpp:10
Definition: RecvSession.hpp:31
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
Definition: LfbStream.hpp:11