1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 8 #include "hmbdc/comm/inet/Misc.hpp" 10 #include "hmbdc/app/tcpcast/SendSession.hpp" 12 #include <unordered_set> 15 #include <boost/asio.hpp> 19 namespace hmbdc {
namespace app {
namespace tcpcast {
31 ip::address::from_string(
32 hmbdc::comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr"))
33 ), cfg.
getExt<
short>(
"tcpPort"))
36 , advertisingMessage_(
37 cfg.
getExt<
string>(
"topicRegex")
38 , acceptor_.local_endpoint().address().to_string()
39 , acceptor_.local_endpoint().port()
40 , config_.getExt<
bool>(
"loopback")
45 return advertisingMessage_;
48 virtual size_t readySessionCount()
const = 0;
53 tcp::acceptor acceptor_;
62 ,
size_t toSendQueueMaxSize)
64 toSendQueue_.set_capacity(toSendQueueMaxSize);
68 void queue(pair<char const*, char const*>
const& t
70 ,
size_t toSendByteSize
72 toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
87 if (sessions_.size() == 0) {
92 auto newStartIt = begin;
94 ToSendQueue::size_type minIndex = numeric_limits<ToSendQueue::size_type>::max();
95 for (
auto it = sessions_.begin(); it != sessions_.end();) {
96 if (minIndex > (*it)->toSendQueueIndex_) {
97 minIndex = (*it)->toSendQueueIndex_;
99 if (unlikely(it->use_count() == 1)) {
101 sessions_.erase(it++);
108 newStartIt = get<3>(toSendQueue_[minIndex - 1]);
109 toSendQueue_.erase_begin(minIndex);
110 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
111 (*it)->toSendQueueIndex_ -= minIndex;
118 size_t readySessionCount()
const override {
120 for (
auto const& s : sessions_) {
121 if (s->ready()) res++;
126 void killSlowestSession() {
127 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
128 if (0 == (*it)->toSendQueueIndex_) {
129 HMBDC_LOG_C((*it)->id(),
" too slow, dropping");
138 acceptor_.async_accept(socket_,
139 [
this](boost::system::error_code ec) {
141 auto sz = config_.getExt<
int>(
"tcpSendBufferBytes");
143 socket_base::send_buffer_size option(sz);
144 boost::system::error_code ec;
145 socket_.set_option(option);
148 socket_base::send_buffer_size option;
149 socket_.get_option(option);
150 if (sz == 0 || sz >= option.value()) {
153 HMBDC_LOG_C(
"set tcpcast SendSession send buffer size unsuccessful, want " 154 , sz,
" actual: ", option.value()
155 ,
" resulting higher receiver dropping possibility, check OS limits!");
158 socket_.set_option(ip::tcp::no_delay(!config_.getExt<
bool>(
"nagling")));
160 auto s = std::make_shared<AsyncSendSession>(std::move(socket_), toSendQueue_);
169 using Sessions = unordered_set<AsyncSendSession::ptr>;
171 ToSendQueue toSendQueue_;
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: SendServer.hpp:25
Definition: TypedString.hpp:74
Definition: SendServer.hpp:58
Definition: GuardedSingleton.hpp:9
MonoLockFreeBuffer::iterator run(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) __restrict__
run the server's async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:85
Definition: Messages.hpp:129
T getExt(const path_type ¶m) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73