hmbdc
simplify-high-performance-messaging-programming
BlockingBuffer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/Compile.hpp"
4 #include "hmbdc/time/Time.hpp"
5 #include "hmbdc/Config.hpp"
6 #include <mutex> // std::mutex, std::unique_lock
7 #include <condition_variable> // std::condition_variable
8 
9 
10 namespace hmbdc { namespace pattern {
11 
12 namespace blocking_buffer_detail {
13 struct iterator;
14 }
15 
18  using value_type = void *;
19  BlockingBuffer(size_t, size_t);
20  ~BlockingBuffer();
21  BlockingBuffer(BlockingBuffer const&) = delete;
22  BlockingBuffer& operator = (BlockingBuffer const&) = delete;
23 
24  value_type getItem(size_t seq) {
25  return store_ + seq % capacity_ * maxItemSize_;
26  }
27 
28  size_t maxItemSize() const {return maxItemSize_;}
29  size_t capacity() const {return capacity_;}
30  void put(void const* item, size_t sizeHint = 0) {
31  using namespace std;
32  unique_lock<mutex> lck(mutex_);
33  hasSlot_.wait(lck, [this]{return capacity_ > size_;});
34  memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
35  if (++size_ == 1) hasItem_.notify_one();
36  }
37 
38  template <typename T, typename... Ts>
39  void
40  put(T&& item, Ts&&... items) {
41  using namespace std;
42  unique_lock<mutex> lck(mutex_);
43  hasSlot_.wait(lck, [this]{return capacity_ > size_ + sizeof...(Ts);});
44  fill(std::forward<T>(item), std::forward<Ts>(items)...);
45  size_ += sizeof...(items) + 1;
46  if (size_ == sizeof...(items) + 1) hasItem_.notify_one();
47  }
48 
49  template <typename It>
50  bool
51  tryPutBatch(It begin, size_t n) {
52  using namespace std;
53  unique_lock<mutex> lck(mutex_);
54  if (hasSlot_.wait_for(
55  lck, chrono::seconds(0), [this, n]{return capacity_ > size_ + n - 1;})) {
56  fillBatch(begin, n);
57  size_ += n;
58  if (size_ == n) hasItem_.notify_all();
59  return true;
60  }
61  return false;
62  }
63 
64  template <typename Item, typename It>
65  bool
66  tryPutBatchInPlace(It begin, size_t n) {
67  using namespace std;
68  unique_lock<mutex> lck(mutex_);
69  if (hasSlot_.wait_for(
70  lck, chrono::seconds(0), [this, n]{return capacity_ > size_ + n - 1;})) {
71  fillBatchInPlace<Item>(begin, n);
72  size_ += n;
73  if (size_ == n) hasItem_.notify_all();
74  return true;
75  }
76  return false;
77  }
78 
79 
80  template <typename T> void putSome(T const& item) {put(&item);}
81  template <typename T, typename ...Args>
82  void putInPlace(Args&&... args) {
83  using namespace std;
84  unique_lock<mutex> lck(mutex_);
85  hasSlot_.wait(lck, [this](){return capacity_ > size_;});
86  new (getItem(seq_++)) T(std::forward<Args>(args)...);
87  if (++size_ == 1) hasItem_.notify_one();
88  }
89  template <typename T, typename ...Args>
90  bool tryPutInPlace(Args&&... args) {
91  using namespace std;
92  unique_lock<mutex> lck(mutex_);
93  if (!hasSlot_.wait_for(lck, chrono::seconds(0), [this](){return capacity_ > size_;}))
94  return false;
95  new (getItem(seq_++)) T(std::forward<Args>(args)...);
96  if (++size_ == 1) hasItem_.notify_one();
97  return true;
98  }
99 
100  bool tryPut(void const* item, size_t sizeHint = 0
101  , time::Duration timeout = time::Duration::seconds(0)) {
102  using namespace std;
103  unique_lock<mutex> lck(mutex_);
104  if (!hasSlot_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds())
105  , [this](){return capacity_ > size_;}))
106  return false;
107  memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
108  if (++size_ == 1) hasItem_.notify_one();
109  return true;
110  }
111 
112  template <typename T>
113  bool tryPut(T const& item
114  , time::Duration timeout = time::Duration::seconds(0)) {
115  return tryPut(&item, sizeof(item), timeout);
116  }
117 
118  bool isFull() const {return size_ == capacity_;}
119  void take(void *dest, size_t sizeHint = 0) {
120  using namespace std;
121  unique_lock<mutex> lck(mutex_);
122  hasItem_.wait(lck, [this](){return size_;});
123  memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
124  if (size_-- == capacity_) hasSlot_.notify_all();
125  }
126 
127  template <typename T>
128  T& take(T& dest) {
129  take(&dest, sizeof(T));
130  return dest;
131  }
132 
133  bool tryTake(void *dest, size_t sizeHint = 0, time::Duration timeout = time::Duration::seconds(0)) {
134  using namespace std;
135  unique_lock<mutex> lck(mutex_);
136  if (!hasItem_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds()), [this](){return size_;})) return false;
137  memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
138  if (size_-- == capacity_) hasSlot_.notify_all();
139  return true;
140  }
141 
142  template <typename T>
143  bool tryTake(T& dest) {
144  return tryTake(&dest, sizeof(T));
145  }
146 
147  template <typename itOut>
148  size_t take(itOut b, itOut e) {
149  using namespace std;
150  unique_lock<mutex> lck(mutex_);
151  hasItem_.wait(lck, [this](){return size_;});
152  auto s = size_;
153  while (s && b != e) {
154  memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_, sizeof(*b)));
155  }
156  auto ret = size_ - s;
157  size_ = s;
158  if (size_ + ret == capacity_) {
159  hasSlot_.notify_all();
160  }
161  return ret;
162  }
163 
164  iterator peek();
165  size_t peek(iterator& b, iterator& e);
166  void wasteAfterPeek(size_t len) {
167  std::unique_lock<std::mutex> lck(mutex_);
168  if (size_ == capacity_) hasSlot_.notify_all();
169  size_ -= len;
170  }
171 
172  void waitItem(time::Duration timeout) {
173  using namespace std;
174  unique_lock<mutex> lck(mutex_);
175  hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
176  , [this](){return size_;});
177  }
178 
179  size_t remainingSize() const {
180  return size_;
181  }
182  void reset() {
183  size_ = 0;
184  seq_ = 0;
185  hasSlot_.notify_all();
186  }
187 
188 private:
189  void fill(){}
190  template <typename T, typename... Ts>
191  void
192  fill(T&& item, Ts&&... items) {
193  new (getItem(seq_++)) T(std::forward<T>(item));
194  fill(std::forward<Ts>(items)...);
195  }
196 
197  template <typename It>
198  void
199  fillBatch(It begin, size_t n) {
200  for (auto it = begin; n; ++it, --n) {
201  using T = decltype(*begin);
202  new (getItem(seq_++)) T(*it);
203  }
204  }
205 
206  template <typename Item, typename It>
207  void
208  fillBatchInPlace(It begin, size_t n) {
209  for (auto it = begin; n; ++it, --n) {
210  new (getItem(seq_++)) Item(*it);
211  }
212  }
213 
214  size_t maxItemSize_;
215  size_t capacity_;
216  char* store_;
217  size_t size_;
218  size_t seq_;
219  std::mutex mutex_;
220  std::condition_variable hasItem_;
221  std::condition_variable hasSlot_;
222 };
223 
224 namespace blocking_buffer_detail {
225 struct iterator {
226  iterator(BlockingBuffer* HMBDC_RESTRICT buf, size_t seq)
227  : buf_(buf), seq_(seq){}
228  iterator() : buf_(nullptr), seq_(0){}
229  void clear() {buf_ = nullptr;}
230 
231  iterator& operator ++() HMBDC_RESTRICT {
232  ++seq_;
233  return *this;
234  }
235 
236  iterator operator ++(int) HMBDC_RESTRICT {
237  iterator tmp = *this;
238  ++*this;
239  return tmp;
240  }
241 
242  iterator operator + (size_t dis) const HMBDC_RESTRICT {
243  iterator tmp = *this;
244  tmp.seq_ += dis;
245  return tmp;
246  }
247 
248  iterator& operator += (size_t dis) HMBDC_RESTRICT {
249  seq_ += dis;
250  return *this;
251  }
252 
253  size_t operator - (iterator const& other) const HMBDC_RESTRICT {
254  return seq_ - other.seq_;
255  }
256 
257  explicit operator bool() const HMBDC_RESTRICT {
258  return buf_;
259  }
260 
261  bool operator < (iterator const& other) const HMBDC_RESTRICT {
262  return seq_ < other.seq_;
263  }
264 
265  bool operator == (iterator const& other) const HMBDC_RESTRICT {return seq_ == other.seq_;}
266  bool operator != (iterator const& other) const HMBDC_RESTRICT {return seq_ != other.seq_;}
267  void* operator*() const HMBDC_RESTRICT {return buf_->getItem(seq_);}
268  template <typename T> T& get() const HMBDC_RESTRICT {return *static_cast<T*>(**this);}
269  template <typename T>
270  T* operator->() HMBDC_RESTRICT {return static_cast<T*>(buf_->getItem(seq_));}
271 private:
272  BlockingBuffer* buf_;
273  size_t seq_;
274 };
275 } //blocking_buffer_detail
276 
277 inline
278 size_t
279 BlockingBuffer::
280 peek(iterator& b, iterator& e) {
281  std::unique_lock<std::mutex> lck(mutex_);
282  e = iterator(this, seq_);
283  b = iterator(this, seq_ - size_);
284  return size_;
285 }
286 
287 inline
289 BlockingBuffer::
290 peek() {
291  std::unique_lock<std::mutex> lck(mutex_);
292  if (size_) {
293  return iterator(this, seq_ - size_);
294  }
295  return iterator();
296 }
297 }}
Definition: BlockingBuffer.hpp:16
Definition: TypedString.hpp:74
Definition: Time.hpp:125
Definition: BlockingBuffer.hpp:225
Definition: Base.hpp:12