hmbdc
simplify-high-performance-messaging-programming
NetPerf.hpp
1 #include "hmbdc/app/LoggerT.hpp"
2 #include "hmbdc/app/Client.hpp"
3 #include "hmbdc/app/Context.hpp"
4 #include "hmbdc/text/Misc.h"
5 #include "hmbdc/numeric/BitMath.hpp"
6 #include "hmbdc/os/Signals.hpp"
7 
8 #include <iostream>
9 #include <memory>
10 #include <unistd.h>
11 #include <boost/program_options.hpp>
12 
13 namespace hmbdc { namespace app { namespace utils {
14 
15 using namespace hmbdc;
16 using namespace hmbdc::app;
17 using namespace tested_module;
18 using namespace hmbdc::time;
19 using namespace hmbdc::numeric;
20 using namespace boost::property_tree;
21 
22 using SendCtx = hmbdc::app::Context<24>;
23 
24 struct Message
25 : hasTag<1001> {
26  Message()
27  : seq(seq_s++)
28  {}
29  size_t seq;
30  friend std::ostream& operator << (std::ostream& os, Message const& r) {
31  os << "Message " << ' ' << r.seq;
32  return os;
33  }
34  char padding[512 - sizeof(seq)];
35 
36  static uint64_t seq_s;
37 } __attribute__((__packed__));
38 
39 uint64_t Message::seq_s = 0ul;
40 
41 struct SenderClient
42 : Client<SenderClient>
45  SenderClient(SendCtx& ctx, Topic const topic, size_t msgSize)
46  : ReoccuringTimer(Duration::seconds(1u))
47  , ctx_(ctx)
48  , msgSize_(msgSize)
49  , periodicMsgCount_(0) {
50  sender_ = NetContext::instance().getSender(topic);
51  setCallback(
52  [this](TimerManager& tm, SysTime const& now) {
53  report();
54  }
55  );
56  schedule(SysTime::now(), *this);
57  }
58 
59  char const* hmbdcName() const {
60  return "perf-tx";
61  }
62 
63  /*virtual*/
64  void invokedCb(uint16_t) __restrict__ override {
65  //send as fast as possible since the engine has rate control
66  Message m;
67  sender_->send(m, msgSize_);
68  periodicMsgCount_++;
69  }
70 
71  void stoppedCb(std::exception const& e){
72  std::cout << (e.what()) << std::endl;
73  };
74 
75  void report() {
76  HMBDC_LOG_n("msgSize=", msgSize_, " mps=", periodicMsgCount_, " totalMsgCount=", Message::seq_s);
77  periodicMsgCount_ = 0u;
78  }
79 
80  SendCtx& ctx_;
81  size_t msgSize_;
82  Sender* sender_;
83  size_t periodicMsgCount_;
84 };
85 
86 void runInSenderMode(Config & config, Topic const& topic, uint16_t msgSize, uint16_t startCore) {
87  using namespace std;
88  uint64_t cpuMask = (1ul << startCore);
89  SingletonGuardian<HMBDC_LOGGER> logGuard(std::cout, cpuMask, "SCHED_OTHER");
90 #ifdef HMBDC_LOG_CONTEXT
91  //only use a cpu when async logger is used
92  cpuMask <<= 0x1u;
93 #endif
94  SendCtx ctx;
96 
97  auto engine = NetContext::instance().createSendTransportEngine(
98  config, msgSize);
99  SenderClient client(ctx, topic, msgSize);
100  ctx.start(*engine, cpuMask, true);
101  cpuMask <<= 0x1u;
102  ctx.start(client, cpuMask);
104  [&ctx]() {
105  ctx.stop();
106  }
107  );
108  ctx.join();
109 }
110 
111 using RecvCtx = Context<>;
113 : Client<ReceiverClient, Message>
114 , TimerManager
115 , ReoccuringTimer {
116  ReceiverClient(RecvCtx& ctx, size_t msgSize)
117  : ReoccuringTimer(Duration::seconds(1u))
118  , ctx_(ctx)
119  , periodicMsgCount_(0)
120  , seq_(0)
121  , totalMsgReceived_(0)
122  , missed_(0)
123  , outOfOrder_(0)
124  , duplicate_(0)
125  , msgSize_(msgSize)
126  {
127  setCallback(
128  [this](TimerManager& tm, SysTime const& now) {
129  report();
130  }
131  );
132  schedule(SysTime::now(), *this);
133  }
134 
135  char const* hmbdcName() const {
136  return "perf-rx";
137  }
138 
139  void handleMessageCb(Message const& r) {
140  periodicMsgCount_++;
141  if (seq_ != 0) {
142  if (r.seq > seq_) {
143  missed_ += r.seq - seq_ - 1;
144  } else if (r.seq == seq_) {
145  duplicate_++;
146  } else {
147  outOfOrder_++;
148  }
149  }
150  seq_ = r.seq;
151  totalMsgReceived_++;
152  }
153 
154  void stoppedCb(std::exception const& e){
155  std::cout << (e.what()) << std::endl;
156  };
157 
158  void report() {
159  HMBDC_LOG_n("msgSize=", msgSize_, " mps=", periodicMsgCount_
160  , " missed=" , missed_ , '(' , missed_?missed_ * 100ul / (missed_ + totalMsgReceived_):0
161  , "%) outOfOrder=" , outOfOrder_
162  , " duplicate=" , duplicate_
163  , " totalMsgCount=" , totalMsgReceived_
164  );
165 
166  periodicMsgCount_ = 0u;
167  }
168 
169  RecvCtx& ctx_;
170  size_t periodicMsgCount_;
171  size_t seq_;
172  size_t totalMsgReceived_;
173  size_t missed_;
174  size_t outOfOrder_;
175  size_t duplicate_;
176  size_t msgSize_;
177 };
178 
179 void runInReceiverMode(Config const& config, Topic const& topic, uint16_t msgSize, uint16_t inBuferSizePower2, uint16_t startCore) {
180  using namespace std;
181  using namespace boost;
182  SingletonGuardian<HMBDC_LOGGER> logGuard(std::cout);
183  RecvCtx ctx(inBuferSizePower2, 1, msgSize);
184 
186  auto& net = NetContext::instance();
187 
188  uint64_t cpuMask = (1ul << startCore);
189  auto engine = net.createRecvTransportEngine(config, ctx.buffer());
190  ReceiverClient client(ctx, msgSize);
191  net.listenTo(topic);
192  ctx.start(*engine, cpuMask, true);
193  cpuMask <<= 0x1u;
194  ctx.start(client, cpuMask, false);
196  [&ctx]() {
197  ctx.stop();
198  }
199  );
200  ctx.join();
201 
202  net.stopListenTo(topic);
203 }
204 
205 int startNetPerf(char const* dftUserConfig, char const* helpStrIn, int argc, char** argv) {
206  using namespace std;
207  string role;
208  string userCfg;
209  uint16_t msgSize;
210  uint16_t startCore;
211  uint16_t inBuferSizePower2;
212  Topic topic("t");
213  if (argc > 1) role = string(argv[1]);
214  if (role != string("--sender")
215  && role != string("--receiver")) {
216  cerr << "1st arg needs to be either --sender or --receiver" << endl;
217  exit(1);
218  }
219  argc--;
220  argv++;
221 
222  bool isSender = role == string("--sender");
223  namespace po = boost::program_options;
224  po::options_description desc("additional allowed options (after --sender or --receiver)");
225  string helpStr =
226  "This program can be used to test messaging performance among a group of hosts.\n"
227  "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n"
228  "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n"
229  "ctrl-c terminate.\n";
230  helpStr += string(helpStrIn);
231  desc.add_options()
232  ("help", helpStr.c_str())
233  ("cfg,c", po::value<std::string>(&userCfg), "use the user provided config file and ignore the ones in command options")
234  ("print", "print effective cfg values")
235  ("startCore,C", po::value<uint16_t>(&startCore)->default_value(0u), "the test uses 3 cores, specify starting core number")
236  ("msgSize", po::value<uint16_t>(&msgSize)->default_value(8u), "size of the message (8-1024)")
237  ("inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u), "2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
238  ("topic", po::value<Topic>(&topic)->default_value(topic), "topic used in the test");
239 
240 
241  Config dft(dftUserConfig, isSender?"tx":"rx");
242  auto params = dft.content();
243  for (auto it = params.begin(); it != params.end();) {
244  string& name = it->first;
245  string& val = it->second;
246  it++;
247  string& comment = it->second;
248  it++;
249  desc.add_options()
250  (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
251  }
252 
253  po::positional_options_description p;
254  po::variables_map vm;
255  po::store(po::command_line_parser(argc, argv).
256  options(desc).positional(p).run(), vm);
257  po::notify(vm);
258 
259  if (argc == 1 || vm.count("help") || msgSize < 8u) {
260  cout << desc << "\n";
261  return 0;
262  }
263 
264  Config config;
265  if (userCfg.size()) {
266  std::istringstream cs(userCfg);
267  config = Config(cs, isSender?"tx":"rx");
268  } else {
269  for (auto& p : params) {
270  config.put(p.first, p.second);
271  }
272  }
273 
274  if (vm.count("print")) {
275  write_json(cout, config);
276  exit(0);
277  }
278 
279  if (isSender) {
280  runInSenderMode(config, topic, msgSize, startCore);
281  } else {
282  if (!inBuferSizePower2) {
283  inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
284  cout << "auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
285  }
286  runInReceiverMode(config, topic, msgSize, inBuferSizePower2, startCore);
287  }
288 }
289 
290 }}}
291 
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: Client.hpp:11
Definition: NetPerf.hpp:112
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:588
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:597
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: Timers.hpp:69
Definition: Message.hpp:21
Definition: NetPerf.hpp:41
Definition: GuardedSingleton.hpp:12
Definition: NetPerf.hpp:24
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:274
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:198
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:553
Definition: Time.hpp:15
Definition: Context.hpp:384
Definition: Rater.hpp:10
static void onTermIntDo(function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:29
Definition: Client.hpp:39
Definition: Client.hpp:11
Definition: Timers.hpp:100