hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/Config.hpp"
5 #include "hmbdc/app/utils/StuckClientPurger.hpp"
6 #include "hmbdc/numeric/BitMath.hpp"
7 #include "hmbdc/os/Allocators.hpp"
8 
9 #include <memory>
10 
11 namespace hmbdc { namespace app {
12 
13 
14 /**
15 * @example hello-world.cpp
16 * @example hmbdc.cpp
17 * @example hmbdc-log.cpp
18 * @example ipc-market-data-propagate.cpp
19 */
20 
21 /**
22  * @namespace hmbdc::app::context_property
23  * contains the trait types that defines how a Context behave and capabilities
24  */
25 namespace context_property {
26  /**
27  * @class broadcast
28  * @brief Context template parameter inidcating each message is
29  * sent to all clients within the Context.
30  * This is the default property of a Context.
31  * @details each message is still subjected to Client's message type
32  * filtering. In the case of ipc Context
33  * it is also sent to all clients in the attached ipc Contexts.
34  * When this Context is specialized using this type, the context normally
35  * works with heterogeneous Clients and all Clients can talk to each
36  * other thru the Context. Load balance among Clients can be achieved by
37  * participating Clients coordinatedly select message to process
38  * In addtion to the direct mode Clients, a Client running pool is supported
39  * with the Context - see pool related functions in Context.
40  *
41  * Implicit usage in hello-world.cpp: @snippet hello-world.cpp broadcast as default
42  * Explicit usage in hmbdc.cpp @snippet hmbdc.cpp explicit using broadcast
43  * There is no hard coded limit on how many Clients can be added into a pool
44  * Also, there is no limit on when you can add a Client into a pool.
45  * @tparam max_parallel_consumer, normally, there isn't a need to specify it.
46  * max thread counts that processes messages
47  * that incudes pool threads plus the count of direct mode Clients that
48  * registers messages within the Context
49  * supported values: 4(default)
50  * 2,8,16,32,64 requires hmbdc licensed
51  */
52  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
53  struct broadcast{
54  static_assert(max_parallel_consumer >= 4u
56  };
57 
58  /**
59  * @class partition
60  * @brief Context template parameter inidcating each message is
61  * sent to one and only one of the clients within the Context
62  * and its attached ipc Contexts if appllies.
63  * @details each message is still subjected to Client's message type
64  * filtering
65  * When this Context is specialized using this type, the context normally
66  * works with homogeneous Clients to achieve load balance thru threads. No
67  * coordination is needed between Clients.
68  * Only the direct mode Clients are supported, thread pool is NOT supported
69  * by this kind of Context - the pool related functions in Context are also disabled
70  *
71  * There is no limit on how many Clients can be started in the Context.
72  * Also, there is no limit on when you can start a Client in this Context.
73  *
74  * Example in server-cluster.cpp: @snippet server-cluster.cpp declare a partition context
75  */
76  struct partition{};
77 
78  /**
79  * @class ipc_creator
80  * @brief Context template parameter indicating the Context is ipc enabled and
81  * it can be attached (see ipc_attacher below) to an ipc transport (thru its name).
82  * @details In addition to the normal Context functions, the Context acts as
83  * the creator (owner) of the named ipc transport.
84  * Since it performs a critical function to purge crushed or
85  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
86  * expected to be running (started) as long as ipc functions.
87  * ipc transport uses persistent shared memory and if the dtor of Context is not called
88  * due to crashing, there will be stale shared memory in /dev/shm.
89  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
90  */
91  struct ipc_creator{};
92 
93  /**
94  * @class ipc_attacher
95  * @brief Context template parameter indicating the Context is ipc enabled and
96  * it can attach to an ipc transport thru a name.
97  * @details it is very important that the Context is constructed exactly
98  * the same size (see constructor) and type (partition vs broadcast) as the ipc
99  * transport creator specified (ipc_creator Context).
100  * All Contexts attaching to a single ipc transport collectively is subjected to the
101  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
102  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
103  */
104  struct ipc_attacher{};
105 }
106 }}
107 
108 #include "hmbdc/app/ContextDetail.hpp"
109 namespace hmbdc { namespace app {
110 
111 namespace context_detail {
112 using namespace std;
113 using namespace hmbdc::pattern;
114 
115 /**
116  * @class ThreadCommBase<>
117  * @brief covers the inter-thread and ipc communication fascade
118  * @details this type's interface is exposed thru Context and the type itself is
119  * not directly used by users
120  * @tparam MaxMessageSize What is the max message size, need at compile time
121  * if the value can only be determined at runtime, set this to 0. Things can still work
122  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
123  * @tparam ContextProperties see types in context_property namespace
124  */
125 template <size_t MaxMessageSize, typename... ContextProperties>
127  : private context_detail::context_property_aggregator<ContextProperties...> {
128  using cpa = context_property_aggregator<ContextProperties...>;
129  using Buffer = typename cpa::Buffer;
130  using Allocator = typename cpa::Allocator;
131 
132  enum {
133  MAX_MESSAGE_SIZE = MaxMessageSize,
134  BUFFER_VALUE_SIZE = MaxMessageSize + 8u, //8bytes for wrap
135  };
136 
137  size_t maxMessageSize() const {
138  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
139  return MaxMessageSize;
140  }
141 
142  /**
143  * @brief try send a batch of messages to the Context or attached ipc Contexts
144  * @details only the Clients that handles the Message will get it of course
145  * This function is threadsafe, which means you can call it anywhere in the code
146  *
147  * @param msgs messages
148  * @tparam Messages message types
149  */
150  template <typename M0, typename M1, typename ... Messages>
151  typename enable_if<!std::is_integral<M1>::value, void>::type
152  send(M0&& m0, M1&& m1, Messages&&... msgs) {
153  auto n = sizeof...(msgs) + 2;
154  auto it = buffer_.claim(n);
155  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
156  buffer_.commit(it, n);
157  }
158 
159  /**
160  * @brief try to send a batch of message to the Context or attached ipc Contexts
161  * @details this call does not block and it is transactional - send all or none
162  * This function is threadsafe, which means you can call it anywhere in the code
163  *
164  * @param msgs messages
165  * @tparam Messages message types
166  *
167  * @return true if send successfully
168  */
169  template <typename M0, typename M1, typename ... Messages>
170  typename enable_if<!std::is_integral<M1>::value, bool>::type
171  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
172  auto n = sizeof...(msgs) + 2;
173  auto it = buffer_.tryClaim(n);
174  if (it) {
175  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
176  buffer_.commit(it, n);
177  return true;
178  }
179 
180  return false;
181  }
182 
183  /**
184  * @brief send a range of messages to the Context or attached ipc Contexts
185  * @details only the Clients that handles the Message will get it of course
186  * This function is threadsafe, which means you can call it anywhere in the code
187  *
188  * @param begin a forward iterator point at the start of the range
189  * @param n length of the range
190  */
191  template <typename ForwardIt>
192  void
193  send(ForwardIt begin, size_t n) {
194  if (hmbdc_likely(n)) {
195  auto bit = buffer_.claim(n);
196  auto it = bit;
197  for (auto i = 0ul; i < n; i++) {
198  using Message = typename iterator_traits<ForwardIt>::value_type;
199  new (*it++) MessageWrap<Message>(*begin++);
200  }
201  buffer_.commit(bit, n);
202  }
203  }
204 
205  /**
206  * @brief try send a range of messages to the Context or attached ipc Contexts
207  * @details this call does not block and it is transactional - send all or none
208  * This function is threadsafe, which means you can call it anywhere in the code
209  *
210  * @param begin a forward iterator point at the start of the range
211  * @param n length of the range
212  */
213  template <typename ForwardIt>
214  bool
215  trySend(ForwardIt begin, size_t n) {
216  if (hmbdc_likely(n)) {
217  auto bit = buffer_.tryClaim(n);
218  if (hmbdc_unlikely(!bit)) return false;
219  auto it = bit;
220  for (auto i = 0ul; i < n; i++) {
221  using Message = typename iterator_traits<ForwardIt>::value_type;
222  new (*it++) MessageWrap<Message>(*begin++);
223  }
224  buffer_.commit(bit, n);
225  }
226  return true;
227  }
228 
229  /**
230  * @brief send a message to the Context or attached ipc Contexts
231  * @details only the Clients that handles the Message will get it of course
232  * This function is threadsafe, which means you can call it anywhere in the code
233  *
234  * @param m message
235  * @tparam Message type
236  */
237  template <typename Message>
238  void send(Message&& m) {
239  using M = typename std::remove_reference<Message>::type;
240  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
241  , "message too big");
242  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
243  HMBDC_THROW(std::out_of_range, "message too big");
244  }
245  buffer_.put(MessageWrap<M>(std::forward<Message>(m)));
246  }
247 
248  /**
249  * @brief try to send a message to the Context or attached ipc Contexts
250  * @details this call does not block - return false when buffer is full
251  * This function is threadsafe, which means you can call it anywhere in the code
252  *
253  * @param m message
254  * @tparam Message type
255  * @return true if send successfully
256  */
257  template <typename Message>
258  bool trySend(Message&& m) {
259  using M = typename std::remove_reference<Message>::type;
260  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
261  , "message too big");
262  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
263  HMBDC_THROW(std::out_of_range, "message too big");
264  }
265  return buffer_.tryPut(MessageWrap<M>(std::forward<Message>(m)));
266  }
267 
268  /**
269  * @brief send a message to all Clients in the Context or attached ipc Contexts
270  * @details construct the Message in buffer directly
271  * This function is threadsafe, which means you can call it anywhere in the code
272  *
273  * @param args ctor args
274  * @tparam Message type
275  * @tparam typename ... Args args
276  */
277  template <typename Message, typename ... Args>
278  void sendInPlace(Args&&... args) {
279  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
280  , "message too big");
281  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
282  HMBDC_THROW(std::out_of_range, "message too big");
283  }
284  buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
285  }
286 
287  /**
288  * @brief accessor - mostly used internally
289  * @return underlying buffer used in the Context
290  */
291  Buffer& buffer() {
292  return buffer_;
293  }
294 
295 
296 protected:
297  ThreadCommBase(uint32_t messageQueueSizePower2Num
298  , size_t maxMessageSizeRuntime
299  , char const* shmName)
300  : allocator_(shmName
301  , Buffer::footprint(maxMessageSizeRuntime + 8u
302  , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
303  )
304  , bufferptr_(allocator_.template allocate<Buffer>(1
305  , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
306  , allocator_)
307  )
308  , buffer_(*bufferptr_) {
309  if (messageQueueSizePower2Num < 2) {
310  HMBDC_THROW(std::out_of_range
311  , "messageQueueSizePower2Num need >= 2");
312  }
313  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
314  HMBDC_THROW(std::out_of_range
315  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
316  }
317  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
318  primeBuffer<cpa::create_ipc && cpa::has_pool>();
319  if (cpa::create_ipc || cpa::attach_ipc) {
320  sleep(2);
321  }
322  }
323 
324  ~ThreadCommBase() {
325  allocator_.unallocate(bufferptr_);
326  }
327 
328  static
329  void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
330  // does not apply
331  }
332 
333  template <typename BroadCastBuf>
334  static
335  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
336  for (uint16_t i = poolThreadCount;
337  i < BroadCastBuf::max_parallel_consumer;
338  ++i) {
339  buffer.markDead(i);
340  }
341  }
342  Allocator allocator_;
343  Buffer* HMBDC_RESTRICT bufferptr_;
344  Buffer& HMBDC_RESTRICT buffer_;
345 
346 private:
347  template <bool doIt>
348  typename enable_if<doIt, void>::type
349  primeBuffer() {
350  markDeadFrom(buffer_, 0);
351  }
352 
353  template <bool doIt>
354  typename enable_if<!doIt, void>::type
355  primeBuffer() {
356  }
357 
358  template <typename M, typename... Messages>
359  void sendRecursive(typename Buffer::iterator it
360  , M&& msg, Messages&&... msgs) {
361  using Message = typename std::remove_reference<M>::type;
362  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
363  , "message too big");
364  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
365  HMBDC_THROW(std::out_of_range, "message too big");
366  }
367  new (*it) MessageWrap<Message>(msg);
368  sendRecursive(++it, std::forward<M>(msgs)...);
369  }
370  void sendRecursive(typename Buffer::iterator) {}
371 
372  size_t maxMessageSizeRuntime_;
373 };
374 
375 } //context_detail
376 
377 /**
378  * @example hmbdc.cpp
379  * @example server-cluster.cpp
380  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
381  * are in direct mode. Pool related interfaces are turned off in compile time
382  */
383 
384 /**
385  * @class Context<>
386  * @brief A Context is like a media object that facilitates the communications
387  * for the Clients that it is holding.
388  * a Client can only be added to (or started within) once to a single Context,
389  * undefined behavior otherwise.
390  * the communication model is determined by the context_property
391  * by default it is in the nature of broadcast fashion within local process indicating
392  * by broadcast<>
393  *
394  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
395  * a Client running in such a Context can either run in the pool mode or a direct mode
396  * (which means the Client has its own dedicated OS thread)
397  * direct mode provides faster responses, and pool mode provides more flexibility.
398  * It is recommended that the total number of threads (pool threads + direct threads)
399  * not exceeding the number of avalibale cores.
400  * @tparam MaxMessageSize What is the max message size if known
401  * at compile time(compile time sized);
402  * if the value can only be determined at runtime (run time sized), set this to 0.
403  * Things can still work but will lost some compile time checking advantages,
404  * see maxMessageSizeRuntime below
405  * @tparam ContextProperties see context_property namespace
406  */
407 template <size_t MaxMessageSize = 0, typename... ContextProperties>
408 struct Context
409 : context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...> {
410  using Base = context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...>;
411  using Buffer = typename Base::Buffer;
412  using cpa = typename Base::cpa;
414  /**
415  * @brief ctor for construct local non-ipc Context
416  * @details won't compile if calling it for ipc Context
417  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024 (messages, not bytes)
418  * @param maxPoolClientCount up to how many Clients the pool is suppose to support, only used when
419  * pool supported in the Context with broadcast property
420  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
421  */
422  Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
423  , size_t maxPoolClientCount = MaxMessageSize?128:0
424  , size_t maxMessageSizeRuntime = MaxMessageSize)
425  : Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
426  , maxMessageSizeRuntime, nullptr)
427  , startToBeContinued_(true)
428  , usedHmbdcCapacity_(0)
429  , currentThreadSerialNumber_(0)
430  , stopped_(false)
431  , pool_(createPool(this->buffer_, maxPoolClientCount))
432  , poolThreadCount_(0) {
433  static_assert(!cpa::create_ipc && !cpa::attach_ipc
434  , "no name specified for ipc Context");
435  }
436 
437  /**
438  * @brief ctor for construct local ipc Context
439  * @details won't compile if calling it for local non-ipc Context
440  *
441  * @param ipcTransportName the id to identify an ipc transport that supports
442  * a group of attached together Contexts and their Clients
443  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
444  * 1024 (messages, not bytes)
445  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
446  * only used when pool supported in the Context with broadcast property
447  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
448  * @param purgerCpuAffinityMask which cores to run the low profile (sleep mostly)
449  * thread in charge of purging crashed Clients.
450  * Used only for ipc_creator Contexts.
451  */
452  Context(char const* ipcTransportName
453  , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
454  , size_t maxPoolClientCount = MaxMessageSize?128:0
455  , size_t maxMessageSizeRuntime = MaxMessageSize
456  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful)
457  : Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName)
458  , startToBeContinued_(true)
459  , usedHmbdcCapacity_(0)
460  , currentThreadSerialNumber_(0)
461  , stopped_(false)
462  , pool_(createPool(this->buffer_, maxPoolClientCount))
463  , poolThreadCount_(0)
464  , secondsBetweenPurge_(60)
465  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
466  static_assert(cpa::create_ipc || cpa::attach_ipc
467  , "ctor can only be used with ipc turned on Context");
468  static_assert(!(cpa::create_ipc && cpa::attach_ipc)
469  , "Context cannot be both ipc_creator and ipc_attacher");
470  }
471 
472  /**
473  * @brief dtor
474  * @details if this Context owns ipc transport, notify all attached processes
475  * that read from it that this tranport is dead
476  */
478  if (cpa::create_ipc) {
479  this->markDeadFrom(this->buffer_, 0);
480  }
481  stop();
482  join();
483  }
484 
485  /**
486  * @brief add a client to Context's pool - the Client is run in pool mode
487  * @details if pool is already started, the client is to get current Messages immediatly
488  * - might miss older messages.
489  * if the pool not started yet, the Client does not get messages or other callbacks until
490  * the Pool starts.
491  * This function is threadsafe, which means you can call it anywhere in the code
492  * @tparam Client client type
493  * @param client to be added into the Pool
494  * @param poolThreadAffinityIn pool is powered by a number of threads
495  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
496  * it is possible to have a Client to use just some of the threads in the Pool
497  * - default to use all.
498  *
499  */
500  template <typename Client>
501  void addToPool(Client &client
502  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
503  static_assert(cpa::has_pool, "pool is not support in the Context type");
504  if (std::is_base_of<single_thread_powered_client, Client>::value
505  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
506  && poolThreadCount_ != 1) {
507  HMBDC_THROW(std::out_of_range
508  , "cannot add a single thread powered client to the non-single"
509  "thread powered pool without specifying a single thread poolThreadAffinity"
510  );
511  }
512  pool_->addConsumer(client, poolThreadAffinityIn);
513 
514  }
515 
516  /**
517  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
518  * @details if pool is already started, the client is to get current Messages immediatly
519  * - might miss older messages.
520  * if the pool not started yet, the Client does not get messages or other callbacks until
521  * the Pool starts.
522  * This function is threadsafe, which means you can call it anywhere in the code
523  * @tparam Client client type
524  * @param client to be added into the Pool
525  * @param poolThreadAffinityIn pool is powered by a number of threads
526  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
527  * it is possible to have a Client to use just some of the threads in the Pool
528  * - default to use all.
529  * @param args more client and poolThreadAffinityIn pairs can follow
530  */
531  template <typename Client, typename ... Args>
532  void addToPool(Client &client
533  , uint64_t poolThreadAffinityIn, Args&& ...args) {
534  addToPool(client, poolThreadAffinityIn);
535  addToPool(std::forward<Args>(args)...);
536  }
537 
538  /**
539  * @brief return the numebr of clients added into pool
540  * @details the number could change since the clients could be added in another thread
541  * @return client count
542  */
543  size_t clientCountInPool() const {
544  static_assert(cpa::has_pool, "pool is not support in the Context type");
545  return pool_->consumerSize();
546  }
547 
548  /**
549  * @brief how many parallel consummers are started
550  * @details the dynamic value could change after the call returns
551  * see max_parallel_consumer Context property
552  * @return how many parallel consummers are started
553  */
554  size_t parallelConsumerAlive() const {
555  return this->buffer_.parallelConsumerAlive();
556  }
557  /**
558  * @brief start the context and specify its Pool and direct Clients
559  * @details doesn't compile if the Context does not support a Pool.
560  * Usage example:
561  *
562  * @code
563  * // the following starts the pool powered by 3 threads that are affinitied to
564  * // the lower 8 cores; client0 affinitied to 4th core and client1 affinitied to 5th core
565  * // the last true value inidicates there will be more direct mode clients to start
566  * // and messages are kept for those coming later (this only works for broadcast Context
567  * // not applicable to partition Context).
568  * start(3, 0xfful, client0, 0x8ul, client1, 0x10ul, true);
569  * @endcode
570  *
571  * @tparam typename ...Args types
572  *
573  * @param poolThreadCount how many threads to power the Pool, 0 means no pool
574  * @param poolThreadsCpuAffinityMask which cores, the pool threads to run on
575  * @param args pairs of direct mode Client and its cpuAffinity;
576  * optionally followed by a bool as the last arg (default false), false to indicate there is
577  * no more direct mode Client to start. (the bool is not aplicable for IPC Context types)
578  * If a cpuAffinity is 0, the Client's affinity rotates
579  * to one of the cores in the system.
580  *
581  * see in example hmbdc.cpp @snippet hmbdc.cpp start pool then client
582  */
583  template <typename ...Args>
584  void
585  start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
586  , Args&& ... args) {
587  startWithContextProperty<cpa>(poolThreadCount, poolThreadsCpuAffinityMask
588  , std::forward<Args>(args) ...);
589  }
590 
591  /**
592  * @brief start the context (without its Pool) and direct Clients
593  * @details for example
594  *
595  * @code
596  * // the following starts client0 affinitied to 4th core and client1 affinitied to 5th core
597  * // AND there is no more direct mode clients to start (since no bool at the end)
598  * start(client0, 0x8ul, client1, 0x10ul);
599  * @endcode
600  *
601  * @tparam typename ...Args types
602  *
603  * @param args pairs of direct mode Client and its cpuAffinity;
604  * optionally followed by a bool as the last arg (default false), false to indicate there is
605  * no more direct mode Client to start. (the bool is not aplicable for IPC Context types)
606  * If a cpuAffinity is 0, the Client's affinity rotates
607  * to one of the cores in the system
608  *
609  * see in example server-cluster.cpp @snippet server-cluster.cpp start multiple direct Clients
610  */
611  template <typename Client, typename ...Args>
612  typename std::enable_if<!std::is_integral<Client>::value, void>::type
613  start(Client& c, uint64_t cpuAffinity, Args&& ... args) {
614  startWithContextProperty<cpa>(c, cpuAffinity, std::forward<Args>(args) ...);
615  }
616 
617  /**
618  * @brief tell hmbdc that there is no more direct mode Client to start
619  */
620  void start() {
621  startWithContextProperty<cpa>();
622  }
623 
624  /**
625  * @brief stop the message dispatching - asynchronously
626  * @details asynchronously means not garanteed message dispatching
627  * stops immidiately after this non-blocking call
628  */
629  void
630  stop() {
631  stopWithContextProperty<cpa>();
632  }
633 
634  /**
635  * @brief wait until all threads (Pool threads too if apply) of the Context exit
636  * @details blocking call
637  */
638  void
639  join() {
640  joinWithContextProperty<cpa>();
641  }
642 
643  /**
644  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
645  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
646  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
647  * @details If some Client are known to
648  * take long to process messages, increase it. If you need to remove slow Clients quickly
649  * reduce it.
650  * Only effective for ipc_creator Context.
651  *
652  * @param s seconds
653  */
654  void
656  secondsBetweenPurge_ = s;
657  }
658 
659  /**
660  * @brief normally not used until you want to run your own message loop
661  * @details call this function frequently to pump hmbdc message loop in its pool
662  *
663  * @param threadSerialNumber starting from 0, indicate which thread in the pool
664  * is powering the loop
665  */
666  void
667  runPoolThreadOnce(uint16_t threadSerialNumber) {
668  static_assert(cpa::has_pool, "pool is not support in the Context type");
669  pool_->runOnce(threadSerialNumber);
670  }
671 
672  /**
673  * @brief normally not used until you want to run your own message loop
674  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
675  *
676  * @param threadSerialNumber indicate which thread is powering the loop
677  * @param c the Client
678  */
679  template <typename Client>
680  void runClientThreadOnce(uint16_t threadSerialNumber, Client& c) {
681  c.messageDispatchingStarted(threadSerialNumber); //lower level ensures executing only once
682  context_detail::runOnceImpl(threadSerialNumber, stopped_
683  , this->buffer_, c);
684  }
685 
686 private:
687  template <typename Buffer>
688  static
689  typename Pool::ptr
690  createPool(Buffer& buffer, size_t maxPoolClientCount) {
691  return Pool::create(buffer, maxPoolClientCount);
692  }
693 
694  static
695  typename Pool::ptr
696  createPool(pattern::MonoLockFreeBuffer&, size_t) {
697  return typename Pool::ptr();
698  }
699 
700  template <typename cpa>
701  typename std::enable_if<cpa::has_pool, void>::type
702  stopWithContextProperty() {
703  if (pool_) pool_->stop();
704  __sync_synchronize();
705  stopped_ = true;
706  }
707 
708  template <typename cpa>
709  typename std::enable_if<!cpa::has_pool, void>::type
710  stopWithContextProperty() {
711  __sync_synchronize();
712  stopped_ = true;
713  }
714 
715  template <typename cpa>
716  typename std::enable_if<cpa::has_pool, void>::type
717  joinWithContextProperty() {
718  if (pool_) pool_->join();
719  for (auto& t : threads_) {
720  t.join();
721  }
722  threads_.clear();
723  }
724 
725  template <typename cpa>
726  typename std::enable_if<!cpa::has_pool, void>::type
727  joinWithContextProperty() {
728  for (auto& t : threads_) {
729  t.join();
730  }
731  threads_.clear();
732  }
733 
734  template <typename cpa, typename ...Args>
735  typename std::enable_if<!cpa::attach_ipc && !cpa::create_ipc, void>::type
736  startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
737  , Args&& ... args) {
738  static_assert(cpa::has_pool, "pool is not support in the Context type");
739  poolThreadCount_ = poolThreadCount;
740  if (!startToBeContinued_) {
741  HMBDC_THROW(std::runtime_error
742  , "Exception: conflicting with previously indicated start completed!");
743  }
744  if (pool_) {
745  pool_->start(poolThreadCount, poolThreadsCpuAffinityMask, false);
746  }
747  usedHmbdcCapacity_ = poolThreadCount;
748  currentThreadSerialNumber_ = usedHmbdcCapacity_;
749  startLocalClients(std::forward<Args>(args) ...);
750  }
751 
752  template <typename cpa, typename Client, typename ...Args>
753  typename std::enable_if<!std::is_integral<Client>::value && !cpa::create_ipc && !cpa::attach_ipc && !cpa::has_pool, void>::type
754  startWithContextProperty(Client& c, uint64_t cpuAffinity, Args&& ... args) {
755  if (!startToBeContinued_) {
756  HMBDC_THROW(std::runtime_error
757  , "Exception: conflicting with previously indicated start completed!");
758  }
759  startLocalClients(c, cpuAffinity, std::forward<Args>(args) ...);
760  }
761 
762  template <typename cpa, typename Client, typename ...Args>
763  typename std::enable_if<!std::is_integral<Client>::value && !cpa::create_ipc && !cpa::attach_ipc && cpa::has_pool, void>::type
764  startWithContextProperty(Client& c, uint64_t cpuAffinity, Args&& ... args) {
765  if (!startToBeContinued_) {
766  HMBDC_THROW(std::runtime_error
767  , "Exception: conflicting with previously indicated start completed!");
768  }
769  startLocalClients(c, cpuAffinity, std::forward<Args>(args) ...);
770  }
771 
772  template <typename cpa>
773  typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
774  startWithContextProperty() {
775  startLocalClients(false);
776  }
777 
778  template <typename cpa>
779  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
780  startWithContextProperty() {
781  startBroadcastIpcClients();
782  }
783 
784  template <typename cpa, typename ...Args>
785  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
786  startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
787  , Args&& ... args) {
788  using namespace std;
789  if (poolThreadCount_) {
790  HMBDC_THROW(std::out_of_range, "Context pool already started");
791  }
792  auto& lock = this->allocator_.fileLock();
793  std::lock_guard<decltype(lock)> g(lock);
794  auto slots = this->buffer_.unusedConsumerIndexes();
795  if (slots.size() < sizeof...(args) / 2) {
796  HMBDC_THROW(std::out_of_range
797  , "Context instance support Client count = " << slots.size());
798  }
799  poolThreadCount_ = poolThreadCount;
800  pool_->startThruRecycling(poolThreadCount, poolThreadsCpuAffinityMask);
801  currentThreadSerialNumber_ = poolThreadCount_;
802  startBroadcastIpcClients(std::forward<Args>(args) ...);
803  }
804 
805  template <typename cpa, typename Client, typename ...Args>
806  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool && !std::is_integral<Client>::value
807  , void>::type
808  startWithContextProperty(Client& c, uint64_t cpuAffinity
809  , Args&& ... args) {
810  auto& lock = this->allocator_.fileLock();
811  std::lock_guard<decltype(lock)> g(lock);
812 
813  auto clientParticipateInMessaging =
814  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
815  if (clientParticipateInMessaging) {
816  auto slots = this->buffer_.unusedConsumerIndexes();
817  if (slots.size() < 1u + sizeof...(args) / 2) {
818  HMBDC_THROW(std::out_of_range
819  , "Context instance support Client count = " << slots.size());
820  }
821  }
822  startBroadcastIpcClients(c, cpuAffinity, std::forward<Args>(args) ...);
823  }
824 
825  template <typename cpa, typename ...Args>
826  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && !cpa::has_pool, void>::type
827  startWithContextProperty(Args&& ... args) {
828  startPartitionIpcClients(std::forward<Args>(args) ...);
829  }
830 
831  void
832  startLocalClients(bool startToBeContinuedFlag = cpa::can_start_anytime) {
833  startToBeContinued_ = startToBeContinuedFlag;
834  if (!startToBeContinued_) {
835  this->markDeadFrom(this->buffer_, usedHmbdcCapacity_);
836  }
837  }
838 
839  template <typename CcClient, typename ...Args>
840  void startLocalClients(CcClient& c, uint64_t mask, Args&&... args) {
841  if (usedHmbdcCapacity_ >= Buffer::max_parallel_consumer
842  && CcClient::REGISTERED_MESSAGE_SIZE) {
843  HMBDC_THROW(std::out_of_range
844  , "messaging participating client number > allowed thread number of "
845  << Buffer::max_parallel_consumer);
846  }
847 
848  auto thrd =
849  kickOffClientThread(c, mask, usedHmbdcCapacity_, currentThreadSerialNumber_);
850  threads_.push_back(move(thrd));
851 
852  auto clientParticipateInMessaging =
853  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE;
854  if (clientParticipateInMessaging) usedHmbdcCapacity_++;
855  currentThreadSerialNumber_++;
856  startLocalClients(std::forward<Args>(args)...);
857  }
858 
859  void startBroadcastIpcClients(){
860  if (cpa::create_ipc && !purger_) {
861  purger_.reset(
862  new utils::StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
863  startBroadcastIpcClients(*purger_, purgerCpuAffinityMask_);
864  }
865  }
866 
867  template <typename Client, typename ...Args>
868  void startBroadcastIpcClients(Client& c, uint64_t mask, Args&&... args) {
869  auto clientParticipateInMessaging =
870  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
871  uint16_t hmbdcNumber = 0xffffu;
872  if (clientParticipateInMessaging) {
873  auto slots = this->buffer_.unusedConsumerIndexes();
874  auto it = slots.begin();
875  hmbdcNumber = *it;
876  this->buffer_.reset(hmbdcNumber);
877  }
878  auto thrd = kickOffClientThread(
879  c, mask, hmbdcNumber, currentThreadSerialNumber_);
880  threads_.push_back(move(thrd));
881  currentThreadSerialNumber_++;
882  startBroadcastIpcClients(std::forward<Args>(args)...);
883  }
884 
885  void startPartitionIpcClients(){}
886 
887  template <typename Client, typename ...Args>
888  void startPartitionIpcClients(Client& c, uint64_t mask, Args&&... args) {
889  auto thrd = kickOffClientThread(
890  c, mask, currentThreadSerialNumber_, currentThreadSerialNumber_);
891  threads_.push_back(move(thrd));
892  currentThreadSerialNumber_++;
893  startPartitionIpcClients(std::forward<Args>(args)...);
894  }
895 
896  template <typename Client>
897  auto kickOffClientThread(
898  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
899  std::thread thrd([
900  this
901  , &c
902  , mask
903  , h=hmbdcNumber
904  , threadSerialNumber
905  ]() {
906  auto hmbdcNumber = h;
907  std::string name;
908  char const* schedule;
909  int priority;
910  auto clientParticipateInMessaging =
911  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
912 
913 
914  if (c.hmbdcName()) {
915  name = c.hmbdcName();
916  } else {
917  if (clientParticipateInMessaging) {
918  name = "hmbdc" + std::to_string(hmbdcNumber);
919  } else {
920  name = "hmbdc-x";
921  }
922  }
923  auto cpuAffinityMask = mask;
924  std::tie(schedule, priority) = c.schedSpec();
925 
926  if (!schedule) schedule = "SCHED_OTHER";
927 
928  if (!mask) {
929  auto cpuCount = std::thread::hardware_concurrency();
930  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
931  }
932 
933  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
934  , schedule, priority);
935 
936  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
937  c.messageDispatchingStartedCb(hmbdcNumber);
938 
939  while(!stopped_ &&
940  context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
941  }
942  if (this->stopped_) c.dropped();
943  if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
944  }
945  );
946 
947  return move(thrd);
948  }
949 
950  Context(Context const&) = delete;
951  Context& operator = (Context const&) = delete;
952  bool startToBeContinued_;
953  uint16_t usedHmbdcCapacity_;
954  uint16_t currentThreadSerialNumber_;
955  bool stopped_;
956  typename Pool::ptr pool_;
957  using Threads = std::vector<std::thread>;
958  Threads threads_;
959  size_t poolThreadCount_;
960  uint32_t secondsBetweenPurge_;
961  uint64_t purgerCpuAffinityMask_;
962  typename std::conditional<cpa::has_pool
963  , std::unique_ptr<utils::StuckClientPurger<Buffer>>, uint32_t
964  >::type purger_;
965 };
966 
967 }}
968 
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:554
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:680
Definition: MonoLockFreeBuffer.hpp:14
Definition: StuckClientPurger.hpp:11
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:630
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:126
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:639
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:76
Definition: TypedString.hpp:74
enable_if<!std::is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&...msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:171
std::enable_if<!std::is_integral< Client >::value, void >::type start(Client &c, uint64_t cpuAffinity, Args &&...args)
start the context (without its Pool) and direct Clients
Definition: Context.hpp:613
the default vanilla allocate
Definition: Allocators.hpp:116
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:238
Definition: GuardedSingleton.hpp:9
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:585
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:422
void sendInPlace(Args &&...args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:278
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:215
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:126
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:116
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:193
enable_if<!std::is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&...msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:152
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: Client.hpp:70
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:53
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is run in pool mode
Definition: Context.hpp:501
Definition: LockFreeBufferT.hpp:18
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:408
void runPoolThreadOnce(uint16_t threadSerialNumber)
normally not used until you want to run your own message loop
Definition: Context.hpp:667
~Context()
dtor
Definition: Context.hpp:477
Definition: Message.hpp:55
void start()
tell hmbdc that there is no more direct mode Client to start
Definition: Context.hpp:620
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful)
ctor for construct local ipc Context
Definition: Context.hpp:452
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:291
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:91
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:532
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:543
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:655
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts
Definition: Context.hpp:258
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:104
Definition: Base.hpp:12
Definition: PoolT.hpp:10