hmbdc
simplify-high-performance-messaging-programming
perf-base-thru.cpp

covers the inter-thread and ipc communication fascade this type's interface is exposed thru Context and the type itself is not directly used by users see perf-base-lat.cpp for usage

Template Parameters
MaxMessageSizeWhat is the max message size, need at compile time if the value can only be determined at runtime, set this to 0. Things can still work but will lost some compile time checking advantages, see maxMessageSizeRuntime below
ContextPropertiessee types in context_property namespace
#define HMBDC_LOG_CONTEXT hmbdc::app::Context<24, hmbdc::app::context_property::partition>
#include "hmbdc/app/LoggerT.hpp"
#include "hmbdc/app/Client.hpp"
#include "hmbdc/app/Context.hpp"
#include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
#include "hmbdc/time/Time.hpp"
#include "hmbdc/os/Signals.hpp"
#include "hmbdc/numeric/StatHistogram.hpp"
#include <boost/program_options.hpp>
#include <boost/range/combine.hpp>
#include <iostream>
#include <memory>
#include <functional>
#include <unistd.h>
#include <sys/mman.h>
#include <signal.h>
using namespace hmbdc::app;
using namespace hmbdc::time;
using namespace hmbdc::numeric;
using namespace std;
using MyLogger = LoggerT<HMBDC_LOG_CONTEXT>;
struct Message
: hasTag<1001> {
Message() : seq(0)
{}
size_t seq;
friend ostream& operator << (ostream& os, Message const& r) {
os << "Message " << ' ' << r.seq;
return os;
}
} __attribute__((__packed__));
template <typename Context>
struct SenderClient
: Client<SenderClient<Context>>
using ptr = shared_ptr<SenderClient>;
SenderClient(Context& ctx, uint16_t id)
: ReoccuringTimer(Duration::seconds(1u))
, ctx_(ctx)
, id_(id)
, periodicMessageCount_(0) {
rawMessageSize_ = sizeof(MessageWrap<Message>);
setCallback(
[this](TimerManager& tm, SysTime const& now) {
report();
}
);
schedule(SysTime::now(), *this);
}
char const* hmbdcName() const {
return "perf-tx";
}
/*virtual*/
void invokedCb(uint16_t) __restrict__ override {
Message m[20];
ctx_.send(m[0]
, m[1]
, m[2]
, m[3]
, m[4]
, m[5]
, m[6]
, m[7]
, m[8]
, m[9]
, m[10]
, m[11]
, m[12]
, m[13]
, m[14]
, m[15]
, m[16]
, m[17]
, m[18]
, m[19]
);
periodicMessageCount_+=20;
}
void stoppedCb(exception const& e){
cout << (e.what()) << endl;
};
void report() {
HMBDC_LOG_n("sender ", id_, ":msgSize=", sizeof(Message), " rawMessageSize="
, rawMessageSize_, " mps=", periodicMessageCount_);
periodicMessageCount_ = 0u;
}
Context& ctx_;
uint16_t id_;
size_t periodicMessageCount_;
size_t rawMessageSize_;
};
template <typename Context>
struct ReceiverClient
: Client<ReceiverClient<Context>, Message>
using ptr = shared_ptr<ReceiverClient>;
ReceiverClient(Context& ctx, uint16_t id)
: ReoccuringTimer(Duration::seconds(1u))
, ctx_(ctx)
, id_(id)
, periodicMessageCount_(0)
{
setCallback(
[this](TimerManager& tm, SysTime const& now) {
report();
}
);
schedule(SysTime::now(), *this);
}
char const* hmbdcName() const {
return "perf-rx";
}
void handleMessageCb(Message const& r) {
periodicMessageCount_++;
}
void stoppedCb(exception const& e){
cout << (e.what()) << endl;
};
void report() {
HMBDC_LOG_n("receiver ", id_, ": mps=", periodicMessageCount_
);
periodicMessageCount_ = 0u;
}
Context& ctx_;
uint16_t id_;
size_t periodicMessageCount_;
};
namespace {
bool part;
uint16_t startCore;
int senderCount;
int receiverCount;
uint16_t bufferSizePower2;
char ipcRole;
}
using MyLogCtx = HMBDC_LOG_CONTEXT;
template <typename Ctx>
void run() {
uint64_t cpuMask = 1ul << startCore;
MyLogCtx logCtx(10u); //two contexts so logger can work
Ctx ctx(bufferSizePower2);
SingletonGuardian<MyLogger> logGuard(cout, logCtx);
//let the logger spead on all cores - low priority
logCtx.start(MyLogger::instance(), (1ul << thread::hardware_concurrency()) - 1ul);
vector<typename ReceiverClient<Ctx>::ptr> receivers;
while (receiverCount-- > 0) {
auto r = new ReceiverClient<Ctx>(ctx, receiverCount);
receivers.emplace_back(r);
ctx.start(*r, cpuMask, true);
cpuMask <<= 1ul;
}
vector<typename SenderClient<Ctx>::ptr> senders;
while (senderCount-- > 0) {
auto s = new SenderClient<Ctx>(ctx, senderCount);
senders.emplace_back(s);
ctx.start(*s, cpuMask, true);
cpuMask <<= 1ul;
}
ctx.start();
[&ctx, &logCtx]() {
ctx.stop();
logCtx.stop();
}
);
ctx.join();
logCtx.join();
}
template <typename Ctx>
void runIpc(bool sender, bool receiver) {
MyLogCtx logCtx(10u); //two context so logger can work
uint64_t cpuMask = 1ul << startCore;
Ctx ctx("hmbdcperf", bufferSizePower2);
//its is impossible the receiver not reading nayting for 2 sec in this app
//the default value is 60 sec
ctx.setSecondsBetweenPurge(2);
SingletonGuardian<MyLogger> logGuard(cout, logCtx);
//let the logger spead on all cores - low priority
if (sender||receiver) {
logCtx.start(MyLogger::instance()
, (1ul << thread::hardware_concurrency()) - 1ul);
}
typename SenderClient<Ctx>::ptr s;
typename ReceiverClient<Ctx>::ptr r;
if (sender == receiver) {//creator
ctx.start();
} else if (sender) {
s.reset(new SenderClient<Ctx>(ctx, 0));
ctx.start(*s, cpuMask);
} else {
r.reset(new ReceiverClient<Ctx>(ctx, 0));
ctx.start(*r, cpuMask);
}
ctx.start();
[&ctx, &logCtx]() {
ctx.stop();
logCtx.stop();
}
);
cout << "ctrl-c to stop" << endl;
ctx.join();
logCtx.join();
}
using IpcCreatorBroadcastContext = hmbdc::app::Context<24
>;
using IpcAttacherBroadcastContext = hmbdc::app::Context<24
, hmbdc::app::context_property::broadcast<>
>;
using IpcReceiverBroadcastContext = IpcAttacherBroadcastContext;
using IpcSenderBroadcastContext = IpcAttacherBroadcastContext;
using IpcCreatorPartitionContext = hmbdc::app::Context<24
, hmbdc::app::context_property::ipc_creator
>;
using IpcAttacherPartitionContext = hmbdc::app::Context<24
, hmbdc::app::context_property::partition
, hmbdc::app::context_property::ipc_attacher
>;
using IpcReceiverPartitionContext = IpcAttacherPartitionContext;
using IpcSenderPartitionContext = IpcAttacherPartitionContext;
int main(int argc, char** argv) {
using namespace std;
namespace po = boost::program_options;
po::options_description desc("Allowed options");
auto helpStr =
"This program can be used to test hmbdc-free thread and ipc messaging throughput performance among a group of cores."
"The source code is in example dir: perf-base-thru.cpp"
"senders cores send messages as fast as possible and receiver cores receive and count them.\n"
"Example:\nrunning \n$ ./perf-base-thru \n"
"would start the test using default setting."
;
desc.add_options()
("help", helpStr)
("partition", po::value<bool>(&::part)->default_value(false), "test using partion context (vs the default broadcast context)")
// ("sendBatch", po::value<size_t>(&sendBatch)->default_value(20), "the sender sending messages in batch of")
("bufferSizePower2,b", po::value<uint16_t>(&bufferSizePower2)->default_value(15u), "2^bufferSizePower2 is the message buffer size")
("startCore,C", po::value<uint16_t>(&startCore)->default_value(0u), "specify starting core number")
("senderCount,s", po::value<int>(&senderCount)->default_value(1u), "how many senders, 1 sender takes up a core ...")
("receiverCount,r", po::value<int>(&receiverCount)->default_value(1u), "how many receivers, 1 receiver take up a core")
("ipc", po::value<char>(&ipcRole)->default_value('n'),
"testing ipc, start one ipc creator (c), ipc sender (s) or ipc receiver (r). "
"only --partition -b and -C are effective and they need to be the same when starting "
"the creator (c) sender (s) and receiver (r). Only one creator is allowed in a test.");
po::positional_options_description p;
po::variables_map vm;
po::store(po::command_line_parser(argc, argv).
options(desc).positional(p).run(), vm);
po::notify(vm);
if (vm.count("help")) {
cout << desc << "\n";
return 0;
}
switch (ipcRole) {
case 'n': {
if (part) run<PartionContext>();
else run<BroadcastContext>();
} break;
case 'c': {
if (part) runIpc<IpcCreatorPartitionContext>(false, false);
else runIpc<IpcCreatorBroadcastContext>(false, false);
} break;
case 's': {
if (part) runIpc<IpcSenderPartitionContext>(true, false);
else runIpc<IpcSenderBroadcastContext>(true, false);
} break;
case 'r': {
if (part) runIpc<IpcReceiverPartitionContext>(false, true);
else runIpc<IpcReceiverBroadcastContext>(false, true);
} break;
}
return 0;
}