1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/text/StringTrieSet.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 10 #include "hmbdc/comm/inet/Misc.hpp" 12 #include <boost/lexical_cast.hpp> 13 #include <boost/circular_buffer.hpp> 20 #include <sys/epoll.h> 22 namespace hmbdc {
namespace app {
namespace tcpcast {
24 namespace sendserver_detail {
28 using ToSend = std::vector<iovec>;
29 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*>
36 namespace sendsession_detail {
40 using ptr = shared_ptr<SendSession>;
43 , ToSendQueue & toSendQueue
44 ,
size_t maxSendBatch)
48 , toSendQueue_(toSendQueue)
49 , toSendQueueIndex_(0)
52 , msghdrRelicSize_(0) {
53 msghdrRelic_.msg_iov =
new iovec[maxSendBatch * 2];
54 auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
55 id_ = addrPort.first +
":" + std::to_string(addrPort.second);
56 auto forRead = dup(fd);
58 HMBDC_THROW(std::runtime_error,
"dup failed errno=" << errno);
62 utils::EpollTask::instance().add(EPOLLIN|EPOLLET, readFd_);
64 utils::EpollTask::instance().add(EPOLLOUT|EPOLLET, writeFd_);
65 HMBDC_LOG_N(
"SendSession started: ",
id());
69 delete [] msghdrRelic_.msg_iov;
70 HMBDC_LOG_N(
"SendSession retired: ",
id());
73 bool runOnce() HMBDC_RESTRICT {
74 if (hmbdc_unlikely(!doRead()))
return false;
75 if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
76 auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
77 if (hmbdc_unlikely(l < 0)) {
78 if (!writeFd_.checkErr()) {
79 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
84 msghdrRelicSize_ -= size_t(l);
85 if (hmbdc_unlikely(msghdrRelicSize_)) {
86 comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
92 for (; !msghdrRelicSize_ && writeFd_.isFdReady()
93 && toSendQueueIndex_ != toSendQueue_.size();) {
94 if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
95 msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
96 msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
97 auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
98 if (hmbdc_unlikely(l < 0)) {
99 if (!writeFd_.checkErr()) {
100 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
105 msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) -
size_t(l);
106 if (hmbdc_unlikely(msghdrRelicSize_)) {
107 comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
116 char const* id()
const {
126 doRead() HMBDC_RESTRICT {
128 auto p = find(data_, data_ + readLen_,
'\t');
129 if (p != data_ + readLen_) {
132 if (hmbdc_unlikely(connKey_)) {
133 if (data_[0] !=
'@' || stoull(t) != connKey_) {
134 HMBDC_LOG_C(
"invalid connection attempted by ", id_);
138 }
else if (t.size() == 0) {
141 else if (data_[0] ==
'+') {
142 clientSubscriptions_.add(t);
143 }
else if (data_[0] ==
'-') {
144 clientSubscriptions_.erase(t);
146 HMBDC_LOG_C(
"voilating protocle by ", id_);
149 memmove(data_, p + 1, readLen_ - (p - data_ + 1));
150 readLen_ -= p - data_ + 1;
155 if (hmbdc_unlikely(readFd_.isFdReady())) {
156 auto l = recv(readFd_.fd, data_ + readLen_,
sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
157 if (hmbdc_unlikely(l < 0)) {
158 if (!readFd_.checkErr()) {
159 HMBDC_LOG_C(
"recv failed errno=", errno);
178 ToSendQueue& toSendQueue_;
179 ToSendQueue::size_type toSendQueueIndex_;
182 size_t msghdrRelicSize_;
Definition: SendSession.hpp:39
Definition: TypedString.hpp:74
Definition: SendServer.hpp:25
Definition: EpollTask.hpp:31
Definition: StringTrieSetDetail.hpp:115
Definition: LockFreeBufferMisc.hpp:74