hmbdc
simplify-high-performance-messaging-programming
SeqArb.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #ifndef SMP_CACHE_BYTES
5 #define SMP_CACHE_BYTES 64
6 #endif
7 
8 #include "hmbdc/Exception.hpp"
9 #include "hmbdc/Compile.hpp"
10 #include <stdexcept>
11 #include <utility>
12 #include <inttypes.h>
13 #include <limits>
14 
15 
16 namespace hmbdc { namespace pattern {
17 
18 using namespace std;
19 namespace {
20 
21 template <bool THREADSAFE>
22 inline __attribute__ ((always_inline))
23 void
24 my__sync_synchronize() {
25  if (THREADSAFE) __sync_synchronize();
26 }
27 
28 template <bool THREADSAFE, typename T>
29 inline __attribute__ ((always_inline))
30 bool
31 my__sync_bool_compare_and_swap(volatile T* var, T compVal, T newVal) {
32  if (THREADSAFE) return __sync_bool_compare_and_swap(var, compVal, newVal);
33  if (*var == compVal) {
34  *var = newVal;
35  return true;
36  }
37  return false;
38 }
39 
40 template <bool THREADSAFE, typename T>
41 inline __attribute__ ((always_inline))
42 T
43 my__sync_val_compare_and_swap(volatile T* var, T compVal, T newVal) {
44  if (THREADSAFE) return __sync_val_compare_and_swap(var, compVal, newVal);
45  auto res = *var;
46  if (*var == compVal) {
47  *var = newVal;
48  }
49  return res;
50 }
51 }
52 
53 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t, bool THREADSAFE = true>
54 struct SeqArb {
55  explicit SeqArb(Seq startSeq = 0)
56  : missedInit_(~((1ul << PARTICIPANT_COUNT) - 1u))
57  , gapFuncLock_(true)
58  , seq_(startSeq) {
59  static_assert(PARTICIPANT_COUNT <= 64u, "too many participants");
60  }
61 
62  SeqArb(SeqArb const&) = delete;
63  SeqArb& operator = (SeqArb const&) = delete;
64 
65  template <typename HitFunc, typename GapFunc>
66  inline __attribute__ ((always_inline))
67  bool operator()(uint16_t participantIndex, Seq seq, HitFunc&& h, GapFunc&& g) __restrict__ {
68  j_[participantIndex].seq = seq;
69 
70  auto old = my__sync_val_compare_and_swap<THREADSAFE>(&seq_, seq, std::numeric_limits<Seq>::max());
71 
72  if (old == seq) {
73  h();
74  my__sync_synchronize<THREADSAFE>();
75  seq_ = seq + 1;
76  return true; //arbitrated (utilized or discarded)
77  } else if (old == std::numeric_limits<Seq>::max()) {
78  return false ;
79  } else if (seq < old) {
80  return true; //arbitrated (discard)
81  } else { //seq > old
82  auto low = jLow();
83  if (unlikely((low > old && seq_ == old &&
84  my__sync_bool_compare_and_swap<THREADSAFE>(&seq_, old, std::numeric_limits<Seq>::max())))) {
85  g(low - old);
86  my__sync_synchronize<THREADSAFE>();
87  seq_ = low;
88  }
89  return false; //cannot decide and ask me later
90  }
91  }
92 
93  template <typename SeqGen, typename HitFunc, typename GapFunc>
94  inline __attribute__ ((always_inline))
95  size_t operator()(uint16_t participantIndex, SeqGen&& seqGen
96  , size_t seqSize, HitFunc&& h, GapFunc&& g) __restrict__ {
97  auto seq = seqGen();
98  j_[participantIndex].seq = seq;
99 
100  auto old = my__sync_val_compare_and_swap<THREADSAFE>(
101  &seq_, seq, std::numeric_limits<Seq>::max());
102 
103  if (old == seq) {
104  h();
105  auto preSeq = seq;
106  auto s = 1ul;
107  for (; s < seqSize; ++s) {
108  seq = seqGen();
109  if (seq - 1 == preSeq) {
110  preSeq = seq;
111  h();
112  } else {
113  break;
114  }
115  }
116  j_[participantIndex].seq = seq;
117  my__sync_synchronize<THREADSAFE>();
118  seq_ = seq + s;
119  return s;
120  } else if (old == std::numeric_limits<Seq>::max()) {
121  return 0;
122  } else if (seq < old) {
123  return 1ul; //arbitrated (discard)
124  } else { //seq > old
125  auto low = jLow();
126  if (unlikely((low > old && seq_ == old &&
127  my__sync_bool_compare_and_swap<THREADSAFE>(
128  &seq_, old, std::numeric_limits<Seq>::max())))) {
129  g(low - old);
130  my__sync_synchronize<THREADSAFE>();
131  seq_ = low;
132  }
133  return 0; //cannot decide and ask me later
134  }
135  }
136 
137  volatile Seq const& expectingSeq() const {
138  return seq_;
139  }
140 
141  volatile Seq& expectingSeq() {
142  return seq_;
143  }
144 
145 private:
146  inline __attribute__ ((always_inline))
147  Seq jLow() const __restrict__ {
148  auto res = j_[0].seq;
149  for (auto i = 1u; i < PARTICIPANT_COUNT; ++i)
150  if (res > j_[i].seq) res = j_[i].seq;
151  return res;
152  }
153 
154  uint64_t const missedInit_;
155  bool gapFuncLock_;
156  volatile Seq seq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
157  uint64_t missed_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
158  struct J {
159  J() : seq(0u){}
160  Seq seq;
161  } __attribute__((__aligned__(SMP_CACHE_BYTES)));
162  J j_[PARTICIPANT_COUNT];
163 };
164 
165 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t>
167  SingleThreadSeqArb(Seq startSeq = 0)
168  : arb_(startSeq){}
169 
170  template <typename GapFunc>
171  int operator()(uint16_t participantIndex, Seq seq, GapFunc&& gapFunc) {
172  int res = -1;
173  if (!arb_(participantIndex, seq
174  , [&res]() {
175  res = 1;
176  }
177  , forward<GapFunc>(gapFunc)
178  )
179  ) {
180  res = 0;
181  }
182  return res;
183  }
184 
185  volatile Seq& expectingSeq() {
186  return arb_.expectingSeq();
187  }
188 
189  volatile Seq const& expectingSeq() const {
190  return arb_.expectingSeq();
191  }
192 
193 private:
195 };
196 
197 }}
Definition: TypedString.hpp:74
Definition: SeqArb.hpp:166
Definition: SeqArb.hpp:54
Definition: Client.hpp:11