hmbdc
simplify-high-performance-messaging-programming
PoolConsumer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/pattern/LockFreeBufferMisc.hpp"
5 #include "hmbdc/time/Timers.hpp"
6 #include "hmbdc/Exception.hpp"
7 #include "hmbdc/Config.hpp"
8 
9 #include <stdexcept>
10 
11 namespace hmbdc { namespace pattern {
12 
13 struct PoolConsumer {
14  friend struct PoolImpl;
15 
16  template <typename Buffer>
17  friend struct PoolTImpl;
18  explicit PoolConsumer(bool interestedInMessages = true);
19  virtual ~PoolConsumer();
20  void stopped(std::exception const&) noexcept;
21  bool dropped() noexcept;
22  void messageDispatchingStarted(uint16_t threadId) {
23  if (unlikely(!messageDispatchingStarted_)) {
24  messageDispatchingStartedCb(threadId);
25  messageDispatchingStarted_ = true;
26  }
27  }
28  void invoked(uint16_t threadId);
29 
30 protected:
32 
33 private:
34  void reset();
35  bool handleRange(BufIt const& begin,
36  BufIt const& end, uint16_t threadId) noexcept;
37  bool handleInvokeOnly(uint16_t threadId) noexcept;
38  virtual void messageDispatchingStartedCb(uint16_t threadId) {}
39  virtual void handleRangeImpl(BufIt& begin,
40  BufIt const& end, uint16_t threadId){};
41  virtual void invokedCb(uint16_t threadId) {}
42  virtual void stoppedCb(std::exception const&) {}
43  virtual bool droppedCb() { return true; }
44 
46  uint64_t poolThreadAffinity;
47  uint16_t droppedCount;
48  uint64_t skippedPoolThreadMask_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
49  HMBDC_SEQ_TYPE nextSeq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
50  bool interestedInMessages;
51  bool messageDispatchingStarted_;
52 };
53 
54 }} // end namespace hmbdc::pattern
Definition: Timers.hpp:65
Definition: PoolConsumer.hpp:13
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:73