1 #include "hmbdc/app/Message.hpp" 2 #include "hmbdc/app/LoggerT.hpp" 3 #include "hmbdc/app/Client.hpp" 4 #include "hmbdc/app/Context.hpp" 5 #include "hmbdc/text/Misc.h" 6 #include "hmbdc/numeric/BitMath.hpp" 7 #include "hmbdc/os/Signals.hpp" 12 #include <boost/program_options.hpp> 14 extern const char *__progname_full;
22 namespace hmbdc {
namespace app {
namespace utils {
24 namespace netperf_detail {
38 friend std::ostream& operator << (std::ostream& os,
Message const& r) {
39 os <<
"Message " <<
' ' << r.seq;
42 char padding[1000 -
sizeof(seq)];
44 static uint64_t seq_s;
45 } __attribute__((__packed__));
47 uint64_t Message::seq_s = 0ul;
54 attachment = attachment_s;
56 afterConsumedCleanupFunc = afterConsumedCleanupFunc_s;
60 friend std::ostream& operator << (std::ostream& os,
MessageAtt const& r) {
61 os <<
"MessageAtt " <<
' ' << r.seq;
64 static uint64_t seq_s;
65 static void* attachment_s;
69 uint64_t MessageAtt::seq_s = 0ul;
70 void* MessageAtt::attachment_s =
nullptr;
71 size_t MessageAtt::len_s = 0;
84 schedule(SysTime::now(), *
this);
87 virtual void report() = 0;
96 , periodicMsgCount_(0)
97 , periodStartAt_(SysTime::now()) {
98 sender_ = NetContext::instance().getSender(topic);
99 if (msgSize_ > 1000) {
100 fileMap_.map(__progname_full);
101 MessageAtt::attachment_s = fileMap_.attachment;
102 MessageAtt::len_s = msgSize;
107 if (msgSize_ > 1000) fileMap_.unmap();
109 char const* hmbdcName()
const {
114 void invokedCb(uint16_t) HMBDC_RESTRICT
override {
116 if (likely(msgSize_ <= 1000u)) {
118 sender_->send(m, msgSize_);
127 void stoppedCb(std::exception
const& e)
override {
128 std::cout << (e.what()) << std::endl;
131 void report()
override {
132 auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
133 HMBDC_LOG_n(
"msgSize=", msgSize_,
" mps=", rate,
" totalMsgCount=", Message::seq_s);
134 periodicMsgCount_ = 0u;
135 periodStartAt_ = SysTime::now();
141 size_t periodicMsgCount_;
146 void runInSenderMode(
Config & config,
Topic const& topic,
size_t msgSize, uint16_t startCore) {
148 uint64_t cpuMask = (1ul << startCore);
150 #ifdef HMBDC_LOG_CONTEXT 156 auto engine = NetContext::instance().createSendTransportEngine(config
161 ctx.
start(*engine, cpuMask,
true);
163 ctx.
start(client, cpuMask);
174 :
Client<ReceiverClient, Message, MessageAtt>
178 , periodicMsgCount_(0)
179 , periodStartAt_(SysTime::now())
181 , totalMsgReceived_(0)
185 , msgSize_(msgSize) {
188 char const* hmbdcName()
const {
192 void handleMessageCb(
Message const& r) {
196 missed_ += r.seq - seq_ - 1;
197 }
else if (r.seq == seq_) {
211 missed_ += r.seq - seq_ - 1;
212 }
else if (r.seq == seq_) {
222 void stoppedCb(std::exception
const& e)
override {
223 std::cout << (e.what()) << std::endl;
226 void report()
override {
227 auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
228 HMBDC_LOG_n(
"msgSize=", msgSize_,
" mps=", rate
229 ,
" missed=" , missed_ ,
'(' , missed_?missed_ * 100ul / (missed_ + totalMsgReceived_):0
230 ,
"%) outOfOrder=" , outOfOrder_
231 ,
" duplicate=" , duplicate_
232 ,
" totalMsgCount=" , totalMsgReceived_
235 periodicMsgCount_ = 0u;
236 periodStartAt_ = SysTime::now();
240 size_t periodicMsgCount_;
243 size_t totalMsgReceived_;
250 template <
typename RecvCtorArgsTuple>
251 void runInReceiverMode(
Config const& config,
Topic const& topic,
size_t msgSize, uint16_t inBuferSizePower2, uint16_t startCore
252 , RecvCtorArgsTuple&& recvCtorArgs) {
254 using namespace boost;
259 auto& net = NetContext::instance();
261 uint64_t cpuMask = (1ul << startCore);
262 auto engine = net.createRecvTransportEngineTuply(config, ctx.
buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
265 ctx.
start(*engine, cpuMask,
true);
267 ctx.
start(client, cpuMask,
false);
275 net.stopListenTo(topic);
278 template <
typename RecvCtorArgsTuple>
279 int startNetPerf(
char const* dftUserConfig,
char const* helpStrIn,
int argc,
char** argv
280 , RecvCtorArgsTuple&& recvCtorArgs) {
286 uint16_t inBuferSizePower2;
288 if (argc > 1) role = string(argv[1]);
289 if (role !=
string(
"--sender")
290 && role !=
string(
"--receiver")) {
291 cerr <<
"1st arg needs to be either --sender or --receiver" << endl;
297 bool isSender = role == string(
"--sender");
298 namespace po = boost::program_options;
299 po::options_description desc(
"additional allowed options (after --sender or --receiver)");
301 "This program can be used to test messaging performance among a group of hosts.\n" 302 "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n" 303 "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n" 304 "ctrl-c terminate.\n";
305 helpStr += string(helpStrIn);
307 (
"help", helpStr.c_str())
308 (
"cfg,c", po::value<std::string>(&userCfg),
"use the user provided config file and ignore the ones in command options")
309 (
"print",
"print effective cfg values")
310 (
"startCore,C", po::value<uint16_t>(&startCore)->default_value(0u),
"the test uses 3 cores, specify starting core number")
311 (
"msgSize", po::value<size_t>(&msgSize)->default_value(8u),
"size of the message (8-size of this executable file). when > 1000 (only applies to transport types that support memory attachment), the message is sent using zero copy memory attachment. hmbdc posts no limit on attachment size")
312 (
"inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u),
"2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
313 (
"topic", po::value<Topic>(&topic)->default_value(topic),
"topic used in the test");
315 Config dft(dftUserConfig, isSender?
"tx":
"rx");
317 for (
auto it = params.begin(); it != params.end();) {
318 string& name = it->first;
319 string& val = it->second;
321 string& comment = it->second;
324 (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
327 po::positional_options_description p;
328 po::variables_map vm;
329 po::store(po::command_line_parser(argc, argv).
330 options(desc).positional(p).run(), vm);
333 if (argc == 1 || vm.count(
"help") || msgSize < 8u) {
334 cout << desc <<
"\n";
339 if (userCfg.size()) {
340 std::istringstream cs(userCfg);
341 config =
Config(cs, isSender?
"tx":
"rx");
343 for (
auto& p : params) {
344 config.put(p.first, p.second);
347 if (msgSize > 1000 && !isSender) {
348 config.put(
"allowRecvMemoryAttachment",
"1002");
351 if (vm.count(
"print")) {
352 write_json(cout, config);
357 runInSenderMode(config, topic, msgSize, startCore);
359 if (!inBuferSizePower2) {
360 inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
361 cout <<
"auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
363 runInReceiverMode(config, topic, msgSize, inBuferSizePower2, startCore, forward<RecvCtorArgsTuple>(recvCtorArgs));
369 using netperf_detail::startNetPerf;
class to hold an hmbdc configuration
Definition: Config.hpp:43
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:622
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:631
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: NetPerf.hpp:173
Definition: Timers.hpp:65
each message type has 16 bit tag
Definition: Message.hpp:30
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:577
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: NetPerf.hpp:49
Definition: Message.hpp:224
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:402
Definition: NetPerf.hpp:32
Definition: NetPerf.hpp:76
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:289
Definition: NetPerf.hpp:90
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:233
if a specific hmbdc network transport (for example tcpcast) supports message with memory attachment...
Definition: Message.hpp:158
static hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc_s
we use the file mapped memory for every message repeatedly, so don't need to release anything ...
Definition: NetPerf.hpp:67
Definition: Timers.hpp:96