hmbdc
simplify-high-performance-messaging-programming
SendSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 
7 #include "hmbdc/text/StringTrieSet.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <boost/lexical_cast.hpp>
13 #include <boost/circular_buffer.hpp>
14 
15 #include <memory>
16 #include <utility>
17 #include <iostream>
18 #include <fcntl.h>
19 #include <sys/epoll.h>
20 
21 namespace hmbdc { namespace app { namespace tcpcast {
22 
23 namespace sendserver_detail {
24 struct SendServer;
25 }
26 
27 using ToSend = std::vector<iovec>;
28 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*> //topic
29  , ToSend
30  , size_t //total bytes above
32  >
33 >;
34 
35 namespace sendsession_detail {
36 using namespace std;
37 
38 struct SendSession {
39  using ptr = shared_ptr<SendSession>;
40  SendSession(int fd
41  , ToSendQueue & toSendQueue
42  , size_t maxSendBatch)
43  : readLen_(0)
44  , ready_(false)
45  , toSendQueue_(toSendQueue)
46  , toSendQueueIndex_(0)
47  , msghdr_{0}
48  , msghdrRelic_{0}
49  , msghdrRelicSize_(0) {
50  msghdrRelic_.msg_iov = new iovec[maxSendBatch * 2]; //double for attachment
51  auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
52  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
53  auto forRead = dup(fd);
54  if (forRead == -1) {
55  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
56  }
57 
58  readFd_.fd = forRead;
59  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, readFd_);
60  writeFd_.fd = fd;
61  utils::EpollTask::instance().add(EPOLLOUT|EPOLLET, writeFd_);
62  HMBDC_LOG_N("SendSession started: ", id());
63  }
64 
65  ~SendSession() {
66  delete [] msghdrRelic_.msg_iov;
67  HMBDC_LOG_N("SendSession retired: ", id());
68  }
69 
70  bool runOnce() HMBDC_RESTRICT {
71  if (hmbdc_unlikely(!doRead())) return false;
72  if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
73  auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
74  if (hmbdc_unlikely(l < 0)) {
75  if (!writeFd_.checkErr()) {
76  HMBDC_LOG_C("sendmsg failed errno=", errno);
77  return false;
78  }
79  return true;
80  }
81  msghdrRelicSize_ -= size_t(l);
82  if (hmbdc_unlikely(msghdrRelicSize_)) {
83  comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
84  return true;
85  } else {
86  toSendQueueIndex_++;
87  }
88  }
89  for (; !msghdrRelicSize_ && writeFd_.isFdReady()
90  && toSendQueueIndex_ != toSendQueue_.size();) {
91  if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
92  msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
93  msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
94  auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
95  if (hmbdc_unlikely(l < 0)) {
96  if (!writeFd_.checkErr()) {
97  HMBDC_LOG_C("sendmsg failed errno=", errno);
98  return false;
99  }
100  return true;
101  }
102  msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) - size_t(l);
103  if (hmbdc_unlikely(msghdrRelicSize_)) {
104  comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
105  return true;
106  }
107  }
108  toSendQueueIndex_++;
109  }
110  return true;
111  }
112 
113  char const* id() const {
114  return id_.c_str();
115  }
116 
117  bool ready() const {
118  return ready_;
119  }
120 
121 private:
122  bool
123  doRead() HMBDC_RESTRICT {
124  while (readLen_) {
125  auto p = find(data_, data_ + readLen_, '\t');
126  if (p != data_ + readLen_) {
127  *p = '\000';
128  string t(data_ + 1);
129  if (t.size() == 0) {
130  ready_ = true;
131  }
132  else if (data_[0] == '+') {
133  clientSubscriptions_.add(t);
134  } else if (data_[0] == '-') {
135  clientSubscriptions_.erase(t);
136  } // else ignore
137  memmove(data_, p + 1, readLen_ - (p - data_ + 1));
138  readLen_ -= p - data_ + 1;
139  } else {
140  break; //no complete topic request
141  }
142  }
143  if (hmbdc_unlikely(readFd_.isFdReady())) {
144  auto l = recv(readFd_.fd, data_ + readLen_, sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
145  if (hmbdc_unlikely(l < 0)) {
146  if (!readFd_.checkErr()) {
147  HMBDC_LOG_C("recv failed errno=", errno);
148  return false;
149  }
150  return true;
151  } else {
152  readLen_ += l;
153  }
154  };
155  return true;
156  }
157 
158  utils::EpollFd writeFd_;
159  utils::EpollFd readFd_;
160  char data_[1024];
161  size_t readLen_;
162  string id_;
163  bool ready_;
164 
166  ToSendQueue& toSendQueue_;
167  ToSendQueue::size_type toSendQueueIndex_;
168  msghdr msghdr_;
169  msghdr msghdrRelic_;
170  size_t msghdrRelicSize_;
171 
172  text::StringTrieSet clientSubscriptions_;
173 };
174 } //sendsession_detail
175 
177 
178 }}}
Definition: TypedString.hpp:74
Definition: EpollTask.hpp:31
Definition: StringTrieSetDetail.hpp:115
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74