hmbdc
simplify-high-performance-messaging-programming
ContextDetail.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/app/Client.hpp"
5 #include "hmbdc/app/Message.hpp"
6 #include "hmbdc/comm/Topic.hpp"
7 
8 #include "hmbdc/pattern/PoolT.hpp"
9 #include "hmbdc/pattern/PoolMinus.hpp"
10 #include "hmbdc/pattern/LockFreeBufferT.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 
13 #include "hmbdc/os/Thread.hpp"
14 #include "hmbdc/os/Allocators.hpp"
15 #include "hmbdc/time/Timers.hpp"
16 
17 #include <vector>
18 #include <type_traits>
19 #include <thread>
20 #include <utility>
21 #include <iostream>
22 
23 namespace hmbdc { namespace app { namespace context_detail {
24 using namespace std;
25 using namespace hmbdc::pattern;
26 
27 template <typename CcClient>
30  PoolConsumerProxy(CcClient& client)
31  : pattern::PoolConsumer(CcClient::REGISTERED_MESSAGE_SIZE != 0, dynamic_cast<time::TimerManager*>(&client))
32  , client_(client){}
33 
34  virtual void handleRangeImpl(BufIt& it,
35  BufIt const& end, uint16_t threadSerialNumber) override {
36  client_.CcClient::handleRangeImpl(it, end, threadSerialNumber);
37  }
38  virtual void messageDispatchingStartedCb(uint16_t threadId) override {
39  client_.CcClient::messageDispatchingStartedCb(threadId);
40  }
41  virtual void invokedCb(uint16_t threadId) override {
42  client_.CcClient::invokedCb(threadId);
43  }
44  virtual void stoppedCb(std::exception const&e) override {
45  client_.CcClient::stoppedCb(e);
46  }
47  virtual bool droppedCb() override {
48  if (client_.CcClient::droppedCb()) {
49  delete this;
50  return true;
51  } else {
52  return false;
53  }
54  }
55 private:
56  CcClient& HMBDC_RESTRICT client_;
57 };
58 
59 template <typename... ContextProperties>
63  enum {
64  broadcast_msg = 1,
65  has_pool = 1,
66  pool_msgless = 0,
67  create_ipc = 0,
68  attach_ipc = 0,
69  };
70 };
71 
72 template <uint16_t c, typename... ContextProperties>
73 struct context_property_aggregator<context_property::broadcast<c>
74  , ContextProperties...>
75 : context_property_aggregator<ContextProperties...> {
77  static_assert(context_property_aggregator<ContextProperties...>::broadcast_msg, "contradicting properties");
78  enum {
79  broadcast_msg = 1,
80  has_pool = 1,
81  };
82 };
83 
84 template <typename... ContextProperties>
85 struct context_property_aggregator<context_property::partition
86  , ContextProperties...>
87 : context_property_aggregator<ContextProperties...> {
89  enum {
90  broadcast_msg = 0,
91  has_pool = context_property_aggregator<ContextProperties...>::pool_msgless,
92  };
93 };
94 
95 template <typename... ContextProperties>
96 struct context_property_aggregator<context_property::msgless_pool
97  , ContextProperties...>
98 : context_property_aggregator<ContextProperties...> {
99  enum {
100  has_pool = 1,
101  pool_msgless = 1
102  };
103 };
104 
105 template <typename... ContextProperties>
106 struct context_property_aggregator<context_property::ipc_creator
107  , ContextProperties...>
108 : context_property_aggregator<ContextProperties...> {
110  enum {
111  create_ipc = 1,
112  };
113 };
114 
115 template <typename... ContextProperties>
116 struct context_property_aggregator<context_property::ipc_attacher
117  , ContextProperties...>
118 : context_property_aggregator<ContextProperties...> {
120  enum {
121  attach_ipc = 1,
122  };
123 };
124 
125 using namespace std;
126 using namespace hmbdc::time;
127 
128 template <bool is_timer_manager>
129 struct tm_runner {
130  template<typename C>
131  void operator()(C&) {}
132 };
133 
134 template <>
135 struct tm_runner<true> {
136  void operator()(TimerManager& tm) {
137  tm.checkTimers(SysTime::now());
138  }
139 };
140 
141 template <typename LFB, typename CcClient>
142 bool runOnceImpl(uint16_t hmbdcNumber, bool& HMBDC_RESTRICT stopped, LFB& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
143  typename LFB::iterator begin, end;
144  try {
146  tr(c);
147 
148  const bool clientParticipateInMessaging =
149  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
150  if (clientParticipateInMessaging) {
151  uint64_t count = lfb.peek(hmbdcNumber, begin, end, c.maxBatchMessageCount());
152  c.CcClient::handleRangeImpl(begin, end, hmbdcNumber);
153  c.CcClient::invokedCb(hmbdcNumber);
154  lfb.wasteAfterPeek(hmbdcNumber, count);
155  } else {
156  c.CcClient::invokedCb(0xffffu - hmbdcNumber);
157  }
158  } catch (std::exception const& e) {
159  if (!stopped) {
160  c.stopped(e);
161  return !c.dropped();
162  }
163  } catch (int code) {
164  if (!stopped) {
165  c.stopped(hmbdc::ExitCode(code));
166  return !c.dropped();
167  }
168  } catch (...) {
169  if (!stopped) {
170  c.stopped(hmbdc::UnknownException());
171  return !c.dropped();
172  }
173  }
174  return true;
175 }
176 
177 template <typename CcClient>
178 bool runOnceImpl(uint16_t threadSerialNumber, bool& HMBDC_RESTRICT stopped, hmbdc::pattern::MonoLockFreeBuffer& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
180  try {
182  tr(c);
183 
184  const bool clientParticipateInMessaging =
185  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE;
186  if (clientParticipateInMessaging) {
187  uint64_t count = lfb.peek(begin, end, c.maxBatchMessageCount());
188  auto b = begin;
189  c.CcClient::handleRangeImpl(b, end, threadSerialNumber);
190  c.CcClient::invokedCb(threadSerialNumber);
191  lfb.wasteAfterPeek(begin, count);
192  } else {
193  c.CcClient::invokedCb(0xffffu - threadSerialNumber);
194  }
195  } catch (std::exception const& e) {
196  if (!stopped) {
197  c.stopped(e);
198  return !c.dropped();
199  }
200  } catch (int code) {
201  if (!stopped) {
202  c.stopped(hmbdc::ExitCode(code));
203  return !c.dropped();
204  }
205  } catch (...) {
206  if (!stopped) {
207  c.stopped(hmbdc::UnknownException());
208  return !c.dropped();
209  }
210  }
211  return true;
212 }
213 
214 inline
215 void unblock(MonoLockFreeBuffer& buffer, uint16_t){
216  buffer.reset();
217 }
218 
219 template <typename Buffer>
220 void unblock(Buffer& lfb, uint16_t threadSerialNumber) {
221  lfb.markDead(threadSerialNumber);
222 }
223 
224 }}}
Definition: MonoLockFreeBuffer.hpp:15
Definition: ContextDetail.hpp:129
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
the default vanilla allocate
Definition: Allocators.hpp:116
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingBuffer.hpp:11
Definition: ContextDetail.hpp:28
Definition: LockFreeBufferT.hpp:18
Definition: PoolConsumer.hpp:13
Definition: Rater.hpp:10
Exception that just has an exit code.
Definition: Exception.hpp:28
helping allocating object and its aggregated objects in a continouse shared memory ...
Definition: Allocators.hpp:88
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74