hmbdc
simplify-high-performance-messaging-programming
BlockingBuffer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/os/Allocators.hpp"
5 #include "hmbdc/time/Time.hpp"
6 #include "hmbdc/Config.hpp"
7 #include <mutex> // std::mutex, std::unique_lock
8 #include <condition_variable> // std::condition_variable
9 
10 
11 namespace hmbdc { namespace pattern {
12 
13 namespace classic_buffer_detail {
14 struct iterator;
15 }
16 
19  using value_type = void *;
20  BlockingBuffer(size_t, size_t);
21  ~BlockingBuffer();
22  BlockingBuffer(BlockingBuffer const&) = delete;
23  BlockingBuffer& operator = (BlockingBuffer const&) = delete;
24 
25  value_type getItem(size_t seq) {
26  return store_ + seq % capacity_ * maxItemSize_;
27  }
28 
29  size_t maxItemSize() const {return maxItemSize_;}
30  size_t capacity() const {return capacity_;}
31  void put(void const* item, size_t sizeHint = 0) {
32  using namespace std;
33  unique_lock<mutex> lck(mutex_);
34  hasSlot_.wait(lck, [this]{return capacity_ > size_;});
35  memcpy(getItem(seq_++), item, min(sizeHint, maxItemSize_));
36  if (++size_ == 1) hasItem_.notify_one();
37  }
38 
39  template <typename T, typename... Ts>
40  void
41  put(T&& item, Ts&&... items) {
42  using namespace std;
43  unique_lock<mutex> lck(mutex_);
44  hasSlot_.wait(lck, [this]{return capacity_ > size_ + sizeof...(Ts);});
45  fill(std::forward<T>(item), std::forward<Ts>(items)...);
46  size_ += sizeof...(items) + 1;
47  if (size_ == sizeof...(items) + 1) hasItem_.notify_one();
48  }
49 
50  template <typename T> void putSome(T const& item) {put(&item);}
51  template <typename T, typename ...Args>
52  void putInPlace(Args&&... args) {
53  using namespace std;
54  unique_lock<mutex> lck(mutex_);
55  hasSlot_.wait(lck, [this](){return capacity_ > size_;});
56  new (getItem(seq_++)) T(std::forward<Args>(args)...);
57  if (++size_ == 1) hasItem_.notify_one();
58  }
59  template <typename T, typename ...Args>
60  bool tryPutInPlace(Args&&... args) {
61  using namespace std;
62  unique_lock<mutex> lck(mutex_);
63  if (!hasSlot_.wait_for(lck, chrono::seconds(0), [this](){return capacity_ > size_;}))
64  return false;
65  new (getItem(seq_++)) T(std::forward<Args>(args)...);
66  if (++size_ == 1) hasItem_.notify_one();
67  return true;
68  }
69 
70  bool tryPut(void const* item, size_t sizeHint = 0) {
71  using namespace std;
72  unique_lock<mutex> lck(mutex_);
73  if (!hasSlot_.wait_for(lck, chrono::seconds(0), [this](){return capacity_ > size_;}))
74  return false;
75  memcpy(getItem(seq_++), item, min(sizeHint, maxItemSize_));
76  if (++size_ == 1) hasItem_.notify_one();
77  return true;
78  }
79 
80  template <typename T>
81  bool tryPut(T const& item) {
82  return tryPut(&item, sizeof(item));
83  }
84 
85  bool isFull() const {return size_ == capacity_;}
86  void take(void *dest, size_t len = 0) {
87  using namespace std;
88  unique_lock<mutex> lck(mutex_);
89  hasItem_.wait(lck, [this](){return size_;});
90  memcpy(dest, getItem(seq_ - size_), min(len, maxItemSize_));
91  if (size_-- == capacity_) hasSlot_.notify_all();
92  }
93 
94  template <typename T>
95  T& take(T& dest) {
96  take(&dest, sizeof(T));
97  return dest;
98  }
99 
100  bool tryTake(void *dest, size_t sizeHint = 0) {
101  using namespace std;
102  unique_lock<mutex> lck(mutex_);
103  if (!hasItem_.wait_for(lck, chrono::seconds(0), [this](){return size_;})) return false;
104  memcpy(dest, getItem(seq_ - size_), min(sizeHint, maxItemSize_));
105  if (size_-- == capacity_) hasSlot_.notify_all();
106  return true;
107  }
108 
109  template <typename T>
110  bool tryTake(T& dest) {
111  return tryTake(&dest, sizeof(T));
112  }
113 
114  template <typename itOut>
115  size_t take(itOut b, itOut e) {
116  using namespace std;
117  unique_lock<mutex> lck(mutex_);
118  hasItem_.wait(lck, [this](){return size_;});
119  auto s = size_;
120  while (s && b != e) {
121  memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_, sizeof(*b)));
122  }
123  auto ret = size_ - s;
124  size_ = s;
125  if (size_ + ret == capacity_) {
126  hasSlot_.notify_all();
127  }
128  return ret;
129  }
130 
131  iterator peek();
132  size_t peek(iterator& b, iterator& e);
133  void wasteAfterPeek(size_t len) {
134  std::unique_lock<std::mutex> lck(mutex_);
135  if (size_ == capacity_) hasSlot_.notify_all();
136  size_ -= len;
137  }
138 
139  void waitItem(time::Duration timeout) {
140  using namespace std;
141  unique_lock<mutex> lck(mutex_);
142  hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
143  , [this](){return size_;});
144  }
145 
146  size_t remainingSize() const {
147  return size_;
148  }
149  void reset() {
150  size_ = 0;
151  seq_ = 0;
152  hasSlot_.notify_all();
153  }
154 
155 private:
156  void fill(){}
157  template <typename T, typename... Ts>
158  void
159  fill(T&& item, Ts&&... items) {
160  memcpy(getItem(seq_++), &item, maxItemSize_);
161  fill(std::forward<Ts>(items)...);
162  }
163 
164  size_t maxItemSize_;
165  size_t capacity_;
166  char* store_;
167  size_t size_;
168  size_t seq_;
169  std::mutex mutex_;
170  std::condition_variable hasItem_;
171  std::condition_variable hasSlot_;
172 };
173 
174 namespace classic_buffer_detail {
175 struct iterator {
176  iterator(BlockingBuffer* HMBDC_RESTRICT buf, size_t seq)
177  : buf_(buf), seq_(seq){}
178  iterator() : buf_(nullptr), seq_(0){}
179  void clear() {buf_ = nullptr;}
180 
181  iterator& operator ++() HMBDC_RESTRICT {
182  ++seq_;
183  return *this;
184  }
185 
186  iterator operator ++(int) HMBDC_RESTRICT {
187  iterator tmp = *this;
188  ++*this;
189  return tmp;
190  }
191 
192  iterator operator + (size_t dis) const HMBDC_RESTRICT {
193  iterator tmp = *this;
194  tmp.seq_ += dis;
195  return tmp;
196  }
197 
198  iterator& operator += (size_t dis) HMBDC_RESTRICT {
199  seq_ += dis;
200  return *this;
201  }
202 
203  size_t operator - (iterator const& other) const HMBDC_RESTRICT {
204  return seq_ - other.seq_;
205  }
206 
207  explicit operator bool() const HMBDC_RESTRICT {
208  return buf_;
209  }
210 
211  bool operator < (iterator const& other) const HMBDC_RESTRICT {
212  return seq_ < other.seq_;
213  }
214 
215  bool operator == (iterator const& other) const HMBDC_RESTRICT {return seq_ == other.seq_;}
216  bool operator != (iterator const& other) const HMBDC_RESTRICT {return seq_ != other.seq_;}
217  void* operator*() const HMBDC_RESTRICT {return buf_->getItem(seq_);}
218  template <typename T> T& get() const HMBDC_RESTRICT {return *static_cast<T*>(**this);}
219  template <typename T>
220  T* operator->() HMBDC_RESTRICT {return static_cast<T*>(buf_->getItem(seq_));}
221 private:
222  BlockingBuffer* buf_;
223  size_t seq_;
224 };
225 } //classic_buffer_detail
226 
227 inline
228 size_t
229 BlockingBuffer::
230 peek(iterator& b, iterator& e) {
231  std::unique_lock<std::mutex> lck(mutex_);
232  e = iterator(this, seq_);
233  b = iterator(this, seq_ - size_);
234  return size_;
235 }
236 
237 inline
239 BlockingBuffer::
240 peek() {
241  std::unique_lock<std::mutex> lck(mutex_);
242  if (size_) {
243  return iterator(this, seq_ - size_);
244  }
245  return iterator();
246 }
247 }}
Definition: BlockingBuffer.hpp:17
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:175
Definition: Time.hpp:125
Definition: Base.hpp:12