1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/time/Timers.hpp" 4 #include "hmbdc/pattern/BlockingBuffer.hpp" 5 #include "hmbdc/Traits.hpp" 8 namespace hmbdc {
namespace app {
9 namespace blocking_context_detail {
11 template <
bool is_timer_manager>
14 void operator()(C&) {}
20 tm.checkTimers(time::SysTime::now());
24 template <
typename... MessageTuples>
29 template <
typename MessageTuple,
typename... MessageTuples>
38 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
40 return std::min(tm.untilNextFire(), maxBlockingTime);
44 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
46 return maxBlockingTime;
49 template <
typename CcClient>
57 const bool clientParticipateInMessaging =
58 std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE != 0;
59 if (clientParticipateInMessaging) {
60 uint64_t count = buf.peek(begin, end);
62 c.CcClient::handleRangeImpl(b, end, 0xffff);
63 c.CcClient::invokedCb(0xffff);
64 buf.wasteAfterPeek(count);
66 buf.waitItem(waitDuration(c, maxBlockingTime));
69 c.CcClient::invokedCb(0xffffu);
71 }
catch (std::exception
const& e) {
105 template <
typename... MessageTuples>
116 using Interests =
typename cpa::Interests;
148 template <
typename Client>
150 ,
size_t capacity = 1024
152 , uint64_t cpuAffinity = 0
154 std::shared_ptr<Transport> t;
155 if (std::tuple_size<typename Client::Interests>::value) {
156 t.reset(
new Transport(maxItemSize, capacity));
159 auto thrd = kickOffClientThread(c, t->buffer, cpuAffinity, maxBlockingTime);
160 threads_.push_back(move(thrd));
201 template <
typename Client,
typename DeliverPred>
205 , uint64_t cpuAffinity
207 , DeliverPred&& pred) {
208 std::shared_ptr<Transport> t;
209 if (std::tuple_size<typename Client::Interests>::value) {
210 t.reset(
new Transport(maxItemSize, capacity));
212 msgConduits_, c, t, std::forward<DeliverPred>(pred));
214 auto thrd = kickOffClientThread(c, t->buffer, cpuAffinity, maxBlockingTime);
215 threads_.push_back(move(thrd));
224 __atomic_thread_fence(__ATOMIC_ACQUIRE);
233 for (
auto& t : threads_) {
247 template <
typename Message>
249 using M =
typename std::remove_reference<Message>::type;
251 static_assert(i < std::tuple_size<MsgConduits>::value,
"message type unspecified for the blocking BlockingContext");
252 auto& entries = std::get<i>(msgConduits_);
253 for (
auto& e : entries) {
254 if (e.diliverPred(std::forward<Message>(m))) {
255 e.transport->buffer.put(
MessageWrap<M>(std::forward<Message>(m)));
271 template <
typename Message>
273 using M =
typename std::remove_reference<Message>::type;
275 static_assert(i < std::tuple_size<MsgConduits>::value,
"unspecified for the blocking BlockingContext");
276 auto& entries = std::get<i>(msgConduits_);
277 for (
auto& e : entries) {
278 if (e.diliverPred(std::forward<Message>(m))) {
279 if (!e.transport->buffer.tryPut(
MessageWrap<M>(std::forward<Message>(m)))) {
296 template <
typename Message,
typename ... Args>
298 using M =
typename std::remove_reference<Message>::type;
299 return trySend(M(std::forward<Args>(args)...));
310 template <
typename ForwardIt>
312 send(ForwardIt begin,
size_t n) {
313 for (
auto i = 0u; i < n; ++i, begin++) {
322 Transport(
size_t maxItemSize,
size_t capacity)
323 : buffer(maxItemSize +
sizeof(
MessageHead), capacity){}
327 template <
typename Message>
329 std::shared_ptr<Transport> transport;
330 std::function<bool (Message const&)> diliverPred;
332 template <
typename Message>
333 using TransportEntries = std::vector<TransportEntry<Message>>;
335 template <
typename Tuple>
struct MCGen;
336 template <
typename ...Messages>
338 using type = std::tuple<TransportEntries<Messages> ...>;
343 MsgConduits msgConduits_;
344 using Threads = std::vector<std::thread>;
348 template <
typename MsgConduits,
typename Client,
typename DeliverPred,
typename Tuple>
350 void operator()(MsgConduits&,
Client&, std::shared_ptr<Transport>, DeliverPred&&){}
353 template <
typename MsgConduits,
typename Client,
typename DeliverPred,
typename M,
typename ...Messages>
355 void operator()(MsgConduits& msgConduits, Client& c, std::shared_ptr<Transport> t, DeliverPred&& pred){
357 static_assert(i != std::tuple_size<Interests>::value,
"");
358 std::get<i>(msgConduits).emplace_back(
TransportEntry<M>{t, std::forward<DeliverPred>(pred)});
360 setupConduitFor<MsgConduits, Client, DeliverPred, std::tuple<Messages...>>()
361 (msgConduits, c, t, std::forward<DeliverPred>(pred));
365 template <
typename MsgConduits,
typename Client,
typename M,
typename ...Messages>
367 void operator()(MsgConduits& msgConduits, Client& c, std::shared_ptr<Transport> t,
void*){
369 static_assert(i != std::tuple_size<Interests>::value,
"");
370 std::get<i>(msgConduits).emplace_back(
TransportEntry<M>{t, [](M
const&){
return true;}});
373 (msgConduits, c, t,
nullptr);
377 template <
typename Client>
385 , maxBlockingTime]() {
387 char const* schedule;
395 auto cpuAffinityMask = mask;
396 std::tie(schedule, priority) = c.
schedSpec();
398 if (!schedule) schedule =
"SCHED_OTHER";
400 os::configureCurrentThread(name.c_str(), cpuAffinityMask
401 , schedule, priority);
405 blocking_context_detail::runOnceImpl(this->stopped_, buffer, c, maxBlockingTime)) {
407 if (this->stopped_) c.dropped();
Definition: BlockingBuffer.hpp:17
Definition: Traits.hpp:79
void start(Client &c, size_t capacity, size_t maxItemSize, uint64_t cpuAffinity, time::Duration maxBlockingTime, DeliverPred &&pred)
start a client within the Context. The client is powered by a single OS thread
Definition: BlockingContext.hpp:202
void send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:312
Definition: BlockingContext.hpp:321
Definition: BlockingContext.hpp:349
Definition: TypedString.hpp:74
Definition: BlockingContext.hpp:25
Definition: Timers.hpp:65
Definition: BlockingContext.hpp:335
Definition: Traits.hpp:43
bool trySend(Message &&m)
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:272
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1))
start a client within the Context. The client is powered by a single OS thread
Definition: BlockingContext.hpp:149
Definition: BlockingContext.hpp:328
Unknown excpetion.
Definition: Exception.hpp:17
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:106
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:69
Definition: Message.hpp:38
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:59
bool trySendInPlace(Args &&...args)
it does the same thing as trySend
Definition: BlockingContext.hpp:297
Definition: BlockingBuffer.hpp:175
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:232
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:223
Definition: Message.hpp:72
BlockingContext()
trivial ctor
Definition: BlockingContext.hpp:121
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:93
Definition: Traits.hpp:95
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Exception that just has an exit code.
Definition: Exception.hpp:28
void send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:248
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:12