hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
12 
13 #include <boost/lexical_cast.hpp>
14 
15 #include <functional>
16 #include <memory>
17 #include <utility>
18 
19 #include <iostream>
20 
21 namespace hmbdc { namespace app { namespace tcpcast {
22 
23 namespace recvsession_detail {
24 using namespace std;
25 using namespace hmbdc::text;
26 
27 template <typename OutputBuffer, typename MsgArbitrator>
28 struct RecvSession {
29  using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
30  using CleanupFunc = std::function<void()>;
31  RecvSession(Config const& config
32  , StringTrieSet const& subscriptions
33  , boost::regex const& topicRegex
34  , OutputBuffer& outputBuffer
35  , MsgArbitrator& arb)
36  : config_(config)
37  , subscriptions_(subscriptions)
38  , topicRegex_(topicRegex)
39  , writeFd_(config)
40  , arb_(arb)
41  , outputBuffer_(outputBuffer)
42  , initialized_(false)
43  , stopped_(false)
44  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
45  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
46  , bufCur_(buf_)
47  , filledLen_(0)
48  , currTransportHeadFlag_(0) {
49  config(allowRecvMemoryAttachment_, "allowRecvMemoryAttachment");
50  }
51 
52  ~RecvSession() {
53  free(buf_);
54  HMBDC_LOG_N("RecvSession retired: ", id());
55  }
56 
57  void start(in_addr_t ip, uint16_t port) {
58  sockaddr_in remoteAddr = {0};
59  remoteAddr.sin_family = AF_INET;
60  remoteAddr.sin_addr.s_addr = ip;
61  remoteAddr.sin_port = htons(port);
62  if (connect(writeFd_.fd, (sockaddr*)&remoteAddr, sizeof(remoteAddr)) < 0) {
63  if (errno != EINPROGRESS) {
64  HMBDC_THROW(runtime_error, "connect fail, errno=" << errno);
65  }
66  }
67 
68  auto forRead = dup(writeFd_.fd);
69  if (forRead == -1) {
70  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
71  }
72 
73  readFd_.fd = forRead;
74  utils::EpollTask::instance().add(EPOLLOUT|EPOLLET, writeFd_);
75  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, readFd_);
76  }
77 
78  void stop() {
79  if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
80  new (*itPending_) MessageWrap<Flush>;
81  outputBuffer_.commit(itPending_);
82  currTransportHeadFlag_ = 0;
83  }
84  outputBuffer_.putSome(sessionDropped_);
85  }
86 
87  void sendSubscribe(Topic const& t) {
88  if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
89  char buf[70];
90  buf[0]='+';
91  auto l = int(t.copyTo(buf + 1));
92  buf[l + 1] = '\t';
93  if (hmbdc_unlikely(send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL) != l + 2)) {
94  HMBDC_LOG_C("error when sending subscribe to ", id());
95  stopped_ = true;
96  }
97  }
98  }
99 
100 
101  void sendUnsubscribe(Topic const& t) {
102  if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
103  char buf[70];
104  buf[0]='-';
105  auto l = int(t.copyTo(buf + 1));
106  buf[l + 1] = '\t';
107  if (hmbdc_unlikely(l + 2 != send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL))) {
108  HMBDC_LOG_C("error when sending unsubscribe to ", id(), " errno=", errno);
109  stopped_ = true;
110  }
111  }
112  }
113 
114  void heartbeat() {
115  if (!initialized_) return;
116  char const* hb = "+\t";
117  if (hmbdc_unlikely(2 != send(writeFd_.fd, hb, 2, MSG_NOSIGNAL))){
118  HMBDC_LOG_C("error when sending heartbeat to ", id(), " errno=", errno);
119  stopped_ = true;
120  }
121  }
122 
123  char const* id() const {
124  return id_.c_str();
125  }
126 
127  bool runOnce() {
128  if (hmbdc_unlikely(stopped_)) return false;
129  if (hmbdc_unlikely(!initialized_ && writeFd_.isFdReady())) {
130  try {
131  initializeConn();
132  } catch (std::exception const& e) {
133  HMBDC_LOG_W(e.what());
134  return false;
135  } catch (...) {
136  HMBDC_LOG_C("unknown exception");
137  return false;
138  }
139  }
140  return doRead();
141  }
142 
143 private:
144  void initializeConn() {
145  int flags = fcntl(writeFd_.fd, F_GETFL, 0);
146  flags &= ~O_NONBLOCK;
147  if (fcntl(writeFd_.fd, F_SETFL, flags) < 0) {
148  HMBDC_THROW(std::runtime_error, "fcntl failed errno=" << errno);
149  }
150 
151  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
152  if (sz) {
153  if (setsockopt(readFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
154  HMBDC_LOG_C("failed to set send buffer size=", sz);
155  }
156  }
157 
158  auto addrPort = hmbdc::comm::inet::getPeerIpPort(writeFd_.fd);
159  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
160  strncpy(sessionStarted_.payload.ip, id_.c_str()
161  , sizeof(sessionStarted_.payload.ip));
162  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
163  strncpy(sessionDropped_.payload.ip, id_.c_str()
164  , sizeof(sessionDropped_.payload.ip));
165  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
166  outputBuffer_.putSome(sessionStarted_);
167  HMBDC_LOG_N("RecvSession started: ", id());
168  sendSubscriptions();
169  initialized_ = true;
170  }
171 
172  void sendSubscriptions() {
173  ostringstream oss;
174  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
175  auto const& t = it->first;
176  if (boost::regex_match(t.c_str(), topicRegex_)) {
177  oss << '+' << t;
178  if (it->second) {
179  oss << '*';
180  }
181  oss << '\t';
182  }
183  }
184  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
185  auto sz = size_t(send(writeFd_.fd, s.c_str(), s.size(), MSG_NOSIGNAL));
186  if (hmbdc_unlikely(sz != s.size())) {
187  HMBDC_LOG_C("error when sending subscriptions to ", id());
188  stopped_ = true;
189  }
190  }
191 
192  bool doRead() {
193  do {
194  if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag
195  && filledLen_)) {
196  auto s = memoryAttachment_.write(bufCur_, filledLen_);
197  if (memoryAttachment_.writeDone()) {
198  memoryAttachment_.close();
199  currTransportHeadFlag_ = 0;
200  outputBuffer_.commit(itPending_);
201  }
202  bufCur_ += s;
203  filledLen_ -= s;
204  }
205  while (currTransportHeadFlag_ == 0
206  && filledLen_ >= sizeof(TransportMessageHeader)) {
207  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
208  auto wireSize = h->wireSize();
209  if (hmbdc_likely(filledLen_ >= wireSize)) {
210  if (hmbdc_likely(subscriptions_.check(h->topic()))) {
211  auto a = arb_(h);
212  if (hmbdc_likely(a > 0)) {
213  if (hmbdc_unlikely(h->flag == hasMemoryAttachment::flag
214  && allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end())) {
215  currTransportHeadFlag_ = h->flag;
216  itPending_ = outputBuffer_.claim();
217  char* b = static_cast<char*>(*itPending_);
218  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
219  memcpy(b, h->payload(), l);
220  auto& att =
221  static_cast<app::MessageWrap<hasMemoryAttachment>*>(*itPending_)->payload;
222  att.hasMemoryAttachment::attachment = memoryAttachment_.open(att.len);
223  att.hasMemoryAttachment::afterConsumedCleanupFunc = hasMemoryAttachment::free;
224  } else if (h->flag != hasMemoryAttachment::flag) {
225  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
226  outputBuffer_.put(h->payload(), l);
227  currTransportHeadFlag_ = 0;
228  } //else ignore
229  } else if (hmbdc_unlikely(a == 0)) {
230  //cannot decide now - keep bufCur_ filledLen_ unchanged
231  //try it later, use a timer since there might be no more bytes arriving
232  return true;
233  } //else move to next msg
234  }
235  bufCur_ += wireSize;
236  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
237  filledLen_ -= wireSize;
238  } else {
239  break;
240  }
241  }
242  memmove(buf_, bufCur_, filledLen_);
243  bufCur_ = buf_;
244 
245  if (readFd_.isFdReady()) {
246  auto l = recv(readFd_.fd, buf_ + filledLen_, bufSize_ - filledLen_
247  , MSG_NOSIGNAL|MSG_DONTWAIT);
248  if (hmbdc_unlikely(l < 0)) {
249  if (hmbdc_unlikely(!readFd_.checkErr())) {
250  HMBDC_LOG_C("recv failed errno=", errno);
251  return false;
252  }
253  return true;
254  } else if (hmbdc_unlikely(l == 0)) {
255  HMBDC_LOG_W("peer dropped:", id());
256  return false;
257  }
258  filledLen_ += l;
259  } else {
260  break;
261  }
262  } while (filledLen_ >= sizeof(TransportMessageHeader));
263  return true;
264  }
265 
266  Config const& config_;
267  StringTrieSet const& subscriptions_;
268  boost::regex topicRegex_;
269  utils::EpollFd readFd_;
270  EpollFd writeFd_;
271  MsgArbitrator& arb_;
272  OutputBuffer& outputBuffer_;
273  string id_;
274  MessageWrap<SessionStarted> sessionStarted_;
275  MessageWrap<SessionDropped> sessionDropped_;
276  bool initialized_;
277  bool stopped_;
278  size_t const bufSize_;
279  char* buf_;
280  char* bufCur_;
281  size_t filledLen_;
282  uint8_t currTransportHeadFlag_;
283  typename OutputBuffer::iterator itPending_;
284 
285  std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
286  os::DownloadMemory memoryAttachment_;
287 };
288 } //recvsession_detail
289 template <typename OutputBuffer, typename MsgArbitrator>
291 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: EpollTask.hpp:31
Definition: DownloadMemory.hpp:10
Definition: Transport.hpp:24
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:72
Definition: Base.hpp:12
Definition: LfbStream.hpp:11