hmbdc
simplify-high-performance-messaging-programming
NetPerf.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Base.hpp"
4 #include "hmbdc/text/Misc.h"
5 #include "hmbdc/numeric/BitMath.hpp"
6 #include "hmbdc/os/Signals.hpp"
7 
8 #include <iostream>
9 #include <vector>
10 #include <memory>
11 #include <unistd.h>
12 #include <boost/program_options.hpp>
13 
14 extern const char *__progname_full;
15 
16 
17 /**
18  * @namespace hmbdc::app::utils
19  * utilities based on app and serve its higher layers
20  */
21 
22 namespace hmbdc { namespace app { namespace utils {
23 
24 namespace netperf_detail {
25 using namespace tested_module;
26 using namespace hmbdc::time;
27 using namespace hmbdc::numeric;
28 using namespace boost::property_tree;
29 using namespace std;
30 
31 using SendCtx = hmbdc::app::Context<24>;
32 
33 struct Message
34 : hasTag<1001> {
35  Message(uint32_t len)
36  : len(len)
37  , seq(0)
38  {}
39  uint32_t len;
40  uint32_t seq;
41  friend std::ostream& operator << (std::ostream& os, Message const& r) {
42  os << "Message " << ' ' << r.len << ' ' << r.seq;
43  return os;
44  }
45  char padding[65536 - sizeof(seq) - sizeof(len)];
46 } __attribute__((__packed__));
47 
48 struct MessageAtt
50 , hasTag<1002> {
51  MessageAtt()
52  : seq(0)
53  {}
54  uint32_t seq;
55  friend std::ostream& operator << (std::ostream& os, MessageAtt const& r) {
56  os << "MessageAtt " << ' ' << r.len << ' ' << r.seq;
57  return os;
58  }
59 };
60 
63  : ReoccuringTimer(Duration::seconds(1u)) {
64  setCallback(
65  [this](TimerManager& tm, SysTime const& now) {
66  report();
67  }
68  );
69  schedule(SysTime::now(), *this);
70  }
71 
72  virtual void report() = 0;
73 };
74 
75 struct SenderClient
76 : Client<SenderClient>
78  SenderClient(SendCtx& ctx, Topic const topic, size_t msgSize, bool allowAtt)
79  : ctx_(ctx)
80  , allowAtt_(allowAtt)
81  , msg_(msgSize)
82  , msgAtt_()
83  , periodicGoodMsgCount_(0)
84  , periodStartAt_(SysTime::now()) {
85  sender_ = NetContext::instance().getSender(topic);
86  if (msg_.len > 1000 && allowAtt) {
87  fileMap_.map(__progname_full);
88  if (fileMap_.len < msgSize) {
89  HMBDC_THROW(std::out_of_range, "exceeds test allowed msg size <=" << fileMap_.len);
90  }
91  msgAtt_.attachment = fileMap_.attachment;
92  msgAtt_.len = msgSize;
93  //don't release memory, since we r reusing the same msfAtt_ everytime
94  msgAtt_.afterConsumedCleanupFunc = nullptr;
95  }
96  }
97 
98  ~SenderClient() {
99  if (msgAtt_.len > 1000 && allowAtt_) fileMap_.unmap();
100  }
101  char const* hmbdcName() const {
102  return "perf-tx";
103  }
104 
105  /*virtual*/
106  void invokedCb(uint16_t) HMBDC_RESTRICT override {
107  //send as fast as possible since the engine has rate control
108  if (hmbdc_likely(!allowAtt_ || msg_.len <= 1000u)) {
109  sender_->send(msg_, msg_.len);
110  msg_.seq++;
111  periodicGoodMsgCount_++;
112  } else {
113  sender_->send(msgAtt_);
114  msgAtt_.seq++;
115  periodicGoodMsgCount_++;
116  }
117  }
118 
119  void stoppedCb(std::exception const& e) override {
120  std::cout << (e.what()) << std::endl;
121  };
122 
123  void report() override {
124  auto rate = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
125  auto l = msg_.len;
126  HMBDC_LOG_n("msgSize=", l, " mps=", rate);
127  periodicGoodMsgCount_ = 0u;
128  periodStartAt_ = SysTime::now();
129  }
130 
131  SendCtx& ctx_;
132  bool allowAtt_;
133  Sender* sender_;
134  Message msg_;
135  MessageAtt msgAtt_;
136  size_t periodicGoodMsgCount_;
137  SysTime periodStartAt_;
138  hasMemoryAttachment fileMap_;
139 };
140 
141 void runInSenderMode(Config & config, Topic const& topic, size_t msgSize
142  , vector<uint16_t> cpus, bool allowAtt) {
143  using namespace std;
144  SendCtx ctx;
146  auto engine = NetContext::instance().createSendTransportEngine(config
147  , msgSize > 1000 && allowAtt
148  ?std::max(sizeof(MessageAtt), sizeof(StartMemorySegTrain))
149  :msgSize);
150  SenderClient client(ctx, topic, msgSize, allowAtt);
151  ctx.start(*engine, 1ul << cpus[0]
152  , client, 1ul << cpus[1]);
154  [&ctx]() {
155  ctx.stop();
156  }
157  );
158  ctx.join();
159 }
160 
163 : Client<ReceiverClient, Message, MessageAtt>
165  ReceiverClient(RecvCtx& ctx, size_t msgOverhead)
166  : ctx_(ctx)
167  , periodicGoodMsgCount_(0)
168  , periodicGoodMsgBytes_(0)
169  , periodicMsgCount_(0)
170  , periodicMsgBytes_(0)
171  , periodStartAt_(SysTime::now())
172  , seq_(0)
173  , periodicMissed_(0)
174  , periodicOoo_(0)
175  , periodicDup_(0)
176  , msgSize_(0)
177  , msgOverhead_(msgOverhead) {
178  }
179 
180  char const* hmbdcName() const {
181  return "perf-rx";
182  }
183 
184  template<typename M>
185  void handleMessageCb(M const& r) {
186  msgSize_ = r.len;
187  if (hmbdc_likely(r.seq > seq_)) {
188  periodicGoodMsgCount_++;
189  periodicGoodMsgBytes_ += msgOverhead_ + msgSize_;
190  if (hmbdc_likely(seq_)) periodicMissed_ += r.seq - seq_ - 1;
191  } else if (r.seq == seq_) {
192  periodicDup_++;
193  } else {
194  periodicOoo_++;
195  }
196  seq_ = r.seq;
197  periodicMsgCount_++;
198  periodicMsgBytes_ += msgOverhead_ + msgSize_;
199  }
200 
201  void handleMessageCb(MessageAtt& r) {
202  handleMessageCb<MessageAtt>(r);
203  r.release();
204  }
205 
206  void stoppedCb(std::exception const& e) override {
207  std::cout << (e.what()) << std::endl;
208  };
209 
210  void report() override {
211  auto rateGood = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
212  HMBDC_LOG_n("msgSize=", msgSize_, " msgOverhead=", msgOverhead_, " mps(good)=", rateGood, " Mbps(good)=", periodicGoodMsgBytes_*8u >> 20u
213  , " missed=" , periodicMissed_ , '(' , periodicMissed_?periodicMissed_ * 100ul / (periodicMissed_ + periodicMsgCount_):0
214  , "%) outOfOrder=" , periodicOoo_
215  , " duplicate=" , periodicDup_
216  , " Mbps(all)=" , periodicMsgBytes_*8u >> 20u
217  );
218 
219  periodicGoodMsgCount_ = 0u;
220  periodicGoodMsgBytes_ = 0u;
221  periodicMsgCount_ = 0;
222  periodicMsgBytes_ = 0u;
223  periodicMissed_ = 0;
224  periodicDup_ = 0;
225  periodicOoo_ = 0;
226 
227  periodStartAt_ = SysTime::now();
228  }
229 
230  RecvCtx& ctx_;
231  size_t periodicGoodMsgCount_;
232  size_t periodicGoodMsgBytes_;
233  size_t periodicMsgCount_;
234  size_t periodicMsgBytes_;
235  SysTime periodStartAt_;
236  size_t seq_;
237  size_t periodicMissed_;
238  size_t periodicOoo_;
239  size_t periodicDup_;
240  size_t msgSize_;
241  size_t msgOverhead_;
242 };
243 
244 template <typename RecvCtorArgsTuple>
245 void runInReceiverMode(Config const& config, Topic const& topic, size_t msgSize, uint16_t inBuferSizePower2
246  , vector<uint16_t> cpus, RecvCtorArgsTuple&& recvCtorArgs, bool allowAtt) {
247  using namespace std;
248  using namespace boost;
249  RecvCtx ctx(inBuferSizePower2, 1, msgSize > 1000 && allowAtt?sizeof(MessageAtt):msgSize);
250 
252  auto& net = NetContext::instance();
253 
254  auto engine = net.createRecvTransportEngineTuply(config, ctx.buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
255  auto msgOverhead = 8u + sizeof(TransportMessageHeader) + topic.size();
256  ReceiverClient client(ctx, msgOverhead);
257  net.listenTo(topic);
258  ctx.start(*engine, 1ul << cpus[0]
259  , client, 1ul << cpus[1]);
261  [&ctx]() {
262  ctx.stop();
263  }
264  );
265  ctx.join();
266 
267  net.stopListenTo(topic);
268 }
269 
270 template <typename RecvCtorArgsTuple>
271 int startNetPerf(char const* dftUserConfig, char const* helpStrIn, int argc, char** argv
272  , RecvCtorArgsTuple&& recvCtorArgs, bool allowAtt = true) {
273  SingletonGuardian<SyncLogger> logGuard(cout);
274  using namespace std;
275  string role;
276  string userCfg;
277  size_t msgSize;
278  vector<uint16_t> cpus;
279  uint16_t inBuferSizePower2;
280  Topic topic("t");
281  if (argc > 1) role = string(argv[1]);
282  if (role != string("--sender")
283  && role != string("--receiver")) {
284  cerr << "1st arg needs to be either --sender or --receiver" << endl;
285  exit(1);
286  }
287  argc--;
288  argv++;
289 
290  bool isSender = role == string("--sender");
291  auto checkMsgSize = [](size_t s) {
292  if (s < 8) HMBDC_THROW(out_of_range, "msgSize too small (>=8)");
293  };
294  namespace po = boost::program_options;
295  po::options_description desc("additional allowed options (after --sender or --receiver)");
296  string helpStr =
297  "This program can be used to test messaging performance among a group of hosts.\n"
298  "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n"
299  "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n"
300  "ctrl-c terminate.\n";
301  helpStr += string(helpStrIn);
302  desc.add_options()
303  ("help", helpStr.c_str())
304  ("cfg,c", po::value<std::string>(&userCfg), "use the user provided config file and ignore the ones in command options")
305  ("print", "print effective cfg values")
306  ("cpus", po::value(&cpus)->multitoken()->default_value({0, 1}, "0 1"), "specify the 2 cpu index numbers used in the test (1 for engine and 1 for msg client)")
307  ("msgSize", po::value<size_t>(&msgSize)->default_value(8u)->notifier(checkMsgSize), "size of the message (8-size of this executable file). when > 1000 the message is sent using zero copy memory attachment if it is supported by the transport type and hmbdc posts no limit on attachment size")
308  ("inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u), "2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
309  ("topic", po::value<Topic>(&topic)->default_value(topic), "topic used in the test");
310 
311  Config dft(dftUserConfig, isSender?"tx":"rx");
312  auto params = dft.content();
313  for (auto it = params.begin(); it != params.end();) {
314  string& name = it->first;
315  string& val = it->second;
316  it++;
317  string& comment = it->second;
318  it++;
319  desc.add_options()
320  (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
321  }
322 
323  po::positional_options_description p;
324  po::variables_map vm;
325  po::store(po::command_line_parser(argc, argv).
326  options(desc).positional(p).run(), vm);
327  po::notify(vm);
328 
329  if (argc == 1 || vm.count("help") || msgSize < 8u || cpus.size() != 2) {
330  cout << desc << "\n";
331  return 0;
332  }
333 
334  Config config;
335  if (userCfg.size()) {
336  std::istringstream cs(userCfg);
337  config = Config(cs, isSender?"tx":"rx");
338  } else {
339  for (auto& p : params) {
340  config.put(p.first, p.second);
341  }
342  }
343  if (msgSize > 1000 && allowAtt && !isSender) {
344  config.put("allowRecvMemoryAttachment", "1002");
345  }
346 
347  if (vm.count("print")) {
348  write_json(cout, config);
349  exit(0);
350  }
351 
352  if (isSender) {
353  runInSenderMode(config, topic, msgSize, cpus, allowAtt);
354  } else {
355  if (!inBuferSizePower2) {
356  inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
357  cout << "auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
358  }
359  runInReceiverMode(config, topic, msgSize, inBuferSizePower2
360  , cpus, forward<RecvCtorArgsTuple>(recvCtorArgs), allowAtt);
361  }
362  return 0;
363 }
364 } //netperf_detail
365 
366 using netperf_detail::startNetPerf;
367 }}}
368 
class to hold an hmbdc configuration
Definition: Config.hpp:46
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:672
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:681
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: Timers.hpp:65
each message type has 16 bit tag
Definition: Message.hpp:29
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:206
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:119
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: Message.hpp:262
Definition: Time.hpp:14
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:461
Definition: Rater.hpp:10
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:328
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:45
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:662
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:186
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: NetPerf.hpp:106
Definition: Base.hpp:12
Definition: Timers.hpp:104
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:289