hmbdc
simplify-high-performance-messaging-programming
SendServer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/comm/inet/Misc.hpp"
9 
10 #include "hmbdc/app/tcpcast/SendSession.hpp"
11 
12 #include <unordered_set>
13 #include <memory>
14 #include <utility>
15 #include <iostream>
16 
17 #include <netinet/tcp.h>
18 
19 namespace hmbdc { namespace app { namespace tcpcast {
20 
21 namespace sendserver_detail {
22 using namespace hmbdc::pattern;
23 using namespace std;
24 
25 struct SendServer {
26  SendServer(Config const& cfg
27  , size_t toSendQueueMaxSize)
28  : config_(cfg)
29  , serverFd_(cfg)
30  , serverAddr_(serverFd_.localAddr)
31  , advertisingMessage_(
32  cfg.getExt<string>("topicRegex")
33  , serverFd_.localIp
34  , serverFd_.localPort
35  , config_.getExt<bool>("loopback")
36  )
37  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch")) {
38  if (listen(serverFd_.fd, 10) < 0) {
39  HMBDC_THROW(runtime_error, "failed to listen, errno=" << errno);
40  }
41  HMBDC_LOG_N("listen at ", advertisingMessage_);
42  utils::EpollTask::instance().add(EPOLLIN|EPOLLET, serverFd_);
43  toSendQueue_.set_capacity(toSendQueueMaxSize);
44  }
45 
46  TopicSource const& advertisingMessage() const {
47  return advertisingMessage_;
48  }
49 
50  void queue(pair<char const*, char const*> const& t
51  , ToSend && toSend
52  , size_t toSendByteSize
54  toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
55  }
56 
57  /**
58  * @brief run the server's async send function and decide which items in buffer
59  * can be release
60  * @details called all the time
61  *
62  * @param begin if nothing can be released in buffer return this
63  * @param end if all can be releases return this
64  *
65  * @return iterator in buffer poing to new start (not released)
66  */
68  , MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT {
69  doAccept();
70  if (sessions_.size() == 0) {
71  toSendQueue_.clear();
72  return end;
73  }
74  //nothing to retire by default
75  auto newStartIt = begin;
76 
77  auto minIndex = toSendQueue_.size();
78  for (auto it = sessions_.begin(); it != sessions_.end();) {
79  if (minIndex > (*it)->toSendQueueIndex_) {
80  minIndex = (*it)->toSendQueueIndex_;
81  }
82  if (hmbdc_unlikely(!(*it)->runOnce())) {
83  sessions_.erase(it++);
84  } else {
85  it++;
86  }
87  }
88  if (minIndex) {
89  newStartIt = get<3>(toSendQueue_[minIndex - 1]);
90  toSendQueue_.erase_begin(minIndex);
91  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
92  (*it)->toSendQueueIndex_ -= minIndex;
93  }
94  }
95 
96  return newStartIt;
97  }
98 
99  size_t readySessionCount() const {
100  size_t res = 0;
101  for (auto const& s : sessions_) {
102  if (s->ready()) res++;
103  }
104  return res;
105  }
106 
107  void killSlowestSession() {
108  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
109  if (0 == (*it)->toSendQueueIndex_) {
110  HMBDC_LOG_C((*it)->id(), " too slow, dropping");
111  sessions_.erase(it);
112  break;
113  }
114  }
115  }
116 
117 private:
118  void doAccept() HMBDC_RESTRICT {
119  if (hmbdc_unlikely(serverFd_.isFdReady())) {
120  auto addrlen = sizeof(serverAddr_);
121  auto conn = accept(serverFd_.fd, (struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
122  if (conn == -1) {
123  if (!serverFd_.checkErr()) {
124  HMBDC_LOG_C("accept failure, errno=", errno);
125  }
126  return;
127  }
128  auto sz = config_.getExt<int>("tcpSendBufferBytes");
129  if (sz) {
130  if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
131  HMBDC_LOG_C("failed to set send buffer size=", sz, " errno=", errno);
132  }
133  }
134  int flag = config_.getExt<bool>("nagling")?0:1;
135  if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) < 0) {
136  HMBDC_LOG_C("failed to set TCP_NODELAY, errno=", errno);
137  }
138  try {
139  auto s = std::make_shared<SendSession>(conn, toSendQueue_, maxSendBatch_);
140  sessions_.insert(s);
141  } catch (std::exception const& e) {
142  HMBDC_LOG_C(e.what());
143  }
144  }
145  }
146 
147  Config const& config_;
148  using Sessions = unordered_set<SendSession::ptr>;
149  Sessions sessions_;
150  ToSendQueue toSendQueue_;
151  EpollFd serverFd_;
152  sockaddr_in& serverAddr_;
153  TopicSource advertisingMessage_;
154  size_t maxSendBatch_;
155 };
156 } //sendserver_detail
158 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:44
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:11
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server&#39;s async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:67
Definition: Transport.hpp:24
Definition: Messages.hpp:78
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74