hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
12 
13 #include <boost/asio.hpp>
14 #include <boost/lexical_cast.hpp>
15 
16 #include <functional>
17 #include <memory>
18 #include <utility>
19 
20 #include <iostream>
21 
22 namespace hmbdc { namespace app { namespace tcpcast {
23 
24 namespace recvsession_detail {
25 using namespace boost::asio;
26 using boost::asio::ip::tcp;
27 using namespace std;
28 using namespace hmbdc::text;
29 
30 template <typename OutputBuffer, typename MsgArbitrator>
31 struct RecvSession
32 : enable_shared_from_this<RecvSession<OutputBuffer, MsgArbitrator>> {
33  using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
34  using CleanupFunc = std::function<void()>;
35  RecvSession(Config const& config
36  , StringTrieSet const& subscriptions
37  , boost::regex const& topicRegex
38  , CleanupFunc cleanupFunc
39  , io_service& ios
40  , OutputBuffer& outputBuffer
41  , MsgArbitrator& arb)
42  : config_(config)
43  , subscriptions_(subscriptions)
44  , topicRegex_(topicRegex)
45  , cleanupFunc_(cleanupFunc)
46  , socket_(ios)
47  , arb_(arb)
48  , outputBuffer_(outputBuffer)
49  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
50  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
51  , bufCur_(buf_)
52  , filledLen_(0)
53  , doReadTimer_(ios)
54  , currTransportHeadFlag_(0) {
55  config(allowRecvMemoryAttachment_, "allowRecvMemoryAttachment");
56  }
57 
58  ~RecvSession() {
59  free(buf_);
60  HMBDC_LOG_N("RecvSession retired: ", id());
61  }
62 
63  void start(tcp::resolver::iterator endpointIt) {
64  auto self(this->shared_from_this());
65  async_connect(socket_, endpointIt,
66  [this, self](boost::system::error_code ec, tcp::resolver::iterator) {
67  if (!ec) {
68  id_ = boost::lexical_cast<string>(socket_.remote_endpoint());
69  strncpy(sessionStarted_.payload.ip, id_.c_str()
70  , sizeof(sessionStarted_.payload.ip));
71  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
72  strncpy(sessionDropped_.payload.ip, id_.c_str()
73  , sizeof(sessionDropped_.payload.ip));
74  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
75 
76  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
77  if (sz) {
78  socket_base::receive_buffer_size option(sz);
79  boost::system::error_code ec;
80  socket_.set_option(option);
81  }
82 
83  socket_base::receive_buffer_size option;
84  socket_.get_option(option);
85  if (sz == 0 || sz >= option.value()) {
86  // HMBDC_LOG_N("RecvSession receive buffer size: ", option.value());
87  } else {
88  HMBDC_LOG_C("set tcpcast RecvSession receive buffer size unsuccessful, want "
89  , sz, " actual: ", option.value()
90  , " resulting higher receiver dropping possibility, check OS limits!");
91  }
92 
93  doRead(); //async
94  socket_.non_blocking(true);
95  sendSubscriptions(); //sync
96  outputBuffer_.put(sessionStarted_);
97  }
98  }
99  );
100  }
101 
102  void sendSubscribe(Topic const& t) {
103  if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
104  char buf[70];
105  buf[0]='+';
106  auto l = t.copyTo(buf + 1);
107  buf[l + 1] = '\t';
108  if (l + 2 != socket_.send(buffer(buf, l + 2))) {
109  HMBDC_LOG_C("error when sending subscribe to ", id());
110  }
111  }
112  }
113 
114 
115  void sendUnsubscribe(Topic const& t) {
116  if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
117  char buf[70];
118  buf[0]='-';
119  auto l = t.copyTo(buf + 1);
120  buf[l + 1] = '\t';
121  if (l + 2 != socket_.send(buffer(buf, l + 2))) {
122  HMBDC_LOG_C("error when sending unsubscribe to ", id());
123  }
124  }
125  }
126 
127  void heartbeat() {
128  if (!socket_.is_open()) return;
129  char const* hb = "+\t";
130  if (socket_.send(buffer(hb, 2)) != 2) {
131  HMBDC_LOG_C("error when sending heartbeat to ", id());
132  }
133  }
134 
135  char const* id() const {
136  return id_.c_str();
137  }
138 
139 private:
140  void sendSubscriptions() {
141  ostringstream oss;
142  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
143  auto const& t = it->first;
144  if (boost::regex_match(t.c_str(), topicRegex_)) {
145  oss << '+' << t;
146  if (it->second) {
147  oss << '*';
148  }
149  oss << '\t';
150  }
151  }
152  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
153  auto sz = socket_.send(buffer(s.c_str(), s.size()));
154  if (sz != s.size()) {
155  HMBDC_LOG_C("error when sending subscriptions to ", id());
156  }
157 
158 
159  }
160  void doRead() {
161  if (unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag)) {
162  auto s = memoryAttachment_.write(bufCur_, filledLen_);
163  if (memoryAttachment_.writeDone()) {
164  memoryAttachment_.close();
165  currTransportHeadFlag_ = 0;
166  outputBuffer_.commit(it_);
167  }
168  bufCur_ += s;
169  filledLen_ -= s;
170  }
171  while (currTransportHeadFlag_ == 0
172  && filledLen_ >= sizeof(TransportMessageHeader)) {
173  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
174  auto wireSize = h->wireSize();
175  if (likely(filledLen_ >= wireSize)) {
176  if (likely(subscriptions_.check(h->topic()))) {
177  auto a = arb_(h);
178  if (likely(a > 0)) {
179  it_ = outputBuffer_.claim();
180  char* b = static_cast<char*>(*it_);
181  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
182  memcpy(b, h->payload(), l);
183  if (allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end()) {
184  currTransportHeadFlag_ = h->flag;
185  } else {
186  currTransportHeadFlag_ = 0;
187  }
188  if (!currTransportHeadFlag_) {
189  outputBuffer_.commit(it_);
190  } else if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
191  auto& att =
192  static_cast<app::MessageWrap<hasMemoryAttachment>*>(*it_)->payload;
193  att.attachment = memoryAttachment_.open(att.len);
194  }
195  } else if (unlikely(a == 0)) {
196  //cannot decide now - keep bufCur_ filledLen_ unchanged
197  //try it later, use a timer since there might be no more bytes arriving
198  doReadTimer_.expires_from_now(boost::posix_time::microseconds(0));
199 
200  auto self(this->shared_from_this());
201  doReadTimer_.async_wait(
202  [this, self](boost::system::error_code ec) {
203  if (!ec) doRead();
204  }
205  );
206  return;
207  } //else move to next msg
208  }
209  bufCur_ += wireSize;
210  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
211  filledLen_ -= wireSize;
212  } else {
213  break;
214  }
215  }
216  memmove(buf_, bufCur_, filledLen_);
217  bufCur_ = buf_;
218 
219  auto self(this->shared_from_this());
220  socket_.async_read_some(boost::asio::buffer(buf_ + filledLen_, bufSize_ - filledLen_),
221  [this, self](boost::system::error_code ec, std::size_t length) {
222  if (!ec) {
223  filledLen_ += length;
224  doRead();
225  } else {
226  HMBDC_LOG_C(id(), " receiving error = ", ec);
227  socket_.cancel();
228  cleanupFunc_();
229  outputBuffer_.put(sessionDropped_);
230  }
231  });
232  }
233 
234  Config const& config_;
235  StringTrieSet const& subscriptions_;
236  boost::regex topicRegex_;
237  std::function<void()> cleanupFunc_;
238  tcp::socket socket_;
239  MsgArbitrator& arb_;
240  OutputBuffer& outputBuffer_;
241  string id_;
242  MessageWrap<SessionStarted> sessionStarted_;
243  MessageWrap<SessionDropped> sessionDropped_;
244  size_t const bufSize_;
245  char* buf_;
246  char* bufCur_;
247  size_t filledLen_;
248  deadline_timer doReadTimer_;
249  uint8_t currTransportHeadFlag_;
250  typename OutputBuffer::iterator it_;
251  os::DownloadFile fileAttachment_;
252 
253  std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
254  os::DownloadMemory memoryAttachment_;
255 };
256 } //recvsession_detail
257 template <typename OutputBuffer, typename MsgArbitrator>
259 }}}
Definition: DownloadFile.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: DownloadMemory.hpp:10
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:55
Definition: Base.hpp:12
Definition: LfbStream.hpp:11