hmbdc
simplify-high-performance-messaging-programming
SeqArb.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #ifndef SMP_CACHE_BYTES
5 #define SMP_CACHE_BYTES 64
6 #endif
7 
8 #include "hmbdc/Exception.hpp"
9 #include "hmbdc/Compile.hpp"
10 #include <stdexcept>
11 #include <utility>
12 #include <inttypes.h>
13 #include <limits>
14 
15 
16 namespace hmbdc { namespace pattern {
17 
18 namespace seqarb_detail {
19 using namespace std;
20 namespace {
21 
22 template <bool THREADSAFE>
23 inline __attribute__ ((always_inline))
24 void
25 my__sync_synchronize() {
26  if (THREADSAFE) __sync_synchronize();
27 }
28 
29 template <bool THREADSAFE, typename T>
30 inline __attribute__ ((always_inline))
31 bool
32 my__sync_bool_compare_and_swap(volatile T* var, T compVal, T newVal) {
33  if (THREADSAFE) return __sync_bool_compare_and_swap(var, compVal, newVal);
34  if (*var == compVal) {
35  *var = newVal;
36  return true;
37  }
38  return false;
39 }
40 
41 template <bool THREADSAFE, typename T>
42 inline __attribute__ ((always_inline))
43 T
44 my__sync_val_compare_and_swap(volatile T* var, T compVal, T newVal) {
45  if (THREADSAFE) return __sync_val_compare_and_swap(var, compVal, newVal);
46  auto res = *var;
47  if (*var == compVal) {
48  *var = newVal;
49  }
50  return res;
51 }
52 }
53 
54 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t, bool THREADSAFE = true>
55 struct SeqArb {
56  explicit SeqArb(Seq startSeq = 0)
57  : missedInit_(~((1ul << PARTICIPANT_COUNT) - 1u))
58  , gapFuncLock_(true)
59  , seq_(startSeq) {
60  static_assert(PARTICIPANT_COUNT <= 64u, "too many participants");
61  }
62 
63  SeqArb(SeqArb const&) = delete;
64  SeqArb& operator = (SeqArb const&) = delete;
65 
66  template <typename HitFunc, typename GapFunc>
67  inline __attribute__ ((always_inline))
68  bool operator()(uint16_t participantIndex, Seq seq, HitFunc&& h, GapFunc&& g) HMBDC_RESTRICT {
69  j_[participantIndex].seq = seq;
70 
71  auto old = my__sync_val_compare_and_swap<THREADSAFE>(&seq_, seq, std::numeric_limits<Seq>::max());
72 
73  if (old == seq) {
74  h();
75  my__sync_synchronize<THREADSAFE>();
76  seq_ = seq + 1;
77  return true; //arbitrated (utilized or discarded)
78  } else if (old == std::numeric_limits<Seq>::max()) {
79  return false ;
80  } else if (seq < old) {
81  return true; //arbitrated (discard)
82  } else { //seq > old
83  auto low = jLow();
84  if (hmbdc_unlikely((low > old && seq_ == old &&
85  my__sync_bool_compare_and_swap<THREADSAFE>(&seq_, old, std::numeric_limits<Seq>::max())))) {
86  g(low - old);
87  my__sync_synchronize<THREADSAFE>();
88  seq_ = low;
89  }
90  return false; //cannot decide and ask me later
91  }
92  }
93 
94  template <typename SeqGen, typename HitFunc, typename GapFunc>
95  inline __attribute__ ((always_inline))
96  size_t operator()(uint16_t participantIndex, SeqGen&& seqGen
97  , size_t seqSize, HitFunc&& h, GapFunc&& g) HMBDC_RESTRICT {
98  auto seq = seqGen();
99  j_[participantIndex].seq = seq;
100 
101  auto old = my__sync_val_compare_and_swap<THREADSAFE>(
102  &seq_, seq, std::numeric_limits<Seq>::max());
103 
104  if (old == seq) {
105  h();
106  auto preSeq = seq;
107  auto s = 1ul;
108  for (; s < seqSize; ++s) {
109  seq = seqGen();
110  if (seq - 1 == preSeq) {
111  preSeq = seq;
112  h();
113  } else {
114  break;
115  }
116  }
117  j_[participantIndex].seq = seq;
118  my__sync_synchronize<THREADSAFE>();
119  seq_ = seq + s;
120  return s;
121  } else if (old == std::numeric_limits<Seq>::max()) {
122  return 0;
123  } else if (seq < old) {
124  return 1ul; //arbitrated (discard)
125  } else { //seq > old
126  auto low = jLow();
127  if (hmbdc_unlikely((low > old && seq_ == old &&
128  my__sync_bool_compare_and_swap<THREADSAFE>(
129  &seq_, old, std::numeric_limits<Seq>::max())))) {
130  g(low - old);
131  my__sync_synchronize<THREADSAFE>();
132  seq_ = low;
133  }
134  return 0; //cannot decide and ask me later
135  }
136  }
137 
138  volatile Seq const& expectingSeq() const {
139  return seq_;
140  }
141 
142  volatile Seq& expectingSeq() {
143  return seq_;
144  }
145 
146 private:
147  inline __attribute__ ((always_inline))
148  Seq jLow() const HMBDC_RESTRICT {
149  auto res = j_[0].seq;
150  for (auto i = 1u; i < PARTICIPANT_COUNT; ++i)
151  if (res > j_[i].seq) res = j_[i].seq;
152  return res;
153  }
154 
155  uint64_t const missedInit_;
156  bool gapFuncLock_;
157  volatile Seq seq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
158  uint64_t missed_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
159  struct J {
160  J() : seq(0u){}
161  Seq seq;
162  } __attribute__((__aligned__(SMP_CACHE_BYTES)));
163  J j_[PARTICIPANT_COUNT];
164 };
165 
166 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t>
168  SingleThreadSeqArb(Seq startSeq = 0)
169  : arb_(startSeq){}
170 
171  template <typename GapFunc>
172  int operator()(uint16_t participantIndex, Seq seq, GapFunc&& gapFunc) {
173  int res = -1;
174  if (!arb_(participantIndex, seq
175  , [&res]() {
176  res = 1;
177  }
178  , forward<GapFunc>(gapFunc)
179  )
180  ) {
181  res = 0;
182  }
183  return res;
184  }
185 
186  volatile Seq& expectingSeq() {
187  return arb_.expectingSeq();
188  }
189 
190  volatile Seq const& expectingSeq() const {
191  return arb_.expectingSeq();
192  }
193 
194 private:
196 };
197 
198 } //seqarb_detail
199 
200 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t, bool THREADSAFE = true>
202 
203 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t>
205 
206 }}
Definition: TypedString.hpp:74
Definition: SeqArb.hpp:55
Definition: Base.hpp:12