hmbdc
simplify-high-performance-messaging-programming
SendSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 
7 #include "hmbdc/text/StringTrieSet.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <boost/asio.hpp>
13 #include <boost/lexical_cast.hpp>
14 #include <boost/circular_buffer.hpp>
15 
16 #include <memory>
17 #include <utility>
18 #include <iostream>
19 
20 namespace hmbdc { namespace app { namespace tcpcast {
21 
22 using namespace boost::asio;
23 using boost::asio::ip::tcp;
24 using namespace std;
25 
26 using ToSend = vector<const_buffer>;
27 using ToSendQueue = boost::circular_buffer<tuple<pair<char const*, char const*> //topic
28  , ToSend
29  , size_t //total bytes above
31  >
32 >;
33 
34 struct SendSession
35 : std::enable_shared_from_this<SendSession> {
36  SendSession(tcp::socket socket)
37  : socket_(move(socket))
38  , filledLen_(0)
39  , id_(boost::lexical_cast<string>(socket_.remote_endpoint()))
40  , ready_(false) {
41  socket_.non_blocking(true);
42  }
43 
44  ~SendSession() {
45  HMBDC_LOG_N("SendSession retired: ", id());
46  }
47 
48  void start() {
49  HMBDC_LOG_N("SendSession started: ", id());
50  doRead();
51  }
52 
53  void stop() {
54  ready_ = false;
55  boost::system::error_code ec;
56  socket_.shutdown(tcp::socket::shutdown_both, ec);
57  socket_.close();
58  }
59 
60  char const* id() const {
61  return id_.c_str();
62  }
63 
64  bool ready() const {
65  return ready_;
66  }
67 
68 private:
69  void doRead() {
70  while (filledLen_) {
71  auto p = find(data_, data_ + filledLen_, '\t');
72  if (p != data_ + filledLen_) {
73  *p = '\000';
74  string t(data_ + 1);
75  if (t.size() == 0) {
76  ready_ = true;
77  }
78  else if (data_[0] == '+') {
79  clientSubscriptions_.add(t);
80  } else if (data_[0] == '-') {
81  clientSubscriptions_.erase(t);
82  } // else ignore
83  memmove(data_, p + 1, filledLen_ - (p - data_ + 1));
84  filledLen_ -= p - data_ + 1;
85  } else {
86  break; //no complete topic request
87  }
88  }
89 
90  auto self(shared_from_this());
91  socket_.async_read_some(boost::asio::buffer(data_ + filledLen_, sizeof(data_) - filledLen_),
92  [this, self](boost::system::error_code ec, std::size_t length) {
93  if (!ec) {
94  filledLen_ += length;
95  doRead();
96  }
97  });
98  }
99 
100 
101 protected:
102  tcp::socket socket_;
103  StringTrieSet clientSubscriptions_;
104 
105 private:
106  char data_[1024];
107  size_t filledLen_;
108  string id_;
109  bool ready_;
110 
111 };
112 
113 
115 : SendSession {
116  using ptr = shared_ptr<SyncSendSession>;
117  using SendSession::SendSession;
118  template <typename ToSend>
119  size_t send(std::pair<char const*, char const*> const& t
120  , ToSend const& toSend, size_t size) {
121  if (clientSubscriptions_.check(t)) {
122  boost::system::error_code ec;
123  auto res = write(socket_, toSend, ec);
124  if (unlikely(ec)) {
125  HMBDC_LOG_W(id(), ", sending error_code=", ec);
126  return 0;
127  }
128  return res;
129  }
130  return size; //pretend things normal
131  }
132 };
133 
135 : SendSession {
136  using ptr = shared_ptr<AsyncSendSession>;
137  AsyncSendSession(tcp::socket socket
138  , ToSendQueue & toSendQueue)
139  : SendSession(move(socket))
140  , toSendQueue_(toSendQueue)
141  , toSendQueueIndex_(0)
142  , asyncWritePending_(false) {
143  }
144 
145  void run() {
146  if (asyncWritePending_) return;
147 
148  for (; toSendQueueIndex_ != toSendQueue_.size();) {
149  if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
150  auto self(shared_from_this());
151  auto bytesToSend = get<2>(toSendQueue_[toSendQueueIndex_]);
152  async_write(socket_, get<1>(toSendQueue_[toSendQueueIndex_]),
153  [this, self, bytesToSend](const boost::system::error_code& ec,
154  std::size_t bytes_transferred) {
155  toSendQueueIndex_++;
156  if (unlikely(ec)) {
157  HMBDC_LOG_C(id(), ", async sending error_code=", ec);
158  } else if (unlikely(bytes_transferred != bytesToSend)) {
159  HMBDC_LOG_C(id(), ", async sending incomplete, wrote bytes="
160  , bytes_transferred);
161  } else {
162  asyncWritePending_ = false;
163  run();
164  }
165  }
166  );
167  asyncWritePending_ = true;
168  break;
169  } else {
170  toSendQueueIndex_++;
171  }
172  }
173  }
174 
175 private:
176  friend class AsyncSendServer;
177  ToSendQueue& toSendQueue_;
178  ToSendQueue::size_type toSendQueueIndex_;
179  bool asyncWritePending_;
180 };
181 
182 
183 }}}
Definition: SendSession.hpp:134
Definition: SendSession.hpp:114
Definition: StringTrieSet.hpp:113
Definition: TypedString.hpp:74
Definition: SendServer.hpp:58
Definition: SendSession.hpp:34
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73