hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #include "hmbdc/app/StuckClientPurger.hpp"
6 #include "hmbdc/Config.hpp"
7 #include "hmbdc/numeric/BitMath.hpp"
8 
9 #include <memory>
10 #include <vector>
11 #include <list>
12 
13 namespace hmbdc { namespace app {
14 
15 
16 /**
17 * @example hello-world.cpp
18 * @example hmbdc.cpp
19 * @example hmbdc-log.cpp
20 * @example ipc-market-data-propagate.cpp
21 */
22 
23 /**
24  * @namespace hmbdc::app::context_property
25  * contains the trait types that defines how a Context behave and capabilities
26  */
27 namespace context_property {
28  /**
29  * @class broadcast
30  * @brief Context template parameter inidcating each message is
31  * sent to all clients within the Context.
32  * This is the default property of a Context.
33  * @details each message is still subjected to Client's message type
34  * filtering. In the case of ipc Context
35  * it is also sent to all clients in the attached ipc Contexts.
36  * When this Context is specialized using this type, the context normally
37  * works with heterogeneous Clients and all Clients can talk to each
38  * other thru the Context. Load balance among Clients can be achieved by
39  * participating Clients coordinatedly select message to process
40  * In addtion to the direct mode Clients, a Client running pool is supported
41  * with the Context - see pool related functions in Context.
42  *
43  * Implicit usage in hello-world.cpp: @snippet hello-world.cpp broadcast as default
44  * Explicit usage in hmbdc.cpp @snippet hmbdc.cpp explicit using broadcast
45  * There is no hard coded limit on how many Clients can be added into a pool
46  * Also, there is no limit on when you can add a Client into a pool.
47  * @tparam max_parallel_consumer max thread counts that processes messages
48  * that incudes pool threads plus the count of direct mode Clients that
49  * registers messages within the Context
50  * supported values: 4(default)
51  * 2,8,16,32,64 requires hmbdc licensed
52  */
53  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
54  struct broadcast{
55  static_assert(max_parallel_consumer >= 4u
57  };
58 
59  /**
60  * @class partition
61  * @brief Context template parameter inidcating each message is
62  * sent to one and only one of the clients within the Context
63  * and its attached ipc Contexts if appllies.
64  * @details each message is still subjected to Client's message type
65  * filtering
66  * When this Context is specialized using this type, the context normally
67  * works with homogeneous Clients to achieve load balance thru threads. No
68  * coordination is needed between Clients.
69  * Only the direct mode Clients are supported, thread pool is NOT supported
70  * by this kind of Context - the pool related functions in Context are also disabled
71  *
72  * Example in server-cluster.cpp: @snippet server-cluster.cpp declare a partition context
73  */
74  struct partition{};
75 
76  /**
77  * @class msgless_pool
78  * @brief Context template parameter indicating the Context must contain a pool to run Clients
79  * and the Clients in the pool shall not receive messages - Unlike the default pool.
80  * @details msgless_pool performs better when its Clients don't need to receive messages from the Context.
81  * This is useful when the Clients are network transport engines. By default, partition Context
82  * don't come with a pool due to semantic reason, but this Context property enables a pool that
83  * does not deliver messages.
84  */
85  struct msgless_pool{};
86 
87  /**
88  * @class ipc_creator
89  * @brief Context template parameter indicating the Context is ipc enabled and
90  * it can be attached (see ipc_attacher below) to an ipc transport (thru its name).
91  * @details In addition to the normal Context functions, the Context acts as
92  * the creator (owner) of the named ipc transport.
93  * Since it performs a critical function to purge crushed or
94  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
95  * expected to be running (started) as long as ipc functions.
96  * ipc transport uses persistent shared memory and if the dtor of Context is not called
97  * due to crashing, there will be stale shared memory in /dev/shm.
98  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
99  */
100  struct ipc_creator{};
101 
102  /**
103  * @class ipc_attacher
104  * @brief Context template parameter indicating the Context is ipc enabled and
105  * it can attach to an ipc transport thru a name.
106  * @details it is very important that the Context is constructed exactly
107  * the same size (see constructor) and type (partition vs broadcast) as the ipc
108  * transport creator specified (ipc_creator Context).
109  * All Contexts attaching to a single ipc transport collectively are subjected to the
110  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
111  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
112  */
113  struct ipc_attacher{};
114 }
115 }}
116 
117 #include "hmbdc/app/ContextDetail.hpp"
118 namespace hmbdc { namespace app {
119 
120 namespace context_detail {
121 using namespace std;
122 using namespace hmbdc::pattern;
123 
124 /**
125  * @class ThreadCommBase<>
126  * @brief covers the inter-thread and ipc communication fascade
127  * @details this type's interface is exposed thru Context and the type itself is
128  * not directly used by users
129  * @tparam MaxMessageSize What is the max message size, need at compile time
130  * if the value can only be determined at runtime, set this to 0. Things can still work
131  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
132  * @tparam ContextProperties see types in context_property namespace
133  */
134 template <size_t MaxMessageSize, typename... ContextProperties>
136  : private context_detail::context_property_aggregator<ContextProperties...> {
137  using cpa = context_property_aggregator<ContextProperties...>;
138  using Buffer = typename cpa::Buffer;
139  using Allocator = typename cpa::Allocator;
140 
141  enum {
142  MAX_MESSAGE_SIZE = MaxMessageSize,
143  BUFFER_VALUE_SIZE = MaxMessageSize + 8u, //8bytes for wrap
144  };
145 
146  size_t maxMessageSize() const {
147  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
148  return MaxMessageSize;
149  }
150 
151  /**
152  * @brief try send a batch of messages to the Context or attached ipc Contexts
153  * @details only the Clients that handles the Message will get it of course
154  * This function is threadsafe, which means you can call it anywhere in the code
155  *
156  * @param msgs messages
157  * @tparam Messages message types
158  */
159  template <typename M0, typename M1, typename ... Messages>
160  typename std::enable_if<!std::is_integral<M1>::value, void>::type
161  send(M0&& m0, M1&& m1, Messages&&... msgs) {
162  auto n = sizeof...(msgs) + 2;
163  auto it = buffer_.claim(n);
164  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
165  buffer_.commit(it, n);
166  }
167 
168  /**
169  * @brief try to send a batch of message to the Context or attached ipc Contexts
170  * @details this call does not block and it is transactional - send all or none
171  * This function is threadsafe, which means you can call it anywhere in the code
172  *
173  * @param msgs messages
174  * @tparam Messages message types
175  *
176  * @return true if send successfully
177  */
178  template <typename M0, typename M1, typename ... Messages>
179  typename std::enable_if<!std::is_integral<M1>::value, bool>::type
180  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
181  auto n = sizeof...(msgs) + 2;
182  auto it = buffer_.tryClaim(n);
183  if (it) {
184  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
185  buffer_.commit(it, n);
186  return true;
187  }
188 
189  return false;
190  }
191 
192  /**
193  * @brief send a range of messages to the Context or attached ipc Contexts
194  * @details only the Clients that handles the Message will get it of course
195  * This function is threadsafe, which means you can call it anywhere in the code
196  *
197  * @param begin a forward iterator point at the start of the range
198  * @param n length of the range
199  */
200  template <typename ForwardIt>
201  void
202  send(ForwardIt begin, size_t n) {
203  if (hmbdc_likely(n)) {
204  auto bit = buffer_.claim(n);
205  auto it = bit;
206  for (auto i = 0ul; i < n; i++) {
207  using Message = typename iterator_traits<ForwardIt>::value_type;
208  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
209  new (*it++) MessageWrap<Message>(*begin++);
210  }
211  buffer_.commit(bit, n);
212  }
213  }
214 
215  /**
216  * @brief try send a range of messages to the Context or attached ipc Contexts
217  * @details this call does not block and it is transactional - send all or none
218  * This function is threadsafe, which means you can call it anywhere in the code
219  *
220  * @param begin a forward iterator point at the start of the range
221  * @param n length of the range
222  */
223  template <typename ForwardIt>
224  bool
225  trySend(ForwardIt begin, size_t n) {
226  if (hmbdc_likely(n)) {
227  auto bit = buffer_.tryClaim(n);
228  if (hmbdc_unlikely(!bit)) return false;
229  auto it = bit;
230  for (auto i = 0ul; i < n; i++) {
231  using Message = typename iterator_traits<ForwardIt>::value_type;
232  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
233  new (*it++) MessageWrap<Message>(*begin++);
234  }
235  buffer_.commit(bit, n);
236  }
237  return true;
238  }
239 
240  /**
241  * @brief send a message to the Context or attached ipc Contexts
242  * @details only the Clients that handles the Message will get it of course
243  * This function is threadsafe, which means you can call it anywhere in the code
244  *
245  * @param m message
246  * @tparam Message type
247  */
248  template <typename Message>
249  void send(Message&& m) {
250  using M = typename std::remove_reference<Message>::type;
251  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
252  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
253  , "message too big");
254  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
255  HMBDC_THROW(std::out_of_range, "message too big");
256  }
257  buffer_.put(MessageWrap<M>(std::forward<Message>(m)));
258  }
259 
260  /**
261  * @brief try to send a message to the Context or attached ipc Contexts if it wouldn't block
262  * @details this call does not block - return false when buffer is full
263  * This function is threadsafe, which means you can call it anywhere in the code
264  *
265  * @param m message
266  * @tparam Message type
267  * @return true if send successfully
268  */
269  template <typename Message>
270  bool trySend(Message&& m) {
271  using M = typename std::remove_reference<Message>::type;
272  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
273  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
274  , "message too big");
275  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
276  HMBDC_THROW(std::out_of_range, "message too big");
277  }
278  return buffer_.tryPut(MessageWrap<M>(std::forward<Message>(m)));
279  }
280 
281  /**
282  * @brief send a message to all Clients in the Context or attached ipc Contexts
283  * @details construct the Message in buffer directly
284  * This function is threadsafe, which means you can call it anywhere in the code
285  *
286  * @param args ctor args
287  * @tparam Message type
288  * @tparam typename ... Args args
289  */
290  template <typename Message, typename ... Args>
291  void sendInPlace(Args&&... args) {
292  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
293  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
294  , "message too big");
295  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
296  HMBDC_THROW(std::out_of_range, "message too big");
297  }
298  buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
299  }
300 
301 
302  /**
303  * @brief try send a message to all Clients in the Context or attached ipc Contexts if it wouldn't block
304  * @details this call does not block - return false when buffer is full
305  * constructed the Message in buffer directly if returns true
306  * This function is threadsafe, which means you can call it anywhere in the code
307  *
308  * @param args ctor args
309  * @tparam Message type
310  * @tparam typename ... Args args
311  * @return true if send successfully
312  */
313  template <typename Message, typename ... Args>
314  bool trySendInPlace(Args&&... args) {
315  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
316  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
317  , "message too big");
318  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
319  HMBDC_THROW(std::out_of_range, "message too big");
320  }
321  return buffer_.template tryPutInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
322  }
323 
324  /**
325  * @brief accessor - mostly used internally
326  * @return underlying buffer used in the Context
327  */
328  Buffer& buffer() {
329  return buffer_;
330  }
331 
332 
333 protected:
334  ThreadCommBase(uint32_t messageQueueSizePower2Num
335  , size_t maxMessageSizeRuntime
336  , char const* shmName)
337  : allocator_(shmName
338  , Buffer::footprint(maxMessageSizeRuntime + 8u
339  , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
340  )
341  , bufferptr_(allocator_.template allocate<Buffer>(SMP_CACHE_BYTES
342  , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
343  , allocator_)
344  )
345  , buffer_(*bufferptr_) {
346  if (messageQueueSizePower2Num < 2) {
347  HMBDC_THROW(std::out_of_range
348  , "messageQueueSizePower2Num need >= 2");
349  }
350  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
351  HMBDC_THROW(std::out_of_range
352  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
353  }
354  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
355  primeBuffer<(cpa::create_ipc || (!cpa::create_ipc && !cpa::attach_ipc)) && cpa::has_pool>();
356  if (cpa::create_ipc || cpa::attach_ipc) {
357  sleep(2);
358  }
359  }
360 
361  ~ThreadCommBase() {
362  allocator_.unallocate(bufferptr_);
363  }
364 
365  static
366  void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
367  // does not apply
368  }
369 
370  template <typename BroadCastBuf>
371  static
372  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
373  for (uint16_t i = poolThreadCount;
374  i < BroadCastBuf::max_parallel_consumer;
375  ++i) {
376  buffer.markDead(i);
377  }
378  }
379 
380 
381  static
382  void markDead(pattern::MonoLockFreeBuffer& buffer, std::list<uint16_t>slots) {
383  // does not apply
384  }
385 
386  template <typename BroadCastBuf>
387  static
388  void markDead(BroadCastBuf& buffer, std::list<uint16_t>slots) {
389  for (auto s : slots) {
390  buffer.markDead(s);
391  }
392  }
393 
394  Allocator allocator_;
395  Buffer* HMBDC_RESTRICT bufferptr_;
396  Buffer& HMBDC_RESTRICT buffer_;
397 
398 private:
399  template <bool doIt>
400  typename std::enable_if<doIt, void>::type
401  primeBuffer() {
402  markDeadFrom(buffer_, 0);
403  }
404 
405  template <bool doIt>
406  typename std::enable_if<!doIt, void>::type
407  primeBuffer() {
408  }
409 
410  template <typename M, typename... Messages>
411  void sendRecursive(typename Buffer::iterator it
412  , M&& msg, Messages&&... msgs) {
413  using Message = typename std::remove_reference<M>::type;
414  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
415  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
416  , "message too big");
417  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
418  HMBDC_THROW(std::out_of_range, "message too big");
419  }
420  new (*it) MessageWrap<Message>(msg);
421  sendRecursive(++it, std::forward<M>(msgs)...);
422  }
423  void sendRecursive(typename Buffer::iterator) {}
424 
425  size_t maxMessageSizeRuntime_;
426 };
427 
428 } //context_detail
429 
430 /**
431  * @example hmbdc.cpp
432  * @example server-cluster.cpp
433  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
434  * are in direct mode. Pool related interfaces are turned off in compile time
435  */
436 
437 /**
438  * @class Context<>
439  * @brief A Context is like a media object that facilitates the communications
440  * for the Clients that it is holding.
441  * a Client can only be added to (or started within) once to a single Context,
442  * undefined behavior otherwise.
443  * the communication model is determined by the context_property
444  * by default it is in the nature of broadcast fashion within local process indicating
445  * by broadcast<>
446  *
447  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
448  * a Client running in such a Context can either run in the pool mode or a direct mode
449  * (which means the Client has its own dedicated OS thread)
450  * direct mode provides faster responses, and pool mode provides more flexibility.
451  * It is recommended that the total number of threads (pool threads + direct threads)
452  * not exceeding the number of available cores.
453  * @tparam MaxMessageSize What is the max message size if known
454  * at compile time(compile time sized);
455  * if the value can only be determined at runtime (run time sized), set this to 0.
456  * Things can still work but will lost some compile time checking advantages,
457  * see maxMessageSizeRuntime below
458  * @tparam ContextProperties see context_property namespace
459  */
460 template <size_t MaxMessageSize = 0, typename... ContextProperties>
461 struct Context
462 : context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...> {
463  using Base = context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...>;
464  using Buffer = typename Base::Buffer;
465  using cpa = typename Base::cpa;
466  using Pool = typename std::conditional<cpa::pool_msgless
468  , pattern::PoolT<Buffer>>::type;
469  /**
470  * @brief ctor for construct local non-ipc Context
471  * @details won't compile if calling it for ipc Context
472  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024 (messages, not bytes)
473  * @param maxPoolClientCount up to how many Clients the pool is suppose to support, only used when
474  * pool supported in the Context with broadcast property
475  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
476  * @param maxThreadSerialNumber the max number of threads (direct mmode clients plus pool threads)
477  * the context can manage
478  */
479  Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
480  , size_t maxPoolClientCount = MaxMessageSize?128:0
481  , size_t maxMessageSizeRuntime = MaxMessageSize
482  , size_t maxThreadSerialNumber = 64)
483  : Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
484  , maxMessageSizeRuntime, nullptr)
485  , usedHmbdcCapacity_(0)
486  , stopped_(false)
487  , pool_(createPool<cpa>(maxPoolClientCount))
488  , poolThreadCount_(0) {
489  static_assert(!cpa::create_ipc && !cpa::attach_ipc
490  , "no name specified for ipc Context");
491  }
492 
493  /**
494  * @brief ctor for construct local ipc Context
495  * @details won't compile if calling it for local non-ipc Context
496  *
497  * @param ipcTransportName the id to identify an ipc transport that supports
498  * a group of attached together Contexts and their Clients
499  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
500  * 1024 (messages, not bytes)
501  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
502  * only used when pool supported in the Context with broadcast property
503  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
504  * @param purgerCpuAffinityMask which cores to run the low profile (sleep mostly)
505  * thread in charge of purging crashed Clients. Used only for ipc_creator Contexts.
506  * @param maxThreadSerialNumber the max number of threads (direct mmode clients plus pool threads)
507  * the context can manage
508  */
509  Context(char const* ipcTransportName
510  , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
511  , size_t maxPoolClientCount = MaxMessageSize?128:0
512  , size_t maxMessageSizeRuntime = MaxMessageSize
513  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful
514  , size_t maxThreadSerialNumber = 64)
515  : Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName)
516  , usedHmbdcCapacity_(0)
517  , stopped_(false)
518  , pool_(createPool<cpa>(maxPoolClientCount))
519  , poolThreadCount_(0)
520  , secondsBetweenPurge_(60)
521  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
522  static_assert(cpa::create_ipc || cpa::attach_ipc
523  , "ctor can only be used with ipc turned on Context");
524  static_assert(!(cpa::create_ipc && cpa::attach_ipc)
525  , "Context cannot be both ipc_creator and ipc_attacher");
526  }
527 
528  /**
529  * @brief dtor
530  * @details if this Context owns ipc transport, notify all attached processes
531  * that read from it that this tranport is dead
532  */
534  if (cpa::create_ipc) {
535  Base::markDeadFrom(this->buffer_, 0);
536  }
537  stop();
538  join();
539  }
540 
541  /**
542  * @brief add a client to Context's pool - the Client is run in pool mode
543  * @details if pool is already started, the client is to get current Messages immediatly
544  * - might miss older messages.
545  * if the pool not started yet, the Client does not get messages or other callbacks until
546  * the Pool starts.
547  * This function is threadsafe, which means you can call it anywhere in the code
548  * @tparam Client client type
549  * @param client to be added into the Pool
550  * @param poolThreadAffinityIn pool is powered by a number of threads
551  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
552  * it is possible to have a Client to use just some of the threads in the Pool
553  * - default to use all.
554  *
555  */
556  template <typename Client>
557  void addToPool(Client &client
558  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
559  static_assert(cpa::has_pool, "pool is not support in the Context type");
560  if (std::is_base_of<single_thread_powered_client, Client>::value
561  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
562  && poolThreadCount_ != 1) {
563  HMBDC_THROW(std::out_of_range
564  , "cannot add a single thread powered client to the non-single"
565  "thread powered pool without specifying a single thread poolThreadAffinity"
566  );
567  }
568  auto stub = new context_detail::PoolConsumerProxy<Client>(client);
569  pool_->addConsumer(*stub, poolThreadAffinityIn);
570 
571  }
572 
573  /**
574  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
575  * @details if pool is already started, the client is to get current Messages immediatly
576  * - might miss older messages.
577  * if the pool not started yet, the Client does not get messages or other callbacks until
578  * the Pool starts.
579  * This function is threadsafe, which means you can call it anywhere in the code
580  * @tparam Client client type
581  * @param client to be added into the Pool
582  * @param poolThreadAffinityIn pool is powered by a number of threads
583  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
584  * it is possible to have a Client to use just some of the threads in the Pool
585  * - default to use all.
586  * @param args more client and poolThreadAffinityIn pairs can follow
587  */
588  template <typename Client, typename ... Args>
589  void addToPool(Client &client
590  , uint64_t poolThreadAffinityIn, Args&& ...args) {
591  addToPool(client, poolThreadAffinityIn);
592  addToPool(std::forward<Args>(args)...);
593  }
594 
595  /**
596  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
597  * @details the implementatiotn tells all
598  * if the pool not started yet, the Client does not get messages or other callbacks until
599  * the Pool starts.
600  * This function is threadsafe, which means you can call it anywhere in the code
601  * @tparam Client client type
602  * @tparam Client2 client2 type
603  * @param client to be added into the Pool using default poolThreadAffinity
604  * @param client2 to be added into the Pool
605  * @param args more client (and/or poolThreadAffinityIn pairs can follow
606  */
607  template <typename Client, typename Client2, typename ... Args>
608  typename std::enable_if<!std::is_integral<Client2>::value, void>::type
609  addToPool(Client &client, Client2 &client2, Args&& ...args) {
610  addToPool(client);
611  addToPool(client2, std::forward<Args>(args)...);
612  }
613 
614  /**
615  * @brief return the numebr of clients added into pool
616  * @details the number could change since the clients could be added in another thread
617  * @return client count
618  */
619  size_t clientCountInPool() const {
620  static_assert(cpa::has_pool, "pool is not support in the Context type");
621  return pool_->consumerSize();
622  }
623 
624  /**
625  * @brief how many parallel consummers are started
626  * @details the dynamic value could change after the call returns
627  * see max_parallel_consumer Context property
628  * @return how many parallel consummers are started
629  */
630  size_t parallelConsumerAlive() const {
631  return this->buffer_.parallelConsumerAlive();
632  }
633  /**
634  * @brief start the context by specifying what are in it (Pool and/or direct Clients)
635  * and their paired up cpu affinities.
636  * @details All direct mode or clients in a pool started by a single start
637  * statement are dispatched with starting from the same event
638  * (subjected to event filtering of each client).
639  * many compile time and runtime check is done, for example:
640  * won't compile if start a pool in a Context does not support one;
641  * exception throw if the Context capacity is reached or try to start a second pool, etc.
642  *
643  * Usage example:
644  *
645  * @code
646  * // the following starts the pool powered by 3 threads that are affinitied to
647  * // the lower 8 cores; client0 affinitied to 4th core and client1 affinitied to 5th core
648  * ctx.start(3, 0xfful, client0, 0x8ul, client1, 0x10ul);
649  *
650  * // the following starts 2 direct mode Clients (client2 and client3)
651  * ctx.start(client2, 0x3ul, client3, 0xful);
652  * @endcode
653  *
654  * @tparam typename ...Args types
655  *
656  * @param args paired up args in the form of (pool-thread-count|client, cpuAffinity)*.
657  * see examples above.
658  * If a cpuAffinity is 0, each thread's affinity rotates to one of the CPUs in the system.
659  */
660  template <typename ...Args>
661  void
662  start(Args&& ... args) {
663  startWithContextProperty<cpa>(std::forward<Args>(args) ...);
664  }
665 
666  /**
667  * @brief stop the message dispatching - asynchronously
668  * @details asynchronously means not garanteed message dispatching
669  * stops immidiately after this non-blocking call
670  */
671  void
672  stop() {
673  stopWithContextProperty<cpa>();
674  }
675 
676  /**
677  * @brief wait until all threads (Pool threads too if apply) of the Context exit
678  * @details blocking call
679  */
680  void
681  join() {
682  joinWithContextProperty<cpa>();
683  }
684 
685  /**
686  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
687  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
688  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
689  * @details If some Client are known to
690  * take long to process messages, increase it. If you need to remove slow Clients quickly
691  * reduce it.
692  * Only effective for ipc_creator Context.
693  *
694  * @param s seconds
695  */
696  void
698  secondsBetweenPurge_ = s;
699  }
700 
701  /**
702  * @brief normally not used until you want to run your own message loop
703  * @details call this function frequently to pump hmbdc message loop in its pool
704  *
705  * @param threadSerialNumber starting from 0, indicate which thread in the pool
706  * is powering the loop
707  */
708  void
709  runPoolThreadOnce(uint16_t threadSerialNumberInPool) {
710  static_assert(cpa::has_pool, "pool is not support in the Context type");
711  pool_->runOnce(threadSerialNumberInPool);
712  }
713 
714  /**
715  * @brief normally not used until you want to run your own message loop
716  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
717  *
718  * @param threadSerialNumber indicate which thread is powering the loop
719  * @param c the Client
720  */
721  template <typename Client>
722  void runClientThreadOnce(uint16_t threadSerialNumber, Client& c) {
723  c.messageDispatchingStarted(
724  hmbdcNumbers_[threadSerialNumber]); //lower level ensures executing only once
725  context_detail::runOnceImpl(
726  hmbdcNumbers_[threadSerialNumber], stopped_, this->buffer_, c);
727  }
728 
729 private:
730  template <typename cpa>
731  typename std::enable_if<cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
732  createPool(size_t maxPoolClientCount) {
733  return Pool::create(this->buffer(), maxPoolClientCount);
734  }
735 
736  template <typename cpa>
737  typename std::enable_if<cpa::pool_msgless, typename Pool::ptr>::type
738  createPool(size_t maxPoolClientCount) {
739  return Pool::create(maxPoolClientCount);
740  }
741 
742  template <typename cpa>
743  typename std::enable_if<!cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
744  createPool(size_t) {
745  return typename Pool::ptr();
746  }
747 
748  template <typename cpa>
749  typename std::enable_if<cpa::has_pool, void>::type
750  stopWithContextProperty() {
751  if (pool_) pool_->stop();
752  __atomic_thread_fence(__ATOMIC_ACQUIRE);
753  stopped_ = true;
754  }
755 
756  template <typename cpa>
757  typename std::enable_if<!cpa::has_pool, void>::type
758  stopWithContextProperty() {
759  __atomic_thread_fence(__ATOMIC_ACQUIRE);
760  stopped_ = true;
761  }
762 
763  template <typename cpa>
764  typename std::enable_if<cpa::has_pool, void>::type
765  joinWithContextProperty() {
766  if (pool_) pool_->join();
767  for (auto& t : threads_) {
768  t.join();
769  }
770  threads_.clear();
771  }
772 
773  template <typename cpa>
774  typename std::enable_if<!cpa::has_pool, void>::type
775  joinWithContextProperty() {
776  for (auto& t : threads_) {
777  t.join();
778  }
779  threads_.clear();
780  }
781 
782  template <typename cpa>
783  void
784  reserveSlots(std::list<uint16_t>&) {
785  }
786 
787  template <typename cpa, typename ...Args>
788  typename std::enable_if<cpa::broadcast_msg && !cpa::pool_msgless, void>::type
789  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
790  auto available = this->buffer_.unusedConsumerIndexes();
791  if (available.size() < poolThreadCount) {
792  HMBDC_THROW(std::out_of_range
793  , "Context remaining capacilty = " << available.size()
794  << ", consider increasing max_parallel_consumer");
795  }
796  for (uint16_t i = 0; i < poolThreadCount; ++i) {
797  slots.push_back(available[i]);
798  this->buffer_.reset(available[i]);
799  }
800  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
801  }
802 
803  template <typename cpa, typename ...Args>
804  typename std::enable_if<!cpa::broadcast_msg || cpa::pool_msgless, void>::type
805  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
806  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
807  }
808 
809  template <typename cpa, typename CcClient, typename ...Args>
810  typename std::enable_if<cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
811  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
812  const bool clientParticipateInMessaging =
813  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
814  if (clientParticipateInMessaging) {
815  auto available = this->buffer_.unusedConsumerIndexes();
816  if (!available.size()) {
817  HMBDC_THROW(std::out_of_range
818  , "Context reached capacity, consider increasing max_parallel_consumer");
819  }
820  this->buffer_.reset(available[0]);
821  slots.push_back(available[0]);
822  }
823  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
824  }
825 
826  template <typename cpa, typename CcClient, typename ...Args>
827  typename std::enable_if<!cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
828  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
829  }
830 
831  template <typename cpa, typename ...Args>
832  typename std::enable_if<cpa::create_ipc || cpa::attach_ipc, void>::type
833  startWithContextProperty(Args&& ... args) {
834  auto& lock = this->allocator_.fileLock();
835  std::lock_guard<decltype(lock)> g(lock);
836  std::list<uint16_t> slots;
837  try {
838  reserveSlots<cpa>(slots, args ...);
839  auto sc = slots;
840  startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
841  } catch (std::out_of_range const&) {
842  Base::markDead(this->buffer_, slots);
843  throw;
844  }
845  }
846 
847  template <typename cpa, typename ...Args>
848  typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
849  startWithContextProperty(Args&& ... args) {
850  std::list<uint16_t> slots;
851  try {
852  reserveSlots<cpa>(slots, args ...);
853  auto sc = slots;
854  startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
855  } catch (std::out_of_range const&) {
856  Base::markDead(this->buffer_, slots);
857  throw;
858  }
859  }
860 
861  template <typename cpa>
862  typename std::enable_if<cpa::broadcast_msg && cpa::create_ipc, void>::type
863  startWithContextPropertyImpl(std::list<uint16_t>& slots) {
864  if (!purger_) {
865  purger_.reset(
866  new StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
867  startWithContextPropertyImpl<cpa>(slots, *purger_, purgerCpuAffinityMask_);
868  }
869  }
870 
871  template <typename cpa>
872  typename std::enable_if<!cpa::broadcast_msg || !cpa::create_ipc, void>::type
873  startWithContextPropertyImpl(std::list<uint16_t>& slots) {
874  }
875 
876  template <typename cpa, typename ...Args>
877  typename std::enable_if<cpa::has_pool, void>::type
878  startWithContextPropertyImpl(std::list<uint16_t>& slots
879  , uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
880  , Args&& ... args) {
881  using namespace std;
882  if (poolThreadCount_) {
883  HMBDC_THROW(std::out_of_range, "Context pool already started");
884  }
885  std::vector<uint16_t> sc(slots.begin(), slots.end());
886  if (!poolThreadsCpuAffinityMask) {
887  auto cpuCount = std::thread::hardware_concurrency();
888  poolThreadsCpuAffinityMask =
889  ((1ul << poolThreadCount) - 1u) << (hmbdcNumbers_.size() % cpuCount);
890  }
891 
892  pool_->startAt(poolThreadCount, poolThreadsCpuAffinityMask, sc);
893  while(poolThreadCount--) {
894  if (!cpa::pool_msgless) {
895  hmbdcNumbers_.push_back(*slots.begin());
896  slots.pop_front();
897  }
898  }
899  poolThreadCount_ = poolThreadCount;
900  startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
901  }
902 
903  template <typename cpa, typename Client, typename ...Args>
904  typename std::enable_if<!std::is_integral<Client>::value, void>::type
905  startWithContextPropertyImpl(std::list<uint16_t>& slots
906  , Client& c, uint64_t cpuAffinity
907  , Args&& ... args) {
908  auto clientParticipateInMessaging =
909  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
910  uint16_t hmbdcNumber = 0xffffu;
911  if (clientParticipateInMessaging && cpa::broadcast_msg) {
912  hmbdcNumber = *slots.begin();
913  slots.pop_front();
914  }
915  auto thrd = kickOffClientThread(
916  c, cpuAffinity, hmbdcNumber, hmbdcNumbers_.size());
917  threads_.push_back(move(thrd));
918  hmbdcNumbers_.push_back(hmbdcNumber);
919  startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
920  }
921 
922  template <typename Client>
923  auto kickOffClientThread(
924  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
925  std::thread thrd([
926  this
927  , &c
928  , mask
929  , h=hmbdcNumber
930  , threadSerialNumber
931  ]() {
932  auto hmbdcNumber = h;
933  std::string name;
934  char const* schedule;
935  int priority;
936  auto clientParticipateInMessaging =
937  std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
938 
939 
940  if (c.hmbdcName()) {
941  name = c.hmbdcName();
942  } else {
943  if (clientParticipateInMessaging) {
944  name = "hmbdc" + std::to_string(hmbdcNumber);
945  } else {
946  name = "hmbdc-x";
947  }
948  }
949  auto cpuAffinityMask = mask;
950  std::tie(schedule, priority) = c.schedSpec();
951 
952  if (!schedule) schedule = "SCHED_OTHER";
953 
954  if (!mask) {
955  auto cpuCount = std::thread::hardware_concurrency();
956  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
957  }
958 
959  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
960  , schedule, priority);
961 
962  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
963  c.messageDispatchingStartedCb(hmbdcNumber);
964 
965  while(!stopped_ &&
966  context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
967  }
968  if (this->stopped_) c.dropped();
969  if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
970  }
971  );
972 
973  return move(thrd);
974  }
975 
976  Context(Context const&) = delete;
977  Context& operator = (Context const&) = delete;
978  uint16_t usedHmbdcCapacity_;
979  std::vector<uint16_t> hmbdcNumbers_;
980 
981  bool stopped_;
982  typename Pool::ptr pool_;
983  using Threads = std::vector<std::thread>;
984  Threads threads_;
985  size_t poolThreadCount_;
986  uint32_t secondsBetweenPurge_;
987  uint64_t purgerCpuAffinityMask_;
988  typename std::conditional<cpa::broadcast_msg && cpa::create_ipc
989  , std::unique_ptr<StuckClientPurger<Buffer>>, uint32_t
990  >::type purger_;
991 };
992 
993 }}
994 
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:722
Definition: MonoLockFreeBuffer.hpp:15
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:57
Definition: StuckClientPurger.hpp:11
Context template parameter indicating the Context must contain a pool to run Clients and the Clients ...
Definition: Context.hpp:85
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:672
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:135
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:681
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:74
std::enable_if<!std::is_integral< Client2 >::value, void >::type addToPool(Client &client, Client2 &client2, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:609
Definition: TypedString.hpp:74
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:67
Definition: PoolMinus.hpp:9
the default vanilla allocate
Definition: Allocators.hpp:116
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:249
Definition: BlockingBuffer.hpp:10
void runPoolThreadOnce(uint16_t threadSerialNumberInPool)
normally not used until you want to run your own message loop
Definition: Context.hpp:709
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:225
Definition: ContextDetail.hpp:28
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:202
std::enable_if<!std::is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&... msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:180
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:54
std::enable_if<!std::is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&... msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:161
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is run in pool mode
Definition: Context.hpp:557
Definition: LockFreeBufferT.hpp:18
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful, size_t maxThreadSerialNumber=64)
ctor for construct local ipc Context
Definition: Context.hpp:509
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:630
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:619
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:461
~Context()
dtor
Definition: Context.hpp:533
Definition: Message.hpp:76
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:91
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:328
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:100
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:589
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:697
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, size_t maxThreadSerialNumber=64)
ctor for construct local non-ipc Context
Definition: Context.hpp:479
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:662
bool trySendInPlace(Args &&... args)
try send a message to all Clients in the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:314
void sendInPlace(Args &&... args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:291
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:270
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:113
Definition: Base.hpp:12
Definition: PoolT.hpp:11