hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
12 
13 #include <boost/asio.hpp>
14 #include <boost/lexical_cast.hpp>
15 
16 #include <functional>
17 #include <memory>
18 #include <utility>
19 
20 #include <iostream>
21 
22 namespace hmbdc { namespace app { namespace tcpcast {
23 
24 namespace recvsession_detail {
25 using namespace boost::asio;
26 using boost::asio::ip::tcp;
27 using namespace std;
28 using namespace hmbdc::text;
29 
30 template <typename OutputBuffer, typename MsgArbitrator>
31 struct RecvSession
32 : enable_shared_from_this<RecvSession<OutputBuffer, MsgArbitrator>> {
33  using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
34  using CleanupFunc = std::function<void()>;
35  RecvSession(Config const& config
36  , StringTrieSet const& subscriptions
37  , boost::regex const& topicRegex
38  , CleanupFunc cleanupFunc
39  , io_service& ios
40  , OutputBuffer& outputBuffer
41  , MsgArbitrator& arb)
42  : config_(config)
43  , subscriptions_(subscriptions)
44  , topicRegex_(topicRegex)
45  , cleanupFunc_(cleanupFunc)
46  , socket_(ios)
47  , arb_(arb)
48  , outputBuffer_(outputBuffer)
49  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
50  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
51  , bufCur_(buf_)
52  , filledLen_(0)
53  , doReadTimer_(ios)
54  , currTransportHeadFlag_(0) {
55  config(allowRecvMemoryAttachment_, "allowRecvMemoryAttachment");
56  }
57 
58  ~RecvSession() {
59  free(buf_);
60  HMBDC_LOG_N("RecvSession retired: ", id());
61  }
62 
63  void start(tcp::resolver::iterator endpointIt) {
64  HMBDC_LOG_N("RecvSession started: ", id());
65  auto self(this->shared_from_this());
66  async_connect(socket_, endpointIt,
67  [this, self](boost::system::error_code ec, tcp::resolver::iterator) {
68  if (!ec) {
69  id_ = boost::lexical_cast<string>(socket_.remote_endpoint());
70  strncpy(sessionStarted_.payload.ip, id_.c_str()
71  , sizeof(sessionStarted_.payload.ip));
72  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
73  strncpy(sessionDropped_.payload.ip, id_.c_str()
74  , sizeof(sessionDropped_.payload.ip));
75  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
76 
77  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
78  if (sz) {
79  socket_base::receive_buffer_size option(sz);
80  boost::system::error_code ec;
81  socket_.set_option(option);
82  }
83 
84  socket_base::receive_buffer_size option;
85  socket_.get_option(option);
86  if (sz == 0 || sz >= option.value()) {
87  // HMBDC_LOG_N("RecvSession receive buffer size: ", option.value());
88  } else {
89  HMBDC_LOG_C("set tcpcast RecvSession receive buffer size unsuccessful, want "
90  , sz, " actual: ", option.value()
91  , " resulting higher receiver dropping possibility, check OS limits!");
92  }
93 
94  doRead(); //async
95  socket_.non_blocking(true);
96  sendSubscriptions(); //sync
97  outputBuffer_.putSome(sessionStarted_);
98  } else {
99  HMBDC_LOG_C(id(), " connect error = ", ec);
100  socket_.cancel();
101  cleanupFunc_();
102  }
103  }
104  );
105  }
106 
107  void sendSubscribe(Topic const& t) {
108  if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
109  char buf[70];
110  buf[0]='+';
111  auto l = t.copyTo(buf + 1);
112  buf[l + 1] = '\t';
113  boost::system::error_code ec;
114  if (hmbdc_unlikely(l + 2 != boost::asio::write(socket_, buffer(buf, l + 2), ec))) {
115  HMBDC_LOG_C("error when sending subscribe to ", id());
116  }
117  }
118  }
119 
120 
121  void sendUnsubscribe(Topic const& t) {
122  if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
123  char buf[70];
124  buf[0]='-';
125  auto l = t.copyTo(buf + 1);
126  buf[l + 1] = '\t';
127  boost::system::error_code ec;
128  if (hmbdc_unlikely(l + 2 != boost::asio::write(socket_, buffer(buf, l + 2), ec))) {
129  HMBDC_LOG_C("error when sending unsubscribe to ", id());
130  }
131  }
132  }
133 
134  void heartbeat() {
135  if (!socket_.is_open()) return;
136  char const* hb = "+\t";
137  boost::system::error_code ec;
138  if (hmbdc_unlikely(2 != boost::asio::write(socket_, buffer(hb, 2)))) {
139  HMBDC_LOG_C("error when sending heartbeat to ", id());
140  }
141  }
142 
143  char const* id() const {
144  return id_.c_str();
145  }
146 
147 private:
148  void sendSubscriptions() {
149  ostringstream oss;
150  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
151  auto const& t = it->first;
152  if (boost::regex_match(t.c_str(), topicRegex_)) {
153  oss << '+' << t;
154  if (it->second) {
155  oss << '*';
156  }
157  oss << '\t';
158  }
159  }
160  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
161  boost::system::error_code ec;
162  auto sz = boost::asio::write(socket_, buffer(s.c_str(), s.size()), ec);
163  if (hmbdc_unlikely(sz != s.size())) {
164  HMBDC_LOG_C("error when sending subscriptions to ", id());
165  }
166  }
167  void doRead() {
168  if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag)) {
169  auto s = memoryAttachment_.write(bufCur_, filledLen_);
170  if (memoryAttachment_.writeDone()) {
171  memoryAttachment_.close();
172  currTransportHeadFlag_ = 0;
173  outputBuffer_.commit(it_);
174  }
175  bufCur_ += s;
176  filledLen_ -= s;
177  }
178  while (currTransportHeadFlag_ == 0
179  && filledLen_ >= sizeof(TransportMessageHeader)) {
180  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
181  auto wireSize = h->wireSize();
182  if (hmbdc_likely(filledLen_ >= wireSize)) {
183  if (hmbdc_likely(subscriptions_.check(h->topic()))) {
184  auto a = arb_(h);
185  if (hmbdc_likely(a > 0)) {
186  it_ = outputBuffer_.claim();
187  char* b = static_cast<char*>(*it_);
188  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
189  memcpy(b, h->payload(), l);
190  if (allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end()) {
191  currTransportHeadFlag_ = h->flag;
192  } else {
193  currTransportHeadFlag_ = 0;
194  }
195  if (!currTransportHeadFlag_) {
196  outputBuffer_.commit(it_);
197  } else if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
198  auto& att =
199  static_cast<app::MessageWrap<hasMemoryAttachment>*>(*it_)->payload;
200  att.attachment = memoryAttachment_.open(att.len);
201  }
202  } else if (hmbdc_unlikely(a == 0)) {
203  //cannot decide now - keep bufCur_ filledLen_ unchanged
204  //try it later, use a timer since there might be no more bytes arriving
205  doReadTimer_.expires_from_now(boost::posix_time::microseconds(0));
206 
207  auto self(this->shared_from_this());
208  doReadTimer_.async_wait(
209  [this, self](boost::system::error_code ec) {
210  if (!ec) doRead();
211  }
212  );
213  return;
214  } //else move to next msg
215  }
216  bufCur_ += wireSize;
217  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
218  filledLen_ -= wireSize;
219  } else {
220  break;
221  }
222  }
223  memmove(buf_, bufCur_, filledLen_);
224  bufCur_ = buf_;
225 
226  auto self(this->shared_from_this());
227  socket_.async_read_some(boost::asio::buffer(buf_ + filledLen_, bufSize_ - filledLen_),
228  [this, self](boost::system::error_code ec, std::size_t length) {
229  if (!ec) {
230  filledLen_ += length;
231  doRead();
232  } else {
233  HMBDC_LOG_C(id(), " receiving error = ", ec);
234  socket_.cancel();
235  cleanupFunc_();
236  outputBuffer_.putSome(sessionDropped_);
237  }
238  });
239  }
240 
241  Config const& config_;
242  StringTrieSet const& subscriptions_;
243  boost::regex topicRegex_;
244  std::function<void()> cleanupFunc_;
245  tcp::socket socket_;
246  MsgArbitrator& arb_;
247  OutputBuffer& outputBuffer_;
248  string id_;
249  MessageWrap<SessionStarted> sessionStarted_;
250  MessageWrap<SessionDropped> sessionDropped_;
251  size_t const bufSize_;
252  char* buf_;
253  char* bufCur_;
254  size_t filledLen_;
255  deadline_timer doReadTimer_;
256  uint8_t currTransportHeadFlag_;
257  typename OutputBuffer::iterator it_;
258  os::DownloadFile fileAttachment_;
259 
260  std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
261  os::DownloadMemory memoryAttachment_;
262 };
263 } //recvsession_detail
264 template <typename OutputBuffer, typename MsgArbitrator>
266 }}}
Definition: DownloadFile.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: DownloadMemory.hpp:10
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
Definition: Base.hpp:12
Definition: LfbStream.hpp:11