hmbdc
simplify-high-performance-messaging-programming
SendSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 
7 #include "hmbdc/text/StringTrieSet.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <boost/lexical_cast.hpp>
13 #include <boost/circular_buffer.hpp>
14 
15 #include <random>
16 #include <memory>
17 #include <utility>
18 #include <iostream>
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 
22 namespace hmbdc { namespace app { namespace tcpcast {
23 
24 namespace sendserver_detail {
25 struct SendServer;
26 }
27 
28 using ToSend = std::vector<iovec>;
29 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*> //topic
30  , ToSend
31  , size_t //total bytes above
33  >
34 >;
35 
36 namespace sendsession_detail {
37 using namespace std;
38 
39 struct SendSession {
40  using ptr = shared_ptr<SendSession>;
41  SendSession(int fd
42  , uint64_t connKey
43  , ToSendQueue & toSendQueue
44  , size_t maxSendBatch)
45  : connKey_(connKey)
46  , readLen_(0)
47  , ready_(false)
48  , toSendQueue_(toSendQueue)
49  , toSendQueueIndex_(0)
50  , msghdr_{0}
51  , msghdrRelic_{0}
52  , msghdrRelicSize_(0) {
53  msghdrRelic_.msg_iov = new iovec[maxSendBatch * 2]; //double for attachment
54  auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
55  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
56  auto forRead = dup(fd);
57  if (forRead == -1) {
58  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
59  }
60 
61  readFd_.fd = forRead;
62  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, readFd_);
63  writeFd_.fd = fd;
64  utils::EpollTask::instance().add(EPOLLOUT|EPOLLET, writeFd_);
65  HMBDC_LOG_N("SendSession started: ", id());
66  }
67 
68  ~SendSession() {
69  delete [] msghdrRelic_.msg_iov;
70  HMBDC_LOG_N("SendSession retired: ", id());
71  }
72 
73  bool runOnce() HMBDC_RESTRICT {
74  if (hmbdc_unlikely(!doRead())) return false;
75  if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
76  auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
77  if (hmbdc_unlikely(l < 0)) {
78  if (!writeFd_.checkErr()) {
79  HMBDC_LOG_C("sendmsg failed errno=", errno);
80  return false;
81  }
82  return true;
83  }
84  msghdrRelicSize_ -= size_t(l);
85  if (hmbdc_unlikely(msghdrRelicSize_)) {
86  comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
87  return true;
88  } else {
89  toSendQueueIndex_++;
90  }
91  }
92  for (; !msghdrRelicSize_ && writeFd_.isFdReady()
93  && toSendQueueIndex_ != toSendQueue_.size();) {
94  if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
95  msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
96  msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
97  auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
98  if (hmbdc_unlikely(l < 0)) {
99  if (!writeFd_.checkErr()) {
100  HMBDC_LOG_C("sendmsg failed errno=", errno);
101  return false;
102  }
103  return true;
104  }
105  msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) - size_t(l);
106  if (hmbdc_unlikely(msghdrRelicSize_)) {
107  comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
108  return true;
109  }
110  }
111  toSendQueueIndex_++;
112  }
113  return true;
114  }
115 
116  char const* id() const {
117  return id_.c_str();
118  }
119 
120  bool ready() const {
121  return ready_;
122  }
123 
124 private:
125  bool
126  doRead() HMBDC_RESTRICT {
127  while (readLen_) {
128  auto p = find(data_, data_ + readLen_, '\t');
129  if (p != data_ + readLen_) {
130  *p = '\000';
131  string t(data_ + 1);
132  if (hmbdc_unlikely(connKey_)) {
133  if (data_[0] != '@' || stoull(t) != connKey_) {
134  HMBDC_LOG_C("invalid connection attempted by ", id_);
135  return false;
136  }
137  connKey_ = 0;
138  } else if (t.size() == 0) {
139  ready_ = true;
140  }
141  else if (data_[0] == '+') {
142  clientSubscriptions_.add(t);
143  } else if (data_[0] == '-') {
144  clientSubscriptions_.erase(t);
145  } else {
146  HMBDC_LOG_C("voilating protocle by ", id_);
147  return false;
148  }
149  memmove(data_, p + 1, readLen_ - (p - data_ + 1));
150  readLen_ -= p - data_ + 1;
151  } else {
152  break; //no complete topic request
153  }
154  }
155  if (hmbdc_unlikely(readFd_.isFdReady())) {
156  auto l = recv(readFd_.fd, data_ + readLen_, sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
157  if (hmbdc_unlikely(l < 0)) {
158  if (!readFd_.checkErr()) {
159  HMBDC_LOG_C("recv failed errno=", errno);
160  return false;
161  }
162  return true;
163  } else {
164  readLen_ += l;
165  }
166  };
167  return true;
168  }
169  uint64_t connKey_;
170  utils::EpollFd writeFd_;
171  utils::EpollFd readFd_;
172  char data_[1024];
173  size_t readLen_;
174  string id_;
175  bool ready_;
176 
178  ToSendQueue& toSendQueue_;
179  ToSendQueue::size_type toSendQueueIndex_;
180  msghdr msghdr_;
181  msghdr msghdrRelic_;
182  size_t msghdrRelicSize_;
183 
184  text::StringTrieSet clientSubscriptions_;
185 };
186 } //sendsession_detail
187 
189 
190 }}}
Definition: TypedString.hpp:74
Definition: EpollTask.hpp:31
Definition: StringTrieSetDetail.hpp:115
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74