1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/Compile.hpp" 4 #include "hmbdc/time/Time.hpp" 5 #include "hmbdc/Config.hpp" 7 #include <condition_variable> 10 namespace hmbdc {
namespace pattern {
12 namespace blocking_buffer_detail {
18 using value_type =
void *;
24 value_type getItem(
size_t seq) {
25 return store_ + seq % capacity_ * maxItemSize_;
28 size_t maxItemSize()
const {
return maxItemSize_;}
29 size_t capacity()
const {
return capacity_;}
30 void put(
void const* item,
size_t sizeHint = 0) {
32 unique_lock<mutex> lck(mutex_);
33 hasSlot_.wait(lck, [
this]{
return capacity_ > size_;});
34 memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
35 if (++size_ == 1) hasItem_.notify_one();
38 template <
typename T,
typename... Ts>
40 put(T&& item, Ts&&... items) {
42 unique_lock<mutex> lck(mutex_);
43 hasSlot_.wait(lck, [
this]{
return capacity_ > size_ +
sizeof...(Ts);});
44 fill(std::forward<T>(item), std::forward<Ts>(items)...);
45 size_ +=
sizeof...(items) + 1;
46 if (size_ ==
sizeof...(items) + 1) hasItem_.notify_one();
49 template <
typename It>
51 tryPutBatch(It begin,
size_t n) {
53 unique_lock<mutex> lck(mutex_);
54 if (hasSlot_.wait_for(
55 lck, chrono::seconds(0), [
this, n]{return capacity_ > size_ + n - 1;})) {
58 if (size_ == n) hasItem_.notify_all();
64 template <
typename Item,
typename It>
66 tryPutBatchInPlace(It begin,
size_t n) {
68 unique_lock<mutex> lck(mutex_);
69 if (hasSlot_.wait_for(
70 lck, chrono::seconds(0), [
this, n]{return capacity_ > size_ + n - 1;})) {
71 fillBatchInPlace<Item>(begin, n);
73 if (size_ == n) hasItem_.notify_all();
80 template <
typename T>
void putSome(T
const& item) {put(&item);}
81 template <
typename T,
typename ...Args>
82 void putInPlace(Args&&... args) {
84 unique_lock<mutex> lck(mutex_);
85 hasSlot_.wait(lck, [
this](){
return capacity_ > size_;});
86 new (getItem(seq_++)) T(std::forward<Args>(args)...);
87 if (++size_ == 1) hasItem_.notify_one();
89 template <
typename T,
typename ...Args>
90 bool tryPutInPlace(Args&&... args) {
92 unique_lock<mutex> lck(mutex_);
93 if (!hasSlot_.wait_for(lck, chrono::seconds(0), [
this](){return capacity_ > size_;}))
95 new (getItem(seq_++)) T(std::forward<Args>(args)...);
96 if (++size_ == 1) hasItem_.notify_one();
100 bool tryPut(
void const* item,
size_t sizeHint = 0
103 unique_lock<mutex> lck(mutex_);
104 if (!hasSlot_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds())
105 , [
this](){
return capacity_ > size_;}))
107 memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
108 if (++size_ == 1) hasItem_.notify_one();
112 template <
typename T>
113 bool tryPut(T
const& item
115 return tryPut(&item,
sizeof(item), timeout);
118 bool isFull()
const {
return size_ == capacity_;}
119 void take(
void *dest,
size_t sizeHint = 0) {
121 unique_lock<mutex> lck(mutex_);
122 hasItem_.wait(lck, [
this](){
return size_;});
123 memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
124 if (size_-- == capacity_) hasSlot_.notify_all();
127 template <
typename T>
129 take(&dest,
sizeof(T));
133 bool tryTake(
void *dest,
size_t sizeHint = 0,
time::Duration timeout = time::Duration::seconds(0)) {
135 unique_lock<mutex> lck(mutex_);
136 if (!hasItem_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds()), [
this](){
return size_;}))
return false;
137 memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
138 if (size_-- == capacity_) hasSlot_.notify_all();
142 template <
typename T>
143 bool tryTake(T& dest) {
144 return tryTake(&dest,
sizeof(T));
147 template <
typename itOut>
148 size_t take(itOut b, itOut e) {
150 unique_lock<mutex> lck(mutex_);
151 hasItem_.wait(lck, [
this](){
return size_;});
153 while (s && b != e) {
154 memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_,
sizeof(*b)));
156 auto ret = size_ - s;
158 if (size_ + ret == capacity_) {
159 hasSlot_.notify_all();
166 void wasteAfterPeek(
size_t len) {
167 std::unique_lock<std::mutex> lck(mutex_);
168 if (size_ == capacity_) hasSlot_.notify_all();
174 unique_lock<mutex> lck(mutex_);
175 hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
176 , [
this](){
return size_;});
179 size_t remainingSize()
const {
185 hasSlot_.notify_all();
190 template <
typename T,
typename... Ts>
192 fill(T&& item, Ts&&... items) {
193 new (getItem(seq_++)) T(std::forward<T>(item));
194 fill(std::forward<Ts>(items)...);
197 template <
typename It>
199 fillBatch(It begin,
size_t n) {
200 for (
auto it = begin; n; ++it, --n) {
201 using T = decltype(*begin);
202 new (getItem(seq_++)) T(*it);
206 template <
typename Item,
typename It>
208 fillBatchInPlace(It begin,
size_t n) {
209 for (
auto it = begin; n; ++it, --n) {
210 new (getItem(seq_++)) Item(*it);
220 std::condition_variable hasItem_;
221 std::condition_variable hasSlot_;
224 namespace blocking_buffer_detail {
227 : buf_(buf), seq_(seq){}
228 iterator() : buf_(
nullptr), seq_(0){}
229 void clear() {buf_ =
nullptr;}
231 iterator& operator ++() HMBDC_RESTRICT {
236 iterator operator ++(
int) HMBDC_RESTRICT {
242 iterator operator + (
size_t dis)
const HMBDC_RESTRICT {
248 iterator& operator += (
size_t dis) HMBDC_RESTRICT {
253 size_t operator - (
iterator const& other)
const HMBDC_RESTRICT {
254 return seq_ - other.seq_;
257 explicit operator bool()
const HMBDC_RESTRICT {
261 bool operator < (
iterator const& other)
const HMBDC_RESTRICT {
262 return seq_ < other.seq_;
265 bool operator == (
iterator const& other)
const HMBDC_RESTRICT {
return seq_ == other.seq_;}
266 bool operator != (
iterator const& other)
const HMBDC_RESTRICT {
return seq_ != other.seq_;}
267 void* operator*()
const HMBDC_RESTRICT {
return buf_->getItem(seq_);}
268 template <
typename T> T&
get()
const HMBDC_RESTRICT {
return *
static_cast<T*
>(**this);}
269 template <
typename T>
270 T* operator->() HMBDC_RESTRICT {
return static_cast<T*
>(buf_->getItem(seq_));}
281 std::unique_lock<std::mutex> lck(mutex_);
291 std::unique_lock<std::mutex> lck(mutex_);
293 return iterator(
this, seq_ - size_);
Definition: BlockingBuffer.hpp:16
Definition: TypedString.hpp:74
Definition: BlockingBuffer.hpp:225