1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/text/StringTrieSet.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 10 #include "hmbdc/comm/inet/Misc.hpp" 12 #include <boost/asio.hpp> 13 #include <boost/lexical_cast.hpp> 14 #include <boost/circular_buffer.hpp> 20 namespace hmbdc {
namespace app {
namespace tcpcast {
22 namespace sendserver_detail {
23 struct AsyncSendServer;
26 using ToSend = std::vector<boost::asio::const_buffer>;
27 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*>
34 namespace sendsession_detail {
36 using boost::asio::ip::tcp;
42 : std::enable_shared_from_this<SendSession> {
44 : socket_(move(socket))
46 , id_(boost::lexical_cast<string>(socket_.remote_endpoint()))
48 socket_.non_blocking(
true);
52 HMBDC_LOG_N(
"SendSession retired: ",
id());
56 HMBDC_LOG_N(
"SendSession started: ",
id());
62 boost::system::error_code ec;
63 socket_.shutdown(tcp::socket::shutdown_both, ec);
67 char const* id()
const {
78 auto p = find(data_, data_ + filledLen_,
'\t');
79 if (p != data_ + filledLen_) {
85 else if (data_[0] ==
'+') {
86 clientSubscriptions_.add(t);
87 }
else if (data_[0] ==
'-') {
88 clientSubscriptions_.erase(t);
90 memmove(data_, p + 1, filledLen_ - (p - data_ + 1));
91 filledLen_ -= p - data_ + 1;
97 auto self(shared_from_this());
98 socket_.async_read_some(boost::asio::buffer(data_ + filledLen_,
sizeof(data_) - filledLen_),
99 [
this,
self](boost::system::error_code ec, std::size_t length) {
101 filledLen_ += length;
123 using ptr = shared_ptr<SyncSendSession>;
124 using SendSession::SendSession;
125 template <
typename ToSend>
126 size_t send(std::pair<char const*, char const*>
const& t
127 , ToSend
const& toSend,
size_t size) {
128 if (clientSubscriptions_.check(t)) {
129 boost::system::error_code ec;
130 auto res = write(socket_, toSend, ec);
131 if (hmbdc_unlikely(ec)) {
132 HMBDC_LOG_W(
id(),
", sending error_code=", ec);
143 using ptr = shared_ptr<AsyncSendSession>;
145 , ToSendQueue & toSendQueue)
147 , toSendQueue_(toSendQueue)
148 , toSendQueueIndex_(0)
149 , asyncWritePending_(
false) {
153 if (asyncWritePending_)
return;
155 for (; toSendQueueIndex_ != toSendQueue_.size();) {
156 if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
157 auto self(shared_from_this());
158 auto bytesToSend = get<2>(toSendQueue_[toSendQueueIndex_]);
159 async_write(socket_, get<1>(toSendQueue_[toSendQueueIndex_])
160 , transfer_exactly(bytesToSend)
161 , [
this,
self](
const boost::system::error_code& ec,
162 std::size_t bytes_transferred) {
164 if (hmbdc_unlikely(ec)) {
165 HMBDC_LOG_C(
id(),
", async sending error_code=", ec);
167 asyncWritePending_ =
false;
172 asyncWritePending_ =
true;
182 ToSendQueue& toSendQueue_;
183 ToSendQueue::size_type toSendQueueIndex_;
184 bool asyncWritePending_;
Definition: SendSession.hpp:41
Definition: SendSession.hpp:121
Definition: TypedString.hpp:74
Definition: StringTrieSetDetail.hpp:115
Definition: SendServer.hpp:60
Definition: SendSession.hpp:141
Definition: LockFreeBufferMisc.hpp:73