1 #include "hmbdc/Copyright.hpp" 4 #include "hmbdc/Config.hpp" 5 #include "hmbdc/app/utils/StuckClientPurger.hpp" 6 #include "hmbdc/numeric/BitMath.hpp" 7 #include "hmbdc/os/Allocators.hpp" 11 namespace hmbdc {
namespace app {
13 namespace context_property {
37 template <u
int16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
39 static_assert(max_parallel_consumer >= 4u
90 #include "hmbdc/app/ContextDetail.hpp" 91 namespace hmbdc {
namespace app {
108 template <
size_t MaxMessageSize,
typename... ContextProperties>
116 MAX_MESSAGE_SIZE = MaxMessageSize,
117 BUFFER_VALUE_SIZE = MaxMessageSize + 8u,
120 size_t maxMessageSize()
const {
121 if (MaxMessageSize == 0)
return maxMessageSizeRuntime_;
122 return MaxMessageSize;
133 template <
typename M0,
typename M1,
typename ... Messages>
134 typename enable_if<!is_integral<M1>::value,
void>::type
135 send(M0&& m0, M1&& m1, Messages&&... msgs) {
136 auto n =
sizeof...(msgs) + 2;
137 auto it = buffer_.claim(n);
138 sendRecursive(it, forward<M0>(m0), forward<M1>(m1), forward<Messages>(msgs)...);
139 buffer_.commit(it, n);
152 template <
typename M0,
typename M1,
typename ... Messages>
153 typename enable_if<!is_integral<M1>::value,
bool>::type
154 trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
155 auto n =
sizeof...(msgs) + 2;
156 auto it = buffer_.tryClaim(n);
158 sendRecursive(it, forward<M0>(m0), forward<M1>(m1), forward<Messages>(msgs)...);
159 buffer_.commit(it, n);
174 template <
typename ForwardIt>
176 send(ForwardIt begin,
size_t n) {
178 auto bit = buffer_.claim(n);
180 for (
auto i = 0ul; i < n; i++) {
181 using Message =
typename iterator_traits<ForwardIt>::value_type;
184 buffer_.commit(bit, n);
196 template <
typename ForwardIt>
200 auto bit = buffer_.tryClaim(n);
201 if (unlikely(!bit))
return false;
203 for (
auto i = 0ul; i < n; i++) {
204 using Message =
typename iterator_traits<ForwardIt>::value_type;
207 buffer_.commit(bit, n);
220 template <
typename Message>
222 using M =
typename remove_reference<Message>::type;
223 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
224 ,
"message too big");
225 if (unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
226 HMBDC_THROW(out_of_range,
"message too big");
240 template <
typename Message>
242 using M =
typename remove_reference<Message>::type;
243 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
244 ,
"message too big");
245 if (unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
246 HMBDC_THROW(out_of_range,
"message too big");
260 template <
typename Message,
typename ... Args>
263 ,
"message too big");
265 HMBDC_THROW(out_of_range,
"message too big");
267 buffer_.template putInPlace<MessageWrap<Message>>(forward<Args>(args)...);
281 ,
size_t maxMessageSizeRuntime = MAX_MESSAGE_SIZE
282 ,
char const* shmName =
nullptr)
284 , Buffer::footprint(maxMessageSizeRuntime + 8u
285 , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
287 , bufferptr_(allocator_.template allocate<Buffer>(1
288 , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
291 , buffer_(*bufferptr_) {
292 if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
293 HMBDC_THROW(out_of_range
294 ,
"can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
296 if (maxMessageSizeRuntime == 0) {
297 HMBDC_THROW(out_of_range,
"please set maxMessageSizeRuntime");
299 maxMessageSizeRuntime_ = maxMessageSizeRuntime;
300 primeBuffer<cpa::create_ipc && cpa::has_pool>();
301 if (cpa::create_ipc || cpa::attach_ipc) {
307 allocator_.unallocate(bufferptr_);
315 template <
typename BroadCastBuf>
317 void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
318 for (uint16_t i = poolThreadCount;
319 i < BroadCastBuf::max_parallel_consumer;
324 Allocator allocator_;
325 Buffer* __restrict__ bufferptr_;
326 Buffer& __restrict__ buffer_;
330 typename enable_if<doIt, void>::type
332 markDeadFrom(buffer_, 0);
336 typename enable_if<!doIt, void>::type
340 template <
typename M,
typename... Messages>
341 void sendRecursive(
typename Buffer::iterator it
342 , M&& msg, Messages&&... msgs) {
343 using Message =
typename remove_reference<M>::type;
345 ,
"message too big");
347 HMBDC_THROW(out_of_range,
"message too big");
350 sendRecursive(++it, forward<M>(msgs)...);
352 void sendRecursive(
typename Buffer::iterator) {}
354 size_t maxMessageSizeRuntime_;
383 template <
size_t MaxMessageSize = 0,
typename... ContextProperties>
387 using Buffer =
typename Base::Buffer;
398 Context(uint32_t messageQueueSizePower2Num = 20
399 ,
size_t maxPoolClientCount = 128
400 ,
size_t maxMessageSizeRuntime = MaxMessageSize)
402 messageQueueSizePower2Num, maxMessageSizeRuntime)
403 , startToBeContinued_(true)
404 , usedHmbdcCapacity_(0)
405 , currentThreadSerialNumber_(0)
407 , pool_(createPool(this->buffer_, maxPoolClientCount))
408 , poolThreadCount_(0) {
409 static_assert(!cpa::create_ipc && !cpa::attach_ipc
410 ,
"no name specified for ipc Context");
428 Context(
char const* ipcTransportName
429 , uint32_t messageQueueSizePower2Num = 20
430 ,
size_t maxPoolClientCount = 128
431 ,
size_t maxMessageSizeRuntime = MaxMessageSize
432 , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful)
434 messageQueueSizePower2Num, maxMessageSizeRuntime
436 , startToBeContinued_(
true)
437 , usedHmbdcCapacity_(0)
438 , currentThreadSerialNumber_(0)
440 , pool_(createPool(this->buffer_, maxPoolClientCount))
441 , poolThreadCount_(0)
442 , secondsBetweenPurge_(60)
443 , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
444 static_assert(cpa::create_ipc || cpa::attach_ipc
445 ,
"ctor can only be used with ipc turned on Context");
446 static_assert(!(cpa::create_ipc && cpa::attach_ipc)
447 ,
"Context cannot be both ipc_creator and ipc_attacher");
456 if (cpa::create_ipc) {
457 this->markDeadFrom(this->buffer_, 0);
476 template <
typename Client>
478 , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
479 static_assert(cpa::has_pool,
"pool is not support in the Context type");
480 if (is_base_of<single_thread_powered_client, Client>::value
481 && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
482 && poolThreadCount_ != 1) {
483 HMBDC_THROW(out_of_range
484 ,
"cannot add a single thread powered client to the non-single thread powered pool" 487 pool_->addConsumer(client, poolThreadAffinityIn);
506 template <
typename Client,
typename ... Args>
508 , uint64_t poolThreadAffinityIn, Args&& ...args) {
509 addToPool(client, poolThreadAffinityIn);
510 addToPool(forward<Args>(args)...);
519 static_assert(cpa::has_pool,
"pool is not support in the Context type");
520 return pool_->consumerSize();
530 return this->buffer_.parallelConsumerAlive();
551 template <
typename ...Args>
553 start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
555 startWithContextProperty<cpa>(poolThreadCount, poolThreadsCpuAffinityMask
556 , forward<Args>(args) ...);
572 template <
typename Client,
typename ...Args>
573 typename enable_if<!is_integral<Client>::value,
void>::type
575 startWithContextProperty<cpa>(c, cpuAffinity, forward<Args>(args) ...);
579 startWithContextProperty<cpa>();
589 stopWithContextProperty<cpa>();
598 joinWithContextProperty<cpa>();
614 secondsBetweenPurge_ = s;
626 static_assert(cpa::has_pool,
"pool is not support in the Context type");
627 pool_->runOnce(threadSerialNumber);
637 template <
typename Client>
639 c.messageDispatchingStarted(threadSerialNumber);
640 detail::runOnceImpl(threadSerialNumber, stopped_
645 template <
typename Buffer>
648 createPool(Buffer& buffer,
size_t maxPoolClientCount) {
649 return Pool::create(buffer, maxPoolClientCount);
655 return shared_ptr<Pool>();
658 template <
typename cpa>
659 typename enable_if<cpa::has_pool, void>::type
660 stopWithContextProperty() {
661 if (pool_) pool_->stop();
662 __sync_synchronize();
666 template <
typename cpa>
667 typename enable_if<!cpa::has_pool, void>::type
668 stopWithContextProperty() {
669 __sync_synchronize();
673 template <
typename cpa>
674 typename enable_if<cpa::has_pool, void>::type
675 joinWithContextProperty() {
676 if (pool_) pool_->join();
677 for (
auto& t : threads_) {
682 template <
typename cpa>
683 typename enable_if<!cpa::has_pool, void>::type
684 joinWithContextProperty() {
685 for (
auto& t : threads_) {
691 template <
typename cpa,
typename ...Args>
692 typename enable_if<!cpa::attach_ipc && !cpa::create_ipc, void>::type
693 startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
695 static_assert(cpa::has_pool,
"pool is not support in the Context type");
696 poolThreadCount_ = poolThreadCount;
697 if (!startToBeContinued_) {
698 HMBDC_THROW(runtime_error
699 ,
"Exception: conflicting with previously indicated start completed!");
702 pool_->start(poolThreadCount, poolThreadsCpuAffinityMask,
false);
704 usedHmbdcCapacity_ = poolThreadCount;
705 currentThreadSerialNumber_ = usedHmbdcCapacity_;
706 startLocalClients(forward<Args>(args) ...);
709 template <
typename cpa,
typename Client,
typename ...Args>
710 typename enable_if<!is_integral<Client>::value && !cpa::create_ipc && !cpa::attach_ipc,
void>::type
711 startWithContextProperty(Client& c, uint64_t cpuAffinity, Args&& ... args) {
712 if (!startToBeContinued_) {
713 HMBDC_THROW(runtime_error
714 ,
"Exception: conflicting with previously indicated start completed!");
716 startLocalClients(c, cpuAffinity, forward<Args>(args) ...);
719 template <
typename cpa>
720 typename enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
721 startWithContextProperty() {
722 startLocalClients(
false);
725 template <
typename cpa>
726 typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
727 startWithContextProperty() {
728 startBroadcastIpcClients();
731 template <
typename cpa,
typename ...Args>
732 typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
733 startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
735 if (poolThreadCount_) {
736 HMBDC_THROW(out_of_range,
"Context pool already started");
738 auto& lock = this->allocator_.fileLock();
739 lock_guard<decltype(lock)> g(lock);
740 auto slots = this->buffer_.unusedConsumerIndexes();
741 if (slots.size() <
sizeof...(args) / 2) {
742 HMBDC_THROW(out_of_range
743 ,
"Context instance support Client count = " << slots.size());
745 poolThreadCount_ = poolThreadCount;
746 pool_->startThruRecycling(poolThreadCount, poolThreadsCpuAffinityMask);
747 currentThreadSerialNumber_ = poolThreadCount_;
748 startBroadcastIpcClients(forward<Args>(args) ...);
751 template <
typename cpa,
typename Client,
typename ...Args>
752 typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool && !is_integral<Client>::value
754 startWithContextProperty(Client& c, uint64_t cpuAffinity
756 auto& lock = this->allocator_.fileLock();
757 lock_guard<decltype(lock)> g(lock);
759 auto clientParticipateInMessaging =
760 remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
761 if (clientParticipateInMessaging) {
762 auto slots = this->buffer_.unusedConsumerIndexes();
763 if (slots.size() < 1u +
sizeof...(args) / 2) {
764 HMBDC_THROW(out_of_range
765 ,
"Context instance support Client count = " << slots.size());
768 startBroadcastIpcClients(c, cpuAffinity, forward<Args>(args) ...);
771 template <
typename cpa,
typename ...Args>
772 typename enable_if<(cpa::create_ipc || cpa::attach_ipc) && !cpa::has_pool, void>::type
773 startWithContextProperty(Args&& ... args) {
774 startPartitionIpcClients(forward<Args>(args) ...);
778 startLocalClients(
bool startToBeContinuedFlag = cpa::can_start_anytime) {
779 startToBeContinued_ = startToBeContinuedFlag;
780 if (!startToBeContinued_) {
781 this->markDeadFrom(this->buffer_, usedHmbdcCapacity_);
786 template <
typename Client,
typename ...Args>
787 void startLocalClients(Client& c, uint64_t mask, Args&&... args) {
788 if (usedHmbdcCapacity_ >= Buffer::max_parallel_consumer
789 && Client::REGISTERED_MESSAGE_SIZE) {
790 HMBDC_THROW(out_of_range
791 ,
"messaging participating client number > allowed thread number of " 792 << Buffer::max_parallel_consumer);
796 kickOffClientThread(c, mask, usedHmbdcCapacity_, currentThreadSerialNumber_);
797 threads_.push_back(move(thrd));
799 auto clientParticipateInMessaging =
800 remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
801 if (clientParticipateInMessaging) usedHmbdcCapacity_++;
802 currentThreadSerialNumber_++;
803 startLocalClients(forward<Args>(args)...);
806 void startBroadcastIpcClients(){
807 if (cpa::create_ipc && !purger_) {
810 startBroadcastIpcClients(*purger_, purgerCpuAffinityMask_);
814 template <
typename Client,
typename ...Args>
815 void startBroadcastIpcClients(Client& c, uint64_t mask, Args&&... args) {
816 auto clientParticipateInMessaging =
817 remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
818 uint16_t hmbdcNumber = 0xffffu;
819 if (clientParticipateInMessaging) {
820 auto slots = this->buffer_.unusedConsumerIndexes();
821 auto it = slots.begin();
823 this->buffer_.reset(hmbdcNumber);
825 thread thrd = kickOffClientThread(
826 c, mask, hmbdcNumber, currentThreadSerialNumber_);
827 threads_.push_back(move(thrd));
828 currentThreadSerialNumber_++;
829 startBroadcastIpcClients(forward<Args>(args)...);
832 void startPartitionIpcClients(){}
834 template <
typename Client,
typename ...Args>
835 void startPartitionIpcClients(Client& c, uint64_t mask, Args&&... args) {
836 thread thrd = kickOffClientThread(
837 c, mask, currentThreadSerialNumber_, currentThreadSerialNumber_);
838 threads_.push_back(move(thrd));
839 currentThreadSerialNumber_++;
840 startPartitionIpcClients(forward<Args>(args)...);
843 template <
typename Client>
844 thread kickOffClientThread(
845 Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
853 auto hmbdcNumber = h;
855 char const* schedule;
857 auto clientParticipateInMessaging =
858 remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
864 if (clientParticipateInMessaging) {
865 name =
"hmbdc" + to_string(hmbdcNumber);
870 auto cpuAffinityMask = mask;
873 if (!schedule) schedule =
"SCHED_OTHER";
876 auto cpuCount = thread::hardware_concurrency();
877 cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
880 hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
881 , schedule, priority);
883 hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
887 runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
889 if (this->stopped_) c.dropped();
890 if (clientParticipateInMessaging) unblock(this->buffer_, hmbdcNumber);
899 bool startToBeContinued_;
900 uint16_t usedHmbdcCapacity_;
901 uint16_t currentThreadSerialNumber_;
903 shared_ptr<Pool> pool_;
904 using Threads = vector<thread>;
906 size_t poolThreadCount_;
907 uint32_t secondsBetweenPurge_;
908 uint64_t purgerCpuAffinityMask_;
909 typename conditional<cpa::has_pool
910 , unique_ptr<utils::StuckClientPurger<Buffer>>, uint32_t
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:529
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:638
Definition: MonoLockFreeBuffer.hpp:14
enable_if<!is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&...msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:135
Definition: StuckClientPurger.hpp:12
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:588
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:597
Definition: Context.hpp:59
Definition: TypedString.hpp:74
void sendInPlace(Args &&...args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:261
the default vanilla allocate
Definition: Allocators.hpp:114
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:274
Definition: Client.hpp:13
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:221
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:553
Definition: Message.hpp:89
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:118
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:108
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: Client.hpp:62
Definition: Context.hpp:38
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context's pool - the Client is running in pool mode
Definition: Context.hpp:477
Definition: LockFreeBufferT.hpp:18
Definition: Context.hpp:384
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:176
void runPoolThreadOnce(uint16_t threadSerialNumber)
normally not used until you want to run your own message loop
Definition: Context.hpp:625
~Context()
dtor
Definition: Context.hpp:455
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:198
Definition: Message.hpp:46
Definition: BitMath.hpp:9
enable_if<!is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&...msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:154
enable_if<!is_integral< Client >::value, void >::type start(Client &c, uint64_t cpuAffinity, Args &&...args)
start the context (without its Pool) and direct Clients
Definition: Context.hpp:574
Definition: Context.hpp:72
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts
Definition: Context.hpp:241
Definition: ContextDetail.hpp:26
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context's pool - the Clients are running in pool mode
Definition: Context.hpp:507
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:518
Definition: Client.hpp:39
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:613
Context(uint32_t messageQueueSizePower2Num=20, size_t maxPoolClientCount=128, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:398
Definition: Context.hpp:86
Definition: Client.hpp:11
Definition: Context.hpp:109