1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 8 #include "hmbdc/comm/inet/Misc.hpp" 10 #include "hmbdc/app/tcpcast/SendSession.hpp" 12 #include <unordered_set> 17 #include <netinet/tcp.h> 19 namespace hmbdc {
namespace app {
namespace tcpcast {
21 namespace sendserver_detail {
27 ,
size_t toSendQueueMaxSize)
30 , serverAddr_(serverFd_.localAddr)
31 , advertisingMessage_(
32 cfg.
getExt<
string>(
"topicRegex")
35 , config_.getExt<
bool>(
"loopback")
37 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch")) {
38 if (listen(serverFd_.fd, 10) < 0) {
39 HMBDC_THROW(runtime_error,
"failed to listen, errno=" << errno);
41 HMBDC_LOG_N(
"listen at ", advertisingMessage_);
42 utils::EpollTask::instance().add(EPOLLIN|EPOLLET, serverFd_);
43 toSendQueue_.set_capacity(toSendQueueMaxSize);
47 return advertisingMessage_;
50 void queue(pair<char const*, char const*>
const& t
52 ,
size_t toSendByteSize
54 toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
70 if (sessions_.size() == 0) {
75 auto newStartIt = begin;
77 auto minIndex = toSendQueue_.size();
78 for (
auto it = sessions_.begin(); it != sessions_.end();) {
79 if (minIndex > (*it)->toSendQueueIndex_) {
80 minIndex = (*it)->toSendQueueIndex_;
82 if (hmbdc_unlikely(!(*it)->runOnce())) {
83 sessions_.erase(it++);
89 newStartIt = get<3>(toSendQueue_[minIndex - 1]);
90 toSendQueue_.erase_begin(minIndex);
91 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
92 (*it)->toSendQueueIndex_ -= minIndex;
99 size_t readySessionCount()
const {
101 for (
auto const& s : sessions_) {
102 if (s->ready()) res++;
107 void killSlowestSession() {
108 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
109 if (0 == (*it)->toSendQueueIndex_) {
110 HMBDC_LOG_C((*it)->id(),
" too slow, dropping");
118 void doAccept() HMBDC_RESTRICT {
119 if (hmbdc_unlikely(serverFd_.isFdReady())) {
120 auto addrlen =
sizeof(serverAddr_);
121 auto conn = accept(serverFd_.fd, (
struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
123 if (!serverFd_.checkErr()) {
124 HMBDC_LOG_C(
"accept failure, errno=", errno);
128 auto sz = config_.getExt<
int>(
"tcpSendBufferBytes");
130 if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
131 HMBDC_LOG_C(
"failed to set send buffer size=", sz,
" errno=", errno);
134 int flag = config_.getExt<
bool>(
"nagling")?0:1;
135 if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (
char*) &flag,
sizeof(flag)) < 0) {
136 HMBDC_LOG_C(
"failed to set TCP_NODELAY, errno=", errno);
139 auto s = std::make_shared<SendSession>(conn, toSendQueue_, maxSendBatch_);
141 }
catch (std::exception
const& e) {
142 HMBDC_LOG_C(e.what());
148 using Sessions = unordered_set<SendSession::ptr>;
150 ToSendQueue toSendQueue_;
152 sockaddr_in& serverAddr_;
154 size_t maxSendBatch_;
class to hold an hmbdc configuration
Definition: Config.hpp:44
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: SendServer.hpp:25
Definition: BlockingBuffer.hpp:11
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server's async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:67
Definition: Transport.hpp:24
Definition: Messages.hpp:78
Definition: LockFreeBufferMisc.hpp:74