hmbdc
simplify-high-performance-messaging-programming
BlockingContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/time/Timers.hpp"
4 #include "hmbdc/pattern/BlockingBuffer.hpp"
5 #include "hmbdc/Traits.hpp"
6 #include <tuple>
7 
8 namespace hmbdc { namespace app {
9 namespace blocking_context_detail {
10 
11 template <bool is_timer_manager>
12 struct tm_runner {
13  template<typename C>
14  void operator()(C&) {}
15 };
16 
17 template <>
18 struct tm_runner<true> {
19  void operator()(time::TimerManager& tm) {
20  tm.checkTimers(time::SysTime::now());
21  }
22 };
23 
24 template <typename... MessageTuples>
26  using Interests = std::tuple<>;
27 };
28 
29 template <typename MessageTuple, typename... MessageTuples>
30 struct context_property_aggregator<MessageTuple, MessageTuples ...> {
31  using Interests = typename merge_tuple_unique<MessageTuple
32  , typename context_property_aggregator<MessageTuples ...>::Interests
33  >::type;
34 };
35 
36 
37 template <typename T>
38 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
39 waitDuration(T const& tm, time::Duration maxBlockingTime) {
40  return std::min(tm.untilNextFire(), maxBlockingTime);
41 }
42 
43 template <typename T>
44 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
45 waitDuration(T const&, time::Duration maxBlockingTime) {
46  return maxBlockingTime;
47 }
48 
49 template <typename CcClient>
50 bool runOnceImpl(bool& HMBDC_RESTRICT stopped, pattern::BlockingBuffer& HMBDC_RESTRICT buf
51  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
53  try {
55  tr(c);
56 
57  const bool clientParticipateInMessaging =
58  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
59  if (clientParticipateInMessaging) {
60  uint64_t count = buf.peek(begin, end);
61  auto b = begin;
62  c.CcClient::handleRangeImpl(b, end, 0xffff);
63  c.CcClient::invokedCb(0xffff);
64  buf.wasteAfterPeek(count);
65  if (!count) {
66  buf.waitItem(waitDuration(c, maxBlockingTime));
67  }
68  } else {
69  c.CcClient::invokedCb(0xffffu);
70  }
71  } catch (std::exception const& e) {
72  if (!stopped) {
73  c.stopped(e);
74  return !c.dropped();
75  }
76  } catch (int code) {
77  if (!stopped) {
78  c.stopped(ExitCode(code));
79  return !c.dropped();
80  }
81  } catch (...) {
82  if (!stopped) {
83  c.stopped(UnknownException());
84  return !c.dropped();
85  }
86  }
87  return true;
88 }
89 } //blocking_context_detail
90 
91 /**
92  * @class BlockingContext<>
93  * @brief A BlockingContext is like a media object that facilitates the communications
94  * for the Clients that it is holding. Each Client is powered by a single OS thread.
95  * a Client needs to be started once and only once to a single BlockingContext
96  * before any messages sending happens - typically in the initialization stage in main(),
97  * undefined behavior otherwise.
98  * @details a Client running in such a BlockingContext utilizing OS's blocking mechanism
99  * and takes less CPU time. The Client's responding time scales better when the number of
100  * Clients greatly exceeds the availlable CPUs in the system and the effective message rate
101  * for a Client tends to be low.
102  * @tparam MessageTuples std tuple capturing the Messages that the Context is supposed to
103  * deliver
104  */
105 template <typename... MessageTuples>
107 private:
109 public:
110  /**
111  * @class Interests
112  * @brief a std tuple holding messages types it can dispatch
113  * @details will not compile if using the Context to deliver a message not in this tuple
114  *
115  */
116  using Interests = typename cpa::Interests;
117 
118  /**
119  * @brief trivial ctor
120  */
122  : stopped_(false){}
123  BlockingContext(BlockingContext const&) = delete;
124  BlockingContext& operator = (BlockingContext const&) = delete;
125 
126  /**
127  * @brief start a client within the Context. The client is powered by a single OS thread
128  * @details it is ok if a Client is blocking, if its own buffer is not full, it
129  * doesn't affect other Client's capabilities of receiving Messages
130  *
131  * Usage example:
132  *
133  * @code
134  *
135  * // the following starts 1 Client with buffer length of 2048 messages and pinning it on 3rd CPU
136  * ctx.start(client2, 2048, 0x4ul);
137  * @endcode
138  *
139  * @tparam Client actual Client type
140  * @param c Client
141  * @param capacity the maximum messages this Client can buffer
142  * @param maxItemSize the max size of a message
143  * @param cpuAffinity cpu affinity mask for this Client's thread
144  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
145  * no messages to handle, so it can respond to things like Context is stopped, or generate
146  * heartbeats if applicable.
147  */
148  template <typename Client>
149  void start(Client& c
150  , size_t capacity = 1024
152  , uint64_t cpuAffinity = 0
153  , time::Duration maxBlockingTime = time::Duration::seconds(1)) {
154  std::shared_ptr<Transport> t;
155  if (std::tuple_size<typename Client::Interests>::value) {
156  t.reset(new Transport(maxItemSize, capacity));
158  }
159  auto thrd = kickOffClientThread(c, t->buffer, cpuAffinity, maxBlockingTime);
160  threads_.push_back(move(thrd));
161  }
162 
163  /**
164  * @brief start a client within the Context. The client is powered by a single OS thread
165  * @details it is ok if a Client is blocking, if its own buffer is not full, it
166  * doesn't affect other Client's capabilities of receiving Messages
167  *
168  * @tparam Client actual Client type
169  * @tparam DeliverPred a condition functor deciding if a message should be delivered to a
170  * Client, which provides filtering before the Message type filtering. It improves performance
171  * in the way it could potentially reduce the unblocking times of a Client.
172  *
173  * Usage example:
174  * //the following Pred verifies srcId matches for Response, and let all other Messages types
175  * //deliver
176  * @code
177  * struct Pred {
178  * Pred(uint16_t srcId)
179  * : srcId(srcId){}
180  *
181  * bool operator()(Response const& resp) {
182  * return resp.srcId == srcId;
183  * }
184  * template <typename M>
185  * bool operator()(M const&) {return true;}
186  *
187  * uint16_t srcId;
188  * };
189  *
190  * @endcode
191  *
192  * @param c Client
193  * @param capacity the maximum messages this Client can buffer
194  * @param maxItemSize the max size of a message
195  * @param cpuAffinity cpu affinity mask for this Client's thread
196  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
197  * no messages to handle, so it can respond to things like Context is stopped, or generate
198  * heartbeats if applicable.
199  * @param pred
200  */
201  template <typename Client, typename DeliverPred>
202  void start(Client& c
203  , size_t capacity
204  , size_t maxItemSize
205  , uint64_t cpuAffinity
206  , time::Duration maxBlockingTime
207  , DeliverPred&& pred) { //=[](auto&&){return true;}
208  std::shared_ptr<Transport> t;
209  if (std::tuple_size<typename Client::Interests>::value) {
210  t.reset(new Transport(maxItemSize, capacity));
212  msgConduits_, c, t, std::forward<DeliverPred>(pred));
213  }
214  auto thrd = kickOffClientThread(c, t->buffer, cpuAffinity, maxBlockingTime);
215  threads_.push_back(move(thrd));
216  }
217 
218  /**
219  * @brief stop the message dispatching - asynchronously
220  * @details asynchronously means not garanteed message dispatching
221  * stops immidiately after this non-blocking call
222  */
223  void stop() {
224  __atomic_thread_fence(__ATOMIC_ACQUIRE);
225  stopped_ = true;
226  }
227 
228  /**
229  * @brief wait until all threads of the Context exit
230  * @details blocking call
231  */
232  void join() {
233  for (auto& t : threads_) {
234  t.join();
235  }
236  threads_.clear();
237  }
238 
239  /**
240  * @brief send a message to the BlockingContext to dispatch
241  * @details only the Clients that handles the Message will get it
242  * This function is threadsafe, which means you can call it anywhere in the code
243  *
244  * @tparam Message type
245  * @param m message
246  */
247  template <typename Message>
248  void send(Message&& m) {
249  using M = typename std::remove_reference<Message>::type;
250  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
251  static_assert(i < std::tuple_size<MsgConduits>::value, "message type unspecified for the blocking BlockingContext");
252  auto& entries = std::get<i>(msgConduits_);
253  for (auto& e : entries) {
254  if (e.diliverPred(std::forward<Message>(m))) {
255  e.transport->buffer.put(MessageWrap<M>(std::forward<Message>(m)));
256  }
257  }
258  }
259 
260  /**
261  * @brief best effort to send a message to via the BlockingContext
262  * @details this method is not recommended if more than one recipients are accepting
263  * this message since there is no garantee each one will receive it once and only once.
264  * this call does not block - return false when deliver doesn't reach all
265  * intended recipients
266  * This method is threadsafe, which means you can call it anywhere in the code
267  *
268  * @param m message
269  * @return true if send successfully to one or no one is interested
270  */
271  template <typename Message>
272  bool trySend(Message&& m) {
273  using M = typename std::remove_reference<Message>::type;
274  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
275  static_assert(i < std::tuple_size<MsgConduits>::value, "unspecified for the blocking BlockingContext");
276  auto& entries = std::get<i>(msgConduits_);
277  for (auto& e : entries) {
278  if (e.diliverPred(std::forward<Message>(m))) {
279  if (!e.transport->buffer.tryPut(MessageWrap<M>(std::forward<Message>(m)))) {
280  return false;
281  }
282  }
283  }
284  return true;
285  }
286 
287  /**
288  * @brief it does the same thing as trySend
289  * @details it is provided for interface compliance
290  *
291  * @param args ctor args
292  * @tparam Message type
293  * @tparam typename ... Args args
294  * @return true if send successfully
295  */
296  template <typename Message, typename ... Args>
297  bool trySendInPlace(Args&&... args) {
298  using M = typename std::remove_reference<Message>::type;
299  return trySend(M(std::forward<Args>(args)...));
300  }
301 
302  /**
303  * @brief send a range of messages via the BlockingContext
304  * @details only the Clients that handles the Message will get it of course
305  * This function is threadsafe, which means you can call it anywhere in the code
306  *
307  * @param begin a forward iterator point at the start of the range
308  * @param n length of the range
309  */
310  template <typename ForwardIt>
311  void
312  send(ForwardIt begin, size_t n) {
313  for (auto i = 0u; i < n; ++i, begin++) {
314  send(*begin);
315  }
316  }
317 
318 private:
319  void start(){}
320 
321  struct Transport {
322  Transport(size_t maxItemSize, size_t capacity)
323  : buffer(maxItemSize + sizeof(MessageHead), capacity){}
325  };
326 
327  template <typename Message>
328  struct TransportEntry {
329  std::shared_ptr<Transport> transport;
330  std::function<bool (Message const&)> diliverPred;
331  };
332  template <typename Message>
333  using TransportEntries = std::vector<TransportEntry<Message>>;
334 
335  template <typename Tuple> struct MCGen;
336  template <typename ...Messages>
337  struct MCGen<std::tuple<Messages ...>> {
338  using type = std::tuple<TransportEntries<Messages> ...>;
339  };
340 
341  using MsgConduits = typename MCGen<Interests>::type;
342 
343  MsgConduits msgConduits_;
344  using Threads = std::vector<std::thread>;
345  Threads threads_;
346  bool stopped_;
347 
348  template <typename MsgConduits, typename Client, typename DeliverPred, typename Tuple>
350  void operator()(MsgConduits&, Client&, std::shared_ptr<Transport>, DeliverPred&&){}
351  };
352 
353  template <typename MsgConduits, typename Client, typename DeliverPred, typename M, typename ...Messages>
354  struct setupConduitFor<MsgConduits, Client, DeliverPred, std::tuple<M, Messages...>> {
355  void operator()(MsgConduits& msgConduits, Client& c, std::shared_ptr<Transport> t, DeliverPred&& pred){
356  auto constexpr i = index_in_tuple<M, Interests>::value;
357  static_assert(i != std::tuple_size<Interests>::value, "");
358  std::get<i>(msgConduits).emplace_back(TransportEntry<M>{t, std::forward<DeliverPred>(pred)});
359 
360  setupConduitFor<MsgConduits, Client, DeliverPred, std::tuple<Messages...>>()
361  (msgConduits, c, t, std::forward<DeliverPred>(pred));
362  }
363  };
364 
365  template <typename MsgConduits, typename Client, typename M, typename ...Messages>
366  struct setupConduitFor<MsgConduits, Client, void*, std::tuple<M, Messages...>> {
367  void operator()(MsgConduits& msgConduits, Client& c, std::shared_ptr<Transport> t, void*){
368  auto constexpr i = index_in_tuple<M, Interests>::value;
369  static_assert(i != std::tuple_size<Interests>::value, "");
370  std::get<i>(msgConduits).emplace_back(TransportEntry<M>{t, [](M const&){return true;}});
371 
372  setupConduitFor<MsgConduits, Client, void*, std::tuple<Messages...>>()
373  (msgConduits, c, t, nullptr);
374  }
375  };
376 
377  template <typename Client>
378  auto kickOffClientThread(Client& c, pattern::BlockingBuffer& buffer
379  , uint64_t mask, time::Duration maxBlockingTime) {
380  std::thread thrd([
381  this
382  , &c
383  , &buffer
384  , mask
385  , maxBlockingTime]() {
386  std::string name;
387  char const* schedule;
388  int priority;
389 
390  if (c.hmbdcName()) {
391  name = c.hmbdcName();
392  } else {
393  name = "hmbdc-b";
394  }
395  auto cpuAffinityMask = mask;
396  std::tie(schedule, priority) = c.schedSpec();
397 
398  if (!schedule) schedule = "SCHED_OTHER";
399 
400  os::configureCurrentThread(name.c_str(), cpuAffinityMask
401  , schedule, priority);
402  c.messageDispatchingStartedCb(0xffff);
403 
404  while(!stopped_ &&
405  blocking_context_detail::runOnceImpl(this->stopped_, buffer, c, maxBlockingTime)) {
406  }
407  if (this->stopped_) c.dropped();
408  }
409  );
410 
411  return move(thrd);
412  }
413 };
414 }}
Definition: BlockingBuffer.hpp:17
Definition: Traits.hpp:79
void start(Client &c, size_t capacity, size_t maxItemSize, uint64_t cpuAffinity, time::Duration maxBlockingTime, DeliverPred &&pred)
start a client within the Context. The client is powered by a single OS thread
Definition: BlockingContext.hpp:202
void send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:312
Definition: BlockingContext.hpp:321
Definition: BlockingContext.hpp:349
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
Definition: BlockingContext.hpp:335
Definition: Traits.hpp:43
bool trySend(Message &&m)
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:272
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1))
start a client within the Context. The client is powered by a single OS thread
Definition: BlockingContext.hpp:149
Definition: BlockingContext.hpp:328
Unknown excpetion.
Definition: Exception.hpp:17
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:106
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:69
Definition: Message.hpp:38
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:59
bool trySendInPlace(Args &&...args)
it does the same thing as trySend
Definition: BlockingContext.hpp:297
Definition: BlockingBuffer.hpp:175
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:232
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:223
Definition: Message.hpp:72
Definition: Time.hpp:125
BlockingContext()
trivial ctor
Definition: BlockingContext.hpp:121
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:93
Definition: Traits.hpp:95
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Exception that just has an exit code.
Definition: Exception.hpp:28
void send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:248
a std tuple holding messages types it can dispatch
Definition: Base.hpp:12
Definition: BlockingContext.hpp:12