hmbdc
simplify-high-performance-messaging-programming
ContextDetail.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/app/Client.hpp"
5 #include "hmbdc/app/Message.hpp"
6 #include "hmbdc/comm/Topic.hpp"
7 
8 #include "hmbdc/pattern/PoolT.hpp"
9 #include "hmbdc/pattern/PoolMinus.hpp"
10 #include "hmbdc/pattern/LockFreeBufferT.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 
13 #include "hmbdc/os/Thread.hpp"
14 #include "hmbdc/os/Allocators.hpp"
15 #include "hmbdc/time/Timers.hpp"
16 
17 #include <vector>
18 #include <type_traits>
19 #include <thread>
20 #include <utility>
21 #include <iostream>
22 
23 namespace hmbdc { namespace app { namespace context_detail {
24 using namespace std;
25 using namespace hmbdc::pattern;
26 
27 template <typename CcClient>
30  PoolConsumerProxy(CcClient& client)
31  : pattern::PoolConsumer(CcClient::REGISTERED_MESSAGE_SIZE != 0, dynamic_cast<time::TimerManager*>(&client))
32  , client_(client){}
33 
34  virtual void handleRangeImpl(BufIt it,
35  BufIt end, uint16_t threadSerialNumber) override {
36  client_.CcClient::handleRangeImpl(it, end, threadSerialNumber);
37  }
38  virtual void messageDispatchingStartedCb(uint16_t threadId) override {
39  client_.CcClient::messageDispatchingStartedCb(threadId);
40  }
41  virtual void invokedCb(uint16_t threadId) override {
42  client_.CcClient::invokedCb(threadId);
43  }
44  virtual void stoppedCb(std::exception const&e) override {
45  client_.CcClient::stoppedCb(e);
46  }
47  virtual bool droppedCb() override {
48  if (client_.CcClient::droppedCb()) {
49  delete this;
50  return true;
51  } else {
52  return false;
53  }
54  }
55 private:
56  CcClient& HMBDC_RESTRICT client_;
57 };
58 
59 template <typename... ContextProperties>
63  enum {
64  broadcast_msg = 1,
65  has_pool = 1,
66  pool_msgless = 0,
67  create_ipc = 0,
68  attach_ipc = 0,
69  dispatch_reverse = 0,
70  };
71 };
72 
73 template <uint16_t c, typename... ContextProperties>
74 struct context_property_aggregator<context_property::broadcast<c>
75  , ContextProperties...>
76 : context_property_aggregator<ContextProperties...> {
78  static_assert(context_property_aggregator<ContextProperties...>::broadcast_msg, "contradicting properties");
79  enum {
80  broadcast_msg = 1,
81  has_pool = 1,
82  };
83 };
84 
85 template <typename... ContextProperties>
86 struct context_property_aggregator<context_property::partition
87  , ContextProperties...>
88 : context_property_aggregator<ContextProperties...> {
90  enum {
91  broadcast_msg = 0,
92  has_pool = context_property_aggregator<ContextProperties...>::pool_msgless,
93  };
94 };
95 
96 template <typename... ContextProperties>
97 struct context_property_aggregator<context_property::msgless_pool
98  , ContextProperties...>
99 : context_property_aggregator<ContextProperties...> {
100  enum {
101  has_pool = 1,
102  pool_msgless = 1
103  };
104 };
105 
106 template <typename... ContextProperties>
107 struct context_property_aggregator<context_property::ipc_creator
108  , ContextProperties...>
109 : context_property_aggregator<ContextProperties...> {
111  enum {
112  create_ipc = 1,
113  };
114 };
115 
116 template <typename... ContextProperties>
117 struct context_property_aggregator<context_property::ipc_attacher
118  , ContextProperties...>
119 : context_property_aggregator<ContextProperties...> {
121  enum {
122  attach_ipc = 1,
123  };
124 };
125 
126 using namespace std;
127 using namespace hmbdc::time;
128 
129 template <bool is_timer_manager>
130 struct tm_runner {
131  template<typename C>
132  void operator()(C&) {}
133 };
134 
135 template <>
136 struct tm_runner<true> {
137  void operator()(TimerManager& tm) {
138  tm.checkTimers(SysTime::now());
139  }
140 };
141 
142 template <typename LFB, typename CcClient>
143 bool runOnceImpl(uint16_t hmbdcNumber, bool& HMBDC_RESTRICT stopped, LFB& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
144  typename LFB::iterator begin, end;
145  try {
147  tr(c);
148 
149  const bool clientParticipateInMessaging =
150  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
151  if (clientParticipateInMessaging) {
152  uint64_t count = lfb.peek(hmbdcNumber, begin, end, c.maxBatchMessageCount());
153  c.CcClient::handleRangeImpl(begin, end, hmbdcNumber);
154  c.CcClient::invokedCb(hmbdcNumber);
155  lfb.wasteAfterPeek(hmbdcNumber, count);
156  } else {
157  c.CcClient::invokedCb(0xffffu - hmbdcNumber);
158  }
159  } catch (std::exception const& e) {
160  if (!stopped) {
161  c.stopped(e);
162  return !c.dropped();
163  }
164  } catch (int code) {
165  if (!stopped) {
166  c.stopped(hmbdc::ExitCode(code));
167  return !c.dropped();
168  }
169  } catch (...) {
170  if (!stopped) {
171  c.stopped(hmbdc::UnknownException());
172  return !c.dropped();
173  }
174  }
175  return true;
176 }
177 
178 template <typename CcClient>
179 bool runOnceImpl(uint16_t threadSerialNumber, bool& HMBDC_RESTRICT stopped, hmbdc::pattern::MonoLockFreeBuffer& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
181  try {
183  tr(c);
184 
185  const bool clientParticipateInMessaging =
186  std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE;
187  if (clientParticipateInMessaging) {
188  uint64_t count = lfb.peek(begin, end, c.maxBatchMessageCount());
189  c.CcClient::handleRangeImpl(begin, end, threadSerialNumber);
190  c.CcClient::invokedCb(threadSerialNumber);
191  lfb.wasteAfterPeek(begin, count);
192  } else {
193  c.CcClient::invokedCb(0xffffu - threadSerialNumber);
194  }
195  } catch (std::exception const& e) {
196  if (!stopped) {
197  c.stopped(e);
198  return !c.dropped();
199  }
200  } catch (int code) {
201  if (!stopped) {
202  c.stopped(hmbdc::ExitCode(code));
203  return !c.dropped();
204  }
205  } catch (...) {
206  if (!stopped) {
207  c.stopped(hmbdc::UnknownException());
208  return !c.dropped();
209  }
210  }
211  return true;
212 }
213 
214 inline
215 void unblock(MonoLockFreeBuffer& buffer, uint16_t){
216  buffer.reset();
217 }
218 
219 template <typename Buffer>
220 void unblock(Buffer& lfb, uint16_t threadSerialNumber) {
221  lfb.markDead(threadSerialNumber);
222 }
223 
224 }}}
Definition: MonoLockFreeBuffer.hpp:15
Definition: ContextDetail.hpp:130
Definition: TypedString.hpp:74
Definition: Timers.hpp:65
the default vanilla allocate
Definition: Allocators.hpp:116
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingBuffer.hpp:10
Definition: ContextDetail.hpp:28
Definition: LockFreeBufferT.hpp:18
Definition: PoolConsumer.hpp:13
Definition: Rater.hpp:10
Exception that just has an exit code.
Definition: Exception.hpp:28
helping allocating object and its aggregated objects in a continouse shared memory ...
Definition: Allocators.hpp:88
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:74