covers the inter-thread and ipc communication fascade this type's interface is exposed thru Context and the type itself is not directly used by users see perf-base-lat.cpp for usage
#define HMBDC_LOG_CONTEXT hmbdc::app::Context<24, hmbdc::app::context_property::partition>
#include "hmbdc/app/LoggerT.hpp"
#include "hmbdc/app/Client.hpp"
#include "hmbdc/app/Context.hpp"
#include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
#include "hmbdc/time/Time.hpp"
#include "hmbdc/os/Signals.hpp"
#include "hmbdc/numeric/StatHistogram.hpp"
#include <boost/program_options.hpp>
#include <boost/range/combine.hpp>
#include <iostream>
#include <memory>
#include <functional>
#include <unistd.h>
#include <sys/mman.h>
#include <signal.h>
struct Message
Message() : seq(0)
{}
size_t seq;
friend ostream& operator << (ostream& os, Message const& r) {
os << "Message " << ' ' << r.seq;
return os;
}
} __attribute__((__packed__));
template <typename Context>
struct SenderClient
:
Client<SenderClient<Context>>
using ptr = shared_ptr<SenderClient>;
SenderClient(
Context& ctx, uint16_t
id)
, ctx_(ctx)
, id_(id)
, periodicMessageCount_(0) {
setCallback(
report();
}
);
schedule(SysTime::now(), *this);
}
char const* hmbdcName() const {
return "perf-tx";
}
void invokedCb(uint16_t) __restrict__ override {
Message m[20];
ctx_.send(m[0]
, m[1]
, m[2]
, m[3]
, m[4]
, m[5]
, m[6]
, m[7]
, m[8]
, m[9]
, m[10]
, m[11]
, m[12]
, m[13]
, m[14]
, m[15]
, m[16]
, m[17]
, m[18]
, m[19]
);
periodicMessageCount_+=20;
}
void stoppedCb(exception const& e){
cout << (e.what()) << endl;
};
void report() {
HMBDC_LOG_n("sender ", id_, ":msgSize=", sizeof(Message), " rawMessageSize="
, rawMessageSize_, " mps=", periodicMessageCount_);
periodicMessageCount_ = 0u;
}
uint16_t id_;
size_t periodicMessageCount_;
size_t rawMessageSize_;
};
template <typename Context>
struct ReceiverClient
:
Client<ReceiverClient<Context>, Message>
using ptr = shared_ptr<ReceiverClient>;
ReceiverClient(
Context& ctx, uint16_t
id)
, ctx_(ctx)
, id_(id)
, periodicMessageCount_(0)
{
setCallback(
report();
}
);
schedule(SysTime::now(), *this);
}
char const* hmbdcName() const {
return "perf-rx";
}
void handleMessageCb(Message const& r) {
periodicMessageCount_++;
}
void stoppedCb(exception const& e){
cout << (e.what()) << endl;
};
void report() {
HMBDC_LOG_n("receiver ", id_, ": mps=", periodicMessageCount_
);
periodicMessageCount_ = 0u;
}
uint16_t id_;
size_t periodicMessageCount_;
};
namespace {
bool part;
uint16_t startCore;
int senderCount;
int receiverCount;
uint16_t bufferSizePower2;
char ipcRole;
}
using MyLogCtx = HMBDC_LOG_CONTEXT;
template <typename Ctx>
void run() {
uint64_t cpuMask = 1ul << startCore;
MyLogCtx logCtx(10u);
Ctx ctx(bufferSizePower2);
SingletonGuardian<MyLogger> logGuard(cout, logCtx);
logCtx.start(MyLogger::instance(), (1ul << thread::hardware_concurrency()) - 1ul);
vector<typename ReceiverClient<Ctx>::ptr> receivers;
while (receiverCount-- > 0) {
auto r = new ReceiverClient<Ctx>(ctx, receiverCount);
receivers.emplace_back(r);
ctx.start(*r, cpuMask, true);
cpuMask <<= 1ul;
}
vector<typename SenderClient<Ctx>::ptr> senders;
while (senderCount-- > 0) {
auto s = new SenderClient<Ctx>(ctx, senderCount);
senders.emplace_back(s);
ctx.start(*s, cpuMask, true);
cpuMask <<= 1ul;
}
ctx.start();
[&ctx, &logCtx]() {
ctx.stop();
logCtx.stop();
}
);
ctx.join();
logCtx.join();
}
template <typename Ctx>
void runIpc(bool sender, bool receiver) {
MyLogCtx logCtx(10u);
uint64_t cpuMask = 1ul << startCore;
Ctx ctx("hmbdcperf", bufferSizePower2);
ctx.setSecondsBetweenPurge(2);
SingletonGuardian<MyLogger> logGuard(cout, logCtx);
if (sender||receiver) {
logCtx.start(MyLogger::instance()
, (1ul << thread::hardware_concurrency()) - 1ul);
}
typename SenderClient<Ctx>::ptr s;
typename ReceiverClient<Ctx>::ptr r;
if (sender == receiver) {
ctx.start();
} else if (sender) {
s.reset(new SenderClient<Ctx>(ctx, 0));
ctx.start(*s, cpuMask);
} else {
r.reset(new ReceiverClient<Ctx>(ctx, 0));
ctx.start(*r, cpuMask);
}
ctx.start();
[&ctx, &logCtx]() {
ctx.stop();
logCtx.stop();
}
);
cout << "ctrl-c to stop" << endl;
ctx.join();
logCtx.join();
}
>;
, hmbdc::app::context_property::broadcast<>
>;
using IpcReceiverBroadcastContext = IpcAttacherBroadcastContext;
using IpcSenderBroadcastContext = IpcAttacherBroadcastContext;
, hmbdc::app::context_property::ipc_creator
>;
, hmbdc::app::context_property::partition
, hmbdc::app::context_property::ipc_attacher
>;
using IpcReceiverPartitionContext = IpcAttacherPartitionContext;
using IpcSenderPartitionContext = IpcAttacherPartitionContext;
int main(int argc, char** argv) {
namespace po = boost::program_options;
po::options_description desc("Allowed options");
auto helpStr =
"This program can be used to test hmbdc-free thread and ipc messaging throughput performance among a group of cores."
"The source code is in example dir: perf-base-thru.cpp"
"senders cores send messages as fast as possible and receiver cores receive and count them.\n"
"Example:\nrunning \n$ ./perf-base-thru \n"
"would start the test using default setting."
;
desc.add_options()
("help", helpStr)
("partition", po::value<bool>(&::part)->default_value(false), "test using partion context (vs the default broadcast context)")
("bufferSizePower2,b", po::value<uint16_t>(&bufferSizePower2)->default_value(15u), "2^bufferSizePower2 is the message buffer size")
("startCore,C", po::value<uint16_t>(&startCore)->default_value(0u), "specify starting core number")
("senderCount,s", po::value<int>(&senderCount)->default_value(1u), "how many senders, 1 sender takes up a core ...")
("receiverCount,r", po::value<int>(&receiverCount)->default_value(1u), "how many receivers, 1 receiver take up a core")
("ipc", po::value<char>(&ipcRole)->default_value('n'),
"testing ipc, start one ipc creator (c), ipc sender (s) or ipc receiver (r). "
"only --partition -b and -C are effective and they need to be the same when starting "
"the creator (c) sender (s) and receiver (r). Only one creator is allowed in a test.");
po::positional_options_description p;
po::variables_map vm;
po::store(po::command_line_parser(argc, argv).
options(desc).positional(p).run(), vm);
po::notify(vm);
if (vm.count("help")) {
cout << desc << "\n";
return 0;
}
switch (ipcRole) {
case 'n': {
if (part) run<PartionContext>();
else run<BroadcastContext>();
} break;
case 'c': {
if (part) runIpc<IpcCreatorPartitionContext>(false, false);
else runIpc<IpcCreatorBroadcastContext>(false, false);
} break;
case 's': {
if (part) runIpc<IpcSenderPartitionContext>(true, false);
else runIpc<IpcSenderBroadcastContext>(true, false);
} break;
case 'r': {
if (part) runIpc<IpcReceiverPartitionContext>(false, true);
else runIpc<IpcReceiverBroadcastContext>(false, true);
} break;
}
return 0;
}