hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/Config.hpp"
5 #include "hmbdc/app/utils/StuckClientPurger.hpp"
6 #include "hmbdc/numeric/BitMath.hpp"
7 #include "hmbdc/os/Allocators.hpp"
8 
9 #include <memory>
10 
11 namespace hmbdc { namespace app {
12 
13 namespace context_property {
14  /**
15  * @brief Context template parameter inidcating each message is
16  * sent to all clients within the Context.
17  * This is the default property of a Context.
18  * @details each message is still subjected to Client's message type
19  * filtering. In the case of ipc Context
20  * it is also sent to all clients in the attached ipc Contexts.
21  * When this Context is specialized using this type, the context normally
22  * works with heterogeneous Clients and all Clients can talk to each
23  * other thru the Context. Load balance among Clients can be achieved by
24  * participating Clients coordinatedly select message to process
25  * In addtion to the direct mode Clients, a Client running pool is supported
26  * with the Context - see pool related functions in Context.
27  * see @example hello-world.cpp @example hmbdc.cpp for usage
28  * There is no hard coded limit on how many Clients can be added into a pool
29  * Also, there is no limit on when you can add a Client into a pool.
30  * @tparam max_parallel_consumer, normally, there isn't a need to specify it.
31  * max thread counts that processes messages
32  * that incudes pool threads plus the count of direct mode Clients that
33  * registers messages within the Context
34  * supported values: 4(default)
35  * 2,8,16,32,64 requires hmbdc licensed
36  */
37  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
38  struct broadcast{
39  static_assert(max_parallel_consumer >= 4u
41  };
42 
43  /**
44  * @brief Context template parameter inidcating each message is
45  * sent to one and only one of the clients within the Context
46  * and its attached ipc Contexts if appllies.
47  * @details each message is still subjected to Client's message type
48  * filtering
49  * When this Context is specialized using this type, the context normally
50  * works with homogeneous Clients to achieve load balance thru threads. No
51  * coordination is needed between Clients.
52  * Only the direct mode Clients are supported, thread pool is NOT supported
53  * by the Context - the pool related functions in Context are also disabled
54  *
55  * There is no limit on how many Clients can be started in the Context.
56  * Also, there is no limit on when you can start a Client in this Context.
57  * see @example server-cluster.cpp for partition Context usage
58  */
59  struct partition{};
60 
61  /**
62  * @brief Context template parameter indicating the Context is ipc enabled and
63  * it can be attached (see ipc_attacher below) to an ipc transport (thru its name).
64  *
65  * @details In addition to the normal Context functions, the Context acts as
66  * the creator (owner) of the named ipc transport.
67  * Since it performs a critical function to purge crushed or
68  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
69  * expected to be running (started) as long as ipc functions.
70  * see @example ipc-market-data-propagate.cpp
71  */
72  struct ipc_creator{};
73 
74  /**
75  * @brief Context template parameter indicating the Context is ipc enabled and
76  * it can attach to an ipc transport(see ipc_attacher below) thru a name.
77  * @details it is very important that the Context is constructed exactly
78  * the same size (see constructor) and type (partition vs broadcast) as the ipc
79  * transport creator specified (ipc_creator Context).
80  * All Contexts attaching to a single ipc transport collectively is subjected to the
81  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
82  *
83  * see @example ipc-market-data-propagate.cpp
84  *
85  */
86  struct ipc_attacher{};
87 }
88 }}
89 
90 #include "hmbdc/app/ContextDetail.hpp"
91 namespace hmbdc { namespace app {
92 
93 using namespace std;
94 using namespace hmbdc::comm;
95 using namespace hmbdc::app::detail;
96 
97 
98 /**
99  * @brief covers the inter-thread and ipc communication fascade
100  * @details this type's interface is exposed thru Context and the type itself is
101  * not directly used by users
102  * see @example perf-base-thru.cpp perf-base-lat.cpp for usage
103  * @tparam MaxMessageSize What is the max message size, need at compile time
104  * if the value can only be determined at runtime, set this to 0. Things can still work
105  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
106  * @tparam ContextProperties see types in context_property namespace
107  */
108 template <size_t MaxMessageSize, typename... ContextProperties>
110  : private context_property_aggregator<ContextProperties...> {
111  using cpa = context_property_aggregator<ContextProperties...>;
112  using Buffer = typename cpa::Buffer;
113  using Allocator = typename cpa::Allocator;
114 
115  enum {
116  MAX_MESSAGE_SIZE = MaxMessageSize,
117  BUFFER_VALUE_SIZE = MaxMessageSize + 8u, //8bytes for wrap
118  };
119 
120  size_t maxMessageSize() const {
121  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
122  return MaxMessageSize;
123  }
124 
125  /**
126  * @brief try send a batch of messages to the Context or attached ipc Contexts
127  * @details only the Clients that handles the Message will get it of course
128  * This function is threadsafe, which means you can call it anywhere in the code
129  *
130  * @param msgs messages
131  * @tparam Messages message types
132  */
133  template <typename M0, typename M1, typename ... Messages>
134  typename enable_if<!is_integral<M1>::value, void>::type
135  send(M0&& m0, M1&& m1, Messages&&... msgs) {
136  auto n = sizeof...(msgs) + 2;
137  auto it = buffer_.claim(n);
138  sendRecursive(it, forward<M0>(m0), forward<M1>(m1), forward<Messages>(msgs)...);
139  buffer_.commit(it, n);
140  }
141 
142  /**
143  * @brief try to send a batch of message to the Context or attached ipc Contexts
144  * @details this call does not block and it is transactional - send all or none
145  * This function is threadsafe, which means you can call it anywhere in the code
146  *
147  * @param msgs messages
148  * @tparam Messages message types
149  *
150  * @return true if send successfully
151  */
152  template <typename M0, typename M1, typename ... Messages>
153  typename enable_if<!is_integral<M1>::value, bool>::type
154  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
155  auto n = sizeof...(msgs) + 2;
156  auto it = buffer_.tryClaim(n);
157  if (it) {
158  sendRecursive(it, forward<M0>(m0), forward<M1>(m1), forward<Messages>(msgs)...);
159  buffer_.commit(it, n);
160  return true;
161  }
162 
163  return false;
164  }
165 
166  /**
167  * @brief send a range of messages to the Context or attached ipc Contexts
168  * @details only the Clients that handles the Message will get it of course
169  * This function is threadsafe, which means you can call it anywhere in the code
170  *
171  * @param begin a forward iterator point at the start of the range
172  * @param n length of the range
173  */
174  template <typename ForwardIt>
175  void
176  send(ForwardIt begin, size_t n) {
177  if (likely(n)) {
178  auto bit = buffer_.claim(n);
179  auto it = bit;
180  for (auto i = 0ul; i < n; i++) {
181  using Message = typename iterator_traits<ForwardIt>::value_type;
182  new (*it++) MessageWrap<Message>(*begin++);
183  }
184  buffer_.commit(bit, n);
185  }
186  }
187 
188  /**
189  * @brief try send a range of messages to the Context or attached ipc Contexts
190  * @details this call does not block and it is transactional - send all or none
191  * This function is threadsafe, which means you can call it anywhere in the code
192  *
193  * @param begin a forward iterator point at the start of the range
194  * @param n length of the range
195  */
196  template <typename ForwardIt>
197  bool
198  trySend(ForwardIt begin, size_t n) {
199  if (likely(n)) {
200  auto bit = buffer_.tryClaim(n);
201  if (unlikely(!bit)) return false;
202  auto it = bit;
203  for (auto i = 0ul; i < n; i++) {
204  using Message = typename iterator_traits<ForwardIt>::value_type;
205  new (*it++) MessageWrap<Message>(*begin++);
206  }
207  buffer_.commit(bit, n);
208  }
209  return true;
210  }
211 
212  /**
213  * @brief send a message to the Context or attached ipc Contexts
214  * @details only the Clients that handles the Message will get it of course
215  * This function is threadsafe, which means you can call it anywhere in the code
216  *
217  * @param m message
218  * @tparam Message type
219  */
220  template <typename Message>
221  void send(Message&& m) {
222  using M = typename remove_reference<Message>::type;
223  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
224  , "message too big");
225  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
226  HMBDC_THROW(out_of_range, "message too big");
227  }
228  buffer_.put(MessageWrap<M>(forward<Message>(m)));
229  }
230 
231  /**
232  * @brief try to send a message to the Context or attached ipc Contexts
233  * @details this call does not block - return false when buffer is full
234  * This function is threadsafe, which means you can call it anywhere in the code
235  *
236  * @param m message
237  * @tparam Message type
238  * @return true if send successfully
239  */
240  template <typename Message>
241  bool trySend(Message&& m) {
242  using M = typename remove_reference<Message>::type;
243  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
244  , "message too big");
245  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
246  HMBDC_THROW(out_of_range, "message too big");
247  }
248  return buffer_.tryPut(MessageWrap<M>(forward<Message>(m)));
249  }
250 
251  /**
252  * @brief send a message to all Clients in the Context or attached ipc Contexts
253  * @details construct the Message in buffer directly
254  * This function is threadsafe, which means you can call it anywhere in the code
255  *
256  * @param args ctor args
257  * @tparam Message type
258  * @tparam typename ... Args args
259  */
260  template <typename Message, typename ... Args>
261  void sendInPlace(Args&&... args) {
262  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
263  , "message too big");
264  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
265  HMBDC_THROW(out_of_range, "message too big");
266  }
267  buffer_.template putInPlace<MessageWrap<Message>>(forward<Args>(args)...);
268  }
269 
270  /**
271  * @brief accessor - mostly used internally
272  * @return underlying buffer used in the Context
273  */
274  Buffer& buffer() {
275  return buffer_;
276  }
277 
278 
279 protected:
280  ThreadCommBase(uint32_t messageQueueSizePower2Num
281  , size_t maxMessageSizeRuntime = MAX_MESSAGE_SIZE
282  , char const* shmName = nullptr)
283  : allocator_(shmName
284  , Buffer::footprint(maxMessageSizeRuntime + 8u
285  , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
286  )
287  , bufferptr_(allocator_.template allocate<Buffer>(1
288  , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
289  , allocator_)
290  )
291  , buffer_(*bufferptr_) {
292  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
293  HMBDC_THROW(out_of_range
294  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
295  }
296  if (maxMessageSizeRuntime == 0) {
297  HMBDC_THROW(out_of_range, "please set maxMessageSizeRuntime");
298  }
299  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
300  primeBuffer<cpa::create_ipc && cpa::has_pool>();
301  if (cpa::create_ipc || cpa::attach_ipc) {
302  sleep(2);
303  }
304  }
305 
306  ~ThreadCommBase() {
307  allocator_.unallocate(bufferptr_);
308  }
309 
310  static
311  void markDeadFrom(hmbdc::pattern::MonoLockFreeBuffer& buffer, uint16_t) {
312  // does not apply
313  }
314 
315  template <typename BroadCastBuf>
316  static
317  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
318  for (uint16_t i = poolThreadCount;
319  i < BroadCastBuf::max_parallel_consumer;
320  ++i) {
321  buffer.markDead(i);
322  }
323  }
324  Allocator allocator_;
325  Buffer* __restrict__ bufferptr_;
326  Buffer& __restrict__ buffer_;
327 
328 private:
329  template <bool doIt>
330  typename enable_if<doIt, void>::type
331  primeBuffer() {
332  markDeadFrom(buffer_, 0);
333  }
334 
335  template <bool doIt>
336  typename enable_if<!doIt, void>::type
337  primeBuffer() {
338  }
339 
340  template <typename M, typename... Messages>
341  void sendRecursive(typename Buffer::iterator it
342  , M&& msg, Messages&&... msgs) {
343  using Message = typename remove_reference<M>::type;
344  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
345  , "message too big");
346  if (unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
347  HMBDC_THROW(out_of_range, "message too big");
348  }
349  new (*it) MessageWrap<Message>(msg);
350  sendRecursive(++it, forward<M>(msgs)...);
351  }
352  void sendRecursive(typename Buffer::iterator) {}
353 
354  size_t maxMessageSizeRuntime_;
355 };
356 
357 
358 /**
359  * @brief A Context is like a media object that facilitates the communications
360  * for the Clients that it is holding.
361  * a Client can only be added to (or started within) once to a single Context,
362  * undefined behavior otherwise.
363  * the communication model is determined by the context_property
364  * by default it is in the nature of broadcast fashion within local process indicating
365  * by broadcast<>
366  *
367  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
368  * a Client running in such a Context can either run in the pool mode or a direct mode
369  * (which means the Client has its own dedicated OS thread)
370  * direct mode provides faster responses, and pool mode provides more flexibility.
371  * It is recommended that the total number of threads (pool threads + direct threads)
372  * not exceeding the number of avalibale cores.
373  * see @example hmbdc.cpp for broadcast Context usage
374  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
375  * are in direct mode. Pool related interfaces are turned off in compile time
376  * @tparam MaxMessageSize What is the max message size if known
377  * at compile time(compile time sized);
378  * if the value can only be determined at runtime (run time sized), set this to 0.
379  * Things can still work but will lost some compile time checking advantages,
380  * see maxMessageSizeRuntime below
381  * @tparam ContextProperties see context_property namespace
382  */
383 template <size_t MaxMessageSize = 0, typename... ContextProperties>
384 struct Context
385 : ThreadCommBase<MaxMessageSize, ContextProperties...> {
386  using Base = ThreadCommBase<MaxMessageSize, ContextProperties...>;
387  using Buffer = typename Base::Buffer;
388  using cpa = typename Base::cpa;
389  using Pool = PoolT<Buffer>;
390  /**
391  * @brief ctor for construct local non-ipc Context
392  * @details won't compile if calling it for ipc Context
393  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024 (messages, not bytes)
394  * @param maxPoolClientCount up to how many Clients the pool is suppose to support, only used when
395  * pool supported in the Context with broadcast property
396  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
397  */
398  Context(uint32_t messageQueueSizePower2Num = 20
399  , size_t maxPoolClientCount = 128
400  , size_t maxMessageSizeRuntime = MaxMessageSize)
401  : ThreadCommBase<MaxMessageSize, ContextProperties...>(
402  messageQueueSizePower2Num, maxMessageSizeRuntime)
403  , startToBeContinued_(true)
404  , usedHmbdcCapacity_(0)
405  , currentThreadSerialNumber_(0)
406  , stopped_(false)
407  , pool_(createPool(this->buffer_, maxPoolClientCount))
408  , poolThreadCount_(0) {
409  static_assert(!cpa::create_ipc && !cpa::attach_ipc
410  , "no name specified for ipc Context");
411  }
412 
413  /**
414  * @brief ctor for construct local ipc Context
415  * @details won't compile if calling it for local non-ipc Context
416  *
417  * @ipcTransportName the id to identify an ipc transport that supports
418  * a group of attached together Contexts and their Clients
419  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
420  * 1024 (messages, not bytes)
421  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
422  * only used when pool supported in the Context with broadcast property
423  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
424  * @param purgerCpuAffinityMask which cores to run the low profile (sleep mostly)
425  * thread in charge of purging crashed Clients.
426  * Used only for ipc_creator Contexts. see @example ipc-market-data-propagate.cpp
427  */
428  Context(char const* ipcTransportName
429  , uint32_t messageQueueSizePower2Num = 20
430  , size_t maxPoolClientCount = 128
431  , size_t maxMessageSizeRuntime = MaxMessageSize
432  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful)
433  : ThreadCommBase<MaxMessageSize, ContextProperties...>(
434  messageQueueSizePower2Num, maxMessageSizeRuntime
435  , ipcTransportName)
436  , startToBeContinued_(true)
437  , usedHmbdcCapacity_(0)
438  , currentThreadSerialNumber_(0)
439  , stopped_(false)
440  , pool_(createPool(this->buffer_, maxPoolClientCount))
441  , poolThreadCount_(0)
442  , secondsBetweenPurge_(60)
443  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
444  static_assert(cpa::create_ipc || cpa::attach_ipc
445  , "ctor can only be used with ipc turned on Context");
446  static_assert(!(cpa::create_ipc && cpa::attach_ipc)
447  , "Context cannot be both ipc_creator and ipc_attacher");
448  }
449 
450  /**
451  * @brief dtor
452  * @details if this Context owns ipc transport, notify all attached processes
453  * that read from it that this tranport is dead
454  */
456  if (cpa::create_ipc) {
457  this->markDeadFrom(this->buffer_, 0);
458  }
459  }
460 
461  /**
462  * @brief add a client to Context's pool - the Client is running in pool mode
463  * @details if pool is already started, the client is to get current Messages immediatly
464  * - might miss older messages.
465  * if the pool not started yet, the Client does not get messages or other callbacks until
466  * the Pool starts.
467  * This function is threadsafe, which means you can call it anywhere in the code
468  * @tparam Client client type
469  * @param client to be added into the Pool
470  * @param poolThreadAffinityIn pool is powered by a number of threads
471  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
472  * it is possible to have a Client to use just some of the threads in the Pool
473  * - default to use all.
474  *
475  */
476  template <typename Client>
477  void addToPool(Client &client
478  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
479  static_assert(cpa::has_pool, "pool is not support in the Context type");
480  if (is_base_of<single_thread_powered_client, Client>::value
481  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
482  && poolThreadCount_ != 1) {
483  HMBDC_THROW(out_of_range
484  , "cannot add a single thread powered client to the non-single thread powered pool"
485  );
486  }
487  pool_->addConsumer(client, poolThreadAffinityIn);
488 
489  }
490 
491  /**
492  * @brief add a bunch of clients to Context's pool - the Clients are running in pool mode
493  * @details if pool is already started, the client is to get current Messages immediatly
494  * - might miss older messages.
495  * if the pool not started yet, the Client does not get messages or other callbacks until
496  * the Pool starts.
497  * This function is threadsafe, which means you can call it anywhere in the code
498  * @tparam Client client type
499  * @param client to be added into the Pool
500  * @param poolThreadAffinityIn pool is powered by a number of threads
501  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
502  * it is possible to have a Client to use just some of the threads in the Pool
503  * - default to use all.
504  * @param args more client and poolThreadAffinityIn pairs can follow
505  */
506  template <typename Client, typename ... Args>
507  void addToPool(Client &client
508  , uint64_t poolThreadAffinityIn, Args&& ...args) {
509  addToPool(client, poolThreadAffinityIn);
510  addToPool(forward<Args>(args)...);
511  }
512 
513  /**
514  * @brief return the numebr of clients added into pool
515  * @details the number could change since the clients could be added in another thread
516  * @return client count
517  */
518  size_t clientCountInPool() const {
519  static_assert(cpa::has_pool, "pool is not support in the Context type");
520  return pool_->consumerSize();
521  }
522 
523  /**
524  * @brief how many parallel consummers are started
525  * @details the dynamic value could change after the call returns
526  * see max_parallel_consumer Context property
527  * @return how many parallel consummers are started
528  */
529  size_t parallelConsumerAlive() const {
530  return this->buffer_.parallelConsumerAlive();
531  }
532  /**
533  * @brief start the context and specify its Pool and direct Clients
534  * @details doesn't compile if the Context does not support a Pool.
535  * Usage example: the following starts the pool powered by 3 threads that are affinitied to
536  * the lower 8 cores; client0 affinitied to 4th core and client1 affinitied to 5th core
537  * the last true value inidicates there will be more direct mode clients to start
538  * and messages are kept for those coming later (this only works for broadcast Context
539  * not applicable to partition Context).
540  * start(3, 0xfful, client0, 0x8ul, client1, 0x10ul, true);
541  *
542  * @tparam typename ...Args types
543  *
544  * @param poolThreadCount how many threads to power the Pool, 0 means no pool
545  * @param poolThreadsCpuAffinityMask which cores, the pool threads to run on
546  * @param args pairs of direct mode Client and its cpuAffinity;
547  * optionally followed by a bool as the last arg (default false), false to indicate there is
548  * no more direct mode Client to start. If a cpuAffinity is 0, the Client's affinity rotates
549  * to one of the cores in the system
550  */
551  template <typename ...Args>
552  void
553  start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
554  , Args&& ... args) {
555  startWithContextProperty<cpa>(poolThreadCount, poolThreadsCpuAffinityMask
556  , forward<Args>(args) ...);
557  }
558 
559  /**
560  * @brief start the context (without its Pool) and direct Clients
561  * @details the following starts client0 affinitied to 4th core and client1 affinitied to 5th core
562  * AND there is no more direct mode clients to start (since no bool at the end)
563  * start(client0, 0x8ul, client1, 0x10ul);
564  *
565  * @tparam typename ...Args types
566  *
567  * @param args pairs of direct mode Client and its cpuAffinity;
568  * optionally followed by a bool as the last arg (default false), false to indicate there is
569  * no more direct mode Client to start. If a cpuAffinity is 0, the Client's affinity rotates
570  * to one of the cores in the system
571  */
572  template <typename Client, typename ...Args>
573  typename enable_if<!is_integral<Client>::value, void>::type
574  start(Client& c, uint64_t cpuAffinity, Args&& ... args) {
575  startWithContextProperty<cpa>(c, cpuAffinity, forward<Args>(args) ...);
576  }
577 
578  void start() {
579  startWithContextProperty<cpa>();
580  }
581 
582  /**
583  * @brief stop the message dispatching - asynchronously
584  * @details asynchronously means not garanteed message dispatching
585  * stops immidiately after this non-blocking call
586  */
587  void
588  stop() {
589  stopWithContextProperty<cpa>();
590  }
591 
592  /**
593  * @brief wait until all threads (Pool threads too if apply) of the Context exit
594  * @details blocking call
595  */
596  void
597  join() {
598  joinWithContextProperty<cpa>();
599  }
600 
601  /**
602  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
603  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
604  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
605  * @details If some Client are known to
606  * take long to process messages, increase it. If you need to remove slow Clients quickly
607  * reduce it.
608  * Only effective for ipc_creator Context.
609  *
610  * @param s seconds
611  */
612  void
614  secondsBetweenPurge_ = s;
615  }
616 
617  /**
618  * @brief normally not used until you want to run your own message loop
619  * @details call this function frequently to pump hmbdc message loop in its pool
620  *
621  * @param threadSerialNumber starting from 0, indicate which thread in the pool
622  * is powering the loop
623  */
624  void
625  runPoolThreadOnce(uint16_t threadSerialNumber) {
626  static_assert(cpa::has_pool, "pool is not support in the Context type");
627  pool_->runOnce(threadSerialNumber);
628  }
629 
630  /**
631  * @brief normally not used until you want to run your own message loop
632  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
633  *
634  * @param threadSerialNumber indicate which thread is powering the loop
635  * @param c the Client
636  */
637  template <typename Client>
638  void runClientThreadOnce(uint16_t threadSerialNumber, Client& c) {
639  c.messageDispatchingStarted(threadSerialNumber); //lower level ensures executing only once
640  detail::runOnceImpl(threadSerialNumber, stopped_
641  , this->buffer_, c);
642  }
643 
644 private:
645  template <typename Buffer>
646  static
647  shared_ptr<Pool>
648  createPool(Buffer& buffer, size_t maxPoolClientCount) {
649  return Pool::create(buffer, maxPoolClientCount);
650  }
651 
652  static
653  shared_ptr<Pool>
654  createPool(MonoLockFreeBuffer&, size_t) {
655  return shared_ptr<Pool>();
656  }
657 
658  template <typename cpa>
659  typename enable_if<cpa::has_pool, void>::type
660  stopWithContextProperty() {
661  if (pool_) pool_->stop();
662  __sync_synchronize();
663  stopped_ = true;
664  }
665 
666  template <typename cpa>
667  typename enable_if<!cpa::has_pool, void>::type
668  stopWithContextProperty() {
669  __sync_synchronize();
670  stopped_ = true;
671  }
672 
673  template <typename cpa>
674  typename enable_if<cpa::has_pool, void>::type
675  joinWithContextProperty() {
676  if (pool_) pool_->join();
677  for (auto& t : threads_) {
678  t.join();
679  }
680  }
681 
682  template <typename cpa>
683  typename enable_if<!cpa::has_pool, void>::type
684  joinWithContextProperty() {
685  for (auto& t : threads_) {
686  t.join();
687  }
688  }
689 
690 
691  template <typename cpa, typename ...Args>
692  typename enable_if<!cpa::attach_ipc && !cpa::create_ipc, void>::type
693  startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
694  , Args&& ... args) {
695  static_assert(cpa::has_pool, "pool is not support in the Context type");
696  poolThreadCount_ = poolThreadCount;
697  if (!startToBeContinued_) {
698  HMBDC_THROW(runtime_error
699  , "Exception: conflicting with previously indicated start completed!");
700  }
701  if (pool_) {
702  pool_->start(poolThreadCount, poolThreadsCpuAffinityMask, false);
703  }
704  usedHmbdcCapacity_ = poolThreadCount;
705  currentThreadSerialNumber_ = usedHmbdcCapacity_;
706  startLocalClients(forward<Args>(args) ...);
707  }
708 
709  template <typename cpa, typename Client, typename ...Args>
710  typename enable_if<!is_integral<Client>::value && !cpa::create_ipc && !cpa::attach_ipc, void>::type
711  startWithContextProperty(Client& c, uint64_t cpuAffinity, Args&& ... args) {
712  if (!startToBeContinued_) {
713  HMBDC_THROW(runtime_error
714  , "Exception: conflicting with previously indicated start completed!");
715  }
716  startLocalClients(c, cpuAffinity, forward<Args>(args) ...);
717  }
718 
719  template <typename cpa>
720  typename enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
721  startWithContextProperty() {
722  startLocalClients(false);
723  }
724 
725  template <typename cpa>
726  typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
727  startWithContextProperty() {
728  startBroadcastIpcClients();
729  }
730 
731  template <typename cpa, typename ...Args>
732  typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
733  startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
734  , Args&& ... args) {
735  if (poolThreadCount_) {
736  HMBDC_THROW(out_of_range, "Context pool already started");
737  }
738  auto& lock = this->allocator_.fileLock();
739  lock_guard<decltype(lock)> g(lock);
740  auto slots = this->buffer_.unusedConsumerIndexes();
741  if (slots.size() < sizeof...(args) / 2) {
742  HMBDC_THROW(out_of_range
743  , "Context instance support Client count = " << slots.size());
744  }
745  poolThreadCount_ = poolThreadCount;
746  pool_->startThruRecycling(poolThreadCount, poolThreadsCpuAffinityMask);
747  currentThreadSerialNumber_ = poolThreadCount_;
748  startBroadcastIpcClients(forward<Args>(args) ...);
749  }
750 
751  template <typename cpa, typename Client, typename ...Args>
752  typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool && !is_integral<Client>::value
753  , void>::type
754  startWithContextProperty(Client& c, uint64_t cpuAffinity
755  , Args&& ... args) {
756  auto& lock = this->allocator_.fileLock();
757  lock_guard<decltype(lock)> g(lock);
758 
759  auto clientParticipateInMessaging =
760  remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
761  if (clientParticipateInMessaging) {
762  auto slots = this->buffer_.unusedConsumerIndexes();
763  if (slots.size() < 1u + sizeof...(args) / 2) {
764  HMBDC_THROW(out_of_range
765  , "Context instance support Client count = " << slots.size());
766  }
767  }
768  startBroadcastIpcClients(c, cpuAffinity, forward<Args>(args) ...);
769  }
770 
771  template <typename cpa, typename ...Args>
772  typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && !cpa::has_pool, void>::type
773  startWithContextProperty(Args&& ... args) {
774  startPartitionIpcClients(forward<Args>(args) ...);
775  }
776 
777  void
778  startLocalClients(bool startToBeContinuedFlag = cpa::can_start_anytime) {
779  startToBeContinued_ = startToBeContinuedFlag;
780  if (!startToBeContinued_) {
781  this->markDeadFrom(this->buffer_, usedHmbdcCapacity_);
782  this->send(Flush());
783  }
784  }
785 
786  template <typename Client, typename ...Args>
787  void startLocalClients(Client& c, uint64_t mask, Args&&... args) {
788  if (usedHmbdcCapacity_ >= Buffer::max_parallel_consumer
789  && Client::REGISTERED_MESSAGE_SIZE) {
790  HMBDC_THROW(out_of_range
791  , "messaging participating client number > allowed thread number of "
792  << Buffer::max_parallel_consumer);
793  }
794 
795  thread thrd =
796  kickOffClientThread(c, mask, usedHmbdcCapacity_, currentThreadSerialNumber_);
797  threads_.push_back(move(thrd));
798 
799  auto clientParticipateInMessaging =
800  remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
801  if (clientParticipateInMessaging) usedHmbdcCapacity_++;
802  currentThreadSerialNumber_++;
803  startLocalClients(forward<Args>(args)...);
804  }
805 
806  void startBroadcastIpcClients(){
807  if (cpa::create_ipc && !purger_) {
808  purger_.reset(
809  new utils::StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
810  startBroadcastIpcClients(*purger_, purgerCpuAffinityMask_);
811  }
812  }
813 
814  template <typename Client, typename ...Args>
815  void startBroadcastIpcClients(Client& c, uint64_t mask, Args&&... args) {
816  auto clientParticipateInMessaging =
817  remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
818  uint16_t hmbdcNumber = 0xffffu;
819  if (clientParticipateInMessaging) {
820  auto slots = this->buffer_.unusedConsumerIndexes();
821  auto it = slots.begin();
822  hmbdcNumber = *it;
823  this->buffer_.reset(hmbdcNumber);
824  }
825  thread thrd = kickOffClientThread(
826  c, mask, hmbdcNumber, currentThreadSerialNumber_);
827  threads_.push_back(move(thrd));
828  currentThreadSerialNumber_++;
829  startBroadcastIpcClients(forward<Args>(args)...);
830  }
831 
832  void startPartitionIpcClients(){}
833 
834  template <typename Client, typename ...Args>
835  void startPartitionIpcClients(Client& c, uint64_t mask, Args&&... args) {
836  thread thrd = kickOffClientThread(
837  c, mask, currentThreadSerialNumber_, currentThreadSerialNumber_);
838  threads_.push_back(move(thrd));
839  currentThreadSerialNumber_++;
840  startPartitionIpcClients(forward<Args>(args)...);
841  }
842 
843  template <typename Client>
844  thread kickOffClientThread(
845  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
846  thread thrd([
847  this
848  , &c
849  , mask
850  , h=hmbdcNumber
851  , threadSerialNumber
852  ]() {
853  auto hmbdcNumber = h;
854  string name;
855  char const* schedule;
856  int priority;
857  auto clientParticipateInMessaging =
858  remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
859 
860 
861  if (c.hmbdcName()) {
862  name = c.hmbdcName();
863  } else {
864  if (clientParticipateInMessaging) {
865  name = "hmbdc" + to_string(hmbdcNumber);
866  } else {
867  name = "hmbdc-x";
868  }
869  }
870  auto cpuAffinityMask = mask;
871  tie(schedule, priority) = c.schedSpec();
872 
873  if (!schedule) schedule = "SCHED_OTHER";
874 
875  if (!mask) {
876  auto cpuCount = thread::hardware_concurrency();
877  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
878  }
879 
880  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
881  , schedule, priority);
882 
883  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
884  c.messageDispatchingStartedCb(hmbdcNumber);
885 
886  while(!stopped_ &&
887  runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
888  }
889  if (this->stopped_) c.dropped();
890  if (clientParticipateInMessaging) unblock(this->buffer_, hmbdcNumber);
891  }
892  );
893 
894  return move(thrd);
895  }
896 
897  Context(Context const&) = delete;
898  Context& operator = (Context const&) = delete;
899  bool startToBeContinued_;
900  uint16_t usedHmbdcCapacity_;
901  uint16_t currentThreadSerialNumber_;
902  bool stopped_;
903  shared_ptr<Pool> pool_;
904  using Threads = vector<thread>;
905  Threads threads_;
906  size_t poolThreadCount_;
907  uint32_t secondsBetweenPurge_;
908  uint64_t purgerCpuAffinityMask_;
909  typename conditional<cpa::has_pool
910  , unique_ptr<utils::StuckClientPurger<Buffer>>, uint32_t
911  >::type purger_;
912 };
913 }}
914 
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:529
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:638
Definition: MonoLockFreeBuffer.hpp:14
enable_if<!is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&...msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:135
Definition: StuckClientPurger.hpp:12
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:588
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:597
Definition: Context.hpp:59
Definition: TypedString.hpp:74
void sendInPlace(Args &&...args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:261
the default vanilla allocate
Definition: Allocators.hpp:114
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:274
Definition: Client.hpp:13
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:221
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:553
Definition: Message.hpp:89
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:118
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:108
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: Client.hpp:62
Definition: Context.hpp:38
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is running in pool mode
Definition: Context.hpp:477
Definition: LockFreeBufferT.hpp:18
Definition: Context.hpp:384
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:176
void runPoolThreadOnce(uint16_t threadSerialNumber)
normally not used until you want to run your own message loop
Definition: Context.hpp:625
~Context()
dtor
Definition: Context.hpp:455
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:198
Definition: Misc.h:9
Definition: Message.hpp:46
Definition: BitMath.hpp:9
enable_if<!is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&...msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:154
enable_if<!is_integral< Client >::value, void >::type start(Client &c, uint64_t cpuAffinity, Args &&...args)
start the context (without its Pool) and direct Clients
Definition: Context.hpp:574
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts
Definition: Context.hpp:241
Definition: ContextDetail.hpp:26
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are running in pool mode
Definition: Context.hpp:507
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:518
Definition: Client.hpp:39
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:613
Context(uint32_t messageQueueSizePower2Num=20, size_t maxPoolClientCount=128, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:398
Definition: Client.hpp:11
Definition: PoolT.hpp:10
Definition: Context.hpp:109