hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/mcast/Transport.hpp"
4 #include "hmbdc/app/mcast/Messages.hpp"
5 #include "hmbdc/app/Base.hpp"
6 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
7 #include "hmbdc/time/Time.hpp"
8 #include "hmbdc/time/Rater.hpp"
9 #include "hmbdc/numeric/BitMath.hpp"
10 
11 #include <boost/regex.hpp>
12 
13 #include <memory>
14 #include <iostream>
15 
16 #include <sys/epoll.h>
17 namespace hmbdc { namespace app { namespace mcast {
18 
19 namespace sendtransportengine_detail {
20 using namespace std;
21 using namespace hmbdc::time;
22 using namespace hmbdc::pattern;
24 
26 : Transport {
27  using ptr = std::shared_ptr<SendTransport>;
28  SendTransport(Config const&, size_t);
29  ~SendTransport();
30 
31  bool match(Topic const& t) const {
32  return boost::regex_match(t.c_str(), topicRegex_);
33  }
34 
35  template <typename... Messages>
36  void queue(Topic const& t, Messages&&... msgs) {
37  auto n = sizeof...(msgs);
38  auto it = buffer_.claim(n);
39  queue(it, t, std::forward<Messages>(msgs)...);
40  buffer_.commit(it, n);
41  }
42 
43  template <typename... Messages>
44  bool tryQueue(Topic const& t, Messages&&... msgs) {
45  auto n = sizeof...(msgs);
46  auto it = buffer_.tryClaim(n);
47  if (it) {
48  queue(it, t, std::forward<Messages>(msgs)...);
49  buffer_.commit(it, n);
50  return true;
51  }
52  return false;
53  }
54 
55  template <typename Message, typename ... Args>
56  void queueInPlace(Topic const& t, Args&&... args) {
57  auto s = buffer_.claim();
58  char* addr = static_cast<char*>(*s);
59  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
60  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
61  h->topicLen = tl;
62  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
63  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
64  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
65  } else {
66  HMBDC_THROW(std::out_of_range
67  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
68  }
69  buffer_.commit(s);
70  }
71 
72  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
73 
74  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
75  resumeSend();
76  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
77  utils::EpollTask::instance().poll();
78  }
79  }
80 
81  void stop();
82 
83 private:
84  std::string topic_;
85  boost::regex topicRegex_;
86  size_t maxMessageSize_;
87  typename Buffer::iterator begin_, it_, end_;
88  Buffer buffer_;
89  Rater rater_;
90  size_t maxSendBatch_;
91 
92  iovec* toSendMsgs_;
93  size_t toSendMsgsHead_;
94  size_t toSendMsgsTail_;
95  mmsghdr* toSendPkts_;
96  size_t toSendPktsHead_;
97  size_t toSendPktsTail_;
98 
99  uint16_t
100  outBufferSizePower2();
101 
102  template<typename M, typename ... Messages>
103  void queue(typename Buffer::iterator it, Topic const& t
104  , M&& m, Messages&&... msgs) {
105  using Message = typename std::remove_reference<M>::type;
106  auto s = *it;
107  char* addr = static_cast<char*>(s);
108  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
109  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
110  h->topicLen = tl;
111  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
112  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
113  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
114  } else {
115  HMBDC_THROW(std::out_of_range
116  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
117  }
118  queue(++it, t, std::forward<Messages>(msgs)...);
119  }
120 
121  void queue(typename Buffer::iterator it
122  , Topic const& t) {}
123  void resumeSend();
124 };
125 
128 , Client<SendTransportEngine> {
129  using SendTransport::SendTransport;
130  using SendTransport::hmbdcName;
131  using SendTransport::schedSpec;
132 
133  /*virtual*/
134  void invokedCb(uint16_t) HMBDC_RESTRICT override {
135  runOnce();
136  }
137  /*virtual*/ bool droppedCb() override {
138  stop();
139  return true;
140  };
141 };
142 
143 } //sendtransportengine_detail
146 }}}
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:11
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:134
Definition: Message.hpp:72
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Transport.hpp:43
Definition: Rater.hpp:11
Definition: Base.hpp:12
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:137
Definition: LockFreeBufferMisc.hpp:74