1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 8 #include "hmbdc/comm/inet/Misc.hpp" 10 #include "hmbdc/app/tcpcast/SendSession.hpp" 12 #include <unordered_set> 17 #include <netinet/tcp.h> 19 namespace hmbdc {
namespace app {
namespace tcpcast {
21 namespace sendserver_detail {
27 ,
size_t toSendQueueMaxSize)
30 , serverAddr_(serverFd_.localAddr)
31 , advertisingMessage_(
32 cfg.
getExt<
string>(
"topicRegex")
35 , config_.getExt<
bool>(
"loopback")
38 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch")) {
39 if (listen(serverFd_.fd, 10) < 0) {
40 HMBDC_THROW(runtime_error,
"failed to listen, errno=" << errno);
42 HMBDC_LOG_N(
"listen at ", advertisingMessage_);
43 utils::EpollTask::instance().add(EPOLLIN|EPOLLET, serverFd_);
44 toSendQueue_.set_capacity(toSendQueueMaxSize);
48 advertisingMessage_.connKey = distribution_(generator_);
49 return advertisingMessage_;
52 void queue(pair<char const*, char const*>
const& t
54 ,
size_t toSendByteSize
56 toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
72 if (sessions_.size() == 0) {
77 auto newStartIt = begin;
79 auto minIndex = toSendQueue_.size();
80 for (
auto it = sessions_.begin(); it != sessions_.end();) {
81 if (minIndex > (*it)->toSendQueueIndex_) {
82 minIndex = (*it)->toSendQueueIndex_;
84 if (hmbdc_unlikely(!(*it)->runOnce())) {
85 sessions_.erase(it++);
91 newStartIt = get<3>(toSendQueue_[minIndex - 1]);
92 toSendQueue_.erase_begin(minIndex);
93 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
94 (*it)->toSendQueueIndex_ -= minIndex;
101 size_t readySessionCount()
const {
103 for (
auto const& s : sessions_) {
104 if (s->ready()) res++;
109 void killSlowestSession() {
110 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
111 if (0 == (*it)->toSendQueueIndex_) {
112 HMBDC_LOG_C((*it)->id(),
" too slow, dropping");
120 void doAccept() HMBDC_RESTRICT {
121 if (hmbdc_unlikely(serverFd_.isFdReady())) {
122 auto addrlen =
sizeof(serverAddr_);
123 auto conn = accept(serverFd_.fd, (
struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
125 if (!serverFd_.checkErr()) {
126 HMBDC_LOG_C(
"accept failure, errno=", errno);
130 auto sz = config_.getExt<
int>(
"tcpSendBufferBytes");
132 if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
133 HMBDC_LOG_C(
"failed to set send buffer size=", sz,
" errno=", errno);
136 int flag = config_.getExt<
bool>(
"nagling")?0:1;
137 if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (
char*) &flag,
sizeof(flag)) < 0) {
138 HMBDC_LOG_C(
"failed to set TCP_NODELAY, errno=", errno);
141 auto k = advertisingMessage_.connKey;
142 auto s = std::make_shared<SendSession>(conn, k, toSendQueue_, maxSendBatch_);
144 }
catch (std::exception
const& e) {
145 HMBDC_LOG_C(e.what());
151 using Sessions = unordered_set<SendSession::ptr>;
153 ToSendQueue toSendQueue_;
155 sockaddr_in& serverAddr_;
157 mutable std::default_random_engine generator_;
158 mutable std::uniform_int_distribution<uint64_t> distribution_;
160 size_t maxSendBatch_;
class to hold an hmbdc configuration
Definition: Config.hpp:46
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:180
Definition: TypedString.hpp:74
Definition: SendServer.hpp:25
Definition: BlockingBuffer.hpp:10
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server's async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:69
Definition: Transport.hpp:24
Definition: Messages.hpp:82
Definition: LockFreeBufferMisc.hpp:74