hmbdc
simplify-high-performance-messaging-programming
NetPerf.hpp
1 #include "hmbdc/app/Base.hpp"
2 #include "hmbdc/text/Misc.h"
3 #include "hmbdc/numeric/BitMath.hpp"
4 #include "hmbdc/os/Signals.hpp"
5 
6 #include <iostream>
7 #include <memory>
8 #include <unistd.h>
9 #include <boost/program_options.hpp>
10 
11 extern const char *__progname_full;
12 
13 
14 /**
15  * @namespace hmbdc::app::utils
16  * utilities based on app and serve its higher layers
17  */
18 
19 namespace hmbdc { namespace app { namespace utils {
20 
21 namespace netperf_detail {
22 using namespace tested_module;
23 using namespace hmbdc::time;
24 using namespace hmbdc::numeric;
25 using namespace boost::property_tree;
26 
27 using SendCtx = hmbdc::app::Context<24>;
28 
29 struct Message
30 : hasTag<1001> {
31  Message()
32  : seq(seq_s++)
33  {}
34  size_t seq;
35  friend std::ostream& operator << (std::ostream& os, Message const& r) {
36  os << "Message " << ' ' << r.seq;
37  return os;
38  }
39  char padding[1000 - sizeof(seq)];
40 
41  static uint64_t seq_s;
42 } __attribute__((__packed__));
43 
44 uint64_t Message::seq_s = 0ul;
45 
46 struct MessageAtt
48 , hasTag<1002> {
49  MessageAtt()
50  : seq(seq_s++) {
51  attachment = attachment_s;
52  len = len_s;
53  afterConsumedCleanupFunc = afterConsumedCleanupFunc_s;
54  }
55 
56  size_t seq;
57  friend std::ostream& operator << (std::ostream& os, MessageAtt const& r) {
58  os << "MessageAtt " << ' ' << r.seq;
59  return os;
60  }
61  static uint64_t seq_s;
62  static void* attachment_s;
63  static size_t len_s;
64  static hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc_s;
65 };
66 uint64_t MessageAtt::seq_s = 0ul;
67 void* MessageAtt::attachment_s = nullptr;
68 size_t MessageAtt::len_s = 0;
69 
70 /// we use the file mapped memory for every message repeatedly, so don't need to release anything
71 hasMemoryAttachment::AfterConsumedCleanupFunc MessageAtt::afterConsumedCleanupFunc_s = nullptr;
72 
75  : ReoccuringTimer(Duration::seconds(1u)) {
76  setCallback(
77  [this](TimerManager& tm, SysTime const& now) {
78  report();
79  }
80  );
81  schedule(SysTime::now(), *this);
82  }
83 
84  virtual void report() = 0;
85 };
86 
87 struct SenderClient
88 : Client<SenderClient>
90  SenderClient(SendCtx& ctx, Topic const topic, size_t msgSize)
91  : ctx_(ctx)
92  , msgSize_(msgSize)
93  , periodicMsgCount_(0)
94  , periodStartAt_(SysTime::now()) {
95  sender_ = NetContext::instance().getSender(topic);
96  if (msgSize_ > 1000) {
97  fileMap_.map(__progname_full);
98  MessageAtt::attachment_s = fileMap_.attachment;
99  MessageAtt::len_s = msgSize;
100  }
101  }
102 
103  ~SenderClient() {
104  if (msgSize_ > 1000) fileMap_.unmap();
105  }
106  char const* hmbdcName() const {
107  return "perf-tx";
108  }
109 
110  /*virtual*/
111  void invokedCb(uint16_t) HMBDC_RESTRICT override {
112  //send as fast as possible since the engine has rate control
113  if (hmbdc_likely(msgSize_ <= 1000u)) {
114  Message m;
115  sender_->send(m, msgSize_);
116  periodicMsgCount_++;
117  } else {
118  MessageAtt m;
119  sender_->send(m);
120  periodicMsgCount_++;
121  }
122  }
123 
124  void stoppedCb(std::exception const& e) override {
125  std::cout << (e.what()) << std::endl;
126  };
127 
128  void report() override {
129  auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
130  HMBDC_LOG_n("msgSize=", msgSize_, " mps=", rate, " totalMsgCount=", Message::seq_s);
131  periodicMsgCount_ = 0u;
132  periodStartAt_ = SysTime::now();
133  }
134 
135  SendCtx& ctx_;
136  size_t msgSize_;
137  Sender* sender_;
138  size_t periodicMsgCount_;
139  SysTime periodStartAt_;
140  hasMemoryAttachment fileMap_;
141 };
142 
143 void runInSenderMode(Config & config, Topic const& topic, size_t msgSize, uint16_t startCore) {
144  using namespace std;
145  uint64_t cpuMask = (1ul << startCore);
146  SingletonGuardian<HMBDC_LOGGER> logGuard(std::cout, cpuMask, "SCHED_OTHER");
147 #ifdef HMBDC_LOG_CONTEXT
148  //only use a cpu when async logger is used
149  cpuMask <<= 0x1u;
150 #endif
151  SendCtx ctx;
153  auto engine = NetContext::instance().createSendTransportEngine(config
154  , msgSize > 1000
155  ?std::max(sizeof(MessageAtt), sizeof(StartMemorySegTrain))
156  :msgSize);
157  SenderClient client(ctx, topic, msgSize);
158  ctx.start(*engine, cpuMask, true);
159  cpuMask <<= 0x1u;
160  ctx.start(client, cpuMask);
162  [&ctx]() {
163  ctx.stop();
164  }
165  );
166  ctx.join();
167 }
168 
169 using RecvCtx = Context<>;
171 : Client<ReceiverClient, Message, MessageAtt>
173  ReceiverClient(RecvCtx& ctx, size_t msgSize)
174  : ctx_(ctx)
175  , periodicMsgCount_(0)
176  , periodStartAt_(SysTime::now())
177  , seq_(0)
178  , totalMsgReceived_(0)
179  , missed_(0)
180  , outOfOrder_(0)
181  , duplicate_(0)
182  , msgSize_(msgSize) {
183  }
184 
185  char const* hmbdcName() const {
186  return "perf-rx";
187  }
188 
189  void handleMessageCb(Message const& r) {
190  periodicMsgCount_++;
191  if (seq_ != 0) {
192  if (r.seq > seq_) {
193  missed_ += r.seq - seq_ - 1;
194  } else if (r.seq == seq_) {
195  duplicate_++;
196  } else {
197  outOfOrder_++;
198  }
199  }
200  seq_ = r.seq;
201  totalMsgReceived_++;
202  }
203 
204  void handleMessageCb(MessageAtt const& r) {
205  periodicMsgCount_++;
206  if (seq_ != 0) {
207  if (r.seq > seq_) {
208  missed_ += r.seq - seq_ - 1;
209  } else if (r.seq == seq_) {
210  duplicate_++;
211  } else {
212  outOfOrder_++;
213  }
214  }
215  seq_ = r.seq;
216  totalMsgReceived_++;
217  }
218 
219  void stoppedCb(std::exception const& e) override {
220  std::cout << (e.what()) << std::endl;
221  };
222 
223  void report() override {
224  auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
225  HMBDC_LOG_n("msgSize=", msgSize_, " mps=", rate
226  , " missed=" , missed_ , '(' , missed_?missed_ * 100ul / (missed_ + totalMsgReceived_):0
227  , "%) outOfOrder=" , outOfOrder_
228  , " duplicate=" , duplicate_
229  , " totalMsgCount=" , totalMsgReceived_
230  );
231 
232  periodicMsgCount_ = 0u;
233  periodStartAt_ = SysTime::now();
234  }
235 
236  RecvCtx& ctx_;
237  size_t periodicMsgCount_;
238  SysTime periodStartAt_;
239  size_t seq_;
240  size_t totalMsgReceived_;
241  size_t missed_;
242  size_t outOfOrder_;
243  size_t duplicate_;
244  size_t msgSize_;
245 };
246 
247 template <typename RecvCtorArgsTuple>
248 void runInReceiverMode(Config const& config, Topic const& topic, size_t msgSize, uint16_t inBuferSizePower2, uint16_t startCore
249  , RecvCtorArgsTuple&& recvCtorArgs) {
250  using namespace std;
251  using namespace boost;
252  SingletonGuardian<HMBDC_LOGGER> logGuard(std::cout);
253  RecvCtx ctx(inBuferSizePower2, 1, msgSize > 1000?sizeof(MessageAtt):msgSize);
254 
256  auto& net = NetContext::instance();
257 
258  uint64_t cpuMask = (1ul << startCore);
259  auto engine = net.createRecvTransportEngineTuply(config, ctx.buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
260  ReceiverClient client(ctx, msgSize);
261  net.listenTo(topic);
262  ctx.start(*engine, cpuMask, true);
263  cpuMask <<= 0x1u;
264  ctx.start(client, cpuMask, false);
266  [&ctx]() {
267  ctx.stop();
268  }
269  );
270  ctx.join();
271 
272  net.stopListenTo(topic);
273 }
274 
275 template <typename RecvCtorArgsTuple>
276 int startNetPerf(char const* dftUserConfig, char const* helpStrIn, int argc, char** argv
277  , RecvCtorArgsTuple&& recvCtorArgs) {
278  using namespace std;
279  string role;
280  string userCfg;
281  size_t msgSize;
282  uint16_t startCore;
283  uint16_t inBuferSizePower2;
284  Topic topic("t");
285  if (argc > 1) role = string(argv[1]);
286  if (role != string("--sender")
287  && role != string("--receiver")) {
288  cerr << "1st arg needs to be either --sender or --receiver" << endl;
289  exit(1);
290  }
291  argc--;
292  argv++;
293 
294  bool isSender = role == string("--sender");
295  namespace po = boost::program_options;
296  po::options_description desc("additional allowed options (after --sender or --receiver)");
297  string helpStr =
298  "This program can be used to test messaging performance among a group of hosts.\n"
299  "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n"
300  "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n"
301  "ctrl-c terminate.\n";
302  helpStr += string(helpStrIn);
303  desc.add_options()
304  ("help", helpStr.c_str())
305  ("cfg,c", po::value<std::string>(&userCfg), "use the user provided config file and ignore the ones in command options")
306  ("print", "print effective cfg values")
307  ("startCore,C", po::value<uint16_t>(&startCore)->default_value(0u), "the test uses 3 cores, specify starting core number")
308  ("msgSize", po::value<size_t>(&msgSize)->default_value(8u), "size of the message (8-size of this executable file). when > 1000 (only applies to transport types that support memory attachment), the message is sent using zero copy memory attachment. hmbdc posts no limit on attachment size")
309  ("inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u), "2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
310  ("topic", po::value<Topic>(&topic)->default_value(topic), "topic used in the test");
311 
312  Config dft(dftUserConfig, isSender?"tx":"rx");
313  auto params = dft.content();
314  for (auto it = params.begin(); it != params.end();) {
315  string& name = it->first;
316  string& val = it->second;
317  it++;
318  string& comment = it->second;
319  it++;
320  desc.add_options()
321  (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
322  }
323 
324  po::positional_options_description p;
325  po::variables_map vm;
326  po::store(po::command_line_parser(argc, argv).
327  options(desc).positional(p).run(), vm);
328  po::notify(vm);
329 
330  if (argc == 1 || vm.count("help") || msgSize < 8u) {
331  cout << desc << "\n";
332  return 0;
333  }
334 
335  Config config;
336  if (userCfg.size()) {
337  std::istringstream cs(userCfg);
338  config = Config(cs, isSender?"tx":"rx");
339  } else {
340  for (auto& p : params) {
341  config.put(p.first, p.second);
342  }
343  }
344  if (msgSize > 1000 && !isSender) {
345  config.put("allowRecvMemoryAttachment", "1002");
346  }
347 
348  if (vm.count("print")) {
349  write_json(cout, config);
350  exit(0);
351  }
352 
353  if (isSender) {
354  runInSenderMode(config, topic, msgSize, startCore);
355  } else {
356  if (!inBuferSizePower2) {
357  inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
358  cout << "auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
359  }
360  runInReceiverMode(config, topic, msgSize, inBuferSizePower2, startCore, forward<RecvCtorArgsTuple>(recvCtorArgs));
361  }
362  return 0;
363 }
364 } //netperf_detail
365 
366 using netperf_detail::startNetPerf;
367 }}}
368 
class to hold an hmbdc configuration
Definition: Config.hpp:44
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:630
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:639
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: Timers.hpp:65
each message type has 16 bit tag
Definition: Message.hpp:30
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:585
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: Message.hpp:224
Definition: Time.hpp:13
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:408
Definition: Rater.hpp:10
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:291
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:264
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:158
static hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc_s
we use the file mapped memory for every message repeatedly, so don&#39;t need to release anything ...
Definition: NetPerf.hpp:64
Definition: Base.hpp:12
Definition: Timers.hpp:96