hmbdc
simplify-high-performance-messaging-programming
SendServer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/comm/inet/Misc.hpp"
9 
10 #include "hmbdc/app/tcpcast/SendSession.hpp"
11 
12 #include <unordered_set>
13 #include <memory>
14 #include <utility>
15 #include <boost/asio.hpp>
16 
17 #include <iostream>
18 
19 namespace hmbdc { namespace app { namespace tcpcast {
20 
21 namespace sendserver_detail {
22 using namespace hmbdc::pattern;
23 using namespace boost::asio;
24 using boost::asio::ip::tcp;
25 using namespace std;
26 
27 struct SendServer {
28  SendServer(Config const& cfg
29  , io_service& ios)
30  : config_(cfg)
31  , acceptor_(ios
32  , tcp::endpoint(
33  ip::address::from_string(
34  hmbdc::comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr"))
35  ), cfg.getExt<short>("tcpPort"))
36  )
37  , socket_(ios)
38  , advertisingMessage_(
39  cfg.getExt<string>("topicRegex")
40  , acceptor_.local_endpoint().address().to_string()
41  , acceptor_.local_endpoint().port()
42  , config_.getExt<bool>("loopback")
43  ) {
44  }
45 
46  TopicSource const& advertisingMessage() const {
47  return advertisingMessage_;
48  }
49 
50  virtual size_t readySessionCount() const = 0;
51  virtual ~SendServer() = default;
52 protected:
53  Config config_;
54 
55  tcp::acceptor acceptor_;
56  tcp::socket socket_;
57  TopicSource advertisingMessage_;
58 };
59 
61 : SendServer {
62  AsyncSendServer(Config const& cfg
63  , io_service& ios
64  , size_t toSendQueueMaxSize)
65  : SendServer(cfg, ios) {
66  toSendQueue_.set_capacity(toSendQueueMaxSize);
67  doAccept();
68  }
69 
70  void queue(pair<char const*, char const*> const& t
71  , ToSend && toSend
72  , size_t toSendByteSize
74  toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
75  }
76 
77  /**
78  * @brief run the server's async send function and decide which items in buffer
79  * can be release
80  * @details called all the time
81  *
82  * @param begin if nothing can be released in buffer return this
83  * @param end if all can be releases return this
84  *
85  * @return iterator in buffer poing to new start (not released)
86  */
88  , MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT {
89  if (sessions_.size() == 0) {
90  toSendQueue_.clear();
91  return end;
92  }
93  //nothing to retire by default
94  auto newStartIt = begin;
95 
96  auto minIndex = numeric_limits<ToSendQueue::size_type>::max();
97  for (auto it = sessions_.begin(); it != sessions_.end();) {
98  if (minIndex > (*it)->toSendQueueIndex_) {
99  minIndex = (*it)->toSendQueueIndex_;
100  }
101  if (unlikely(it->use_count() == 1)) {
102  (*it)->stop();
103  sessions_.erase(it++);
104  } else {
105  (*it)->run();
106  it++;
107  }
108  }
109  if (minIndex) {
110  newStartIt = get<3>(toSendQueue_[minIndex - 1]);
111  toSendQueue_.erase_begin(minIndex);
112  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
113  (*it)->toSendQueueIndex_ -= minIndex;
114  }
115  }
116 
117  return newStartIt;
118  }
119 
120  size_t readySessionCount() const override {
121  size_t res = 0;
122  for (auto const& s : sessions_) {
123  if (s->ready()) res++;
124  }
125  return res;
126  }
127 
128  void killSlowestSession() {
129  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
130  if (0 == (*it)->toSendQueueIndex_) {
131  HMBDC_LOG_C((*it)->id(), " too slow, dropping");
132  (*it)->stop();
133  break;
134  }
135  }
136  }
137 
138 private:
139  void doAccept() {
140  acceptor_.async_accept(socket_,
141  [this](boost::system::error_code ec) {
142  if (!ec) {
143  auto sz = config_.getExt<int>("tcpSendBufferBytes");
144  if (sz) {
145  socket_base::send_buffer_size option(sz);
146  boost::system::error_code ec;
147  socket_.set_option(option);
148  }
149 
150  socket_base::send_buffer_size option;
151  socket_.get_option(option);
152  if (sz == 0 || sz >= option.value()) {
153  // HMBDC_LOG_N("tcpcast SendSession send buffer size: ", option.value());
154  } else {
155  HMBDC_LOG_C("set tcpcast SendSession send buffer size unsuccessful, want "
156  , sz, " actual: ", option.value()
157  , " resulting higher receiver dropping possibility, check OS limits!");
158  }
159 
160  socket_.set_option(ip::tcp::no_delay(!config_.getExt<bool>("nagling")));
161 
162  auto s = std::make_shared<AsyncSendSession>(std::move(socket_), toSendQueue_);
163  sessions_.insert(s);
164  s->start();
165  }
166  doAccept();
167  }
168  );
169  }
170 
171  using Sessions = unordered_set<AsyncSendSession::ptr>;
172  Sessions sessions_;
173  ToSendQueue toSendQueue_;
174 };
175 } //sendserver_detail
177 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:43
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:151
Definition: TypedString.hpp:74
Definition: GuardedSingleton.hpp:9
Definition: Messages.hpp:75
MonoLockFreeBuffer::iterator run(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server&#39;s async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:87
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:73