hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
12 
13 #include <boost/lexical_cast.hpp>
14 
15 #include <functional>
16 #include <memory>
17 #include <utility>
18 
19 #include <iostream>
20 
21 namespace hmbdc { namespace app { namespace tcpcast {
22 
23 namespace recvsession_detail {
24 using namespace std;
25 using namespace hmbdc::text;
26 
27 template <typename OutputBuffer, typename MsgArbitrator>
28 struct RecvSession {
29  using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
30  using CleanupFunc = std::function<void()>;
31  RecvSession(Config const& config
32  , StringTrieSet const& subscriptions
33  , boost::regex const& topicRegex
34  , OutputBuffer& outputBuffer
35  , uint64_t connKey
36  , MsgArbitrator& arb)
37  : config_(config)
38  , subscriptions_(subscriptions)
39  , topicRegex_(topicRegex)
40  , writeFd_(config)
41  , arb_(arb)
42  , outputBuffer_(outputBuffer)
43  , connKey_(connKey)
44  , initialized_(false)
45  , stopped_(false)
46  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
47  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
48  , bufCur_(buf_)
49  , filledLen_(0)
50  , currTransportHeadFlag_(0) {
51  config(allowRecvMemoryAttachment_, "allowRecvMemoryAttachment");
52  }
53 
54  ~RecvSession() {
55  free(buf_);
56  HMBDC_LOG_N("RecvSession retired: ", id());
57  }
58 
59  void start(in_addr_t ip, uint16_t port) {
60  sockaddr_in remoteAddr = {0};
61  remoteAddr.sin_family = AF_INET;
62  remoteAddr.sin_addr.s_addr = ip;
63  remoteAddr.sin_port = htons(port);
64  if (connect(writeFd_.fd, (sockaddr*)&remoteAddr, sizeof(remoteAddr)) < 0) {
65  if (errno != EINPROGRESS) {
66  HMBDC_THROW(runtime_error, "connect fail, errno=" << errno);
67  }
68  }
69 
70  auto forRead = dup(writeFd_.fd);
71  if (forRead == -1) {
72  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
73  }
74 
75  readFd_.fd = forRead;
76  utils::EpollTask::instance().add(EPOLLOUT|EPOLLET, writeFd_);
77  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, readFd_);
78  }
79 
80  void stop() {
81  if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
82  new (*itPending_) MessageWrap<Flush>;
83  outputBuffer_.commit(itPending_);
84  currTransportHeadFlag_ = 0;
85  }
86  outputBuffer_.putSome(sessionDropped_);
87  }
88 
89  void sendSubscribe(Topic const& t) {
90  if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
91  char buf[70];
92  buf[0]='+';
93  auto l = int(t.copyTo(buf + 1));
94  buf[l + 1] = '\t';
95  if (hmbdc_unlikely(send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL) != l + 2)) {
96  HMBDC_LOG_C("error when sending subscribe to ", id());
97  stopped_ = true;
98  }
99  }
100  }
101 
102 
103  void sendUnsubscribe(Topic const& t) {
104  if (hmbdc_unlikely(boost::regex_match(t.c_str(), topicRegex_))) {
105  char buf[70];
106  buf[0]='-';
107  auto l = int(t.copyTo(buf + 1));
108  buf[l + 1] = '\t';
109  if (hmbdc_unlikely(l + 2 != send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL))) {
110  HMBDC_LOG_C("error when sending unsubscribe to ", id(), " errno=", errno);
111  stopped_ = true;
112  }
113  }
114  }
115 
116  void heartbeat() {
117  if (!initialized_) return;
118  char const* hb = "+\t";
119  if (hmbdc_unlikely(2 != send(writeFd_.fd, hb, 2, MSG_NOSIGNAL))){
120  HMBDC_LOG_C("error when sending heartbeat to ", id(), " errno=", errno);
121  stopped_ = true;
122  }
123  }
124 
125  char const* id() const {
126  return id_.c_str();
127  }
128 
129  bool runOnce() {
130  if (hmbdc_unlikely(stopped_)) return false;
131  if (hmbdc_unlikely(!initialized_ && writeFd_.isFdReady())) {
132  try {
133  initializeConn();
134  } catch (std::exception const& e) {
135  HMBDC_LOG_W(e.what());
136  return false;
137  } catch (...) {
138  HMBDC_LOG_C("unknown exception");
139  return false;
140  }
141  }
142  return doRead();
143  }
144 
145 private:
146  void initializeConn() {
147  int flags = fcntl(writeFd_.fd, F_GETFL, 0);
148  flags &= ~O_NONBLOCK;
149  if (fcntl(writeFd_.fd, F_SETFL, flags) < 0) {
150  HMBDC_THROW(std::runtime_error, "fcntl failed errno=" << errno);
151  }
152 
153  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
154  if (sz) {
155  if (setsockopt(readFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
156  HMBDC_LOG_C("failed to set send buffer size=", sz);
157  }
158  }
159 
160  auto addrPort = hmbdc::comm::inet::getPeerIpPort(writeFd_.fd);
161  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
162  strncpy(sessionStarted_.payload.ip, id_.c_str()
163  , sizeof(sessionStarted_.payload.ip));
164  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
165  strncpy(sessionDropped_.payload.ip, id_.c_str()
166  , sizeof(sessionDropped_.payload.ip));
167  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
168  outputBuffer_.putSome(sessionStarted_);
169  HMBDC_LOG_N("RecvSession started: ", id());
170  sendSubscriptions();
171  initialized_ = true;
172  }
173 
174  void sendSubscriptions() {
175  ostringstream oss;
176  oss << '@' << connKey_ << '\t';
177  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
178  auto const& t = it->first;
179  if (boost::regex_match(t.c_str(), topicRegex_)) {
180  oss << '+' << t;
181  if (it->second) {
182  oss << '*';
183  }
184  oss << '\t';
185  }
186  }
187  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
188  auto sz = size_t(send(writeFd_.fd, s.c_str(), s.size(), MSG_NOSIGNAL));
189  if (hmbdc_unlikely(sz != s.size())) {
190  HMBDC_LOG_C("error when sending subscriptions to ", id());
191  stopped_ = true;
192  }
193  }
194 
195  bool doRead() {
196  do {
197  if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag
198  && filledLen_)) {
199  auto s = memoryAttachment_.write(bufCur_, filledLen_);
200  if (memoryAttachment_.writeDone()) {
201  memoryAttachment_.close();
202  currTransportHeadFlag_ = 0;
203  outputBuffer_.commit(itPending_);
204  }
205  bufCur_ += s;
206  filledLen_ -= s;
207  }
208  while (currTransportHeadFlag_ == 0
209  && filledLen_ >= sizeof(TransportMessageHeader)) {
210  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
211  auto wireSize = h->wireSize();
212  if (hmbdc_likely(filledLen_ >= wireSize)) {
213  if (hmbdc_likely(subscriptions_.check(h->topic()))) {
214  auto a = arb_(h);
215  if (hmbdc_likely(a > 0)) {
216  if (hmbdc_unlikely(h->flag == hasMemoryAttachment::flag
217  && allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end())) {
218  currTransportHeadFlag_ = h->flag;
219  itPending_ = outputBuffer_.claim();
220  char* b = static_cast<char*>(*itPending_);
221  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
222  memcpy(b, h->payload(), l);
223  auto& att =
224  static_cast<app::MessageWrap<hasMemoryAttachment>*>(*itPending_)->payload;
225  att.hasMemoryAttachment::attachment = memoryAttachment_.open(att.len);
226  att.hasMemoryAttachment::afterConsumedCleanupFunc = hasMemoryAttachment::free;
227  } else if (h->flag != hasMemoryAttachment::flag) {
228  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
229  outputBuffer_.put(h->payload(), l);
230  currTransportHeadFlag_ = 0;
231  } //else ignore
232  } else if (hmbdc_unlikely(a == 0)) {
233  //cannot decide now - keep bufCur_ filledLen_ unchanged
234  //try it later, use a timer since there might be no more bytes arriving
235  return true;
236  } //else move to next msg
237  }
238  bufCur_ += wireSize;
239  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
240  filledLen_ -= wireSize;
241  } else {
242  break;
243  }
244  }
245  memmove(buf_, bufCur_, filledLen_);
246  bufCur_ = buf_;
247 
248  if (readFd_.isFdReady()) {
249  auto l = recv(readFd_.fd, buf_ + filledLen_, bufSize_ - filledLen_
250  , MSG_NOSIGNAL|MSG_DONTWAIT);
251  if (hmbdc_unlikely(l < 0)) {
252  if (hmbdc_unlikely(!readFd_.checkErr())) {
253  HMBDC_LOG_C("recv failed errno=", errno);
254  return false;
255  }
256  return true;
257  } else if (hmbdc_unlikely(l == 0)) {
258  HMBDC_LOG_W("peer dropped:", id());
259  return false;
260  }
261  filledLen_ += l;
262  } else {
263  break;
264  }
265  } while (filledLen_ >= sizeof(TransportMessageHeader));
266  return true;
267  }
268 
269  Config const& config_;
270  StringTrieSet const& subscriptions_;
271  boost::regex topicRegex_;
272  utils::EpollFd readFd_;
273  EpollFd writeFd_;
274  MsgArbitrator& arb_;
275  OutputBuffer& outputBuffer_;
276  uint64_t connKey_;
277  string id_;
278  MessageWrap<SessionStarted> sessionStarted_;
279  MessageWrap<SessionDropped> sessionDropped_;
280  bool initialized_;
281  bool stopped_;
282  size_t const bufSize_;
283  char* buf_;
284  char* bufCur_;
285  size_t filledLen_;
286  uint8_t currTransportHeadFlag_;
287  typename OutputBuffer::iterator itPending_;
288 
289  std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
290  os::DownloadMemory memoryAttachment_;
291 };
292 } //recvsession_detail
293 template <typename OutputBuffer, typename MsgArbitrator>
295 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:46
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:180
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: EpollTask.hpp:31
Definition: DownloadMemory.hpp:10
Definition: Transport.hpp:24
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:76
Definition: Base.hpp:12
Definition: LfbStream.hpp:11