1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Message.hpp" 4 #include "hmbdc/time/Timers.hpp" 5 #include "hmbdc/pattern/BlockingBuffer.hpp" 6 #include "hmbdc/os/Thread.hpp" 7 #include "hmbdc/Traits.hpp" 8 #include "hmbdc/Exception.hpp" 15 namespace hmbdc {
namespace app {
16 namespace blocking_context_detail {
18 template <
bool is_timer_manager>
21 void operator()(C&) {}
27 tm.checkTimers(time::SysTime::now());
31 template <
typename... MessageTuples>
36 template <
typename MessageTuple,
typename... MessageTuples>
45 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
47 return std::min(tm.untilNextFire(), maxBlockingTime);
51 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
53 return maxBlockingTime;
56 template <
typename CcClient>
57 bool runOnceImpl(
bool& HMBDC_RESTRICT stopped
65 const bool clientParticipateInMessaging =
66 std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
67 if (clientParticipateInMessaging && buf) {
68 uint64_t count = buf->peek(begin, end);
70 c.CcClient::handleRangeImpl(b, end, 0xffff);
71 c.CcClient::invokedCb(0xffff);
72 buf->wasteAfterPeek(count);
74 buf->waitItem(waitDuration(c, maxBlockingTime));
77 c.CcClient::invokedCb(0xffffu);
79 }
catch (std::exception
const& e) {
99 template <
typename CcClient>
100 bool runOnceLoadSharingImpl(
bool& HMBDC_RESTRICT stopped
108 const bool clientParticipateInMessaging =
109 std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
110 if (clientParticipateInMessaging && buf) {
111 uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
112 void* tmp = msgWrapped;
113 if (buf->tryTake(tmp, 0, maxBlockingTime)) {
114 c.CcClient::handleMessage(*static_cast<MessageHead*>(tmp));
117 c.CcClient::invokedCb(0xffffu);
118 }
catch (std::exception
const& e) {
153 template <
typename... MessageTuples>
157 template <
typename Message>
struct can_handle;
165 using Interests =
typename cpa::Interests;
168 template <
typename T>
169 bool operator()(T&&) {
return true;}
218 template <
typename Client,
typename DeliverPred = deliverAll>
220 ,
size_t capacity = 1024
222 , uint64_t cpuAffinity = 0
224 , DeliverPred&& pred = DeliverPred()) {
225 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
227 msgConduits_, t, std::forward<DeliverPred>(pred));
228 auto thrd = kickOffClientThread(blocking_context_detail::runOnceImpl<Client>
229 , c, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
230 threads_.push_back(move(thrd));
254 template <
typename LoadSharingClientPtrIt,
typename DeliverPred = deliverAll>
255 void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
256 ,
size_t capacity = 1024
258 typename std::remove_reference<decltype(**LoadSharingClientPtrIt())>::type::Interests
260 , uint64_t cpuAffinity = 0
262 , DeliverPred&& pred = DeliverPred()) {
263 if (begin == end)
return;
264 using Client =
typename std::remove_reference<decltype(**begin)>::type;
265 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
267 msgConduits_, t, std::forward<DeliverPred>(pred));
268 while(begin != end) {
269 auto thrd = kickOffClientThread(blocking_context_detail::runOnceLoadSharingImpl<Client>
270 , **begin++, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
271 threads_.push_back(std::move(thrd));
281 __atomic_thread_fence(__ATOMIC_ACQUIRE);
290 for (
auto& t : threads_) {
304 template <
typename Message>
305 typename std::enable_if<can_handle<Message>::value,
void>::type
307 using M =
typename std::remove_reference<Message>::type;
309 static_assert(i < std::tuple_size<MsgConduits>::value,
"unexpected message type");
310 auto& entries = std::get<i>(msgConduits_);
311 for (
auto i = 0u; i < entries.size(); ++i) {
312 auto& e = entries[i];
313 if (e.diliverPred(m)) {
314 if (i == entries.size() - 1) {
316 .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
318 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
323 template <
typename Message>
324 typename std::enable_if<!can_handle<Message>::value,
void>::type
336 template <
typename Message,
typename ... Args>
337 typename std::enable_if<can_handle<Message>::value,
void>::type
339 using M =
typename std::remove_reference<Message>::type;
341 auto& entries = std::get<i>(msgConduits_);
342 for (
auto i = 0u; i < entries.size(); ++i) {
343 auto& e = entries[i];
344 if (i == entries.size() - 1) {
346 .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
348 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
352 template <
typename Message,
typename ... Args>
353 typename std::enable_if<!can_handle<Message>::value,
void>::type
354 sendInPlace(Args&&...) {}
368 template <
typename Message>
369 typename std::enable_if<can_handle<Message>::value,
bool>::type
371 using M =
typename std::remove_reference<Message>::type;
373 auto& entries = std::get<i>(msgConduits_);
374 for (
auto i = 0u; i < entries.size(); ++i) {
375 auto& e = entries[i];
376 if (e.diliverPred(m)) {
377 if (i == entries.size() - 1) {
378 if (!e.transport->buffer
392 template <
typename Message>
393 typename std::enable_if<!can_handle<Message>::value,
bool>::type
412 template <
typename Message,
typename ... Args>
413 typename std::enable_if<can_handle<Message>::value,
bool>::type
415 using M =
typename std::remove_reference<Message>::type;
417 auto& entries = std::get<i>(msgConduits_);
418 for (
auto i = 0u; i < entries.size(); ++i) {
419 auto& e = entries[i];
420 if (i == entries.size() - 1) {
421 if (!e.transport->buffer.template
426 if (!e.transport->buffer.template
435 template <
typename Message,
typename ... Args>
436 typename std::enable_if<!can_handle<Message>::value,
bool>::type
437 trySendInPlace(Args&&... args) {
450 template <
typename ForwardIt>
451 typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::value,
void>::type
452 send(ForwardIt begin,
size_t n) {
454 using Message = decltype(*begin);
455 using M =
typename std::remove_reference<Message>::type;
457 auto& entries = std::get<i>(msgConduits_);
458 for (
auto i = 0u; i < entries.size(); ++i) {
459 auto& e = entries[i];
460 if (e.diliverPred(*begin)) {
461 while (!e.transport->buffer.template tryPutBatchInPlace<
MessageWrap<M>>(begin, n)) {}
466 template <
typename ForwardIt>
467 typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::value,
void>::type
468 send(ForwardIt begin,
size_t n) {}
474 Transport(
size_t maxItemSize,
size_t capacity)
475 : buffer(maxItemSize +
sizeof(
MessageHead), capacity){}
479 template <
typename Message>
481 std::shared_ptr<Transport> transport;
482 std::function<bool (Message const&)> diliverPred;
484 template <
typename Message>
485 using TransportEntries = std::vector<TransportEntry<Message>>;
487 template <
typename Tuple>
struct MCGen;
488 template <
typename ...Messages>
490 using type = std::tuple<TransportEntries<Messages> ...>;
495 MsgConduits msgConduits_;
496 using Threads = std::vector<std::thread>;
500 template <
typename MsgConduits,
typename DeliverPred,
typename Tuple>
502 void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
506 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
507 typename std::enable_if<careMsg, void>::type
508 doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
512 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
513 typename std::enable_if<!careMsg, void>::type
514 doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
518 template <
typename MsgConduits,
typename DeliverPred,
typename M,
typename ...Messages>
520 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
522 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
523 msgConduits, t, pred);
524 setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
525 (msgConduits,t, std::forward<DeliverPred>(pred));
529 template <
typename MsgConduits,
typename M,
typename ...Messages>
531 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t,
void*){
533 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
534 msgConduits, t, [](M
const&){
return true;});
535 setupConduit<MsgConduits,
void*, std::tuple<Messages...>>()
536 (msgConduits, t,
nullptr);
540 template <
typename Message>
543 using M =
typename std::remove_reference<Message>::type;
547 value = index < std::tuple_size<MsgConduits>::value?1:0,
551 template <
typename Client,
typename RunOnceFunc>
562 char const* schedule;
570 auto cpuAffinityMask = mask;
571 std::tie(schedule, priority) = c.
schedSpec();
573 if (!schedule) schedule =
"SCHED_OTHER";
575 os::configureCurrentThread(name.c_str(), cpuAffinityMask
576 , schedule, priority);
579 while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
581 if (this->stopped_) c.dropped();
Definition: BlockingBuffer.hpp:16
Definition: Traits.hpp:79
std::enable_if< can_handle< Message >::value, bool >::type trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:370
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:57
std::enable_if< can_handle< decltype(*(ForwardIt()))>::value, void >::type send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:452
Definition: BlockingContext.hpp:473
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::remove_reference< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:255
std::enable_if< can_handle< Message >::value, void >::type send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:306
Definition: TypedString.hpp:74
Definition: BlockingContext.hpp:32
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:67
Definition: Timers.hpp:65
Definition: BlockingContext.hpp:487
Definition: Traits.hpp:43
std::enable_if< can_handle< Message >::value, bool >::type trySendInPlace(Args &&... args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:414
Definition: BlockingContext.hpp:480
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingContext.hpp:157
Definition: BlockingContext.hpp:167
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:154
Definition: Message.hpp:42
Definition: BlockingContext.hpp:505
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:219
std::enable_if< can_handle< Message >::value, void >::type sendInPlace(Args &&... args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:338
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:289
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:280
Definition: Message.hpp:76
BlockingContext()
trivial ctor
Definition: BlockingContext.hpp:175
Definition: BlockingContext.hpp:501
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:91
Definition: Traits.hpp:95
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
Exception that just has an exit code.
Definition: Exception.hpp:28
Definition: BlockingBuffer.hpp:225
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:157
Definition: BlockingContext.hpp:19