1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/os/DownloadFile.hpp" 8 #include "hmbdc/os/DownloadMemory.hpp" 9 #include "hmbdc/comm/inet/Misc.hpp" 11 #include "hmbdc/app/tcpcast/RecvSession.hpp" 13 #include <boost/asio.hpp> 14 #include <boost/lexical_cast.hpp> 22 namespace hmbdc {
namespace app {
namespace tcpcast {
25 using boost::asio::ip::tcp;
28 template <
typename OutputBuffer,
typename MsgArbitrator>
30 : enable_shared_from_this<RecvSession<OutputBuffer, MsgArbitrator>> {
31 using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
32 using CleanupFunc = std::function<void()>;
35 , boost::regex
const& topicRegex
36 , CleanupFunc cleanupFunc
38 , OutputBuffer& outputBuffer
41 , subscriptions_(subscriptions)
42 , topicRegex_(topicRegex)
43 , cleanupFunc_(cleanupFunc)
46 , outputBuffer_(outputBuffer)
47 , bufSize_(config.
getExt<
size_t>(
"maxTcpReadBytes"))
48 , buf_((
char*)memalign(SMP_CACHE_BYTES, bufSize_))
52 , currTransportHeadFlag_(0)
53 , downloadDir_(config.
getExt<
string>(
"downloadDir"))
54 , allowRecvMemoryAttachment_(config.
getExt<
bool>(
"allowRecvMemoryAttachment")) {
59 HMBDC_LOG_N(
"RecvSession retired: ",
id());
62 void start(tcp::resolver::iterator endpointIt) {
63 auto self(this->shared_from_this());
64 async_connect(socket_, endpointIt,
65 [
this,
self](boost::system::error_code ec, tcp::resolver::iterator) {
67 id_ = boost::lexical_cast<
string>(socket_.remote_endpoint());
68 strncpy(sessionStarted_.payload.ip, id_.c_str()
69 ,
sizeof(sessionStarted_.payload.ip));
70 sessionStarted_.payload.ip[
sizeof(sessionStarted_.payload.ip) - 1] = 0;
71 strncpy(sessionDropped_.payload.ip, id_.c_str()
72 ,
sizeof(sessionDropped_.payload.ip));
73 sessionDropped_.payload.ip[
sizeof(sessionDropped_.payload.ip) - 1] = 0;
75 auto sz = config_.getExt<
int>(
"tcpRecvBufferBytes");
77 socket_base::receive_buffer_size option(sz);
78 boost::system::error_code ec;
79 socket_.set_option(option);
82 socket_base::receive_buffer_size option;
83 socket_.get_option(option);
84 if (sz == 0 || sz >= option.value()) {
87 HMBDC_LOG_C(
"set tcpcast RecvSession receive buffer size unsuccessful, want " 88 , sz,
" actual: ", option.value()
89 ,
" resulting higher receiver dropping possibility, check OS limits!");
93 socket_.non_blocking(
true);
95 outputBuffer_.put(sessionStarted_);
101 void sendSubscribe(
Topic const& t) {
102 if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
105 auto l = t.copyTo(buf + 1);
107 if (l + 2 != socket_.send(buffer(buf, l + 2))) {
108 HMBDC_LOG_C(
"error when sending subscribe to ",
id());
114 void sendUnsubscribe(
Topic const& t) {
115 if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
118 auto l = t.copyTo(buf + 1);
120 if (l + 2 != socket_.send(buffer(buf, l + 2))) {
121 HMBDC_LOG_C(
"error when sending unsubscribe to ",
id());
127 if (!socket_.is_open())
return;
128 char const* hb =
"+\t";
129 if (socket_.send(buffer(hb, 2)) != 2) {
130 HMBDC_LOG_C(
"error when sending heartbeat to ",
id());
134 char const* id()
const {
139 void sendSubscriptions() {
141 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
142 auto const& t = it->first;
143 if (boost::regex_match(t.c_str(), topicRegex_)) {
151 auto s = oss.str() +
"+\t";
152 auto sz = socket_.send(buffer(s.c_str(), s.size()));
153 if (sz != s.size()) {
154 HMBDC_LOG_C(
"error when sending subscriptions to ",
id());
160 if (unlikely(currTransportHeadFlag_ == hasFileAttachment::flag)) {
161 auto s = fileAttachment_.write(bufCur_, filledLen_);
162 if (fileAttachment_.writeDone()) {
163 fileAttachment_.close();
164 currTransportHeadFlag_ = 0;
165 outputBuffer_.commit(it_);
169 }
else if (unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag)) {
170 auto s = memoryAttachment_.write(bufCur_, filledLen_);
171 if (memoryAttachment_.writeDone()) {
172 memoryAttachment_.close();
173 currTransportHeadFlag_ = 0;
174 outputBuffer_.commit(it_);
179 while (currTransportHeadFlag_ == 0
182 auto wireSize = h->wireSize();
183 if (likely(filledLen_ >= wireSize)) {
184 if (likely(subscriptions_.check(h->topic()))) {
187 it_ = outputBuffer_.claim();
188 char* b =
static_cast<char*
>(*it_);
189 auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
190 memcpy(b, h->payload(), l);
191 currTransportHeadFlag_ = h->flag;
192 if (!currTransportHeadFlag_) {
193 outputBuffer_.commit(it_);
194 }
else if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
197 if (allowRecvMemoryAttachment_) {
198 att.attachment = memoryAttachment_.open(att.len);
200 }
else if (currTransportHeadFlag_ == hasFileAttachment::flag) {
203 if (downloadDir_.size()) {
204 fileAttachment_.open(downloadDir_.c_str(), att.file, att.len);
206 if (!fileAttachment_) {
207 att.attachment =
nullptr;
210 }
else if (unlikely(a == 0)) {
213 doReadTimer_.expires_from_now(boost::posix_time::microseconds(0));
215 auto self(this->shared_from_this());
216 doReadTimer_.async_wait(
217 [
this,
self](boost::system::error_code ec) {
226 filledLen_ -= wireSize;
231 memmove(buf_, bufCur_, filledLen_);
234 auto self(this->shared_from_this());
235 socket_.async_read_some(boost::asio::buffer(buf_ + filledLen_, bufSize_ - filledLen_),
236 [
this,
self](boost::system::error_code ec, std::size_t length) {
238 filledLen_ += length;
241 HMBDC_LOG_C(
id(),
" receiving error = ", ec);
244 outputBuffer_.put(sessionDropped_);
251 boost::regex topicRegex_;
252 std::function<void()> cleanupFunc_;
255 OutputBuffer& outputBuffer_;
259 size_t const bufSize_;
263 deadline_timer doReadTimer_;
264 uint8_t currTransportHeadFlag_;
265 typename OutputBuffer::iterator it_;
268 bool allowRecvMemoryAttachment_;
Definition: DownloadFile.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: StringTrieSet.hpp:113
Definition: TypedString.hpp:74
Definition: DownloadMemory.hpp:11
Definition: Message.hpp:46
Definition: RecvSession.hpp:29
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:11