1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/text/StringTrieSet.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 10 #include "hmbdc/comm/inet/Misc.hpp" 12 #include <boost/lexical_cast.hpp> 13 #include <boost/circular_buffer.hpp> 19 #include <sys/epoll.h> 21 namespace hmbdc {
namespace app {
namespace tcpcast {
23 namespace sendserver_detail {
27 using ToSend = std::vector<iovec>;
28 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*>
35 namespace sendsession_detail {
39 using ptr = shared_ptr<SendSession>;
41 , ToSendQueue & toSendQueue
42 ,
size_t maxSendBatch)
45 , toSendQueue_(toSendQueue)
46 , toSendQueueIndex_(0)
49 , msghdrRelicSize_(0) {
50 msghdrRelic_.msg_iov =
new iovec[maxSendBatch * 2];
51 auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
52 id_ = addrPort.first +
":" + std::to_string(addrPort.second);
53 auto forRead = dup(fd);
55 HMBDC_THROW(std::runtime_error,
"dup failed errno=" << errno);
59 utils::EpollTask::instance().add(EPOLLIN|EPOLLET, readFd_);
61 utils::EpollTask::instance().add(EPOLLOUT|EPOLLET, writeFd_);
62 HMBDC_LOG_N(
"SendSession started: ",
id());
66 delete [] msghdrRelic_.msg_iov;
67 HMBDC_LOG_N(
"SendSession retired: ",
id());
70 bool runOnce() HMBDC_RESTRICT {
71 if (hmbdc_unlikely(!doRead()))
return false;
72 if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
73 auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
74 if (hmbdc_unlikely(l < 0)) {
75 if (!writeFd_.checkErr()) {
76 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
81 msghdrRelicSize_ -= size_t(l);
82 if (hmbdc_unlikely(msghdrRelicSize_)) {
83 comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
89 for (; !msghdrRelicSize_ && writeFd_.isFdReady()
90 && toSendQueueIndex_ != toSendQueue_.size();) {
91 if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
92 msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
93 msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
94 auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
95 if (hmbdc_unlikely(l < 0)) {
96 if (!writeFd_.checkErr()) {
97 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
102 msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) -
size_t(l);
103 if (hmbdc_unlikely(msghdrRelicSize_)) {
104 comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
113 char const* id()
const {
123 doRead() HMBDC_RESTRICT {
125 auto p = find(data_, data_ + readLen_,
'\t');
126 if (p != data_ + readLen_) {
132 else if (data_[0] ==
'+') {
133 clientSubscriptions_.add(t);
134 }
else if (data_[0] ==
'-') {
135 clientSubscriptions_.erase(t);
137 memmove(data_, p + 1, readLen_ - (p - data_ + 1));
138 readLen_ -= p - data_ + 1;
143 if (hmbdc_unlikely(readFd_.isFdReady())) {
144 auto l = recv(readFd_.fd, data_ + readLen_,
sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
145 if (hmbdc_unlikely(l < 0)) {
146 if (!readFd_.checkErr()) {
147 HMBDC_LOG_C(
"recv failed errno=", errno);
166 ToSendQueue& toSendQueue_;
167 ToSendQueue::size_type toSendQueueIndex_;
170 size_t msghdrRelicSize_;
Definition: SendSession.hpp:38
Definition: TypedString.hpp:74
Definition: SendServer.hpp:25
Definition: EpollTask.hpp:31
Definition: StringTrieSetDetail.hpp:115
Definition: LockFreeBufferMisc.hpp:74