1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/LoggerT.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/text/StringTrieSet.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 10 #include "hmbdc/comm/inet/Misc.hpp" 12 #include <boost/asio.hpp> 13 #include <boost/lexical_cast.hpp> 14 #include <boost/circular_buffer.hpp> 20 namespace hmbdc {
namespace app {
namespace tcpcast {
23 using boost::asio::ip::tcp;
26 using ToSend = vector<const_buffer>;
27 using ToSendQueue = boost::circular_buffer<tuple<pair<char const*, char const*>
35 : std::enable_shared_from_this<SendSession> {
37 : socket_(move(socket))
39 , id_(boost::lexical_cast<string>(socket_.remote_endpoint()))
41 socket_.non_blocking(
true);
45 HMBDC_LOG_N(
"SendSession retired: ",
id());
49 HMBDC_LOG_N(
"SendSession started: ",
id());
55 boost::system::error_code ec;
56 socket_.shutdown(tcp::socket::shutdown_both, ec);
60 char const* id()
const {
71 auto p = find(data_, data_ + filledLen_,
'\t');
72 if (p != data_ + filledLen_) {
78 else if (data_[0] ==
'+') {
79 clientSubscriptions_.add(t);
80 }
else if (data_[0] ==
'-') {
81 clientSubscriptions_.erase(t);
83 memmove(data_, p + 1, filledLen_ - (p - data_ + 1));
84 filledLen_ -= p - data_ + 1;
90 auto self(shared_from_this());
91 socket_.async_read_some(boost::asio::buffer(data_ + filledLen_,
sizeof(data_) - filledLen_),
92 [
this,
self](boost::system::error_code ec, std::size_t length) {
116 using ptr = shared_ptr<SyncSendSession>;
117 using SendSession::SendSession;
118 template <
typename ToSend>
119 size_t send(std::pair<char const*, char const*>
const& t
120 , ToSend
const& toSend,
size_t size) {
121 if (clientSubscriptions_.check(t)) {
122 boost::system::error_code ec;
123 auto res = write(socket_, toSend, ec);
125 HMBDC_LOG_W(
id(),
", sending error_code=", ec);
136 using ptr = shared_ptr<AsyncSendSession>;
138 , ToSendQueue & toSendQueue)
140 , toSendQueue_(toSendQueue)
141 , toSendQueueIndex_(0)
142 , asyncWritePending_(
false) {
146 if (asyncWritePending_)
return;
148 for (; toSendQueueIndex_ != toSendQueue_.size();) {
149 if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
150 auto self(shared_from_this());
151 auto bytesToSend = get<2>(toSendQueue_[toSendQueueIndex_]);
152 async_write(socket_, get<1>(toSendQueue_[toSendQueueIndex_]),
153 [
this,
self, bytesToSend](
const boost::system::error_code& ec,
154 std::size_t bytes_transferred) {
157 HMBDC_LOG_C(
id(),
", async sending error_code=", ec);
158 }
else if (unlikely(bytes_transferred != bytesToSend)) {
159 HMBDC_LOG_C(
id(),
", async sending incomplete, wrote bytes=" 160 , bytes_transferred);
162 asyncWritePending_ =
false;
167 asyncWritePending_ =
true;
177 ToSendQueue& toSendQueue_;
178 ToSendQueue::size_type toSendQueueIndex_;
179 bool asyncWritePending_;
Definition: SendSession.hpp:134
Definition: SendSession.hpp:114
Definition: StringTrieSet.hpp:113
Definition: TypedString.hpp:74
Definition: SendServer.hpp:58
Definition: SendSession.hpp:34
Definition: Client.hpp:11
Definition: LockFreeBufferMisc.hpp:73