1 #include "hmbdc/app/Base.hpp" 2 #include "hmbdc/text/Misc.h" 3 #include "hmbdc/numeric/BitMath.hpp" 4 #include "hmbdc/os/Signals.hpp" 9 #include <boost/program_options.hpp> 11 extern const char *__progname_full;
19 namespace hmbdc {
namespace app {
namespace utils {
21 namespace netperf_detail {
35 friend std::ostream& operator << (std::ostream& os,
Message const& r) {
36 os <<
"Message " <<
' ' << r.seq;
39 char padding[1000 -
sizeof(seq)];
41 static uint64_t seq_s;
42 } __attribute__((__packed__));
44 uint64_t Message::seq_s = 0ul;
51 attachment = attachment_s;
53 afterConsumedCleanupFunc = afterConsumedCleanupFunc_s;
57 friend std::ostream& operator << (std::ostream& os,
MessageAtt const& r) {
58 os <<
"MessageAtt " <<
' ' << r.seq;
61 static uint64_t seq_s;
62 static void* attachment_s;
66 uint64_t MessageAtt::seq_s = 0ul;
67 void* MessageAtt::attachment_s =
nullptr;
68 size_t MessageAtt::len_s = 0;
81 schedule(SysTime::now(), *
this);
84 virtual void report() = 0;
93 , periodicMsgCount_(0)
94 , periodStartAt_(SysTime::now()) {
95 sender_ = NetContext::instance().getSender(topic);
96 if (msgSize_ > 1000) {
97 fileMap_.map(__progname_full);
98 MessageAtt::attachment_s = fileMap_.attachment;
99 MessageAtt::len_s = msgSize;
104 if (msgSize_ > 1000) fileMap_.unmap();
106 char const* hmbdcName()
const {
111 void invokedCb(uint16_t) HMBDC_RESTRICT
override {
113 if (hmbdc_likely(msgSize_ <= 1000u)) {
115 sender_->send(m, msgSize_);
124 void stoppedCb(std::exception
const& e)
override {
125 std::cout << (e.what()) << std::endl;
128 void report()
override {
129 auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
130 HMBDC_LOG_n(
"msgSize=", msgSize_,
" mps=", rate,
" totalMsgCount=", Message::seq_s);
131 periodicMsgCount_ = 0u;
132 periodStartAt_ = SysTime::now();
138 size_t periodicMsgCount_;
143 void runInSenderMode(
Config & config,
Topic const& topic,
size_t msgSize, uint16_t startCore) {
145 uint64_t cpuMask = (1ul << startCore);
147 #ifdef HMBDC_LOG_CONTEXT 153 auto engine = NetContext::instance().createSendTransportEngine(config
158 ctx.
start(*engine, cpuMask,
true);
160 ctx.
start(client, cpuMask);
171 :
Client<ReceiverClient, Message, MessageAtt>
175 , periodicMsgCount_(0)
176 , periodStartAt_(SysTime::now())
178 , totalMsgReceived_(0)
182 , msgSize_(msgSize) {
185 char const* hmbdcName()
const {
189 void handleMessageCb(
Message const& r) {
193 missed_ += r.seq - seq_ - 1;
194 }
else if (r.seq == seq_) {
208 missed_ += r.seq - seq_ - 1;
209 }
else if (r.seq == seq_) {
219 void stoppedCb(std::exception
const& e)
override {
220 std::cout << (e.what()) << std::endl;
223 void report()
override {
224 auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
225 HMBDC_LOG_n(
"msgSize=", msgSize_,
" mps=", rate
226 ,
" missed=" , missed_ ,
'(' , missed_?missed_ * 100ul / (missed_ + totalMsgReceived_):0
227 ,
"%) outOfOrder=" , outOfOrder_
228 ,
" duplicate=" , duplicate_
229 ,
" totalMsgCount=" , totalMsgReceived_
232 periodicMsgCount_ = 0u;
233 periodStartAt_ = SysTime::now();
237 size_t periodicMsgCount_;
240 size_t totalMsgReceived_;
247 template <
typename RecvCtorArgsTuple>
248 void runInReceiverMode(
Config const& config,
Topic const& topic,
size_t msgSize, uint16_t inBuferSizePower2, uint16_t startCore
249 , RecvCtorArgsTuple&& recvCtorArgs) {
251 using namespace boost;
256 auto& net = NetContext::instance();
258 uint64_t cpuMask = (1ul << startCore);
259 auto engine = net.createRecvTransportEngineTuply(config, ctx.
buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
262 ctx.
start(*engine, cpuMask,
true);
264 ctx.
start(client, cpuMask,
false);
272 net.stopListenTo(topic);
275 template <
typename RecvCtorArgsTuple>
276 int startNetPerf(
char const* dftUserConfig,
char const* helpStrIn,
int argc,
char** argv
277 , RecvCtorArgsTuple&& recvCtorArgs) {
283 uint16_t inBuferSizePower2;
285 if (argc > 1) role = string(argv[1]);
286 if (role !=
string(
"--sender")
287 && role !=
string(
"--receiver")) {
288 cerr <<
"1st arg needs to be either --sender or --receiver" << endl;
294 bool isSender = role == string(
"--sender");
295 namespace po = boost::program_options;
296 po::options_description desc(
"additional allowed options (after --sender or --receiver)");
298 "This program can be used to test messaging performance among a group of hosts.\n" 299 "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n" 300 "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n" 301 "ctrl-c terminate.\n";
302 helpStr += string(helpStrIn);
304 (
"help", helpStr.c_str())
305 (
"cfg,c", po::value<std::string>(&userCfg),
"use the user provided config file and ignore the ones in command options")
306 (
"print",
"print effective cfg values")
307 (
"startCore,C", po::value<uint16_t>(&startCore)->default_value(0u),
"the test uses 3 cores, specify starting core number")
308 (
"msgSize", po::value<size_t>(&msgSize)->default_value(8u),
"size of the message (8-size of this executable file). when > 1000 (only applies to transport types that support memory attachment), the message is sent using zero copy memory attachment. hmbdc posts no limit on attachment size")
309 (
"inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u),
"2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
310 (
"topic", po::value<Topic>(&topic)->default_value(topic),
"topic used in the test");
312 Config dft(dftUserConfig, isSender?
"tx":
"rx");
314 for (
auto it = params.begin(); it != params.end();) {
315 string& name = it->first;
316 string& val = it->second;
318 string& comment = it->second;
321 (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
324 po::positional_options_description p;
325 po::variables_map vm;
326 po::store(po::command_line_parser(argc, argv).
327 options(desc).positional(p).run(), vm);
330 if (argc == 1 || vm.count(
"help") || msgSize < 8u) {
331 cout << desc <<
"\n";
336 if (userCfg.size()) {
337 std::istringstream cs(userCfg);
338 config =
Config(cs, isSender?
"tx":
"rx");
340 for (
auto& p : params) {
341 config.put(p.first, p.second);
344 if (msgSize > 1000 && !isSender) {
345 config.put(
"allowRecvMemoryAttachment",
"1002");
348 if (vm.count(
"print")) {
349 write_json(cout, config);
354 runInSenderMode(config, topic, msgSize, startCore);
356 if (!inBuferSizePower2) {
357 inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
358 cout <<
"auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
360 runInReceiverMode(config, topic, msgSize, inBuferSizePower2, startCore, forward<RecvCtorArgsTuple>(recvCtorArgs));
366 using netperf_detail::startNetPerf;
class to hold an hmbdc configuration
Definition: Config.hpp:44
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:630
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:639
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: NetPerf.hpp:170
Definition: Timers.hpp:65
each message type has 16 bit tag
Definition: Message.hpp:30
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:585
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: NetPerf.hpp:46
Definition: Message.hpp:224
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:408
Definition: NetPerf.hpp:29
Definition: NetPerf.hpp:73
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:291
Definition: NetPerf.hpp:87
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:264
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:158
static hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc_s
we use the file mapped memory for every message repeatedly, so don't need to release anything ...
Definition: NetPerf.hpp:64
Definition: Timers.hpp:96