hmbdc
simplify-high-performance-messaging-programming
BlockingContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Message.hpp"
4 #include "hmbdc/time/Timers.hpp"
5 #include "hmbdc/pattern/BlockingBuffer.hpp"
6 #include "hmbdc/os/Thread.hpp"
7 #include "hmbdc/Traits.hpp"
8 #include "hmbdc/Exception.hpp"
9 
10 #include <tuple>
11 #include <vector>
12 #include <thread>
13 #include <stdexcept>
14 
15 namespace hmbdc { namespace app {
16 namespace blocking_context_detail {
17 
18 template <bool is_timer_manager>
19 struct tm_runner {
20  template<typename C>
21  void operator()(C&) {}
22 };
23 
24 template <>
25 struct tm_runner<true> {
26  void operator()(time::TimerManager& tm) {
27  tm.checkTimers(time::SysTime::now());
28  }
29 };
30 
31 template <typename... MessageTuples>
33  using Interests = std::tuple<>;
34 };
35 
36 template <typename MessageTuple, typename... MessageTuples>
37 struct context_property_aggregator<MessageTuple, MessageTuples ...> {
38  using Interests = typename merge_tuple_unique<MessageTuple
39  , typename context_property_aggregator<MessageTuples ...>::Interests
40  >::type;
41 };
42 
43 
44 template <typename T>
45 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
46 waitDuration(T const& tm, time::Duration maxBlockingTime) {
47  return std::min(tm.untilNextFire(), maxBlockingTime);
48 }
49 
50 template <typename T>
51 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
52 waitDuration(T const&, time::Duration maxBlockingTime) {
53  return maxBlockingTime;
54 }
55 
56 template <typename CcClient>
57 bool runOnceImpl(bool& HMBDC_RESTRICT stopped
58  , pattern::BlockingBuffer* HMBDC_RESTRICT buf
59  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
61  try {
63  tr(c);
64 
65  const bool clientParticipateInMessaging =
66  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
67  if (clientParticipateInMessaging && buf) {
68  uint64_t count = buf->peek(begin, end);
69  auto b = begin;
70  c.CcClient::handleRangeImpl(b, end, 0xffff);
71  c.CcClient::invokedCb(0xffff);
72  buf->wasteAfterPeek(count);
73  if (!count) {
74  buf->waitItem(waitDuration(c, maxBlockingTime));
75  }
76  } else {
77  c.CcClient::invokedCb(0xffffu);
78  }
79  } catch (std::exception const& e) {
80  if (!stopped) {
81  c.stopped(e);
82  return !c.dropped();
83  }
84  } catch (int code) {
85  if (!stopped) {
86  c.stopped(ExitCode(code));
87  return !c.dropped();
88  }
89  } catch (...) {
90  if (!stopped) {
91  c.stopped(UnknownException());
92  return !c.dropped();
93  }
94  }
95  return true;
96 }
97 
98 
99 template <typename CcClient>
100 bool runOnceLoadSharingImpl(bool& HMBDC_RESTRICT stopped
101  , pattern::BlockingBuffer* HMBDC_RESTRICT buf
102  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
104  try {
106  tr(c);
107 
108  const bool clientParticipateInMessaging =
109  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
110  if (clientParticipateInMessaging && buf) {
111  uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
112  void* tmp = msgWrapped;
113  if (buf->tryTake(tmp, 0, maxBlockingTime)) {
114  c.CcClient::handleMessage(*static_cast<MessageHead*>(tmp));
115  }
116  }
117  c.CcClient::invokedCb(0xffffu);
118  } catch (std::exception const& e) {
119  if (!stopped) {
120  c.stopped(e);
121  return !c.dropped();
122  }
123  } catch (int code) {
124  if (!stopped) {
125  c.stopped(ExitCode(code));
126  return !c.dropped();
127  }
128  } catch (...) {
129  if (!stopped) {
130  c.stopped(UnknownException());
131  return !c.dropped();
132  }
133  }
134  return true;
135 }
136 } //blocking_context_detail
137 
138 /**
139  * @class BlockingContext<>
140  * @brief A BlockingContext is like a media object that facilitates the communications
141  * for the Clients that it is holding. Each Client is powered by a single OS thread.
142  * a Client needs to be started once and only once to a single BlockingContext
143  * before any messages sending happens - typically in the initialization stage in main(),
144  * undefined behavior otherwise.
145  * @details a Client running in such a BlockingContext utilizing OS's blocking mechanism
146  * and takes less CPU time. The Client's responding time scales better when the number of
147  * Clients greatly exceeds the availlable CPUs in the system and the effective message rate
148  * for a Client tends to be low.
149  * @tparam MessageTuples std tuple capturing the Messages that the Context is supposed to
150  * deliver. Messages that not listed here are silently dropped to ensure loose coupling
151  * between senders and receivers
152  */
153 template <typename... MessageTuples>
155 private:
157  template <typename Message> struct can_handle;
158 public:
159  /**
160  * @class Interests
161  * @brief a std tuple holding messages types it can dispatch
162  * @details will not compile if using the Context to deliver a message not in this tuple
163  *
164  */
165  using Interests = typename cpa::Interests;
166 
167  struct deliverAll {
168  template <typename T>
169  bool operator()(T&&) {return true;}
170  };
171 
172  /**
173  * @brief trivial ctor
174  */
176  : stopped_(false){}
177  BlockingContext(BlockingContext const&) = delete;
178  BlockingContext& operator = (BlockingContext const&) = delete;
179 
180  /**
181  * @brief start a client within the Context. The client is powered by a single OS thread.
182  * @details it is ok if a Client is blocking, if its own buffer is not full, it
183  * doesn't affect other Client's capabilities of receiving Messages
184  *
185  * @tparam Client actual Client type
186  * @tparam DeliverPred a condition functor deciding if a message should be delivered to a
187  * Client, which provides filtering before the Message type filtering. It improves performance
188  * in the way it could potentially reduce the unblocking times of thread
189  *
190  * Usage example:
191  * //the following Pred verifies srcId matches for Response, and let all other Messages types
192  * //deliver
193  * @code
194  * struct Pred {
195  * Pred(uint16_t srcId)
196  * : srcId(srcId){}
197  *
198  * bool operator()(Response const& resp) {
199  * return resp.srcId == srcId;
200  * }
201  * template <typename M>
202  * bool operator()(M const&) {return true;}
203  *
204  * uint16_t srcId;
205  * };
206  *
207  * @endcode
208  *
209  * @param c Client
210  * @param capacity the maximum messages this Client can buffer
211  * @param maxItemSize the max size of a message
212  * @param cpuAffinity cpu affinity mask for this Client's thread
213  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
214  * no messages to handle, so it can respond to things like Context is stopped, or generate
215  * heartbeats if applicable.
216  * @param pred see DeliverPred documentation
217  */
218  template <typename Client, typename DeliverPred = deliverAll>
219  void start(Client& c
220  , size_t capacity = 1024
222  , uint64_t cpuAffinity = 0
223  , time::Duration maxBlockingTime = time::Duration::seconds(1)
224  , DeliverPred&& pred = DeliverPred()) {
225  std::shared_ptr<Transport> t(new Transport(maxItemSize, capacity));
227  msgConduits_, t, std::forward<DeliverPred>(pred));
228  auto thrd = kickOffClientThread(blocking_context_detail::runOnceImpl<Client>
229  , c, t.use_count() == 1?nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
230  threads_.push_back(move(thrd));
231  }
232 
233  /**
234  * @brief start a group of clients within the Context that collectively processing
235  * messages in a load sharing manner. Each client is powered by a single OS thread
236  * @details it is ok if a Client is blocking, if its own buffer is not full, it
237  * doesn't affect other Client's capabilities of receiving Messages
238  *
239  * @tparam LoadSharingClientPtrIt iterator to a Client pointer
240  * @tparam DeliverPred a condition functor deciding if a message should be delivered to these
241  * Clients, which provides filtering before the Message type filtering. It improves performance
242  * in the way it could potentially reduce the unblocking times of threads
243  *
244  * @param begin an iterator, **begin should produce a Client& for the first Client
245  * @param end an end iterator, [begin, end) is the range for Clients
246  * @param capacity the maximum messages this Client can buffer
247  * @param maxItemSize the max size of a message
248  * @param cpuAffinity cpu affinity mask for this Client's thread
249  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
250  * no messages to handle, so it can respond to things like Context is stopped, or generate
251  * heartbeats if applicable.
252  * @param pred see DeliverPred documentation
253  */
254  template <typename LoadSharingClientPtrIt, typename DeliverPred = deliverAll>
255  void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
256  , size_t capacity = 1024
257  , size_t maxItemSize = max_size_in_tuple<
258  typename std::remove_reference<decltype(**LoadSharingClientPtrIt())>::type::Interests
259  >::value
260  , uint64_t cpuAffinity = 0
261  , time::Duration maxBlockingTime = time::Duration::seconds(1)
262  , DeliverPred&& pred = DeliverPred()) { //=[](auto&&){return true;}
263  if (begin == end) return;
264  using Client = typename std::remove_reference<decltype(**begin)>::type;
265  std::shared_ptr<Transport> t(new Transport(maxItemSize, capacity));
267  msgConduits_, t, std::forward<DeliverPred>(pred));
268  while(begin != end) {
269  auto thrd = kickOffClientThread(blocking_context_detail::runOnceLoadSharingImpl<Client>
270  , **begin++, t.use_count() == 1?nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
271  threads_.push_back(std::move(thrd));
272  }
273  }
274 
275  /**
276  * @brief stop the message dispatching - asynchronously
277  * @details asynchronously means not garanteed message dispatching
278  * stops immidiately after this non-blocking call
279  */
280  void stop() {
281  __atomic_thread_fence(__ATOMIC_ACQUIRE);
282  stopped_ = true;
283  }
284 
285  /**
286  * @brief wait until all threads of the Context exit
287  * @details blocking call
288  */
289  void join() {
290  for (auto& t : threads_) {
291  t.join();
292  }
293  threads_.clear();
294  }
295 
296  /**
297  * @brief send a message to the BlockingContext to dispatch
298  * @details only the Clients that handles the Message will get it
299  * This function is threadsafe, which means you can call it anywhere in the code
300  *
301  * @tparam Message type
302  * @param m message
303  */
304  template <typename Message>
305  typename std::enable_if<can_handle<Message>::value, void>::type
306  send(Message&& m) {
307  using M = typename std::remove_reference<Message>::type;
308  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
309  static_assert(i < std::tuple_size<MsgConduits>::value, "unexpected message type");
310  auto& entries = std::get<i>(msgConduits_);
311  for (auto i = 0u; i < entries.size(); ++i) {
312  auto& e = entries[i];
313  if (e.diliverPred(m)) {
314  if (i == entries.size() - 1) {
315  e.transport->buffer
316  .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
317  } else {
318  e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
319  }
320  }
321  }
322  }
323  template <typename Message>
324  typename std::enable_if<!can_handle<Message>::value, void>::type
325  send(Message&&){}
326 
327  /**
328  * @brief send a message by directly constructing the message in receiving message queue
329  * @details since the message only exists after being deleivered, the DeliverPred
330  * is not called to decide if it should be delivered
331  *
332  * @param args ctor args
333  * @tparam Message type
334  * @tparam typename ... Args args
335  */
336  template <typename Message, typename ... Args>
337  typename std::enable_if<can_handle<Message>::value, void>::type
338  sendInPlace(Args&&... args) {
339  using M = typename std::remove_reference<Message>::type;
340  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
341  auto& entries = std::get<i>(msgConduits_);
342  for (auto i = 0u; i < entries.size(); ++i) {
343  auto& e = entries[i];
344  if (i == entries.size() - 1) {
345  e.transport->buffer
346  .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
347  } else {
348  e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
349  }
350  }
351  }
352  template <typename Message, typename ... Args>
353  typename std::enable_if<!can_handle<Message>::value, void>::type
354  sendInPlace(Args&&...) {}
355 
356  /**
357  * @brief best effort to send a message to via the BlockingContext
358  * @details this method is not recommended if more than one recipients are accepting
359  * this message since there is no garantee each one will receive it once and only once.
360  * this call does not block - return false when deliver doesn't reach all
361  * intended recipients
362  * This method is threadsafe, which means you can call it anywhere in the code
363  *
364  * @param m message
365  * @param timeout return false if cannot deliver in the specified time
366  * @return true if send successfully to every intended receiver
367  */
368  template <typename Message>
369  typename std::enable_if<can_handle<Message>::value, bool>::type
370  trySend(Message&& m, time::Duration timeout = time::Duration::seconds(0)) {
371  using M = typename std::remove_reference<Message>::type;
372  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
373  auto& entries = std::get<i>(msgConduits_);
374  for (auto i = 0u; i < entries.size(); ++i) {
375  auto& e = entries[i];
376  if (e.diliverPred(m)) {
377  if (i == entries.size() - 1) {
378  if (!e.transport->buffer
379  .tryPut(MessageWrap<M>(std::forward<Message>(m)), timeout)) {
380  return false;
381  }
382  } else {
383  if (!e.transport->buffer.tryPut(MessageWrap<M>(m), timeout)) {
384  return false;
385  }
386  }
387  }
388  }
389  return true;
390  }
391 
392  template <typename Message>
393  typename std::enable_if<!can_handle<Message>::value, bool>::type
394  trySend(Message&&, time::Duration = time::Duration::seconds(0)) {
395  return true;
396  }
397 
398 
399  /**
400  * @brief best effort to send a message by directly constructing the message in
401  * receiving message queue
402  * @details since the message only exists after being deleivered, the DeliverPred
403  * is not called to decide if it should be delivered
404  * this method is not recommended if more than one recipients are accepting
405  * this message since it is hard to ensure each message is delivered once and only once.
406  * this call does not block - return false when delivery doesn't reach ALL
407  * intended recipients
408  * This method is threadsafe, which means you can call it anywhere in the code
409  *
410  * @return true if send successfully to every intended receiver
411  */
412  template <typename Message, typename ... Args>
413  typename std::enable_if<can_handle<Message>::value, bool>::type
414  trySendInPlace(Args&&... args) {
415  using M = typename std::remove_reference<Message>::type;
416  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
417  auto& entries = std::get<i>(msgConduits_);
418  for (auto i = 0u; i < entries.size(); ++i) {
419  auto& e = entries[i];
420  if (i == entries.size() - 1) {
421  if (!e.transport->buffer.template
422  tryPutInPlace<MessageWrap<M>>(std::forward<Args>(args)...)) {
423  return false;
424  }
425  } else {
426  if (!e.transport->buffer.template
427  tryPutInPlace<MessageWrap<M>>(args...)) {
428  return false;
429  }
430  }
431  }
432  return true;
433  }
434 
435  template <typename Message, typename ... Args>
436  typename std::enable_if<!can_handle<Message>::value, bool>::type
437  trySendInPlace(Args&&... args) {
438  return true;
439  }
440 
441  /**
442  * @brief send a range of messages via the BlockingContext
443  * - only checking with diliverPred using the first message
444  * @details only the Clients that handles the Message will get it of course
445  * This function is threadsafe, which means you can call it anywhere in the code
446  *
447  * @param begin a forward iterator point at the start of the range
448  * @param n length of the range
449  */
450  template <typename ForwardIt>
451  typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::value, void>::type
452  send(ForwardIt begin, size_t n) {
453  if (!n) return;
454  using Message = decltype(*begin);
455  using M = typename std::remove_reference<Message>::type;
456  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
457  auto& entries = std::get<i>(msgConduits_);
458  for (auto i = 0u; i < entries.size(); ++i) {
459  auto& e = entries[i];
460  if (e.diliverPred(*begin)) {
461  while (!e.transport->buffer.template tryPutBatchInPlace<MessageWrap<M>>(begin, n)) {}
462  }
463  }
464  }
465 
466  template <typename ForwardIt>
467  typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::value, void>::type
468  send(ForwardIt begin, size_t n) {}
469 
470 private:
471  void start(){}
472 
473  struct Transport {
474  Transport(size_t maxItemSize, size_t capacity)
475  : buffer(maxItemSize + sizeof(MessageHead), capacity){}
477  };
478 
479  template <typename Message>
480  struct TransportEntry {
481  std::shared_ptr<Transport> transport;
482  std::function<bool (Message const&)> diliverPred;
483  };
484  template <typename Message>
485  using TransportEntries = std::vector<TransportEntry<Message>>;
486 
487  template <typename Tuple> struct MCGen;
488  template <typename ...Messages>
489  struct MCGen<std::tuple<Messages ...>> {
490  using type = std::tuple<TransportEntries<Messages> ...>;
491  };
492 
493  using MsgConduits = typename MCGen<Interests>::type;
494 
495  MsgConduits msgConduits_;
496  using Threads = std::vector<std::thread>;
497  Threads threads_;
498  bool stopped_;
499 
500  template <typename MsgConduits, typename DeliverPred, typename Tuple>
501  struct setupConduit {
502  void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
503  };
504 
505  struct createEntry {
506  template <bool careMsg, size_t i, typename M, typename MsgConduits, typename DeliverPred>
507  typename std::enable_if<careMsg, void>::type
508  doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
509  std::get<i>(msgConduits).emplace_back(TransportEntry<M>{t, pred});
510  }
511 
512  template <bool careMsg, size_t i, typename M, typename MsgConduits, typename DeliverPred>
513  typename std::enable_if<!careMsg, void>::type
514  doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
515  }
516  };
517 
518  template <typename MsgConduits, typename DeliverPred, typename M, typename ...Messages>
519  struct setupConduit<MsgConduits, DeliverPred, std::tuple<M, Messages...>> {
520  void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
521  auto constexpr i = index_in_tuple<M, Interests>::value;
522  createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
523  msgConduits, t, pred);
524  setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
525  (msgConduits,t, std::forward<DeliverPred>(pred));
526  }
527  };
528 
529  template <typename MsgConduits, typename M, typename ...Messages>
530  struct setupConduit<MsgConduits, void*, std::tuple<M, Messages...>> {
531  void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, void*){
532  auto constexpr i = index_in_tuple<M, Interests>::value;
533  createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
534  msgConduits, t, [](M const&){return true;});
535  setupConduit<MsgConduits, void*, std::tuple<Messages...>>()
536  (msgConduits, t, nullptr);
537  }
538  };
539 
540  template <typename Message>
541  struct can_handle {
542  private:
543  using M = typename std::remove_reference<Message>::type;
544  public:
545  enum {
546  index = index_in_tuple<TransportEntries<M>, MsgConduits>::value,
547  value = index < std::tuple_size<MsgConduits>::value?1:0,
548  };
549  };
550 
551  template <typename Client, typename RunOnceFunc>
552  auto kickOffClientThread(RunOnceFunc runOnceFunc, Client& c, pattern::BlockingBuffer* buffer
553  , uint64_t mask, time::Duration maxBlockingTime) {
554  std::thread thrd([
555  this
556  , &c
557  , buffer
558  , mask
559  , maxBlockingTime
560  , runOnceFunc]() {
561  std::string name;
562  char const* schedule;
563  int priority;
564 
565  if (c.hmbdcName()) {
566  name = c.hmbdcName();
567  } else {
568  name = "hmbdc-b";
569  }
570  auto cpuAffinityMask = mask;
571  std::tie(schedule, priority) = c.schedSpec();
572 
573  if (!schedule) schedule = "SCHED_OTHER";
574 
575  os::configureCurrentThread(name.c_str(), cpuAffinityMask
576  , schedule, priority);
577  c.messageDispatchingStartedCb(0xffff);
578 
579  while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
580  }
581  if (this->stopped_) c.dropped();
582  }
583  );
584 
585  return move(thrd);
586  }
587 };
588 }}
Definition: BlockingBuffer.hpp:16
Definition: Traits.hpp:79
std::enable_if< can_handle< Message >::value, bool >::type trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:370
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:57
std::enable_if< can_handle< decltype(*(ForwardIt()))>::value, void >::type send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:452
Definition: BlockingContext.hpp:473
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::remove_reference< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:255
std::enable_if< can_handle< Message >::value, void >::type send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:306
Definition: TypedString.hpp:74
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:67
Definition: Timers.hpp:65
Definition: BlockingContext.hpp:487
Definition: Traits.hpp:43
std::enable_if< can_handle< Message >::value, bool >::type trySendInPlace(Args &&... args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:414
Definition: BlockingContext.hpp:480
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingContext.hpp:157
Definition: BlockingContext.hpp:167
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:154
Definition: Message.hpp:42
Definition: BlockingContext.hpp:505
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:219
std::enable_if< can_handle< Message >::value, void >::type sendInPlace(Args &&... args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:338
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:289
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:280
Definition: Message.hpp:76
Definition: Time.hpp:125
BlockingContext()
trivial ctor
Definition: BlockingContext.hpp:175
Definition: BlockingContext.hpp:501
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:91
Definition: Traits.hpp:95
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Exception that just has an exit code.
Definition: Exception.hpp:28
Definition: BlockingBuffer.hpp:225
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:157
Definition: Base.hpp:12
Definition: BlockingContext.hpp:19