1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Base.hpp" 4 #include "hmbdc/text/Misc.h" 5 #include "hmbdc/numeric/BitMath.hpp" 6 #include "hmbdc/os/Signals.hpp" 12 #include <boost/program_options.hpp> 14 extern const char *__progname_full;
22 namespace hmbdc {
namespace app {
namespace utils {
24 namespace netperf_detail {
41 friend std::ostream& operator << (std::ostream& os,
Message const& r) {
42 os <<
"Message " <<
' ' << r.len <<
' ' << r.seq;
45 char padding[65536 -
sizeof(seq) -
sizeof(len)];
46 } __attribute__((__packed__));
55 friend std::ostream& operator << (std::ostream& os,
MessageAtt const& r) {
56 os <<
"MessageAtt " <<
' ' << r.len <<
' ' << r.seq;
69 schedule(SysTime::now(), *
this);
72 virtual void report() = 0;
83 , periodicGoodMsgCount_(0)
84 , periodStartAt_(SysTime::now()) {
85 sender_ = NetContext::instance().getSender(topic);
86 if (msg_.len > 1000 && allowAtt) {
87 fileMap_.map(__progname_full);
88 if (fileMap_.len < msgSize) {
89 HMBDC_THROW(std::out_of_range,
"exceeds test allowed msg size <=" << fileMap_.len);
91 msgAtt_.attachment = fileMap_.attachment;
92 msgAtt_.len = msgSize;
94 msgAtt_.afterConsumedCleanupFunc =
nullptr;
99 if (msgAtt_.len > 1000 && allowAtt_) fileMap_.unmap();
101 char const* hmbdcName()
const {
108 if (hmbdc_likely(!allowAtt_ || msg_.len <= 1000u)) {
109 sender_->send(msg_, msg_.len);
111 periodicGoodMsgCount_++;
113 sender_->send(msgAtt_);
115 periodicGoodMsgCount_++;
120 std::cout << (e.what()) << std::endl;
123 void report()
override {
124 auto rate = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
126 HMBDC_LOG_n(
"msgSize=", l,
" mps=", rate);
127 periodicGoodMsgCount_ = 0u;
128 periodStartAt_ = SysTime::now();
136 size_t periodicGoodMsgCount_;
141 void runInSenderMode(
Config & config,
Topic const& topic,
size_t msgSize
142 , vector<uint16_t> cpus,
bool allowAtt) {
146 auto engine = NetContext::instance().createSendTransportEngine(config
147 , msgSize > 1000 && allowAtt
151 ctx.
start(*engine, 1ul << cpus[0]
152 , client, 1ul << cpus[1]);
163 :
Client<ReceiverClient, Message, MessageAtt>
167 , periodicGoodMsgCount_(0)
168 , periodicGoodMsgBytes_(0)
169 , periodicMsgCount_(0)
170 , periodicMsgBytes_(0)
171 , periodStartAt_(SysTime::now())
177 , msgOverhead_(msgOverhead) {
180 char const* hmbdcName()
const {
185 void handleMessageCb(M
const& r) {
187 if (hmbdc_likely(r.seq > seq_)) {
188 periodicGoodMsgCount_++;
189 periodicGoodMsgBytes_ += msgOverhead_ + msgSize_;
190 if (hmbdc_likely(seq_)) periodicMissed_ += r.seq - seq_ - 1;
191 }
else if (r.seq == seq_) {
198 periodicMsgBytes_ += msgOverhead_ + msgSize_;
202 handleMessageCb<MessageAtt>(r);
207 std::cout << (e.what()) << std::endl;
210 void report()
override {
211 auto rateGood = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
212 HMBDC_LOG_n(
"msgSize=", msgSize_,
" msgOverhead=", msgOverhead_,
" mps(good)=", rateGood,
" Mbps(good)=", periodicGoodMsgBytes_*8u >> 20u
213 ,
" missed=" , periodicMissed_ ,
'(' , periodicMissed_?periodicMissed_ * 100ul / (periodicMissed_ + periodicMsgCount_):0
214 ,
"%) outOfOrder=" , periodicOoo_
215 ,
" duplicate=" , periodicDup_
216 ,
" Mbps(all)=" , periodicMsgBytes_*8u >> 20u
219 periodicGoodMsgCount_ = 0u;
220 periodicGoodMsgBytes_ = 0u;
221 periodicMsgCount_ = 0;
222 periodicMsgBytes_ = 0u;
227 periodStartAt_ = SysTime::now();
231 size_t periodicGoodMsgCount_;
232 size_t periodicGoodMsgBytes_;
233 size_t periodicMsgCount_;
234 size_t periodicMsgBytes_;
237 size_t periodicMissed_;
244 template <
typename RecvCtorArgsTuple>
245 void runInReceiverMode(
Config const& config,
Topic const& topic,
size_t msgSize, uint16_t inBuferSizePower2
246 , vector<uint16_t> cpus, RecvCtorArgsTuple&& recvCtorArgs,
bool allowAtt) {
248 using namespace boost;
249 RecvCtx ctx(inBuferSizePower2, 1, msgSize > 1000 && allowAtt?
sizeof(
MessageAtt):msgSize);
252 auto& net = NetContext::instance();
254 auto engine = net.createRecvTransportEngineTuply(config, ctx.
buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
255 auto msgOverhead = 8u +
sizeof(TransportMessageHeader) + topic.size();
258 ctx.
start(*engine, 1ul << cpus[0]
259 , client, 1ul << cpus[1]);
267 net.stopListenTo(topic);
270 template <
typename RecvCtorArgsTuple>
271 int startNetPerf(
char const* dftUserConfig,
char const* helpStrIn,
int argc,
char** argv
272 , RecvCtorArgsTuple&& recvCtorArgs,
bool allowAtt =
true) {
278 vector<uint16_t> cpus;
279 uint16_t inBuferSizePower2;
281 if (argc > 1) role = string(argv[1]);
282 if (role !=
string(
"--sender")
283 && role !=
string(
"--receiver")) {
284 cerr <<
"1st arg needs to be either --sender or --receiver" << endl;
290 bool isSender = role == string(
"--sender");
291 auto checkMsgSize = [](
size_t s) {
292 if (s < 8) HMBDC_THROW(out_of_range,
"msgSize too small (>=8)");
294 namespace po = boost::program_options;
295 po::options_description desc(
"additional allowed options (after --sender or --receiver)");
297 "This program can be used to test messaging performance among a group of hosts.\n" 298 "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n" 299 "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n" 300 "ctrl-c terminate.\n";
301 helpStr += string(helpStrIn);
303 (
"help", helpStr.c_str())
304 (
"cfg,c", po::value<std::string>(&userCfg),
"use the user provided config file and ignore the ones in command options")
305 (
"print",
"print effective cfg values")
306 (
"cpus", po::value(&cpus)->multitoken()->default_value({0, 1},
"0 1"),
"specify the 2 cpu index numbers used in the test (1 for engine and 1 for msg client)")
307 (
"msgSize", po::value<size_t>(&msgSize)->default_value(8u)->notifier(checkMsgSize),
"size of the message (8-size of this executable file). when > 1000 the message is sent using zero copy memory attachment if it is supported by the transport type and hmbdc posts no limit on attachment size")
308 (
"inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u),
"2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
309 (
"topic", po::value<Topic>(&topic)->default_value(topic),
"topic used in the test");
311 Config dft(dftUserConfig, isSender?
"tx":
"rx");
313 for (
auto it = params.begin(); it != params.end();) {
314 string& name = it->first;
315 string& val = it->second;
317 string& comment = it->second;
320 (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
323 po::positional_options_description p;
324 po::variables_map vm;
325 po::store(po::command_line_parser(argc, argv).
326 options(desc).positional(p).run(), vm);
329 if (argc == 1 || vm.count(
"help") || msgSize < 8u || cpus.size() != 2) {
330 cout << desc <<
"\n";
335 if (userCfg.size()) {
336 std::istringstream cs(userCfg);
337 config =
Config(cs, isSender?
"tx":
"rx");
339 for (
auto& p : params) {
340 config.put(p.first, p.second);
343 if (msgSize > 1000 && allowAtt && !isSender) {
344 config.put(
"allowRecvMemoryAttachment",
"1002");
347 if (vm.count(
"print")) {
348 write_json(cout, config);
353 runInSenderMode(config, topic, msgSize, cpus, allowAtt);
355 if (!inBuferSizePower2) {
356 inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
357 cout <<
"auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
359 runInReceiverMode(config, topic, msgSize, inBuferSizePower2
360 , cpus, forward<RecvCtorArgsTuple>(recvCtorArgs), allowAtt);
366 using netperf_detail::startNetPerf;
class to hold an hmbdc configuration
Definition: Config.hpp:46
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:672
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:681
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: NetPerf.hpp:162
Definition: Timers.hpp:65
each message type has 16 bit tag
Definition: Message.hpp:29
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:206
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:119
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: NetPerf.hpp:48
Definition: Message.hpp:262
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:461
Definition: NetPerf.hpp:33
Definition: NetPerf.hpp:61
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:328
Definition: NetPerf.hpp:75
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:662
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:186
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: NetPerf.hpp:106
Definition: Timers.hpp:104
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:289