1 #include "hmbdc/Copyright.hpp" 4 #include "hmbdc/Config.hpp" 5 #include "hmbdc/app/utils/StuckClientPurger.hpp" 6 #include "hmbdc/numeric/BitMath.hpp" 7 #include "hmbdc/os/Allocators.hpp" 11 namespace hmbdc {
namespace app {
25 namespace context_property {
52 template <u
int16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
54 static_assert(max_parallel_consumer >= 4u
106 #include "hmbdc/app/ContextDetail.hpp" 107 namespace hmbdc {
namespace app {
109 namespace context_detail {
123 template <
size_t MaxMessageSize,
typename... ContextProperties>
131 MAX_MESSAGE_SIZE = MaxMessageSize,
132 BUFFER_VALUE_SIZE = MaxMessageSize + 8u,
135 size_t maxMessageSize()
const {
136 if (MaxMessageSize == 0)
return maxMessageSizeRuntime_;
137 return MaxMessageSize;
148 template <
typename M0,
typename M1,
typename ... Messages>
149 typename enable_if<!std::is_integral<M1>::value,
void>::type
150 send(M0&& m0, M1&& m1, Messages&&... msgs) {
151 auto n =
sizeof...(msgs) + 2;
152 auto it = buffer_.claim(n);
153 sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
154 buffer_.commit(it, n);
167 template <
typename M0,
typename M1,
typename ... Messages>
168 typename enable_if<!std::is_integral<M1>::value,
bool>::type
169 trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
170 auto n =
sizeof...(msgs) + 2;
171 auto it = buffer_.tryClaim(n);
173 sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
174 buffer_.commit(it, n);
189 template <
typename ForwardIt>
191 send(ForwardIt begin,
size_t n) {
193 auto bit = buffer_.claim(n);
195 for (
auto i = 0ul; i < n; i++) {
196 using Message =
typename iterator_traits<ForwardIt>::value_type;
199 buffer_.commit(bit, n);
211 template <
typename ForwardIt>
215 auto bit = buffer_.tryClaim(n);
216 if (unlikely(!bit))
return false;
218 for (
auto i = 0ul; i < n; i++) {
219 using Message =
typename iterator_traits<ForwardIt>::value_type;
222 buffer_.commit(bit, n);
235 template <
typename Message>
237 using M =
typename std::remove_reference<Message>::type;
238 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
239 ,
"message too big");
240 if (unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
241 HMBDC_THROW(std::out_of_range,
"message too big");
255 template <
typename Message>
257 using M =
typename std::remove_reference<Message>::type;
258 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
259 ,
"message too big");
260 if (unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
261 HMBDC_THROW(std::out_of_range,
"message too big");
275 template <
typename Message,
typename ... Args>
278 ,
"message too big");
280 HMBDC_THROW(std::out_of_range,
"message too big");
282 buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
296 ,
size_t maxMessageSizeRuntime = MAX_MESSAGE_SIZE
297 ,
char const* shmName =
nullptr)
299 , Buffer::footprint(maxMessageSizeRuntime + 8u
300 , messageQueueSizePower2Num), O_RDWR | (cpa::create_ipc?O_CREAT:0)
302 , bufferptr_(allocator_.template allocate<Buffer>(1
303 , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
306 , buffer_(*bufferptr_) {
307 if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
308 HMBDC_THROW(std::out_of_range
309 ,
"can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
311 maxMessageSizeRuntime_ = maxMessageSizeRuntime;
312 primeBuffer<cpa::create_ipc && cpa::has_pool>();
313 if (cpa::create_ipc || cpa::attach_ipc) {
319 allocator_.unallocate(bufferptr_);
327 template <
typename BroadCastBuf>
329 void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
330 for (uint16_t i = poolThreadCount;
331 i < BroadCastBuf::max_parallel_consumer;
336 Allocator allocator_;
337 Buffer* HMBDC_RESTRICT bufferptr_;
338 Buffer& HMBDC_RESTRICT buffer_;
342 typename enable_if<doIt, void>::type
344 markDeadFrom(buffer_, 0);
348 typename enable_if<!doIt, void>::type
352 template <
typename M,
typename... Messages>
353 void sendRecursive(
typename Buffer::iterator it
354 , M&& msg, Messages&&... msgs) {
355 using Message =
typename std::remove_reference<M>::type;
357 ,
"message too big");
359 HMBDC_THROW(std::out_of_range,
"message too big");
362 sendRecursive(++it, std::forward<M>(msgs)...);
364 void sendRecursive(
typename Buffer::iterator) {}
366 size_t maxMessageSizeRuntime_;
401 template <
size_t MaxMessageSize = 0,
typename... ContextProperties>
405 using Buffer =
typename Base::Buffer;
416 Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
417 ,
size_t maxPoolClientCount = MaxMessageSize?128:0
418 ,
size_t maxMessageSizeRuntime = MaxMessageSize)
419 :
Base(messageQueueSizePower2Num, maxMessageSizeRuntime)
420 , startToBeContinued_(true)
421 , usedHmbdcCapacity_(0)
422 , currentThreadSerialNumber_(0)
424 , pool_(createPool(this->buffer_, maxPoolClientCount))
425 , poolThreadCount_(0) {
426 static_assert(!cpa::create_ipc && !cpa::attach_ipc
427 ,
"no name specified for ipc Context");
446 , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
447 ,
size_t maxPoolClientCount = MaxMessageSize?128:0
448 ,
size_t maxMessageSizeRuntime = MaxMessageSize
449 , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful)
450 :
Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName)
451 , startToBeContinued_(true)
452 , usedHmbdcCapacity_(0)
453 , currentThreadSerialNumber_(0)
455 , pool_(createPool(this->buffer_, maxPoolClientCount))
456 , poolThreadCount_(0)
457 , secondsBetweenPurge_(60)
458 , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
459 static_assert(cpa::create_ipc || cpa::attach_ipc
460 ,
"ctor can only be used with ipc turned on Context");
461 static_assert(!(cpa::create_ipc && cpa::attach_ipc)
462 ,
"Context cannot be both ipc_creator and ipc_attacher");
471 if (cpa::create_ipc) {
472 this->markDeadFrom(this->buffer_, 0);
493 template <
typename Client>
495 , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
496 static_assert(cpa::has_pool,
"pool is not support in the Context type");
497 if (std::is_base_of<single_thread_powered_client, Client>::value
498 && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
499 && poolThreadCount_ != 1) {
500 HMBDC_THROW(std::out_of_range
501 ,
"cannot add a single thread powered client to the non-single thread powered pool" 504 pool_->addConsumer(client, poolThreadAffinityIn);
523 template <
typename Client,
typename ... Args>
525 , uint64_t poolThreadAffinityIn, Args&& ...args) {
526 addToPool(client, poolThreadAffinityIn);
527 addToPool(std::forward<Args>(args)...);
536 static_assert(cpa::has_pool,
"pool is not support in the Context type");
537 return pool_->consumerSize();
547 return this->buffer_.parallelConsumerAlive();
575 template <
typename ...Args>
577 start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
579 startWithContextProperty<cpa>(poolThreadCount, poolThreadsCpuAffinityMask
580 , std::forward<Args>(args) ...);
603 template <
typename Client,
typename ...Args>
604 typename std::enable_if<!std::is_integral<Client>::value,
void>::type
606 startWithContextProperty<cpa>(c, cpuAffinity, std::forward<Args>(args) ...);
613 startWithContextProperty<cpa>();
623 stopWithContextProperty<cpa>();
632 joinWithContextProperty<cpa>();
648 secondsBetweenPurge_ = s;
660 static_assert(cpa::has_pool,
"pool is not support in the Context type");
661 pool_->runOnce(threadSerialNumber);
671 template <
typename Client>
673 c.messageDispatchingStarted(threadSerialNumber);
674 context_detail::runOnceImpl(threadSerialNumber, stopped_
679 template <
typename Buffer>
682 createPool(Buffer& buffer,
size_t maxPoolClientCount) {
683 return Pool::create(buffer, maxPoolClientCount);
689 return typename Pool::ptr();
692 template <
typename cpa>
693 typename std::enable_if<cpa::has_pool, void>::type
694 stopWithContextProperty() {
695 if (pool_) pool_->stop();
696 __sync_synchronize();
700 template <
typename cpa>
701 typename std::enable_if<!cpa::has_pool, void>::type
702 stopWithContextProperty() {
703 __sync_synchronize();
707 template <
typename cpa>
708 typename std::enable_if<cpa::has_pool, void>::type
709 joinWithContextProperty() {
710 if (pool_) pool_->join();
711 for (
auto& t : threads_) {
717 template <
typename cpa>
718 typename std::enable_if<!cpa::has_pool, void>::type
719 joinWithContextProperty() {
720 for (
auto& t : threads_) {
727 template <
typename cpa,
typename ...Args>
728 typename std::enable_if<!cpa::attach_ipc && !cpa::create_ipc, void>::type
729 startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
731 static_assert(cpa::has_pool,
"pool is not support in the Context type");
732 poolThreadCount_ = poolThreadCount;
733 if (!startToBeContinued_) {
734 HMBDC_THROW(std::runtime_error
735 ,
"Exception: conflicting with previously indicated start completed!");
738 pool_->start(poolThreadCount, poolThreadsCpuAffinityMask,
false);
740 usedHmbdcCapacity_ = poolThreadCount;
741 currentThreadSerialNumber_ = usedHmbdcCapacity_;
742 startLocalClients(std::forward<Args>(args) ...);
745 template <
typename cpa,
typename Client,
typename ...Args>
746 typename std::enable_if<!std::is_integral<Client>::value && !cpa::create_ipc && !cpa::attach_ipc,
void>::type
747 startWithContextProperty(Client& c, uint64_t cpuAffinity, Args&& ... args) {
748 if (!startToBeContinued_) {
749 HMBDC_THROW(std::runtime_error
750 ,
"Exception: conflicting with previously indicated start completed!");
752 startLocalClients(c, cpuAffinity, std::forward<Args>(args) ...);
755 template <
typename cpa>
756 typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
757 startWithContextProperty() {
758 startLocalClients(
false);
761 template <
typename cpa>
762 typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
763 startWithContextProperty() {
764 startBroadcastIpcClients();
767 template <
typename cpa,
typename ...Args>
768 typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool, void>::type
769 startWithContextProperty(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
772 if (poolThreadCount_) {
773 HMBDC_THROW(std::out_of_range,
"Context pool already started");
775 auto& lock = this->allocator_.fileLock();
776 std::lock_guard<decltype(lock)> g(lock);
777 auto slots = this->buffer_.unusedConsumerIndexes();
778 if (slots.size() <
sizeof...(args) / 2) {
779 HMBDC_THROW(std::out_of_range
780 ,
"Context instance support Client count = " << slots.size());
782 poolThreadCount_ = poolThreadCount;
783 pool_->startThruRecycling(poolThreadCount, poolThreadsCpuAffinityMask);
784 currentThreadSerialNumber_ = poolThreadCount_;
785 startBroadcastIpcClients(std::forward<Args>(args) ...);
788 template <
typename cpa,
typename Client,
typename ...Args>
789 typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && cpa::has_pool && !std::is_integral<Client>::value
791 startWithContextProperty(Client& c, uint64_t cpuAffinity
793 auto& lock = this->allocator_.fileLock();
794 std::lock_guard<decltype(lock)> g(lock);
796 auto clientParticipateInMessaging =
797 std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
798 if (clientParticipateInMessaging) {
799 auto slots = this->buffer_.unusedConsumerIndexes();
800 if (slots.size() < 1u +
sizeof...(args) / 2) {
801 HMBDC_THROW(std::out_of_range
802 ,
"Context instance support Client count = " << slots.size());
805 startBroadcastIpcClients(c, cpuAffinity, std::forward<Args>(args) ...);
808 template <
typename cpa,
typename ...Args>
809 typename std::enable_if<(cpa::create_ipc || cpa::attach_ipc) && !cpa::has_pool, void>::type
810 startWithContextProperty(Args&& ... args) {
811 startPartitionIpcClients(std::forward<Args>(args) ...);
815 startLocalClients(
bool startToBeContinuedFlag = cpa::can_start_anytime) {
816 startToBeContinued_ = startToBeContinuedFlag;
817 if (!startToBeContinued_) {
818 this->markDeadFrom(this->buffer_, usedHmbdcCapacity_);
822 template <
typename CcClient,
typename ...Args>
823 void startLocalClients(CcClient& c, uint64_t mask, Args&&... args) {
824 if (usedHmbdcCapacity_ >= Buffer::max_parallel_consumer
825 && CcClient::REGISTERED_MESSAGE_SIZE) {
826 HMBDC_THROW(std::out_of_range
827 ,
"messaging participating client number > allowed thread number of " 828 << Buffer::max_parallel_consumer);
832 kickOffClientThread(c, mask, usedHmbdcCapacity_, currentThreadSerialNumber_);
833 threads_.push_back(move(thrd));
835 auto clientParticipateInMessaging =
836 std::remove_reference<CcClient>::type::REGISTERED_MESSAGE_SIZE;
837 if (clientParticipateInMessaging) usedHmbdcCapacity_++;
838 currentThreadSerialNumber_++;
839 startLocalClients(std::forward<Args>(args)...);
842 void startBroadcastIpcClients(){
843 if (cpa::create_ipc && !purger_) {
846 startBroadcastIpcClients(*purger_, purgerCpuAffinityMask_);
850 template <
typename Client,
typename ...Args>
851 void startBroadcastIpcClients(Client& c, uint64_t mask, Args&&... args) {
852 auto clientParticipateInMessaging =
853 std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
854 uint16_t hmbdcNumber = 0xffffu;
855 if (clientParticipateInMessaging) {
856 auto slots = this->buffer_.unusedConsumerIndexes();
857 auto it = slots.begin();
859 this->buffer_.reset(hmbdcNumber);
861 auto thrd = kickOffClientThread(
862 c, mask, hmbdcNumber, currentThreadSerialNumber_);
863 threads_.push_back(move(thrd));
864 currentThreadSerialNumber_++;
865 startBroadcastIpcClients(std::forward<Args>(args)...);
868 void startPartitionIpcClients(){}
870 template <
typename Client,
typename ...Args>
871 void startPartitionIpcClients(Client& c, uint64_t mask, Args&&... args) {
872 auto thrd = kickOffClientThread(
873 c, mask, currentThreadSerialNumber_, currentThreadSerialNumber_);
874 threads_.push_back(move(thrd));
875 currentThreadSerialNumber_++;
876 startPartitionIpcClients(std::forward<Args>(args)...);
879 template <
typename Client>
880 auto kickOffClientThread(
881 Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
889 auto hmbdcNumber = h;
891 char const* schedule;
893 auto clientParticipateInMessaging =
894 std::remove_reference<Client>::type::REGISTERED_MESSAGE_SIZE;
900 if (clientParticipateInMessaging) {
901 name =
"hmbdc" + std::to_string(hmbdcNumber);
906 auto cpuAffinityMask = mask;
907 std::tie(schedule, priority) = c.
schedSpec();
909 if (!schedule) schedule =
"SCHED_OTHER";
912 auto cpuCount = std::thread::hardware_concurrency();
913 cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
916 hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
917 , schedule, priority);
919 hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
923 context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
925 if (this->stopped_) c.dropped();
926 if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
935 bool startToBeContinued_;
936 uint16_t usedHmbdcCapacity_;
937 uint16_t currentThreadSerialNumber_;
939 typename Pool::ptr pool_;
940 using Threads = std::vector<std::thread>;
942 size_t poolThreadCount_;
943 uint32_t secondsBetweenPurge_;
944 uint64_t purgerCpuAffinityMask_;
945 typename std::conditional<cpa::has_pool
946 , std::unique_ptr<utils::StuckClientPurger<Buffer>>, uint32_t
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:546
void runClientThreadOnce(uint16_t threadSerialNumber, Client &c)
normally not used until you want to run your own message loop
Definition: Context.hpp:672
Definition: MonoLockFreeBuffer.hpp:14
Definition: ContextDetail.hpp:29
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:416
Definition: StuckClientPurger.hpp:11
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:622
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:124
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:631
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:76
Definition: TypedString.hpp:74
enable_if<!std::is_integral< M1 >::value, bool >::type trySend(M0 &&m0, M1 &&m1, Messages &&...msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:169
std::enable_if<!std::is_integral< Client >::value, void >::type start(Client &c, uint64_t cpuAffinity, Args &&...args)
start the context (without its Pool) and direct Clients
Definition: Context.hpp:605
the default vanilla allocate
Definition: Allocators.hpp:116
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:236
Definition: GuardedSingleton.hpp:9
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:577
void sendInPlace(Args &&...args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:276
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:213
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:126
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:116
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:191
enable_if<!std::is_integral< M1 >::value, void >::type send(M0 &&m0, M1 &&m1, Messages &&...msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:150
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: Client.hpp:70
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:53
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context's pool - the Client is run in pool mode
Definition: Context.hpp:494
Definition: LockFreeBufferT.hpp:18
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:402
void runPoolThreadOnce(uint16_t threadSerialNumber)
normally not used until you want to run your own message loop
Definition: Context.hpp:659
~Context()
dtor
Definition: Context.hpp:470
Definition: Message.hpp:55
void start()
tell hmbdc that there is no more direct mode Client to start
Definition: Context.hpp:612
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful)
ctor for construct local ipc Context
Definition: Context.hpp:445
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:289
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:89
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context's pool - the Clients are run in pool mode
Definition: Context.hpp:524
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:535
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:647
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts
Definition: Context.hpp:256
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:102