hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/Config.hpp"
5 #include "hmbdc/app/utils/StuckClientPurger.hpp"
6 #include "hmbdc/numeric/BitMath.hpp"
7 #include "hmbdc/os/Allocators.hpp"
8 
9 #include <memory>
10 
11 namespace hmbdc { namespace app {
12 
13 
14 /**
15 * @example hello-world.cpp
16 * @example hmbdc.cpp
17 * @example hmbdc-log.cpp
18 * @example ipc-market-data-propagate.cpp
19 */
20 
21 /**
22  * @namespace hmbdc::app::context_property
23  * contains the trait types that defines how a Context behave and capabilities
24  */
25 namespace context_property {
26  /**
27  * @class broadcast
28  * @brief Context template parameter inidcating each message is
29  * sent to all clients within the Context.
30  * This is the default property of a Context.
31  * @details each message is still subjected to Client's message type
32  * filtering. In the case of ipc Context
33  * it is also sent to all clients in the attached ipc Contexts.
34  * When this Context is specialized using this type, the context normally
35  * works with heterogeneous Clients and all Clients can talk to each
36  * other thru the Context. Load balance among Clients can be achieved by
37  * participating Clients coordinatedly select message to process
38  * In addtion to the direct mode Clients, a Client running pool is supported
39  * with the Context - see pool related functions in Context.
40  *
41  * Implicit usage in hello-world.cpp: @snippet hello-world.cpp broadcast as default
42  * Explicit usage in hmbdc.cpp @snippet hmbdc.cpp explicit using broadcast
43  * There is no hard coded limit on how many Clients can be added into a pool
44  * Also, there is no limit on when you can add a Client into a pool.
45  * @tparam max_parallel_consumer, normally, there isn't a need to specify it.
46  * max thread counts that processes messages
47  * that incudes pool threads plus the count of direct mode Clients that
48  * registers messages within the Context
49  * supported values: 4(default)
50  * 2,8,16,32,64 requires hmbdc licensed
51  */
52  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
53  struct broadcast{
54  static_assert(max_parallel_consumer >= 4u
56  };
57 
58  /**
59  * @class partition
60  * @brief Context template parameter inidcating each message is
61  * sent to one and only one of the clients within the Context
62  * and its attached ipc Contexts if appllies.
63  * @details each message is still subjected to Client's message type
64  * filtering
65  * When this Context is specialized using this type, the context normally
66  * works with homogeneous Clients to achieve load balance thru threads. No
67  * coordination is needed between Clients.
68  * Only the direct mode Clients are supported, thread pool is NOT supported
69  * by this kind of Context - the pool related functions in Context are also disabled
70  *
71  * There is no limit on how many Clients can be started in the Context.
72  * Also, there is no limit on when you can start a Client in this Context.
73  *
74  * Example in server-cluster.cpp: @snippet server-cluster.cpp declare a partition context
75  */
76  struct partition{};
77 
78  /**
79  * @class ipc_creator
80  * @brief Context template parameter indicating the Context is ipc enabled and
81  * it can be attached (see ipc_attacher below) to an ipc transport (thru its name).
82  * @details In addition to the normal Context functions, the Context acts as
83  * the creator (owner) of the named ipc transport.
84  * Since it performs a critical function to purge crushed or
85  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
86  * expected to be running (started) as long as ipc functions.
87  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
88  */
89  struct ipc_creator{};
90 
91  /**
92  * @class ipc_attacher
93  * @brief Context template parameter indicating the Context is ipc enabled and
94  * it can attach to an ipc transport thru a name.
95  * @details it is very important that the Context is constructed exactly
96  * the same size (see constructor) and type (partition vs broadcast) as the ipc
97  * transport creator specified (ipc_creator Context).
98  * All Contexts attaching to a single ipc transport collectively is subjected to the
99  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
100  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
101  */
102  struct ipc_attacher{};
103 }
104 }}
105 
106 #include "hmbdc/app/ContextDetail.hpp"
107 namespace hmbdc { namespace app {
108 
109 namespace context_detail {
110 using namespace std;
111 using namespace hmbdc::pattern;
112 
113 /**
114  * @class ThreadCommBase<>
115  * @brief covers the inter-thread and ipc communication fascade
116  * @details this type's interface is exposed thru Context and the type itself is
117  * not directly used by users
118  * @tparam MaxMessageSize What is the max message size, need at compile time
119  * if the value can only be determined at runtime, set this to 0. Things can still work
120  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
121  * @tparam ContextProperties see types in context_property namespace
122  */
123 template <size_t MaxMessageSize, typename... ContextProperties>
125  : private context_detail::context_property_aggregator<ContextProperties...> {
126  using cpa = context_property_aggregator<ContextProperties...>;
127  using Buffer = typename cpa::Buffer;
128  using Allocator = typename cpa::Allocator;
129 
130  enum {
131  MAX_MESSAGE_SIZE = MaxMessageSize,
132  BUFFER_VALUE_SIZE = MaxMessageSize + 8u, //8bytes for wrap
133  };
134 
135  size_t maxMessageSize() const {
136  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
137  return MaxMessageSize;
138  }
139 
140  /**
141  * @brief try send a batch of messages to the Context or attached ipc Contexts
142  * @details only the Clients that handles the Message will get it of course
143  * This function is threadsafe, which means you can call it anywhere in the code
144  *
145  * @param msgs messages
146  * @tparam Messages message types
147  */
148  template <typename M0, typename M1, typename ... Messages>
149  typename enable_if<!std::is_integral<M1>::value, void>::type
150  send(M0&& m0, M1&& m1, Messages&&... msgs) {
151  auto n = sizeof...(msgs) + 2;
152  auto it = buffer_.claim(n);
153  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
154  buffer_.commit(it, n);
155  }
156 
157  /**
158  * @brief try to send a batch of message to the Context or attached ipc Contexts
159  * @details this call does not block and it is transactional - send all or none
160  * This function is threadsafe, which means you can call it anywhere in the code
161  *
162  * @param msgs messages
163  * @tparam Messages message types
164  *
165  * @return true if send successfully
166  */
167  template <typename M0, typename M1, typename ... Messages>
168  typename enable_if<!std::is_integral<M1>::value, bool>::type
169  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
170  auto n = sizeof...(msgs) + 2;
171  auto it = buffer_.tryClaim(n);
172  if (it) {
173  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
174  buffer_.commit(it, n);
175  return true;
176  }
177 
178  return false;
179  }
180 
181  /**
182  * @brief send a range of messages to the Context or attached ipc Contexts
183  * @details only the Clients that handles the Message will get it of course
184  * This function is threadsafe, which means you can call it anywhere in the code
185  *
186  * @param begin a forward iterator point at the start of the range
187  * @param n length of the range
188  */
189  template <typename ForwardIt>
190  void
191  send(ForwardIt begin, size_t n) {
192  if (likely(n)) {
193  auto bit = buffer_.claim(n);
194  auto it = bit;
195  for (auto i = 0ul; i < n; i++) {
196  using Message = typename iterator_traits<ForwardIt>::value_type;
197  new (*it++) MessageWrap<Message>(*begin++);
198  }
199  buffer_.commit(bit, n);
200  }
201  }
202 
203  /**
204  * @brief try send a range of messages to the Context or attached ipc Contexts
205  * @details this call does not block and it is transactional - send all or none
206  * This function is threadsafe, which means you can call it anywhere in the code
207  *
208  * @param begin a forward iterator point at the start of the range
209  * @param n length of the range
210  */
211  template <typename ForwardIt>
212  bool
213  trySend(ForwardIt begin, size_t n) {
214  if (likely(n)) {
215  auto bit = buffer_.tryClaim(n);
216  if (unlikely(!bit)) return false;
217  auto it = bit;
218  for (auto i = 0ul; i < n; i++) {
219  using Message = typename iterator_traits<ForwardIt>::value_type;
220  new (*it++) MessageWrap<Message>(*begin++);
221  }
222  buffer_.commit(bit, n);
223  }
224  return true;
225  }
226 
227  /**
228  * @brief send a message to the Context or attached ipc Contexts
229  * @details only the Clients that handles the Message will get it of course
230  * This function is threadsafe, which means you can call it anywhere in the code
231  *
232  * @param m message
233  * @tparam Message type
234  */
235  template <typename Message>
236  void send(Message&& m) {
237  using M = typename std::remove_reference<Message>::type;
238  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
239  , "message too big");
240  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
241  HMBDC_THROW(std::out_of_range, "message too big");
242  }
243  buffer_.put(MessageWrap<M>(std::forward<Message>(m)));
244  }
245 
246  /**
247  * @brief try to send a message to the Context or attached ipc Contexts
248  * @details this call does not block - return false when buffer is full
249  * This function is threadsafe, which means you can call it anywhere in the code
250  *
251  * @param m message
252  * @tparam Message type
253  * @return true if send successfully
254  */
255  template <typename Message>
256  bool trySend(Message&& m) {
257  using M = typename std::remove_reference<Message>::type;
258  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
259  , "message too big");
260  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
261  HMBDC_THROW(std::out_of_range, "message too big");
262  }
263  return buffer_.tryPut(MessageWrap<M>(std::forward<Message>(m)));
264  }
265 
266  /**
267  * @brief send a message to all Clients in the Context or attached ipc Contexts
268  * @details construct the Message in buffer directly
269  * This function is threadsafe, which means you can call it anywhere in the code
270  *
271  * @param args ctor args
272  * @tparam Message type
273  * @tparam typename ... Args args
274  */
275  template <typename Message, typename ... Args>
276  void sendInPlace(Args&&... args) {
277  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
278  , "message too big");
279  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
280  HMBDC_THROW(std::out_of_range, "message too big");
281  }
282  buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
283  }
284 
285  /**
286  * @brief accessor - mostly used internally
287  * @return underlying buffer used in the Context
288  */
289  Buffer& buffer() {
290  return buffer_;
291  }
292 
293 
294 protected:
295  ThreadCommBase(uint32_t messageQueueSizePower2Num
296  , size_t maxMessageSizeRuntime = MAX_MESSAGE_SIZE
297  , char const* shmName = nullptr)
298  : allocator_(shmName
299  , Buffer::footprint(maxMessageSizeRuntime + 8u
300  , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
301  )
302  , bufferptr_(allocator_.template allocate<Buffer>(1
303  , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
304  , allocator_)
305  )
306  , buffer_(*bufferptr_) {
307  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
308  HMBDC_THROW(std::out_of_range
309  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
310  }
311  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
312  primeBuffer<cpa::create_ipc && cpa::has_pool>();
313  if (cpa::create_ipc || cpa::attach_ipc) {
314  sleep(2);
315  }
316  }
317 
318  ~ThreadCommBase() {
319  allocator_.unallocate(bufferptr_);
320  }
321 
322  static
323  void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
324  // does not apply
325  }
326 
327  template <typename BroadCastBuf>
328  static
329  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
330  for (uint16_t i = poolThreadCount;
331  i < BroadCastBuf::max_parallel_consumer;
332  ++i) {
333  buffer.markDead(i);
334  }
335  }
336  Allocator allocator_;
337  Buffer* HMBDC_RESTRICT bufferptr_;
338  Buffer& HMBDC_RESTRICT buffer_;
339 
340 private:
341  template <bool doIt>
342  typename enable_if<doIt, void>::type
343  primeBuffer() {
344  markDeadFrom(buffer_, 0);
345  }
346 
347  template <bool doIt>
348  typename enable_if<!doIt, void>::type
349  primeBuffer() {
350  }
351 
352  template <typename M, typename... Messages>
353  void sendRecursive(typename Buffer::iterator it
354  , M&& msg, Messages&&... msgs) {
355  using Message = typename std::remove_reference<M>::type;
356  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
357  , "message too big");
358  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
359  HMBDC_THROW(std::out_of_range, "message too big");
360  }
361  new (*it) MessageWrap<Message>(msg);
362  sendRecursive(++it, std::forward<M>(msgs)...);
363  }
364  void sendRecursive(typename Buffer::iterator) {}
365 
366  size_t maxMessageSizeRuntime_;
367 };
368 
369 } //context_detail
370 
371 /**
372  * @example hmbdc.cpp
373  * @example server-cluster.cpp
374  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
375  * are in direct mode. Pool related interfaces are turned off in compile time
376  */
377 
378 /**
379  * @class Context<>
380  * @brief A Context is like a media object that facilitates the communications
381  * for the Clients that it is holding.
382  * a Client can only be added to (or started within) once to a single Context,
383  * undefined behavior otherwise.
384  * the communication model is determined by the context_property
385  * by default it is in the nature of broadcast fashion within local process indicating
386  * by broadcast<>
387  *
388  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
389  * a Client running in such a Context can either run in the pool mode or a direct mode
390  * (which means the Client has its own dedicated OS thread)
391  * direct mode provides faster responses, and pool mode provides more flexibility.
392  * It is recommended that the total number of threads (pool threads + direct threads)
393  * not exceeding the number of avalibale cores.
394  * @tparam MaxMessageSize What is the max message size if known
395  * at compile time(compile time sized);
396  * if the value can only be determined at runtime (run time sized), set this to 0.
397  * Things can still work but will lost some compile time checking advantages,
398  * see maxMessageSizeRuntime below
399  * @tparam ContextProperties see context_property namespace
400  */
401 template <size_t MaxMessageSize = 0, typename... ContextProperties>
402 struct Context
403 : context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...> {
404  using Base = context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...>;
405  using Buffer = typename Base::Buffer;
406  using cpa = typename Base::cpa;
408  /**
409  * @brief ctor for construct local non-ipc Context
410  * @details won't compile if calling it for ipc Context
411  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024 (messages, not bytes)
412  * @param maxPoolClientCount up to how many Clients the pool is suppose to support, only used when
413  * pool supported in the Context with broadcast property
414  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
415  */
416  Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
417  , size_t maxPoolClientCount = MaxMessageSize?128:0
418  , size_t maxMessageSizeRuntime = MaxMessageSize)
419  : Base(messageQueueSizePower2Num, maxMessageSizeRuntime)
420  , startToBeContinued_(true)
421  , usedHmbdcCapacity_(0)
422  , currentThreadSerialNumber_(0)
423  , stopped_(false)
424  , pool_(createPool(this->buffer_, maxPoolClientCount))
425  , poolThreadCount_(0) {
426  static_assert(!cpa::create_ipc && !cpa::attach_ipc
427  , "no name specified for ipc Context");
428  }
429 
430  /**
431  * @brief ctor for construct local ipc Context
432  * @details won't compile if calling it for local non-ipc Context
433  *
434  * @param ipcTransportName the id to identify an ipc transport that supports
435  * a group of attached together Contexts and their Clients
436  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
437  * 1024 (messages, not bytes)
438  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
439  * only used when pool supported in the Context with broadcast property
440  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
441  * @param purgerCpuAffinityMask which cores to run the low profile (sleep mostly)
442  * thread in charge of purging crashed Clients.
443  * Used only for ipc_creator Contexts.
444  */
445  Context(char const* ipcTransportName
446  , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
447  , size_t maxPoolClientCount = MaxMessageSize?128:0
448  , size_t maxMessageSizeRuntime = MaxMessageSize
449  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful)
450  : Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName)
451  , startToBeContinued_(true)
452  , usedHmbdcCapacity_(0)
453  , currentThreadSerialNumber_(0)
454  , stopped_(false)
455  , pool_(createPool(this->buffer_, maxPoolClientCount))
456  , poolThreadCount_(0)
457  , secondsBetweenPurge_(60)
458  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
459  static_assert(cpa::create_ipc || cpa::attach_ipc
460  , "ctor can only be used with ipc turned on Context");
461  static_assert(!(cpa::create_ipc && cpa::attach_ipc)
462  , "Context cannot be both ipc_creator and ipc_attacher");
463  }
464 
465  /**
466  * @brief dtor
467  * @details if this Context owns ipc transport, notify all attached processes
468  * that read from it that this tranport is dead
469  */
471  if (cpa::create_ipc) {
472  this->markDeadFrom(this->buffer_, 0);
473  }
474  stop();
475  join();
476  }
477 
478  /**
479  * @brief add a client to Context's pool - the Client is run in pool mode
480  * @details if pool is already started, the client is to get current Messages immediatly
481  * - might miss older messages.
482  * if the pool not started yet, the Client does not get messages or other callbacks until
483  * the Pool starts.
484  * This function is threadsafe, which means you can call it anywhere in the code
485  * @tparam Client client type
486  * @param client to be added into the Pool
487  * @param poolThreadAffinityIn pool is powered by a number of threads
488  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
489  * it is possible to have a Client to use just some of the threads in the Pool
490  * - default to use all.
491  *
492  */
493  template <typename Client>
494  void addToPool(Client &client
495  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
496  static_assert(cpa::has_pool, "pool is not support in the Context type");
497  if (std::is_base_of<single_thread_powered_client, Client>::value
498  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
499  && poolThreadCount_ != 1) {
500  HMBDC_THROW(std::out_of_range
501  , "cannot add a single thread powered client to the non-single thread powered pool"
502  );
503  }
504  pool_->addConsumer(client, poolThreadAffinityIn);
505 
506  }
507 
508  /**
509  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
510  * @details if pool is already started, the client is to get current Messages immediatly
511  * - might miss older messages.
512  * if the pool not started yet, the Client does not get messages or other callbacks until
513  * the Pool starts.
514  * This function is threadsafe, which means you can call it anywhere in the code
515  * @tparam Client client type
516  * @param client to be added into the Pool
517  * @param poolThreadAffinityIn pool is powered by a number of threads
518  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
519  * it is possible to have a Client to use just some of the threads in the Pool
520  * - default to use all.
521  * @param args more client and poolThreadAffinityIn pairs can follow
522  */
523  template <typename Client, typename ... Args>
524  void addToPool(Client &client
525  , uint64_t poolThreadAffinityIn, Args&& ...args) {
526  addToPool(client, poolThreadAffinityIn);
527  addToPool(std::forward<Args>(args)...);
528  }
529 
530  /**
531  * @brief return the numebr of clients added into pool
532  * @details the number could change since the clients could be added in another thread
533  * @return client count
534  */
535  size_t clientCountInPool() const {
536  static_assert(cpa::has_pool, "pool is not support in the Context type");
537  return pool_->consumerSize();
538  }
539 
540  /**
541  * @brief how many parallel consummers are started
542  * @details the dynamic value could change after the call returns
543  * see max_parallel_consumer Context property
544  * @return how many parallel consummers are started
545  */
546  size_t parallelConsumerAlive() const {
547  return this->buffer_.parallelConsumerAlive();
548  }
549  /**
550  * @brief start the context and specify its Pool and direct Clients
551  * @details doesn't compile if the Context does not support a Pool.
552  * Usage example:
553  *
554  * @code
555  * // the following starts the pool powered by 3 threads that are affinitied to
556  * // the lower 8 cores; client0 affinitied to 4th core and client1 affinitied to 5th core
557  * // the last true value inidicates there will be more direct mode clients to start
558  * // and messages are kept for those coming later (this only works for broadcast Context
559  * // not applicable to partition Context).
560  * start(3, 0xfful, client0, 0x8ul, client1, 0x10ul, true);
561  * @endcode
562  *
563  * @tparam typename ...Args types
564  *
565  * @param poolThreadCount how many threads to power the Pool, 0 means no pool
566  * @param poolThreadsCpuAffinityMask which cores, the pool threads to run on
567  * @param args pairs of direct mode Client and its cpuAffinity;
568  * optionally followed by a bool as the last arg (default false), false to indicate there is
569  * no more direct mode Client to start. (the bool is not aplicable for IPC Context types)
570  * If a cpuAffinity is 0, the Client's affinity rotates
571  * to one of the cores in the system.
572  *
573  * see in example hmbdc.cpp @snippet hmbdc.cpp start pool then client
574  */
575  template <typename ...Args>
576  void
577  start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
578  , Args&& ... args) {
579  startWithContextProperty<cpa>(poolThreadCount, poolThreadsCpuAffinityMask
580  , std::forward<Args>(args) ...);
581  }
582 
583  /**
584  * @brief start the context (without its Pool) and direct Clients
585  * @details for example
586  *
587  * @code
588  * // the following starts client0 affinitied to 4th core and client1 affinitied to 5th core
589  * // AND there is no more direct mode clients to start (since no bool at the end)
590  * start(client0, 0x8ul, client1, 0x10ul);
591  * @endcode
592  *
593  * @tparam typename ...Args types
594  *
595  * @param args pairs of direct mode Client and its cpuAffinity;
596  * optionally followed by a bool as the last arg (default false), false to indicate there is
597  * no more direct mode Client to start. (the bool is not aplicable for IPC Context types)
598  * If a cpuAffinity is 0, the Client's affinity rotates
599  * to one of the cores in the system
600  *
601  * see in example server-cluster.cpp @snippet server-cluster.cpp start multiple direct Clients
602  */
603  template <typename Client, typename ...Args>
604  typename std::enable_if<!std::is_integral<Client>::value, void>::type
605  start(Client& c, uint64_t cpuAffinity, Args&& ... args) {
606  startWithContextProperty<cpa>(c, cpuAffinity, std::forward<Args>(args) ...);
607  }
608 
609  /**
610  * @brief tell hmbdc that there is no more direct mode Client to start
611  */
612  void start() {
613  startWithContextProperty<cpa>();
614  }
615 
616  /**
617  * @brief stop the message dispatching - asynchronously
618  * @details asynchronously means not garanteed message dispatching
619  * stops immidiately after this non-blocking call
620  */
621  void
622  stop() {
623  stopWithContextProperty<cpa>();
624  }
625 
626  /**
627  * @brief wait until all threads (Pool threads too if apply) of the Context exit
628  * @details blocking call
629  */
630  void
631  join() {
632  joinWithContextProperty<cpa>();
633  }
634 
635  /**
636  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
637  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
638  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
639  * @details If some Client are known to
640  * take long to process messages, increase it. If you need to remove slow Clients quickly
641  * reduce it.
642  * Only effective for ipc_creator Context.
643  *
644  * @param s seconds
645  */
646  void
648  secondsBetweenPurge_ = s;
649  }
650 
651  /**
652  * @brief normally not used until you want to run your own message loop
653  * @details call this function frequently to pump hmbdc message loop in its pool
654  *
655  * @param threadSerialNumber starting from 0, indicate which thread in the pool
656  * is powering the loop
657  */
658  void
659  runPoolThreadOnce(uint16_t threadSerialNumber) {
660  static_assert(cpa::has_pool, "pool is not support in the Context type");
661  pool_->runOnce(threadSerialNumber);
662  }
663 
664  /**
665  * @brief normally not used until you want to run your own message loop
666  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
667  *
668  * @param threadSerialNumber indicate which thread is powering the loop
669  * @param c the Client
670  */
671  template <typename Client>
672  void runClientThreadOnce(uint16_t threadSerialNumber, Client& c) {
673  c.messageDispatchingStarted(threadSerialNumber); //lower level ensures executing only once
674  context_detail::runOnceImpl(threadSerialNumber, stopped_
675  , this->buffer_, c);
676  }
677 
678 private:
679  template <typename Buffer>
680  static
681  typename Pool::ptr
682  createPool(Buffer& buffer, size_t maxPoolClientCount) {
683  return Pool::create(buffer, maxPoolClientCount);
684  }
685 
686  static
687  typename Pool::ptr
688  createPool(pattern::MonoLockFreeBuffer&, size_t) {
689  return typename Pool::ptr();
690  }
691 
692  template <typename cpa>
693  typename std::enable_if<cpa::has_pool, void>::type
694  stopWithContextProperty() {
695  if (pool_) pool_->stop();
696  __sync_synchronize();
697  stopped_ = true;
698  }
699 
700  template <typename cpa>
701  typename std::enable_if<!cpa::has_pool, void>::type
702  stopWithContextProperty() {
703  __sync_synchronize();
704  stopped_ = true;
705  }
706 
707  template <typename cpa>
708  typename std::enable_if<cpa::has_pool, void>::type
709  joinWithContextProperty() {
710  if (pool_) pool_->join();
711  for (auto& t : threads_) {
712  t.join();
713  }
714  threads_.clear();
715  }
716 
717  template <typename cpa>
718  typename std::enable_if<!cpa::has_pool, void>::type
719  joinWithContextProperty() {
720  for (auto& t : threads_) {
721  t.join();
722  }
723  threads_.clear();
724  }
725 
726 
727  template <typename cpa, typename ...Args>
728  typename std::enable_if<!cpa::attach_ipc && !cpa::create_ipc, void>::type
729  startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
730  , Args&& ... args) {
731  static_assert(cpa::has_pool, "pool is not support in the Context type");
732  poolThreadCount_ = poolThreadCount;
733  if (!startToBeContinued_) {
734  HMBDC_THROW(std::runtime_error
735  , "Exception: conflicting with previously indicated start completed!");
736  }
737  if (pool_) {
738  pool_->start(poolThreadCount, poolThreadsCpuAffinityMask, false);
739  }
740  usedHmbdcCapacity_ = poolThreadCount;
741  currentThreadSerialNumber_ = usedHmbdcCapacity_;
742  startLocalClients(std::forward<Args>(args) ...);
743  }
744 
745  template <typename cpa, typename Client, typename ...Args>
746  typename std::enable_if<!std::is_integral<Client>::value && !cpa::create_ipc && !cpa::attach_ipc, void>::type
747  startWithContextProperty(Client& c, uint64_t cpuAffinity, Args&& ... args) {
748  if (!startToBeContinued_) {
749  HMBDC_THROW(std::runtime_error
750  , "Exception: conflicting with previously indicated start completed!");
751  }
752  startLocalClients(c, cpuAffinity, std::forward<Args>(args) ...);
753  }
754 
755  template <typename cpa>
756  typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
757  startWithContextProperty() {
758  startLocalClients(false);
759  }
760 
761  template <typename cpa>
762  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
763  startWithContextProperty() {
764  startBroadcastIpcClients();
765  }
766 
767  template <typename cpa, typename ...Args>
768  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
769  startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
770  , Args&& ... args) {
771  using namespace std;
772  if (poolThreadCount_) {
773  HMBDC_THROW(std::out_of_range, "Context pool already started");
774  }
775  auto& lock = this->allocator_.fileLock();
776  std::lock_guard<decltype(lock)> g(lock);
777  auto slots = this->buffer_.unusedConsumerIndexes();
778  if (slots.size() < sizeof...(args) / 2) {
779  HMBDC_THROW(std::out_of_range
780  , "Context instance support Client count = " << slots.size());
781  }
782  poolThreadCount_ = poolThreadCount;
783  pool_->startThruRecycling(poolThreadCount, poolThreadsCpuAffinityMask);
784  currentThreadSerialNumber_ = poolThreadCount_;
785  startBroadcastIpcClients(std::forward<Args>(args) ...);
786  }
787 
788  template <typename cpa, typename Client, typename ...Args>
789  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool && !std::is_integral<Client>::value
790  , void>::type
791  startWithContextProperty(Client& c, uint64_t cpuAffinity
792  , Args&& ... args) {
793  auto& lock = this->allocator_.fileLock();
794  std::lock_guard<decltype(lock)> g(lock);
795 
796  auto clientParticipateInMessaging =
797  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
798  if (clientParticipateInMessaging) {
799  auto slots = this->buffer_.unusedConsumerIndexes();
800  if (slots.size() < 1u + sizeof...(args) / 2) {
801  HMBDC_THROW(std::out_of_range
802  , "Context instance support Client count = " << slots.size());
803  }
804  }
805  startBroadcastIpcClients(c, cpuAffinity, std::forward<Args>(args) ...);
806  }
807 
808  template <typename cpa, typename ...Args>
809  typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && !cpa::has_pool, void>::type
810  startWithContextProperty(Args&& ... args) {
811  startPartitionIpcClients(std::forward<Args>(args) ...);
812  }
813 
814  void
815  startLocalClients(bool startToBeContinuedFlag = cpa::can_start_anytime) {
816  startToBeContinued_ = startToBeContinuedFlag;
817  if (!startToBeContinued_) {
818  this->markDeadFrom(this->buffer_, usedHmbdcCapacity_);
819  }
820  }
821 
822  template <typename CcClient, typename ...Args>
823  void startLocalClients(CcClient& c, uint64_t mask, Args&&... args) {
824  if (usedHmbdcCapacity_ >= Buffer::max_parallel_consumer
825  && CcClient::REGISTERED_MESSAGE_SIZE) {
826  HMBDC_THROW(std::out_of_range
827  , "messaging participating client number > allowed thread number of "
828  << Buffer::max_parallel_consumer);
829  }
830 
831  auto thrd =
832  kickOffClientThread(c, mask, usedHmbdcCapacity_, currentThreadSerialNumber_);
833  threads_.push_back(move(thrd));
834 
835  auto clientParticipateInMessaging =
836  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE;
837  if (clientParticipateInMessaging) usedHmbdcCapacity_++;
838  currentThreadSerialNumber_++;
839  startLocalClients(std::forward<Args>(args)...);
840  }
841 
842  void startBroadcastIpcClients(){
843  if (cpa::create_ipc && !purger_) {
844  purger_.reset(
845  new utils::StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
846  startBroadcastIpcClients(*purger_, purgerCpuAffinityMask_);
847  }
848  }
849 
850  template <typename Client, typename ...Args>
851  void startBroadcastIpcClients(Client& c, uint64_t mask, Args&&... args) {
852  auto clientParticipateInMessaging =
853  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
854  uint16_t hmbdcNumber = 0xffffu;
855  if (clientParticipateInMessaging) {
856  auto slots = this->buffer_.unusedConsumerIndexes();
857  auto it = slots.begin();
858  hmbdcNumber = *it;
859  this->buffer_.reset(hmbdcNumber);
860  }
861  auto thrd = kickOffClientThread(
862  c, mask, hmbdcNumber, currentThreadSerialNumber_);
863  threads_.push_back(move(thrd));
864  currentThreadSerialNumber_++;
865  startBroadcastIpcClients(std::forward<Args>(args)...);
866  }
867 
868  void startPartitionIpcClients(){}
869 
870  template <typename Client, typename ...Args>
871  void startPartitionIpcClients(Client& c, uint64_t mask, Args&&... args) {
872  auto thrd = kickOffClientThread(
873  c, mask, currentThreadSerialNumber_, currentThreadSerialNumber_);
874  threads_.push_back(move(thrd));
875  currentThreadSerialNumber_++;
876  startPartitionIpcClients(std::forward<Args>(args)...);
877  }
878 
879  template <typename Client>
880  auto kickOffClientThread(
881  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
882  std::thread thrd([
883  this
884  , &c
885  , mask
886  , h=hmbdcNumber
887  , threadSerialNumber
888  ]() {
889  auto hmbdcNumber = h;
890  std::string name;
891  char const* schedule;
892  int priority;
893  auto clientParticipateInMessaging =
894  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
895 
896 
897  if (c.hmbdcName()) {
898  name = c.hmbdcName();
899  } else {
900  if (clientParticipateInMessaging) {
901  name = "hmbdc" + std::to_string(hmbdcNumber);
902  } else {
903  name = "hmbdc-x";
904  }
905  }
906  auto cpuAffinityMask = mask;
907  std::tie(schedule, priority) = c.schedSpec();
908 
909  if (!schedule) schedule = "SCHED_OTHER";
910 
911  if (!mask) {
912  auto cpuCount = std::thread::hardware_concurrency();
913  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
914  }
915 
916  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
917  , schedule, priority);
918 
919  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
920  c.messageDispatchingStartedCb(hmbdcNumber);
921 
922  while(!stopped_ &&
923  context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
924  }
925  if (this->stopped_) c.dropped();
926  if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
927  }
928  );
929 
930  return move(thrd);
931  }
932 
933  Context(Context const&) = delete;
934  Context& operator = (Context const&) = delete;
935  bool startToBeContinued_;
936  uint16_t usedHmbdcCapacity_;
937  uint16_t currentThreadSerialNumber_;
938  bool stopped_;
939  typename Pool::ptr pool_;
940  using Threads = std::vector<std::thread>;
941  Threads threads_;
942  size_t poolThreadCount_;
943  uint32_t secondsBetweenPurge_;
944  uint64_t purgerCpuAffinityMask_;
945  typename std::conditional<cpa::has_pool
946  , std::unique_ptr<utils::StuckClientPurger<Buffer>>, uint32_t
947  >::type purger_;
948 };
949 
950 }}
951 
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:546
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:672
Definition: MonoLockFreeBuffer.hpp:14
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:416
Definition: StuckClientPurger.hpp:11
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:622
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:124
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:631
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:76
Definition: TypedString.hpp:74
enable_if<!std::is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&...msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:169
std::enable_if<!std::is_integral< Client >::value, void >::type start(Client &c, uint64_t cpuAffinity, Args &&...args)
start the context (without its Pool) and direct Clients
Definition: Context.hpp:605
the default vanilla allocate
Definition: Allocators.hpp:116
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:236
Definition: GuardedSingleton.hpp:9
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:577
void sendInPlace(Args &&...args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:276
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:213
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:126
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:116
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:191
enable_if<!std::is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&...msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:150
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: Client.hpp:70
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:53
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is run in pool mode
Definition: Context.hpp:494
Definition: LockFreeBufferT.hpp:18
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:402
void runPoolThreadOnce(uint16_t threadSerialNumber)
normally not used until you want to run your own message loop
Definition: Context.hpp:659
~Context()
dtor
Definition: Context.hpp:470
Definition: Message.hpp:55
void start()
tell hmbdc that there is no more direct mode Client to start
Definition: Context.hpp:612
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful)
ctor for construct local ipc Context
Definition: Context.hpp:445
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:289
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:89
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:524
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:535
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:647
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts
Definition: Context.hpp:256
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:102
Definition: Base.hpp:12
Definition: PoolT.hpp:10