hmbdc
simplify-high-performance-messaging-programming
SendSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 
7 #include "hmbdc/text/StringTrieSet.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <boost/asio.hpp>
13 #include <boost/lexical_cast.hpp>
14 #include <boost/circular_buffer.hpp>
15 
16 #include <memory>
17 #include <utility>
18 #include <iostream>
19 
20 namespace hmbdc { namespace app { namespace tcpcast {
21 
22 namespace sendserver_detail {
23 struct AsyncSendServer;
24 }
25 
26 using ToSend = std::vector<boost::asio::const_buffer>;
27 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*> //topic
28  , ToSend
29  , size_t //total bytes above
31  >
32 >;
33 
34 namespace sendsession_detail {
35 using namespace boost::asio;
36 using boost::asio::ip::tcp;
37 using namespace std;
38 
39 
40 
41 struct SendSession
42 : std::enable_shared_from_this<SendSession> {
43  SendSession(tcp::socket socket)
44  : socket_(move(socket))
45  , filledLen_(0)
46  , id_(boost::lexical_cast<string>(socket_.remote_endpoint()))
47  , ready_(false) {
48  socket_.non_blocking(true);
49  }
50 
51  ~SendSession() {
52  HMBDC_LOG_N("SendSession retired: ", id());
53  }
54 
55  void start() {
56  HMBDC_LOG_N("SendSession started: ", id());
57  doRead();
58  }
59 
60  void stop() {
61  ready_ = false;
62  boost::system::error_code ec;
63  socket_.shutdown(tcp::socket::shutdown_both, ec);
64  socket_.close();
65  }
66 
67  char const* id() const {
68  return id_.c_str();
69  }
70 
71  bool ready() const {
72  return ready_;
73  }
74 
75 private:
76  void doRead() {
77  while (filledLen_) {
78  auto p = find(data_, data_ + filledLen_, '\t');
79  if (p != data_ + filledLen_) {
80  *p = '\000';
81  string t(data_ + 1);
82  if (t.size() == 0) {
83  ready_ = true;
84  }
85  else if (data_[0] == '+') {
86  clientSubscriptions_.add(t);
87  } else if (data_[0] == '-') {
88  clientSubscriptions_.erase(t);
89  } // else ignore
90  memmove(data_, p + 1, filledLen_ - (p - data_ + 1));
91  filledLen_ -= p - data_ + 1;
92  } else {
93  break; //no complete topic request
94  }
95  }
96 
97  auto self(shared_from_this());
98  socket_.async_read_some(boost::asio::buffer(data_ + filledLen_, sizeof(data_) - filledLen_),
99  [this, self](boost::system::error_code ec, std::size_t length) {
100  if (!ec) {
101  filledLen_ += length;
102  doRead();
103  }
104  });
105  }
106 
107 
108 protected:
109  tcp::socket socket_;
110  text::StringTrieSet clientSubscriptions_;
111 
112 private:
113  char data_[1024];
114  size_t filledLen_;
115  string id_;
116  bool ready_;
117 
118 };
119 
120 
122 : SendSession {
123  using ptr = shared_ptr<SyncSendSession>;
124  using SendSession::SendSession;
125  template <typename ToSend>
126  size_t send(std::pair<char const*, char const*> const& t
127  , ToSend const& toSend, size_t size) {
128  if (clientSubscriptions_.check(t)) {
129  boost::system::error_code ec;
130  auto res = write(socket_, toSend, ec);
131  if (hmbdc_unlikely(ec)) {
132  HMBDC_LOG_W(id(), ", sending error_code=", ec);
133  return 0;
134  }
135  return res;
136  }
137  return size; //pretend things normal
138  }
139 };
140 
142 : SendSession {
143  using ptr = shared_ptr<AsyncSendSession>;
144  AsyncSendSession(tcp::socket socket
145  , ToSendQueue & toSendQueue)
146  : SendSession(move(socket))
147  , toSendQueue_(toSendQueue)
148  , toSendQueueIndex_(0)
149  , asyncWritePending_(false) {
150  }
151 
152  void run() {
153  if (asyncWritePending_) return;
154 
155  for (; toSendQueueIndex_ != toSendQueue_.size();) {
156  if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
157  auto self(shared_from_this());
158  auto bytesToSend = get<2>(toSendQueue_[toSendQueueIndex_]);
159  async_write(socket_, get<1>(toSendQueue_[toSendQueueIndex_])
160  , transfer_exactly(bytesToSend)
161  , [this, self](const boost::system::error_code& ec,
162  std::size_t bytes_transferred) {
163  toSendQueueIndex_++;
164  if (hmbdc_unlikely(ec)) {
165  HMBDC_LOG_C(id(), ", async sending error_code=", ec);
166  } else {
167  asyncWritePending_ = false;
168  run();
169  }
170  }
171  );
172  asyncWritePending_ = true;
173  break;
174  } else {
175  toSendQueueIndex_++;
176  }
177  }
178  }
179 
180 private:
182  ToSendQueue& toSendQueue_;
183  ToSendQueue::size_type toSendQueueIndex_;
184  bool asyncWritePending_;
185 };
186 } //sendsession_detail
187 
189 
190 }}}
Definition: TypedString.hpp:74
Definition: StringTrieSetDetail.hpp:115
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:73