1 #include "hmbdc/app/LoggerT.hpp" 2 #include "hmbdc/app/Client.hpp" 3 #include "hmbdc/app/Context.hpp" 4 #include "hmbdc/text/Misc.h" 5 #include "hmbdc/numeric/BitMath.hpp" 6 #include "hmbdc/os/Signals.hpp" 11 #include <boost/program_options.hpp> 13 namespace hmbdc {
namespace app {
namespace utils {
15 using namespace hmbdc;
30 friend std::ostream& operator << (std::ostream& os,
Message const& r) {
31 os <<
"Message " <<
' ' << r.seq;
34 char padding[512 -
sizeof(seq)];
36 static uint64_t seq_s;
37 } __attribute__((__packed__));
39 uint64_t Message::seq_s = 0ul;
49 , periodicMsgCount_(0) {
50 sender_ = NetContext::instance().getSender(topic);
56 schedule(SysTime::now(), *
this);
59 char const* hmbdcName()
const {
64 void invokedCb(uint16_t) __restrict__
override {
67 sender_->send(m, msgSize_);
71 void stoppedCb(std::exception
const& e){
72 std::cout << (e.what()) << std::endl;
76 HMBDC_LOG_n(
"msgSize=", msgSize_,
" mps=", periodicMsgCount_,
" totalMsgCount=", Message::seq_s);
77 periodicMsgCount_ = 0u;
83 size_t periodicMsgCount_;
86 void runInSenderMode(
Config & config,
Topic const& topic, uint16_t msgSize, uint16_t startCore) {
88 uint64_t cpuMask = (1ul << startCore);
90 #ifdef HMBDC_LOG_CONTEXT 97 auto engine = NetContext::instance().createSendTransportEngine(
100 ctx.
start(*engine, cpuMask,
true);
102 ctx.
start(client, cpuMask);
113 :
Client<ReceiverClient, Message>
119 , periodicMsgCount_(0)
121 , totalMsgReceived_(0)
132 schedule(SysTime::now(), *
this);
135 char const* hmbdcName()
const {
139 void handleMessageCb(
Message const& r) {
143 missed_ += r.seq - seq_ - 1;
144 }
else if (r.seq == seq_) {
154 void stoppedCb(std::exception
const& e){
155 std::cout << (e.what()) << std::endl;
159 HMBDC_LOG_n(
"msgSize=", msgSize_,
" mps=", periodicMsgCount_
160 ,
" missed=" , missed_ ,
'(' , missed_?missed_ * 100ul / (missed_ + totalMsgReceived_):0
161 ,
"%) outOfOrder=" , outOfOrder_
162 ,
" duplicate=" , duplicate_
163 ,
" totalMsgCount=" , totalMsgReceived_
166 periodicMsgCount_ = 0u;
170 size_t periodicMsgCount_;
172 size_t totalMsgReceived_;
179 void runInReceiverMode(
Config const& config,
Topic const& topic, uint16_t msgSize, uint16_t inBuferSizePower2, uint16_t startCore) {
181 using namespace boost;
183 RecvCtx ctx(inBuferSizePower2, 1, msgSize);
186 auto& net = NetContext::instance();
188 uint64_t cpuMask = (1ul << startCore);
189 auto engine = net.createRecvTransportEngine(config, ctx.
buffer());
192 ctx.
start(*engine, cpuMask,
true);
194 ctx.
start(client, cpuMask,
false);
202 net.stopListenTo(topic);
205 int startNetPerf(
char const* dftUserConfig,
char const* helpStrIn,
int argc,
char** argv) {
211 uint16_t inBuferSizePower2;
213 if (argc > 1) role = string(argv[1]);
214 if (role !=
string(
"--sender")
215 && role !=
string(
"--receiver")) {
216 cerr <<
"1st arg needs to be either --sender or --receiver" << endl;
222 bool isSender = role == string(
"--sender");
223 namespace po = boost::program_options;
224 po::options_description desc(
"additional allowed options (after --sender or --receiver)");
226 "This program can be used to test messaging performance among a group of hosts.\n" 227 "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n" 228 "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n" 229 "ctrl-c terminate.\n";
230 helpStr += string(helpStrIn);
232 (
"help", helpStr.c_str())
233 (
"cfg,c", po::value<std::string>(&userCfg),
"use the user provided config file and ignore the ones in command options")
234 (
"print",
"print effective cfg values")
235 (
"startCore,C", po::value<uint16_t>(&startCore)->default_value(0u),
"the test uses 3 cores, specify starting core number")
236 (
"msgSize", po::value<uint16_t>(&msgSize)->default_value(8u),
"size of the message (8-1024)")
237 (
"inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u),
"2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
238 (
"topic", po::value<Topic>(&topic)->default_value(topic),
"topic used in the test");
241 Config dft(dftUserConfig, isSender?
"tx":
"rx");
243 for (
auto it = params.begin(); it != params.end();) {
244 string& name = it->first;
245 string& val = it->second;
247 string& comment = it->second;
250 (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
253 po::positional_options_description p;
254 po::variables_map vm;
255 po::store(po::command_line_parser(argc, argv).
256 options(desc).positional(p).run(), vm);
259 if (argc == 1 || vm.count(
"help") || msgSize < 8u) {
260 cout << desc <<
"\n";
265 if (userCfg.size()) {
266 std::istringstream cs(userCfg);
267 config =
Config(cs, isSender?
"tx":
"rx");
269 for (
auto& p : params) {
270 config.put(p.first, p.second);
274 if (vm.count(
"print")) {
275 write_json(cout, config);
280 runInSenderMode(config, topic, msgSize, startCore);
282 if (!inBuferSizePower2) {
283 inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
284 cout <<
"auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
286 runInReceiverMode(config, topic, msgSize, inBuferSizePower2, startCore);
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: Client.hpp:11
Definition: NetPerf.hpp:112
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:588
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:597
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: Timers.hpp:69
Definition: Message.hpp:21
Definition: NetPerf.hpp:41
Definition: GuardedSingleton.hpp:12
Definition: NetPerf.hpp:24
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:274
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:198
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:553
Definition: Context.hpp:384
static void onTermIntDo(function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:29
Definition: Client.hpp:39
Definition: Client.hpp:11
Definition: Timers.hpp:100