hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #include "hmbdc/app/StuckClientPurger.hpp"
6 #include "hmbdc/Config.hpp"
7 #include "hmbdc/numeric/BitMath.hpp"
8 
9 #include <memory>
10 #include <vector>
11 #include <list>
12 
13 namespace hmbdc { namespace app {
14 
15 
16 /**
17 * @example hello-world.cpp
18 * @example hmbdc.cpp
19 * @example hmbdc-log.cpp
20 * @example ipc-market-data-propagate.cpp
21 */
22 
23 /**
24  * @namespace hmbdc::app::context_property
25  * contains the trait types that defines how a Context behave and capabilities
26  */
27 namespace context_property {
28  /**
29  * @class broadcast
30  * @brief Context template parameter inidcating each message is
31  * sent to all clients within the Context.
32  * This is the default property of a Context.
33  * @details each message is still subjected to Client's message type
34  * filtering. In the case of ipc Context
35  * it is also sent to all clients in the attached ipc Contexts.
36  * When this Context is specialized using this type, the context normally
37  * works with heterogeneous Clients and all Clients can talk to each
38  * other thru the Context. Load balance among Clients can be achieved by
39  * participating Clients coordinatedly select message to process
40  * In addtion to the direct mode Clients, a Client running pool is supported
41  * with the Context - see pool related functions in Context.
42  *
43  * Implicit usage in hello-world.cpp: @snippet hello-world.cpp broadcast as default
44  * Explicit usage in hmbdc.cpp @snippet hmbdc.cpp explicit using broadcast
45  * There is no hard coded limit on how many Clients can be added into a pool
46  * Also, there is no limit on when you can add a Client into a pool.
47  * @tparam max_parallel_consumer, normally, there isn't a need to specify it.
48  * max thread counts that processes messages
49  * that incudes pool threads plus the count of direct mode Clients that
50  * registers messages within the Context
51  * supported values: 4(default)
52  * 2,8,16,32,64 requires hmbdc licensed
53  */
54  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
55  struct broadcast{
56  static_assert(max_parallel_consumer >= 4u
58  };
59 
60  /**
61  * @class partition
62  * @brief Context template parameter inidcating each message is
63  * sent to one and only one of the clients within the Context
64  * and its attached ipc Contexts if appllies.
65  * @details each message is still subjected to Client's message type
66  * filtering
67  * When this Context is specialized using this type, the context normally
68  * works with homogeneous Clients to achieve load balance thru threads. No
69  * coordination is needed between Clients.
70  * Only the direct mode Clients are supported, thread pool is NOT supported
71  * by this kind of Context - the pool related functions in Context are also disabled
72  *
73  * Example in server-cluster.cpp: @snippet server-cluster.cpp declare a partition context
74  */
75  struct partition{};
76 
77  /**
78  * @class msgless_pool
79  * @brief Context template parameter indicating the Context must contain a pool to run Clients
80  * and the Clients in the pool shall not receive messages - Unlike the default pool.
81  * @details msgless_pool performs better when its Clients don't need to receive messages from the Context.
82  * This is useful when the Clients are network transport engines. By default, partition Context
83  * don't come with a pool due to semantic reason, but this Context property enables a pool that
84  * does not deliver messages.
85  */
86  struct msgless_pool{};
87 
88  /**
89  * @class ipc_creator
90  * @brief Context template parameter indicating the Context is ipc enabled and
91  * it can be attached (see ipc_attacher below) to an ipc transport (thru its name).
92  * @details In addition to the normal Context functions, the Context acts as
93  * the creator (owner) of the named ipc transport.
94  * Since it performs a critical function to purge crushed or
95  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
96  * expected to be running (started) as long as ipc functions.
97  * ipc transport uses persistent shared memory and if the dtor of Context is not called
98  * due to crashing, there will be stale shared memory in /dev/shm.
99  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
100  */
101  struct ipc_creator{};
102 
103  /**
104  * @class ipc_attacher
105  * @brief Context template parameter indicating the Context is ipc enabled and
106  * it can attach to an ipc transport thru a name.
107  * @details it is very important that the Context is constructed exactly
108  * the same size (see constructor) and type (partition vs broadcast) as the ipc
109  * transport creator specified (ipc_creator Context).
110  * All Contexts attaching to a single ipc transport collectively are subjected to the
111  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
112  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
113  */
114  struct ipc_attacher{};
115 }
116 }}
117 
118 #include "hmbdc/app/ContextDetail.hpp"
119 namespace hmbdc { namespace app {
120 
121 namespace context_detail {
122 using namespace std;
123 using namespace hmbdc::pattern;
124 
125 /**
126  * @class ThreadCommBase<>
127  * @brief covers the inter-thread and ipc communication fascade
128  * @details this type's interface is exposed thru Context and the type itself is
129  * not directly used by users
130  * @tparam MaxMessageSize What is the max message size, need at compile time
131  * if the value can only be determined at runtime, set this to 0. Things can still work
132  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
133  * @tparam ContextProperties see types in context_property namespace
134  */
135 template <size_t MaxMessageSize, typename... ContextProperties>
137  : private context_detail::context_property_aggregator<ContextProperties...> {
138  using cpa = context_property_aggregator<ContextProperties...>;
139  using Buffer = typename cpa::Buffer;
140  using Allocator = typename cpa::Allocator;
141 
142  enum {
143  MAX_MESSAGE_SIZE = MaxMessageSize,
144  BUFFER_VALUE_SIZE = MaxMessageSize + 8u, //8bytes for wrap
145  };
146 
147  size_t maxMessageSize() const {
148  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
149  return MaxMessageSize;
150  }
151 
152  /**
153  * @brief try send a batch of messages to the Context or attached ipc Contexts
154  * @details only the Clients that handles the Message will get it of course
155  * This function is threadsafe, which means you can call it anywhere in the code
156  *
157  * @param msgs messages
158  * @tparam Messages message types
159  */
160  template <typename M0, typename M1, typename ... Messages>
161  typename std::enable_if<!std::is_integral<M1>::value, void>::type
162  send(M0&& m0, M1&& m1, Messages&&... msgs) {
163  auto n = sizeof...(msgs) + 2;
164  auto it = buffer_.claim(n);
165  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
166  buffer_.commit(it, n);
167  }
168 
169  /**
170  * @brief try to send a batch of message to the Context or attached ipc Contexts
171  * @details this call does not block and it is transactional - send all or none
172  * This function is threadsafe, which means you can call it anywhere in the code
173  *
174  * @param msgs messages
175  * @tparam Messages message types
176  *
177  * @return true if send successfully
178  */
179  template <typename M0, typename M1, typename ... Messages>
180  typename std::enable_if<!std::is_integral<M1>::value, bool>::type
181  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
182  auto n = sizeof...(msgs) + 2;
183  auto it = buffer_.tryClaim(n);
184  if (it) {
185  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
186  buffer_.commit(it, n);
187  return true;
188  }
189 
190  return false;
191  }
192 
193  /**
194  * @brief send a range of messages to the Context or attached ipc Contexts
195  * @details only the Clients that handles the Message will get it of course
196  * This function is threadsafe, which means you can call it anywhere in the code
197  *
198  * @param begin a forward iterator point at the start of the range
199  * @param n length of the range
200  */
201  template <typename ForwardIt>
202  void
203  send(ForwardIt begin, size_t n) {
204  if (hmbdc_likely(n)) {
205  auto bit = buffer_.claim(n);
206  auto it = bit;
207  for (auto i = 0ul; i < n; i++) {
208  using Message = typename iterator_traits<ForwardIt>::value_type;
209  new (*it++) MessageWrap<Message>(*begin++);
210  }
211  buffer_.commit(bit, n);
212  }
213  }
214 
215  /**
216  * @brief try send a range of messages to the Context or attached ipc Contexts
217  * @details this call does not block and it is transactional - send all or none
218  * This function is threadsafe, which means you can call it anywhere in the code
219  *
220  * @param begin a forward iterator point at the start of the range
221  * @param n length of the range
222  */
223  template <typename ForwardIt>
224  bool
225  trySend(ForwardIt begin, size_t n) {
226  if (hmbdc_likely(n)) {
227  auto bit = buffer_.tryClaim(n);
228  if (hmbdc_unlikely(!bit)) return false;
229  auto it = bit;
230  for (auto i = 0ul; i < n; i++) {
231  using Message = typename iterator_traits<ForwardIt>::value_type;
232  new (*it++) MessageWrap<Message>(*begin++);
233  }
234  buffer_.commit(bit, n);
235  }
236  return true;
237  }
238 
239  /**
240  * @brief send a message to the Context or attached ipc Contexts
241  * @details only the Clients that handles the Message will get it of course
242  * This function is threadsafe, which means you can call it anywhere in the code
243  *
244  * @param m message
245  * @tparam Message type
246  */
247  template <typename Message>
248  void send(Message&& m) {
249  using M = typename std::remove_reference<Message>::type;
250  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
251  , "message too big");
252  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
253  HMBDC_THROW(std::out_of_range, "message too big");
254  }
255  buffer_.put(MessageWrap<M>(std::forward<Message>(m)));
256  }
257 
258  /**
259  * @brief try to send a message to the Context or attached ipc Contexts if it wouldn't block
260  * @details this call does not block - return false when buffer is full
261  * This function is threadsafe, which means you can call it anywhere in the code
262  *
263  * @param m message
264  * @tparam Message type
265  * @return true if send successfully
266  */
267  template <typename Message>
268  bool trySend(Message&& m) {
269  using M = typename std::remove_reference<Message>::type;
270  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
271  , "message too big");
272  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
273  HMBDC_THROW(std::out_of_range, "message too big");
274  }
275  return buffer_.tryPut(MessageWrap<M>(std::forward<Message>(m)));
276  }
277 
278  /**
279  * @brief send a message to all Clients in the Context or attached ipc Contexts
280  * @details construct the Message in buffer directly
281  * This function is threadsafe, which means you can call it anywhere in the code
282  *
283  * @param args ctor args
284  * @tparam Message type
285  * @tparam typename ... Args args
286  */
287  template <typename Message, typename ... Args>
288  void sendInPlace(Args&&... args) {
289  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
290  , "message too big");
291  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
292  HMBDC_THROW(std::out_of_range, "message too big");
293  }
294  buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
295  }
296 
297 
298  /**
299  * @brief try send a message to all Clients in the Context or attached ipc Contexts if it wouldn't block
300  * @details this call does not block - return false when buffer is full
301  * constructed the Message in buffer directly if returns true
302  * This function is threadsafe, which means you can call it anywhere in the code
303  *
304  * @param args ctor args
305  * @tparam Message type
306  * @tparam typename ... Args args
307  * @return true if send successfully
308  */
309  template <typename Message, typename ... Args>
310  bool trySendInPlace(Args&&... args) {
311  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
312  , "message too big");
313  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
314  HMBDC_THROW(std::out_of_range, "message too big");
315  }
316  return buffer_.template tryPutInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
317  }
318 
319  /**
320  * @brief accessor - mostly used internally
321  * @return underlying buffer used in the Context
322  */
323  Buffer& buffer() {
324  return buffer_;
325  }
326 
327 
328 protected:
329  ThreadCommBase(uint32_t messageQueueSizePower2Num
330  , size_t maxMessageSizeRuntime
331  , char const* shmName)
332  : allocator_(shmName
333  , Buffer::footprint(maxMessageSizeRuntime + 8u
334  , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
335  )
336  , bufferptr_(allocator_.template allocate<Buffer>(SMP_CACHE_BYTES
337  , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
338  , allocator_)
339  )
340  , buffer_(*bufferptr_) {
341  if (messageQueueSizePower2Num < 2) {
342  HMBDC_THROW(std::out_of_range
343  , "messageQueueSizePower2Num need >= 2");
344  }
345  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
346  HMBDC_THROW(std::out_of_range
347  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
348  }
349  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
350  primeBuffer<(cpa::create_ipc || (!cpa::create_ipc && !cpa::attach_ipc)) && cpa::has_pool>();
351  if (cpa::create_ipc || cpa::attach_ipc) {
352  sleep(2);
353  }
354  }
355 
356  ~ThreadCommBase() {
357  allocator_.unallocate(bufferptr_);
358  }
359 
360  static
361  void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
362  // does not apply
363  }
364 
365  template <typename BroadCastBuf>
366  static
367  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
368  for (uint16_t i = poolThreadCount;
369  i < BroadCastBuf::max_parallel_consumer;
370  ++i) {
371  buffer.markDead(i);
372  }
373  }
374 
375 
376  static
377  void markDead(pattern::MonoLockFreeBuffer& buffer, std::list<uint16_t>slots) {
378  // does not apply
379  }
380 
381  template <typename BroadCastBuf>
382  static
383  void markDead(BroadCastBuf& buffer, std::list<uint16_t>slots) {
384  for (auto s : slots) {
385  buffer.markDead(s);
386  }
387  }
388 
389  Allocator allocator_;
390  Buffer* HMBDC_RESTRICT bufferptr_;
391  Buffer& HMBDC_RESTRICT buffer_;
392 
393 private:
394  template <bool doIt>
395  typename std::enable_if<doIt, void>::type
396  primeBuffer() {
397  markDeadFrom(buffer_, 0);
398  }
399 
400  template <bool doIt>
401  typename std::enable_if<!doIt, void>::type
402  primeBuffer() {
403  }
404 
405  template <typename M, typename... Messages>
406  void sendRecursive(typename Buffer::iterator it
407  , M&& msg, Messages&&... msgs) {
408  using Message = typename std::remove_reference<M>::type;
409  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
410  , "message too big");
411  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
412  HMBDC_THROW(std::out_of_range, "message too big");
413  }
414  new (*it) MessageWrap<Message>(msg);
415  sendRecursive(++it, std::forward<M>(msgs)...);
416  }
417  void sendRecursive(typename Buffer::iterator) {}
418 
419  size_t maxMessageSizeRuntime_;
420 };
421 
422 } //context_detail
423 
424 /**
425  * @example hmbdc.cpp
426  * @example server-cluster.cpp
427  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
428  * are in direct mode. Pool related interfaces are turned off in compile time
429  */
430 
431 /**
432  * @class Context<>
433  * @brief A Context is like a media object that facilitates the communications
434  * for the Clients that it is holding.
435  * a Client can only be added to (or started within) once to a single Context,
436  * undefined behavior otherwise.
437  * the communication model is determined by the context_property
438  * by default it is in the nature of broadcast fashion within local process indicating
439  * by broadcast<>
440  *
441  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
442  * a Client running in such a Context can either run in the pool mode or a direct mode
443  * (which means the Client has its own dedicated OS thread)
444  * direct mode provides faster responses, and pool mode provides more flexibility.
445  * It is recommended that the total number of threads (pool threads + direct threads)
446  * not exceeding the number of available cores.
447  * @tparam MaxMessageSize What is the max message size if known
448  * at compile time(compile time sized);
449  * if the value can only be determined at runtime (run time sized), set this to 0.
450  * Things can still work but will lost some compile time checking advantages,
451  * see maxMessageSizeRuntime below
452  * @tparam ContextProperties see context_property namespace
453  */
454 template <size_t MaxMessageSize = 0, typename... ContextProperties>
455 struct Context
456 : context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...> {
457  using Base = context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...>;
458  using Buffer = typename Base::Buffer;
459  using cpa = typename Base::cpa;
460  using Pool = typename std::conditional<cpa::pool_msgless
462  , pattern::PoolT<Buffer>>::type;
463  /**
464  * @brief ctor for construct local non-ipc Context
465  * @details won't compile if calling it for ipc Context
466  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024 (messages, not bytes)
467  * @param maxPoolClientCount up to how many Clients the pool is suppose to support, only used when
468  * pool supported in the Context with broadcast property
469  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
470  * @param maxThreadSerialNumber the max number of threads (direct mmode clients plus pool threads)
471  * the context can manage
472  */
473  Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
474  , size_t maxPoolClientCount = MaxMessageSize?128:0
475  , size_t maxMessageSizeRuntime = MaxMessageSize
476  , size_t maxThreadSerialNumber = 64)
477  : Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
478  , maxMessageSizeRuntime, nullptr)
479  , usedHmbdcCapacity_(0)
480  , stopped_(false)
481  , pool_(createPool<cpa>(maxPoolClientCount))
482  , poolThreadCount_(0) {
483  static_assert(!cpa::create_ipc && !cpa::attach_ipc
484  , "no name specified for ipc Context");
485  }
486 
487  /**
488  * @brief ctor for construct local ipc Context
489  * @details won't compile if calling it for local non-ipc Context
490  *
491  * @param ipcTransportName the id to identify an ipc transport that supports
492  * a group of attached together Contexts and their Clients
493  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
494  * 1024 (messages, not bytes)
495  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
496  * only used when pool supported in the Context with broadcast property
497  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
498  * @param purgerCpuAffinityMask which cores to run the low profile (sleep mostly)
499  * thread in charge of purging crashed Clients. Used only for ipc_creator Contexts.
500  * @param maxThreadSerialNumber the max number of threads (direct mmode clients plus pool threads)
501  * the context can manage
502  */
503  Context(char const* ipcTransportName
504  , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
505  , size_t maxPoolClientCount = MaxMessageSize?128:0
506  , size_t maxMessageSizeRuntime = MaxMessageSize
507  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful
508  , size_t maxThreadSerialNumber = 64)
509  : Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName)
510  , usedHmbdcCapacity_(0)
511  , stopped_(false)
512  , pool_(createPool<cpa>(maxPoolClientCount))
513  , poolThreadCount_(0)
514  , secondsBetweenPurge_(60)
515  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
516  static_assert(cpa::create_ipc || cpa::attach_ipc
517  , "ctor can only be used with ipc turned on Context");
518  static_assert(!(cpa::create_ipc && cpa::attach_ipc)
519  , "Context cannot be both ipc_creator and ipc_attacher");
520  }
521 
522  /**
523  * @brief dtor
524  * @details if this Context owns ipc transport, notify all attached processes
525  * that read from it that this tranport is dead
526  */
528  if (cpa::create_ipc) {
529  Base::markDeadFrom(this->buffer_, 0);
530  }
531  stop();
532  join();
533  }
534 
535  /**
536  * @brief add a client to Context's pool - the Client is run in pool mode
537  * @details if pool is already started, the client is to get current Messages immediatly
538  * - might miss older messages.
539  * if the pool not started yet, the Client does not get messages or other callbacks until
540  * the Pool starts.
541  * This function is threadsafe, which means you can call it anywhere in the code
542  * @tparam Client client type
543  * @param client to be added into the Pool
544  * @param poolThreadAffinityIn pool is powered by a number of threads
545  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
546  * it is possible to have a Client to use just some of the threads in the Pool
547  * - default to use all.
548  *
549  */
550  template <typename Client>
551  void addToPool(Client &client
552  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
553  static_assert(cpa::has_pool, "pool is not support in the Context type");
554  if (std::is_base_of<single_thread_powered_client, Client>::value
555  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
556  && poolThreadCount_ != 1) {
557  HMBDC_THROW(std::out_of_range
558  , "cannot add a single thread powered client to the non-single"
559  "thread powered pool without specifying a single thread poolThreadAffinity"
560  );
561  }
562  auto stub = new context_detail::PoolConsumerProxy<Client>(client);
563  pool_->addConsumer(*stub, poolThreadAffinityIn);
564 
565  }
566 
567  /**
568  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
569  * @details if pool is already started, the client is to get current Messages immediatly
570  * - might miss older messages.
571  * if the pool not started yet, the Client does not get messages or other callbacks until
572  * the Pool starts.
573  * This function is threadsafe, which means you can call it anywhere in the code
574  * @tparam Client client type
575  * @param client to be added into the Pool
576  * @param poolThreadAffinityIn pool is powered by a number of threads
577  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
578  * it is possible to have a Client to use just some of the threads in the Pool
579  * - default to use all.
580  * @param args more client and poolThreadAffinityIn pairs can follow
581  */
582  template <typename Client, typename ... Args>
583  void addToPool(Client &client
584  , uint64_t poolThreadAffinityIn, Args&& ...args) {
585  addToPool(client, poolThreadAffinityIn);
586  addToPool(std::forward<Args>(args)...);
587  }
588 
589  /**
590  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
591  * @details the implementatiotn tells all
592  * if the pool not started yet, the Client does not get messages or other callbacks until
593  * the Pool starts.
594  * This function is threadsafe, which means you can call it anywhere in the code
595  * @tparam Client client type
596  * @tparam Client2 client2 type
597  * @param client to be added into the Pool using default poolThreadAffinity
598  * @param client2 to be added into the Pool
599  * @param args more client (and/or poolThreadAffinityIn pairs can follow
600  */
601  template <typename Client, typename Client2, typename ... Args>
602  typename std::enable_if<!std::is_integral<Client2>::value, void>::type
603  addToPool(Client &client, Client2 &client2, Args&& ...args) {
604  addToPool(client);
605  addToPool(client2, std::forward<Args>(args)...);
606  }
607 
608  /**
609  * @brief return the numebr of clients added into pool
610  * @details the number could change since the clients could be added in another thread
611  * @return client count
612  */
613  size_t clientCountInPool() const {
614  static_assert(cpa::has_pool, "pool is not support in the Context type");
615  return pool_->consumerSize();
616  }
617 
618  /**
619  * @brief how many parallel consummers are started
620  * @details the dynamic value could change after the call returns
621  * see max_parallel_consumer Context property
622  * @return how many parallel consummers are started
623  */
624  size_t parallelConsumerAlive() const {
625  return this->buffer_.parallelConsumerAlive();
626  }
627  /**
628  * @brief start the context by specifying what are in it (Pool and/or direct Clients)
629  * and their paired up cpu affinities.
630  * @details All direct mode or clients in a pool started by a single start
631  * statement are dispatched with starting from the same event
632  * (subjected to event filtering of each client).
633  * many compile time and runtime check is done, for example:
634  * won't compile if start a pool in a Context does not support one;
635  * exception throw if the Context capacity is reached or try to start a second pool, etc.
636  *
637  * Usage example:
638  *
639  * @code
640  * // the following starts the pool powered by 3 threads that are affinitied to
641  * // the lower 8 cores; client0 affinitied to 4th core and client1 affinitied to 5th core
642  * ctx.start(3, 0xfful, client0, 0x8ul, client1, 0x10ul);
643  *
644  * // the following starts 2 direct mode Clients (client2 and client3)
645  * ctx.start(client2, 0x3ul, client3, 0xful);
646  * @endcode
647  *
648  * @tparam typename ...Args types
649  *
650  * @param args paired up args in the form of (pool-thread-count|client, cpuAffinity)*.
651  * see examples above.
652  * If a cpuAffinity is 0, each thread's affinity rotates to one of the CPUs in the system.
653  */
654  template <typename ...Args>
655  void
656  start(Args&& ... args) {
657  startWithContextProperty<cpa>(std::forward<Args>(args) ...);
658  }
659 
660  /**
661  * @brief stop the message dispatching - asynchronously
662  * @details asynchronously means not garanteed message dispatching
663  * stops immidiately after this non-blocking call
664  */
665  void
666  stop() {
667  stopWithContextProperty<cpa>();
668  }
669 
670  /**
671  * @brief wait until all threads (Pool threads too if apply) of the Context exit
672  * @details blocking call
673  */
674  void
675  join() {
676  joinWithContextProperty<cpa>();
677  }
678 
679  /**
680  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
681  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
682  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
683  * @details If some Client are known to
684  * take long to process messages, increase it. If you need to remove slow Clients quickly
685  * reduce it.
686  * Only effective for ipc_creator Context.
687  *
688  * @param s seconds
689  */
690  void
692  secondsBetweenPurge_ = s;
693  }
694 
695  /**
696  * @brief normally not used until you want to run your own message loop
697  * @details call this function frequently to pump hmbdc message loop in its pool
698  *
699  * @param threadSerialNumber starting from 0, indicate which thread in the pool
700  * is powering the loop
701  */
702  void
703  runPoolThreadOnce(uint16_t threadSerialNumberInPool) {
704  static_assert(cpa::has_pool, "pool is not support in the Context type");
705  pool_->runOnce(threadSerialNumberInPool);
706  }
707 
708  /**
709  * @brief normally not used until you want to run your own message loop
710  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
711  *
712  * @param threadSerialNumber indicate which thread is powering the loop
713  * @param c the Client
714  */
715  template <typename Client>
716  void runClientThreadOnce(uint16_t threadSerialNumber, Client& c) {
717  c.messageDispatchingStarted(
718  hmbdcNumbers_[threadSerialNumber]); //lower level ensures executing only once
719  context_detail::runOnceImpl(
720  hmbdcNumbers_[threadSerialNumber], stopped_, this->buffer_, c);
721  }
722 
723 private:
724  template <typename cpa>
725  typename std::enable_if<cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
726  createPool(size_t maxPoolClientCount) {
727  return Pool::create(this->buffer(), maxPoolClientCount);
728  }
729 
730  template <typename cpa>
731  typename std::enable_if<cpa::pool_msgless, typename Pool::ptr>::type
732  createPool(size_t maxPoolClientCount) {
733  return Pool::create(maxPoolClientCount);
734  }
735 
736  template <typename cpa>
737  typename std::enable_if<!cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
738  createPool(size_t) {
739  return typename Pool::ptr();
740  }
741 
742  template <typename cpa>
743  typename std::enable_if<cpa::has_pool, void>::type
744  stopWithContextProperty() {
745  if (pool_) pool_->stop();
746  __atomic_thread_fence(__ATOMIC_ACQUIRE);
747  stopped_ = true;
748  }
749 
750  template <typename cpa>
751  typename std::enable_if<!cpa::has_pool, void>::type
752  stopWithContextProperty() {
753  __atomic_thread_fence(__ATOMIC_ACQUIRE);
754  stopped_ = true;
755  }
756 
757  template <typename cpa>
758  typename std::enable_if<cpa::has_pool, void>::type
759  joinWithContextProperty() {
760  if (pool_) pool_->join();
761  for (auto& t : threads_) {
762  t.join();
763  }
764  threads_.clear();
765  }
766 
767  template <typename cpa>
768  typename std::enable_if<!cpa::has_pool, void>::type
769  joinWithContextProperty() {
770  for (auto& t : threads_) {
771  t.join();
772  }
773  threads_.clear();
774  }
775 
776  template <typename cpa>
777  void
778  reserveSlots(std::list<uint16_t>&) {
779  }
780 
781  template <typename cpa, typename ...Args>
782  typename std::enable_if<cpa::broadcast_msg && !cpa::pool_msgless, void>::type
783  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
784  auto available = this->buffer_.unusedConsumerIndexes();
785  if (available.size() < poolThreadCount) {
786  HMBDC_THROW(std::out_of_range
787  , "Context remaining capacilty = " << available.size()
788  << ", consider increasing max_parallel_consumer");
789  }
790  for (uint16_t i = 0; i < poolThreadCount; ++i) {
791  slots.push_back(available[i]);
792  this->buffer_.reset(available[i]);
793  }
794  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
795  }
796 
797  template <typename cpa, typename ...Args>
798  typename std::enable_if<!cpa::broadcast_msg || cpa::pool_msgless, void>::type
799  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
800  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
801  }
802 
803  template <typename cpa, typename CcClient, typename ...Args>
804  typename std::enable_if<cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
805  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
806  const bool clientParticipateInMessaging =
807  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
808  if (clientParticipateInMessaging) {
809  auto available = this->buffer_.unusedConsumerIndexes();
810  if (!available.size()) {
811  HMBDC_THROW(std::out_of_range
812  , "Context reached capacity, consider increasing max_parallel_consumer");
813  }
814  this->buffer_.reset(available[0]);
815  slots.push_back(available[0]);
816  }
817  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
818  }
819 
820  template <typename cpa, typename CcClient, typename ...Args>
821  typename std::enable_if<!cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
822  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
823  }
824 
825  template <typename cpa, typename ...Args>
826  typename std::enable_if<cpa::create_ipc || cpa::attach_ipc, void>::type
827  startWithContextProperty(Args&& ... args) {
828  auto& lock = this->allocator_.fileLock();
829  std::lock_guard<decltype(lock)> g(lock);
830  std::list<uint16_t> slots;
831  try {
832  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
833  auto sc = slots;
834  startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
835  } catch (std::out_of_range const&) {
836  Base::markDead(this->buffer_, slots);
837  throw;
838  }
839  }
840 
841  template <typename cpa, typename ...Args>
842  typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
843  startWithContextProperty(Args&& ... args) {
844  std::list<uint16_t> slots;
845  try {
846  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
847  auto sc = slots;
848  startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
849  } catch (std::out_of_range const&) {
850  Base::markDead(this->buffer_, slots);
851  throw;
852  }
853  }
854 
855  template <typename cpa>
856  typename std::enable_if<cpa::broadcast_msg && cpa::create_ipc, void>::type
857  startWithContextPropertyImpl(std::list<uint16_t>& slots) {
858  if (!purger_) {
859  purger_.reset(
860  new StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
861  startWithContextPropertyImpl<cpa>(slots, *purger_, purgerCpuAffinityMask_);
862  }
863  }
864 
865  template <typename cpa>
866  typename std::enable_if<!cpa::broadcast_msg || !cpa::create_ipc, void>::type
867  startWithContextPropertyImpl(std::list<uint16_t>& slots) {
868  }
869 
870  template <typename cpa, typename ...Args>
871  typename std::enable_if<cpa::has_pool, void>::type
872  startWithContextPropertyImpl(std::list<uint16_t>& slots
873  , uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
874  , Args&& ... args) {
875  using namespace std;
876  if (poolThreadCount_) {
877  HMBDC_THROW(std::out_of_range, "Context pool already started");
878  }
879  std::vector<uint16_t> sc(slots.begin(), slots.end());
880  if (!poolThreadsCpuAffinityMask) {
881  auto cpuCount = std::thread::hardware_concurrency();
882  poolThreadsCpuAffinityMask =
883  ((1ul << poolThreadCount) - 1u) << (hmbdcNumbers_.size() % cpuCount);
884  }
885 
886  pool_->startAt(poolThreadCount, poolThreadsCpuAffinityMask, sc);
887  while(poolThreadCount--) {
888  if (!cpa::pool_msgless) {
889  hmbdcNumbers_.push_back(*slots.begin());
890  slots.pop_front();
891  }
892  }
893  poolThreadCount_ = poolThreadCount;
894  startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
895  }
896 
897  template <typename cpa, typename Client, typename ...Args>
898  typename std::enable_if<!std::is_integral<Client>::value, void>::type
899  startWithContextPropertyImpl(std::list<uint16_t>& slots
900  , Client& c, uint64_t cpuAffinity
901  , Args&& ... args) {
902  auto clientParticipateInMessaging =
903  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
904  uint16_t hmbdcNumber = 0xffffu;
905  if (clientParticipateInMessaging && cpa::broadcast_msg) {
906  hmbdcNumber = *slots.begin();
907  slots.pop_front();
908  }
909  auto thrd = kickOffClientThread(
910  c, cpuAffinity, hmbdcNumber, hmbdcNumbers_.size());
911  threads_.push_back(move(thrd));
912  hmbdcNumbers_.push_back(hmbdcNumber);
913  startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
914  }
915 
916  template <typename Client>
917  auto kickOffClientThread(
918  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
919  std::thread thrd([
920  this
921  , &c
922  , mask
923  , h=hmbdcNumber
924  , threadSerialNumber
925  ]() {
926  auto hmbdcNumber = h;
927  std::string name;
928  char const* schedule;
929  int priority;
930  auto clientParticipateInMessaging =
931  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
932 
933 
934  if (c.hmbdcName()) {
935  name = c.hmbdcName();
936  } else {
937  if (clientParticipateInMessaging) {
938  name = "hmbdc" + std::to_string(hmbdcNumber);
939  } else {
940  name = "hmbdc-x";
941  }
942  }
943  auto cpuAffinityMask = mask;
944  std::tie(schedule, priority) = c.schedSpec();
945 
946  if (!schedule) schedule = "SCHED_OTHER";
947 
948  if (!mask) {
949  auto cpuCount = std::thread::hardware_concurrency();
950  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
951  }
952 
953  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
954  , schedule, priority);
955 
956  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
957  c.messageDispatchingStartedCb(hmbdcNumber);
958 
959  while(!stopped_ &&
960  context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
961  }
962  if (this->stopped_) c.dropped();
963  if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
964  }
965  );
966 
967  return move(thrd);
968  }
969 
970  Context(Context const&) = delete;
971  Context& operator = (Context const&) = delete;
972  uint16_t usedHmbdcCapacity_;
973  std::vector<uint16_t> hmbdcNumbers_;
974 
975  bool stopped_;
976  typename Pool::ptr pool_;
977  using Threads = std::vector<std::thread>;
978  Threads threads_;
979  size_t poolThreadCount_;
980  uint32_t secondsBetweenPurge_;
981  uint64_t purgerCpuAffinityMask_;
982  typename std::conditional<cpa::broadcast_msg && cpa::create_ipc
983  , std::unique_ptr<StuckClientPurger<Buffer>>, uint32_t
984  >::type purger_;
985 };
986 
987 }}
988 
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:624
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:716
Definition: MonoLockFreeBuffer.hpp:15
Definition: StuckClientPurger.hpp:11
Context template parameter indicating the Context must contain a pool to run Clients and the Clients ...
Definition: Context.hpp:86
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:666
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:136
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:675
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:75
std::enable_if<!std::is_integral< Client2 >::value, void >::type addToPool(Client &client, Client2 &client2, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:603
Definition: TypedString.hpp:74
Definition: PoolMinus.hpp:9
the default vanilla allocate
Definition: Allocators.hpp:116
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:248
Definition: BlockingBuffer.hpp:11
void runPoolThreadOnce(uint16_t threadSerialNumberInPool)
normally not used until you want to run your own message loop
Definition: Context.hpp:703
std::enable_if<!std::is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&...msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:162
void sendInPlace(Args &&...args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:288
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:225
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:69
Definition: ContextDetail.hpp:28
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:59
void start(Args &&...args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:656
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:203
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:55
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is run in pool mode
Definition: Context.hpp:551
Definition: LockFreeBufferT.hpp:18
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful, size_t maxThreadSerialNumber=64)
ctor for construct local ipc Context
Definition: Context.hpp:503
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:455
~Context()
dtor
Definition: Context.hpp:527
Definition: Message.hpp:72
bool trySendInPlace(Args &&...args)
try send a message to all Clients in the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:310
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:93
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:323
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:101
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:583
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:613
std::enable_if<!std::is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&...msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:181
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:691
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, size_t maxThreadSerialNumber=64)
ctor for construct local non-ipc Context
Definition: Context.hpp:473
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:268
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:114
Definition: Base.hpp:12
Definition: PoolT.hpp:11