hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
12 
13 #include <boost/asio.hpp>
14 #include <boost/lexical_cast.hpp>
15 
16 #include <functional>
17 #include <memory>
18 #include <utility>
19 
20 #include <iostream>
21 
22 namespace hmbdc { namespace app { namespace tcpcast {
23 
24 using namespace boost::asio;
25 using boost::asio::ip::tcp;
26 using namespace std;
27 
28 template <typename OutputBuffer, typename MsgArbitrator>
29 struct RecvSession
30 : enable_shared_from_this<RecvSession<OutputBuffer, MsgArbitrator>> {
31  using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
32  using CleanupFunc = std::function<void()>;
33  RecvSession(Config const& config
34  , StringTrieSet const& subscriptions
35  , boost::regex const& topicRegex
36  , CleanupFunc cleanupFunc
37  , io_service& ios
38  , OutputBuffer& outputBuffer
39  , MsgArbitrator& arb)
40  : config_(config)
41  , subscriptions_(subscriptions)
42  , topicRegex_(topicRegex)
43  , cleanupFunc_(cleanupFunc)
44  , socket_(ios)
45  , arb_(arb)
46  , outputBuffer_(outputBuffer)
47  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
48  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
49  , bufCur_(buf_)
50  , filledLen_(0)
51  , doReadTimer_(ios)
52  , currTransportHeadFlag_(0)
53  , downloadDir_(config.getExt<string>("downloadDir"))
54  , allowRecvMemoryAttachment_(config.getExt<bool>("allowRecvMemoryAttachment")) {
55  }
56 
57  ~RecvSession() {
58  free(buf_);
59  HMBDC_LOG_N("RecvSession retired: ", id());
60  }
61 
62  void start(tcp::resolver::iterator endpointIt) {
63  auto self(this->shared_from_this());
64  async_connect(socket_, endpointIt,
65  [this, self](boost::system::error_code ec, tcp::resolver::iterator) {
66  if (!ec) {
67  id_ = boost::lexical_cast<string>(socket_.remote_endpoint());
68  strncpy(sessionStarted_.payload.ip, id_.c_str()
69  , sizeof(sessionStarted_.payload.ip));
70  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
71  strncpy(sessionDropped_.payload.ip, id_.c_str()
72  , sizeof(sessionDropped_.payload.ip));
73  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
74 
75  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
76  if (sz) {
77  socket_base::receive_buffer_size option(sz);
78  boost::system::error_code ec;
79  socket_.set_option(option);
80  }
81 
82  socket_base::receive_buffer_size option;
83  socket_.get_option(option);
84  if (sz == 0 || sz >= option.value()) {
85  // HMBDC_LOG_N("RecvSession receive buffer size: ", option.value());
86  } else {
87  HMBDC_LOG_C("set tcpcast RecvSession receive buffer size unsuccessful, want "
88  , sz, " actual: ", option.value()
89  , " resulting higher receiver dropping possibility, check OS limits!");
90  }
91 
92  doRead(); //async
93  socket_.non_blocking(true);
94  sendSubscriptions(); //sync
95  outputBuffer_.put(sessionStarted_);
96  }
97  }
98  );
99  }
100 
101  void sendSubscribe(Topic const& t) {
102  if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
103  char buf[70];
104  buf[0]='+';
105  auto l = t.copyTo(buf + 1);
106  buf[l + 1] = '\t';
107  if (l + 2 != socket_.send(buffer(buf, l + 2))) {
108  HMBDC_LOG_C("error when sending subscribe to ", id());
109  }
110  }
111  }
112 
113 
114  void sendUnsubscribe(Topic const& t) {
115  if (unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
116  char buf[70];
117  buf[0]='-';
118  auto l = t.copyTo(buf + 1);
119  buf[l + 1] = '\t';
120  if (l + 2 != socket_.send(buffer(buf, l + 2))) {
121  HMBDC_LOG_C("error when sending unsubscribe to ", id());
122  }
123  }
124  }
125 
126  void heartbeat() {
127  if (!socket_.is_open()) return;
128  char const* hb = "+\t";
129  if (socket_.send(buffer(hb, 2)) != 2) {
130  HMBDC_LOG_C("error when sending heartbeat to ", id());
131  }
132  }
133 
134  char const* id() const {
135  return id_.c_str();
136  }
137 
138 private:
139  void sendSubscriptions() {
140  ostringstream oss;
141  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
142  auto const& t = it->first;
143  if (boost::regex_match(t.c_str(), topicRegex_)) {
144  oss << '+' << t;
145  if (it->second) {
146  oss << '*';
147  }
148  oss << '\t';
149  }
150  }
151  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
152  auto sz = socket_.send(buffer(s.c_str(), s.size()));
153  if (sz != s.size()) {
154  HMBDC_LOG_C("error when sending subscriptions to ", id());
155  }
156 
157 
158  }
159  void doRead() {
160  if (unlikely(currTransportHeadFlag_ == hasFileAttachment::flag)) {
161  auto s = fileAttachment_.write(bufCur_, filledLen_);
162  if (fileAttachment_.writeDone()) {
163  fileAttachment_.close();
164  currTransportHeadFlag_ = 0;
165  outputBuffer_.commit(it_);
166  }
167  bufCur_ += s;
168  filledLen_ -= s;
169  } else if (unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag)) {
170  auto s = memoryAttachment_.write(bufCur_, filledLen_);
171  if (memoryAttachment_.writeDone()) {
172  memoryAttachment_.close();
173  currTransportHeadFlag_ = 0;
174  outputBuffer_.commit(it_);
175  }
176  bufCur_ += s;
177  filledLen_ -= s;
178  }
179  while (currTransportHeadFlag_ == 0
180  && filledLen_ >= sizeof(TransportMessageHeader)) {
181  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
182  auto wireSize = h->wireSize();
183  if (likely(filledLen_ >= wireSize)) {
184  if (likely(subscriptions_.check(h->topic()))) {
185  auto a = arb_(h);
186  if (likely(a > 0)) {
187  it_ = outputBuffer_.claim();
188  char* b = static_cast<char*>(*it_);
189  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
190  memcpy(b, h->payload(), l);
191  currTransportHeadFlag_ = h->flag;
192  if (!currTransportHeadFlag_) {
193  outputBuffer_.commit(it_);
194  } else if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
195  auto& att =
196  static_cast<app::MessageWrap<hasMemoryAttachment>*>(*it_)->payload;
197  if (allowRecvMemoryAttachment_) {
198  att.attachment = memoryAttachment_.open(att.len);
199  }
200  } else if (currTransportHeadFlag_ == hasFileAttachment::flag) {
201  auto& att =
202  static_cast<app::MessageWrap<hasFileAttachment>*>(*it_)->payload;
203  if (downloadDir_.size()) {
204  fileAttachment_.open(downloadDir_.c_str(), att.file, att.len);
205  }
206  if (!fileAttachment_) {
207  att.attachment = nullptr;
208  }
209  }
210  } else if (unlikely(a == 0)) {
211  //cannot decide now - keep bufCur_ filledLen_ unchanged
212  //try it later, use a timer since there might be no more bytes arriving
213  doReadTimer_.expires_from_now(boost::posix_time::microseconds(0));
214 
215  auto self(this->shared_from_this());
216  doReadTimer_.async_wait(
217  [this, self](boost::system::error_code ec) {
218  if (!ec) doRead();
219  }
220  );
221  return;
222  } //else move to next msg
223  }
224  bufCur_ += wireSize;
225  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
226  filledLen_ -= wireSize;
227  } else {
228  break;
229  }
230  }
231  memmove(buf_, bufCur_, filledLen_);
232  bufCur_ = buf_;
233 
234  auto self(this->shared_from_this());
235  socket_.async_read_some(boost::asio::buffer(buf_ + filledLen_, bufSize_ - filledLen_),
236  [this, self](boost::system::error_code ec, std::size_t length) {
237  if (!ec) {
238  filledLen_ += length;
239  doRead();
240  } else {
241  HMBDC_LOG_C(id(), " receiving error = ", ec);
242  socket_.cancel();
243  cleanupFunc_();
244  outputBuffer_.put(sessionDropped_);
245  }
246  });
247  }
248 
249  Config const& config_;
250  StringTrieSet const& subscriptions_;
251  boost::regex topicRegex_;
252  std::function<void()> cleanupFunc_;
253  tcp::socket socket_;
254  MsgArbitrator& arb_;
255  OutputBuffer& outputBuffer_;
256  string id_;
257  MessageWrap<SessionStarted> sessionStarted_;
258  MessageWrap<SessionDropped> sessionDropped_;
259  size_t const bufSize_;
260  char* buf_;
261  char* bufCur_;
262  size_t filledLen_;
263  deadline_timer doReadTimer_;
264  uint8_t currTransportHeadFlag_;
265  typename OutputBuffer::iterator it_;
266  string downloadDir_;
267  os::DownloadFile fileAttachment_;
268  bool allowRecvMemoryAttachment_;
269  os::DownloadMemory memoryAttachment_;
270 };
271 
272 }}}
Definition: DownloadFile.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:35
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: StringTrieSet.hpp:113
Definition: TypedString.hpp:74
Definition: DownloadMemory.hpp:11
Definition: Message.hpp:46
Definition: RecvSession.hpp:29
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:11