hmbdc
simplify-high-performance-messaging-programming
NetPerf.hpp
1 #include "hmbdc/app/Message.hpp"
2 #include "hmbdc/app/LoggerT.hpp"
3 #include "hmbdc/app/Client.hpp"
4 #include "hmbdc/app/Context.hpp"
5 #include "hmbdc/text/Misc.h"
6 #include "hmbdc/numeric/BitMath.hpp"
7 #include "hmbdc/os/Signals.hpp"
8 
9 #include <iostream>
10 #include <memory>
11 #include <unistd.h>
12 #include <boost/program_options.hpp>
13 
14 extern const char *__progname_full;
15 
16 
17 /**
18  * @namespace hmbdc::app::utils
19  * utilities based on app and serve its higher layers
20  */
21 
22 namespace hmbdc { namespace app { namespace utils {
23 
24 namespace netperf_detail {
25 using namespace tested_module;
26 using namespace hmbdc::time;
27 using namespace hmbdc::numeric;
28 using namespace boost::property_tree;
29 
30 using SendCtx = hmbdc::app::Context<24>;
31 
32 struct Message
33 : hasTag<1001> {
34  Message()
35  : seq(seq_s++)
36  {}
37  size_t seq;
38  friend std::ostream& operator << (std::ostream& os, Message const& r) {
39  os << "Message " << ' ' << r.seq;
40  return os;
41  }
42  char padding[1000 - sizeof(seq)];
43 
44  static uint64_t seq_s;
45 } __attribute__((__packed__));
46 
47 uint64_t Message::seq_s = 0ul;
48 
49 struct MessageAtt
51 , hasTag<1002> {
52  MessageAtt()
53  : seq(seq_s++) {
54  attachment = attachment_s;
55  len = len_s;
56  afterConsumedCleanupFunc = afterConsumedCleanupFunc_s;
57  }
58 
59  size_t seq;
60  friend std::ostream& operator << (std::ostream& os, MessageAtt const& r) {
61  os << "MessageAtt " << ' ' << r.seq;
62  return os;
63  }
64  static uint64_t seq_s;
65  static void* attachment_s;
66  static size_t len_s;
67  static hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc_s;
68 };
69 uint64_t MessageAtt::seq_s = 0ul;
70 void* MessageAtt::attachment_s = nullptr;
71 size_t MessageAtt::len_s = 0;
72 
73 /// we use the file mapped memory for every message repeatedly, so don't need to release anything
74 hasMemoryAttachment::AfterConsumedCleanupFunc MessageAtt::afterConsumedCleanupFunc_s = nullptr;
75 
78  : ReoccuringTimer(Duration::seconds(1u)) {
79  setCallback(
80  [this](TimerManager& tm, SysTime const& now) {
81  report();
82  }
83  );
84  schedule(SysTime::now(), *this);
85  }
86 
87  virtual void report() = 0;
88 };
89 
90 struct SenderClient
91 : Client<SenderClient>
93  SenderClient(SendCtx& ctx, Topic const topic, size_t msgSize)
94  : ctx_(ctx)
95  , msgSize_(msgSize)
96  , periodicMsgCount_(0)
97  , periodStartAt_(SysTime::now()) {
98  sender_ = NetContext::instance().getSender(topic);
99  if (msgSize_ > 1000) {
100  fileMap_.map(__progname_full);
101  MessageAtt::attachment_s = fileMap_.attachment;
102  MessageAtt::len_s = msgSize;
103  }
104  }
105 
106  ~SenderClient() {
107  if (msgSize_ > 1000) fileMap_.unmap();
108  }
109  char const* hmbdcName() const {
110  return "perf-tx";
111  }
112 
113  /*virtual*/
114  void invokedCb(uint16_t) HMBDC_RESTRICT override {
115  //send as fast as possible since the engine has rate control
116  if (likely(msgSize_ <= 1000u)) {
117  Message m;
118  sender_->send(m, msgSize_);
119  periodicMsgCount_++;
120  } else {
121  MessageAtt m;
122  sender_->send(m);
123  periodicMsgCount_++;
124  }
125  }
126 
127  void stoppedCb(std::exception const& e) override {
128  std::cout << (e.what()) << std::endl;
129  };
130 
131  void report() override {
132  auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
133  HMBDC_LOG_n("msgSize=", msgSize_, " mps=", rate, " totalMsgCount=", Message::seq_s);
134  periodicMsgCount_ = 0u;
135  periodStartAt_ = SysTime::now();
136  }
137 
138  SendCtx& ctx_;
139  size_t msgSize_;
140  Sender* sender_;
141  size_t periodicMsgCount_;
142  SysTime periodStartAt_;
143  hasMemoryAttachment fileMap_;
144 };
145 
146 void runInSenderMode(Config & config, Topic const& topic, size_t msgSize, uint16_t startCore) {
147  using namespace std;
148  uint64_t cpuMask = (1ul << startCore);
149  SingletonGuardian<HMBDC_LOGGER> logGuard(std::cout, cpuMask, "SCHED_OTHER");
150 #ifdef HMBDC_LOG_CONTEXT
151  //only use a cpu when async logger is used
152  cpuMask <<= 0x1u;
153 #endif
154  SendCtx ctx;
156  auto engine = NetContext::instance().createSendTransportEngine(config
157  , msgSize > 1000
158  ?std::max(sizeof(MessageAtt), sizeof(StartMemorySegTrain))
159  :msgSize);
160  SenderClient client(ctx, topic, msgSize);
161  ctx.start(*engine, cpuMask, true);
162  cpuMask <<= 0x1u;
163  ctx.start(client, cpuMask);
165  [&ctx]() {
166  ctx.stop();
167  }
168  );
169  ctx.join();
170 }
171 
172 using RecvCtx = Context<>;
174 : Client<ReceiverClient, Message, MessageAtt>
176  ReceiverClient(RecvCtx& ctx, size_t msgSize)
177  : ctx_(ctx)
178  , periodicMsgCount_(0)
179  , periodStartAt_(SysTime::now())
180  , seq_(0)
181  , totalMsgReceived_(0)
182  , missed_(0)
183  , outOfOrder_(0)
184  , duplicate_(0)
185  , msgSize_(msgSize) {
186  }
187 
188  char const* hmbdcName() const {
189  return "perf-rx";
190  }
191 
192  void handleMessageCb(Message const& r) {
193  periodicMsgCount_++;
194  if (seq_ != 0) {
195  if (r.seq > seq_) {
196  missed_ += r.seq - seq_ - 1;
197  } else if (r.seq == seq_) {
198  duplicate_++;
199  } else {
200  outOfOrder_++;
201  }
202  }
203  seq_ = r.seq;
204  totalMsgReceived_++;
205  }
206 
207  void handleMessageCb(MessageAtt const& r) {
208  periodicMsgCount_++;
209  if (seq_ != 0) {
210  if (r.seq > seq_) {
211  missed_ += r.seq - seq_ - 1;
212  } else if (r.seq == seq_) {
213  duplicate_++;
214  } else {
215  outOfOrder_++;
216  }
217  }
218  seq_ = r.seq;
219  totalMsgReceived_++;
220  }
221 
222  void stoppedCb(std::exception const& e) override {
223  std::cout << (e.what()) << std::endl;
224  };
225 
226  void report() override {
227  auto rate = size_t(periodicMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
228  HMBDC_LOG_n("msgSize=", msgSize_, " mps=", rate
229  , " missed=" , missed_ , '(' , missed_?missed_ * 100ul / (missed_ + totalMsgReceived_):0
230  , "%) outOfOrder=" , outOfOrder_
231  , " duplicate=" , duplicate_
232  , " totalMsgCount=" , totalMsgReceived_
233  );
234 
235  periodicMsgCount_ = 0u;
236  periodStartAt_ = SysTime::now();
237  }
238 
239  RecvCtx& ctx_;
240  size_t periodicMsgCount_;
241  SysTime periodStartAt_;
242  size_t seq_;
243  size_t totalMsgReceived_;
244  size_t missed_;
245  size_t outOfOrder_;
246  size_t duplicate_;
247  size_t msgSize_;
248 };
249 
250 template <typename RecvCtorArgsTuple>
251 void runInReceiverMode(Config const& config, Topic const& topic, size_t msgSize, uint16_t inBuferSizePower2, uint16_t startCore
252  , RecvCtorArgsTuple&& recvCtorArgs) {
253  using namespace std;
254  using namespace boost;
255  SingletonGuardian<HMBDC_LOGGER> logGuard(std::cout);
256  RecvCtx ctx(inBuferSizePower2, 1, msgSize > 1000?sizeof(MessageAtt):msgSize);
257 
259  auto& net = NetContext::instance();
260 
261  uint64_t cpuMask = (1ul << startCore);
262  auto engine = net.createRecvTransportEngineTuply(config, ctx.buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
263  ReceiverClient client(ctx, msgSize);
264  net.listenTo(topic);
265  ctx.start(*engine, cpuMask, true);
266  cpuMask <<= 0x1u;
267  ctx.start(client, cpuMask, false);
269  [&ctx]() {
270  ctx.stop();
271  }
272  );
273  ctx.join();
274 
275  net.stopListenTo(topic);
276 }
277 
278 template <typename RecvCtorArgsTuple>
279 int startNetPerf(char const* dftUserConfig, char const* helpStrIn, int argc, char** argv
280  , RecvCtorArgsTuple&& recvCtorArgs) {
281  using namespace std;
282  string role;
283  string userCfg;
284  size_t msgSize;
285  uint16_t startCore;
286  uint16_t inBuferSizePower2;
287  Topic topic("t");
288  if (argc > 1) role = string(argv[1]);
289  if (role != string("--sender")
290  && role != string("--receiver")) {
291  cerr << "1st arg needs to be either --sender or --receiver" << endl;
292  exit(1);
293  }
294  argc--;
295  argv++;
296 
297  bool isSender = role == string("--sender");
298  namespace po = boost::program_options;
299  po::options_description desc("additional allowed options (after --sender or --receiver)");
300  string helpStr =
301  "This program can be used to test messaging performance among a group of hosts.\n"
302  "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n"
303  "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n"
304  "ctrl-c terminate.\n";
305  helpStr += string(helpStrIn);
306  desc.add_options()
307  ("help", helpStr.c_str())
308  ("cfg,c", po::value<std::string>(&userCfg), "use the user provided config file and ignore the ones in command options")
309  ("print", "print effective cfg values")
310  ("startCore,C", po::value<uint16_t>(&startCore)->default_value(0u), "the test uses 3 cores, specify starting core number")
311  ("msgSize", po::value<size_t>(&msgSize)->default_value(8u), "size of the message (8-size of this executable file). when > 1000 (only applies to transport types that support memory attachment), the message is sent using zero copy memory attachment. hmbdc posts no limit on attachment size")
312  ("inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u), "2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
313  ("topic", po::value<Topic>(&topic)->default_value(topic), "topic used in the test");
314 
315  Config dft(dftUserConfig, isSender?"tx":"rx");
316  auto params = dft.content();
317  for (auto it = params.begin(); it != params.end();) {
318  string& name = it->first;
319  string& val = it->second;
320  it++;
321  string& comment = it->second;
322  it++;
323  desc.add_options()
324  (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
325  }
326 
327  po::positional_options_description p;
328  po::variables_map vm;
329  po::store(po::command_line_parser(argc, argv).
330  options(desc).positional(p).run(), vm);
331  po::notify(vm);
332 
333  if (argc == 1 || vm.count("help") || msgSize < 8u) {
334  cout << desc << "\n";
335  return 0;
336  }
337 
338  Config config;
339  if (userCfg.size()) {
340  std::istringstream cs(userCfg);
341  config = Config(cs, isSender?"tx":"rx");
342  } else {
343  for (auto& p : params) {
344  config.put(p.first, p.second);
345  }
346  }
347  if (msgSize > 1000 && !isSender) {
348  config.put("allowRecvMemoryAttachment", "1002");
349  }
350 
351  if (vm.count("print")) {
352  write_json(cout, config);
353  exit(0);
354  }
355 
356  if (isSender) {
357  runInSenderMode(config, topic, msgSize, startCore);
358  } else {
359  if (!inBuferSizePower2) {
360  inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
361  cout << "auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
362  }
363  runInReceiverMode(config, topic, msgSize, inBuferSizePower2, startCore, forward<RecvCtorArgsTuple>(recvCtorArgs));
364  }
365  return 0;
366 }
367 } //netperf_detail
368 
369 using netperf_detail::startNetPerf;
370 }}}
371 
class to hold an hmbdc configuration
Definition: Config.hpp:43
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:622
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:631
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: Timers.hpp:65
each message type has 16 bit tag
Definition: Message.hpp:30
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:577
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: Message.hpp:224
Definition: Time.hpp:13
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:402
Definition: Rater.hpp:10
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:289
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:233
if a specific hmbdc network transport (for example tcpcast) supports message with memory attachment...
Definition: Message.hpp:158
static hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc_s
we use the file mapped memory for every message repeatedly, so don&#39;t need to release anything ...
Definition: NetPerf.hpp:67
Definition: Base.hpp:12
Definition: Timers.hpp:96