hmbdc
simplify-high-performance-messaging-programming
SendServer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/comm/inet/Misc.hpp"
9 
10 #include "hmbdc/app/tcpcast/SendSession.hpp"
11 
12 #include <unordered_set>
13 #include <memory>
14 #include <utility>
15 #include <boost/asio.hpp>
16 
17 #include <iostream>
18 
19 namespace hmbdc { namespace app { namespace tcpcast {
20 
21 using namespace hmbdc::pattern;
22 using namespace boost::asio;
23 using namespace std;
24 
25 struct SendServer {
26  SendServer(Config const& cfg
27  , io_service& ios)
28  : config_(cfg)
29  , acceptor_(ios
30  , tcp::endpoint(
31  ip::address::from_string(
32  hmbdc::comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr"))
33  ), cfg.getExt<short>("tcpPort"))
34  )
35  , socket_(ios)
36  , advertisingMessage_(
37  cfg.getExt<string>("topicRegex")
38  , acceptor_.local_endpoint().address().to_string()
39  , acceptor_.local_endpoint().port()
40  , config_.getExt<bool>("loopback")
41  ) {
42  }
43 
44  TopicSource const& advertisingMessage() const {
45  return advertisingMessage_;
46  }
47 
48  virtual size_t readySessionCount() const = 0;
49 
50 protected:
51  Config config_;
52 
53  tcp::acceptor acceptor_;
54  tcp::socket socket_;
55  TopicSource advertisingMessage_;
56 };
57 
59 : SendServer {
60  AsyncSendServer(Config const& cfg
61  , io_service& ios
62  , size_t toSendQueueMaxSize)
63  : SendServer(cfg, ios) {
64  toSendQueue_.set_capacity(toSendQueueMaxSize);
65  doAccept();
66  }
67 
68  void queue(pair<char const*, char const*> const& t
69  , ToSend && toSend
70  , size_t toSendByteSize
72  toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
73  }
74 
75  /**
76  * @brief run the server's async send function and decide which items in buffer
77  * can be release
78  * @details called all the time
79  *
80  * @param begin if nothing can be released in buffer return this
81  * @param end if all can be releases return this
82  *
83  * @return iterator in buffer poing to new start (not released)
84  */
86  , MonoLockFreeBuffer::iterator end) __restrict__ {
87  if (sessions_.size() == 0) {
88  toSendQueue_.clear();
89  return end;
90  }
91  //nothing to retire by default
92  auto newStartIt = begin;
93 
94  ToSendQueue::size_type minIndex = numeric_limits<ToSendQueue::size_type>::max();
95  for (auto it = sessions_.begin(); it != sessions_.end();) {
96  if (minIndex > (*it)->toSendQueueIndex_) {
97  minIndex = (*it)->toSendQueueIndex_;
98  }
99  if (unlikely(it->use_count() == 1)) {
100  (*it)->stop();
101  sessions_.erase(it++);
102  } else {
103  (*it)->run();
104  it++;
105  }
106  }
107  if (minIndex) {
108  newStartIt = get<3>(toSendQueue_[minIndex - 1]);
109  toSendQueue_.erase_begin(minIndex);
110  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
111  (*it)->toSendQueueIndex_ -= minIndex;
112  }
113  }
114 
115  return newStartIt;
116  }
117 
118  size_t readySessionCount() const override {
119  size_t res = 0;
120  for (auto const& s : sessions_) {
121  if (s->ready()) res++;
122  }
123  return res;
124  }
125 
126  void killSlowestSession() {
127  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
128  if (0 == (*it)->toSendQueueIndex_) {
129  HMBDC_LOG_C((*it)->id(), " too slow, dropping");
130  (*it)->stop();
131  break;
132  }
133  }
134  }
135 
136 private:
137  void doAccept() {
138  acceptor_.async_accept(socket_,
139  [this](boost::system::error_code ec) {
140  if (!ec) {
141  auto sz = config_.getExt<int>("tcpSendBufferBytes");
142  if (sz) {
143  socket_base::send_buffer_size option(sz);
144  boost::system::error_code ec;
145  socket_.set_option(option);
146  }
147 
148  socket_base::send_buffer_size option;
149  socket_.get_option(option);
150  if (sz == 0 || sz >= option.value()) {
151  // HMBDC_LOG_N("tcpcast SendSession send buffer size: ", option.value());
152  } else {
153  HMBDC_LOG_C("set tcpcast SendSession send buffer size unsuccessful, want "
154  , sz, " actual: ", option.value()
155  , " resulting higher receiver dropping possibility, check OS limits!");
156  }
157 
158  socket_.set_option(ip::tcp::no_delay(!config_.getExt<bool>("nagling")));
159 
160  auto s = std::make_shared<AsyncSendSession>(std::move(socket_), toSendQueue_);
161  sessions_.insert(s);
162  s->start();
163  }
164  doAccept();
165  }
166  );
167  }
168 
169  using Sessions = unordered_set<AsyncSendSession::ptr>;
170  Sessions sessions_;
171  ToSendQueue toSendQueue_;
172 };
173 
174 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: SendServer.hpp:25
Definition: TypedString.hpp:74
Definition: SendServer.hpp:58
Definition: GuardedSingleton.hpp:9
MonoLockFreeBuffer::iterator run(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) __restrict__
run the server&#39;s async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:85
Definition: Messages.hpp:129
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73