1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 8 #include "hmbdc/comm/inet/Misc.hpp" 10 #include "hmbdc/app/tcpcast/SendSession.hpp" 12 #include <unordered_set> 15 #include <boost/asio.hpp> 19 namespace hmbdc {
namespace app {
namespace tcpcast {
21 namespace sendserver_detail {
24 using boost::asio::ip::tcp;
33 ip::address::from_string(
34 hmbdc::comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr"))
35 ), cfg.
getExt<
short>(
"tcpPort"))
38 , advertisingMessage_(
39 cfg.
getExt<
string>(
"topicRegex")
40 , acceptor_.local_endpoint().address().to_string()
41 , acceptor_.local_endpoint().port()
42 , config_.getExt<
bool>(
"loopback")
47 return advertisingMessage_;
50 virtual size_t readySessionCount()
const = 0;
55 tcp::acceptor acceptor_;
64 ,
size_t toSendQueueMaxSize)
66 toSendQueue_.set_capacity(toSendQueueMaxSize);
70 void queue(pair<char const*, char const*>
const& t
72 ,
size_t toSendByteSize
74 toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
89 if (sessions_.size() == 0) {
94 auto newStartIt = begin;
96 auto minIndex = numeric_limits<ToSendQueue::size_type>::max();
97 for (
auto it = sessions_.begin(); it != sessions_.end();) {
98 if (minIndex > (*it)->toSendQueueIndex_) {
99 minIndex = (*it)->toSendQueueIndex_;
101 if (hmbdc_unlikely(it->use_count() == 1)) {
103 sessions_.erase(it++);
110 newStartIt = get<3>(toSendQueue_[minIndex - 1]);
111 toSendQueue_.erase_begin(minIndex);
112 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
113 (*it)->toSendQueueIndex_ -= minIndex;
120 size_t readySessionCount()
const override {
122 for (
auto const& s : sessions_) {
123 if (s->ready()) res++;
128 void killSlowestSession() {
129 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
130 if (0 == (*it)->toSendQueueIndex_) {
131 HMBDC_LOG_C((*it)->id(),
" too slow, dropping");
140 acceptor_.async_accept(socket_,
141 [
this](boost::system::error_code ec) {
143 auto sz = config_.getExt<
int>(
"tcpSendBufferBytes");
145 socket_base::send_buffer_size option(sz);
146 boost::system::error_code ec;
147 socket_.set_option(option);
150 socket_base::send_buffer_size option;
151 socket_.get_option(option);
152 if (sz == 0 || sz >= option.value()) {
155 HMBDC_LOG_C(
"set tcpcast SendSession send buffer size unsuccessful, want " 156 , sz,
" actual: ", option.value()
157 ,
" resulting higher receiver dropping possibility, check OS limits!");
160 socket_.set_option(ip::tcp::no_delay(!config_.getExt<
bool>(
"nagling")));
162 auto s = std::make_shared<AsyncSendSession>(std::move(socket_), toSendQueue_);
171 using Sessions = unordered_set<AsyncSendSession::ptr>;
173 ToSendQueue toSendQueue_;
class to hold an hmbdc configuration
Definition: Config.hpp:44
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: SendServer.hpp:27
Definition: GuardedSingleton.hpp:9
Definition: Messages.hpp:77
MonoLockFreeBuffer::iterator run(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server's async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:87
Definition: SendServer.hpp:60
Definition: LockFreeBufferMisc.hpp:73