hmbdc
simplify-high-performance-messaging-programming
SendServer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/comm/inet/Misc.hpp"
9 
10 #include "hmbdc/app/tcpcast/SendSession.hpp"
11 
12 #include <unordered_set>
13 #include <memory>
14 #include <utility>
15 #include <iostream>
16 
17 #include <netinet/tcp.h>
18 
19 namespace hmbdc { namespace app { namespace tcpcast {
20 
21 namespace sendserver_detail {
22 using namespace hmbdc::pattern;
23 using namespace std;
24 
25 struct SendServer {
26  SendServer(Config const& cfg
27  , size_t toSendQueueMaxSize)
28  : config_(cfg)
29  , serverFd_(cfg)
30  , serverAddr_(serverFd_.localAddr)
31  , advertisingMessage_(
32  cfg.getExt<string>("topicRegex")
33  , serverFd_.localIp
34  , serverFd_.localPort
35  , config_.getExt<bool>("loopback")
36  )
37  , distribution_(1)
38  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch")) {
39  if (listen(serverFd_.fd, 10) < 0) {
40  HMBDC_THROW(runtime_error, "failed to listen, errno=" << errno);
41  }
42  HMBDC_LOG_N("listen at ", advertisingMessage_);
43  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, serverFd_);
44  toSendQueue_.set_capacity(toSendQueueMaxSize);
45  }
46 
47  TopicSource const& advertisingMessage() const {
48  advertisingMessage_.connKey = distribution_(generator_);
49  return advertisingMessage_;
50  }
51 
52  void queue(pair<char const*, char const*> const& t
53  , ToSend && toSend
54  , size_t toSendByteSize
56  toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
57  }
58 
59  /**
60  * @brief run the server's async send function and decide which items in buffer
61  * can be release
62  * @details called all the time
63  *
64  * @param begin if nothing can be released in buffer return this
65  * @param end if all can be releases return this
66  *
67  * @return iterator in buffer poing to new start (not released)
68  */
70  , MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT {
71  doAccept();
72  if (sessions_.size() == 0) {
73  toSendQueue_.clear();
74  return end;
75  }
76  //nothing to retire by default
77  auto newStartIt = begin;
78 
79  auto minIndex = toSendQueue_.size();
80  for (auto it = sessions_.begin(); it != sessions_.end();) {
81  if (minIndex > (*it)->toSendQueueIndex_) {
82  minIndex = (*it)->toSendQueueIndex_;
83  }
84  if (hmbdc_unlikely(!(*it)->runOnce())) {
85  sessions_.erase(it++);
86  } else {
87  it++;
88  }
89  }
90  if (minIndex) {
91  newStartIt = get<3>(toSendQueue_[minIndex - 1]);
92  toSendQueue_.erase_begin(minIndex);
93  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
94  (*it)->toSendQueueIndex_ -= minIndex;
95  }
96  }
97 
98  return newStartIt;
99  }
100 
101  size_t readySessionCount() const {
102  size_t res = 0;
103  for (auto const& s : sessions_) {
104  if (s->ready()) res++;
105  }
106  return res;
107  }
108 
109  void killSlowestSession() {
110  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
111  if (0 == (*it)->toSendQueueIndex_) {
112  HMBDC_LOG_C((*it)->id(), " too slow, dropping");
113  sessions_.erase(it);
114  break;
115  }
116  }
117  }
118 
119 private:
120  void doAccept() HMBDC_RESTRICT {
121  if (hmbdc_unlikely(serverFd_.isFdReady())) {
122  auto addrlen = sizeof(serverAddr_);
123  auto conn = accept(serverFd_.fd, (struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
124  if (conn == -1) {
125  if (!serverFd_.checkErr()) {
126  HMBDC_LOG_C("accept failure, errno=", errno);
127  }
128  return;
129  }
130  auto sz = config_.getExt<int>("tcpSendBufferBytes");
131  if (sz) {
132  if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
133  HMBDC_LOG_C("failed to set send buffer size=", sz, " errno=", errno);
134  }
135  }
136  int flag = config_.getExt<bool>("nagling")?0:1;
137  if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) < 0) {
138  HMBDC_LOG_C("failed to set TCP_NODELAY, errno=", errno);
139  }
140  try {
141  auto k = advertisingMessage_.connKey;
142  auto s = std::make_shared<SendSession>(conn, k, toSendQueue_, maxSendBatch_);
143  sessions_.insert(s);
144  } catch (std::exception const& e) {
145  HMBDC_LOG_C(e.what());
146  }
147  }
148  }
149 
150  Config const& config_;
151  using Sessions = unordered_set<SendSession::ptr>;
152  Sessions sessions_;
153  ToSendQueue toSendQueue_;
154  EpollFd serverFd_;
155  sockaddr_in& serverAddr_;
156  mutable TopicSource advertisingMessage_;
157  mutable std::default_random_engine generator_;
158  mutable std::uniform_int_distribution<uint64_t> distribution_;
159 
160  size_t maxSendBatch_;
161 };
162 } //sendserver_detail
164 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:46
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:180
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:10
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server&#39;s async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:69
Definition: Transport.hpp:24
Definition: Messages.hpp:82
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74