hmbdc
simplify-high-performance-messaging-programming
Pingpong.hpp
1 #include "hmbdc/app/utils/changeSched.hpp"
2 #include "hmbdc/app/Client.hpp"
3 #include "hmbdc/app/Context.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/pattern/GuardedSingleton.hpp"
6 #include "hmbdc/numeric/StatHistogram.hpp"
7 #include "hmbdc/time/Rater.hpp"
8 #include "hmbdc/os/Signals.hpp"
9 
10 #include <iostream>
11 #include <memory>
12 #include <functional>
13 #include <unistd.h>
14 #include <sys/mman.h>
15 
16 namespace hmbdc { namespace app { namespace utils {
17 
18 using namespace hmbdc::app;
19 using namespace hmbdc::pattern;
20 using namespace hmbdc::time;
21 using namespace hmbdc::numeric;
22 using namespace std;
23 
25 
26 struct Ball
27 : hasTag<1001> {
28  Ball(size_t s)
29  : size(s)
30  , ts(SysTime::now())
31  {}
32  size_t size;
33  SysTime ts;
34  char padding[1024 - 16];
35 };
36 
37 template <typename Sender>
38 struct Pinger
39 : Client<Pinger<Sender>, Ball> {
40  Pinger(Sender* sender, uint16_t msgPerSec, uint16_t msgSize, size_t skipFirst)
41  : rater_(Duration::seconds(1), msgPerSec, 1u)
42  , sender_(sender)
43  , hist_(Duration::microseconds(0), Duration::microseconds(1000), 50000)
44  , periodicPingCount_(0)
45  , pingCount_(0)
46  , skipped_(skipFirst)
47  , msgPerSec_(msgPerSec)
48  , msgSize_(msgSize) {
49  }
50 
51  char const* hmbdcName() const {
52  return "pinger";
53  }
54 
55  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
56  cout << "Started with the first " << skipped_ << " values ignored(x), press ctrl-c to get results" << endl;
57  };
58 
59  void handleMessageCb(Ball const& m) {
60  auto now = SysTime::now();
61  auto lat = now - m.ts;
62  if (!skipped_) {
63  hist_.add(lat);
64  } else {
65  --skipped_;
66  }
67  }
68 
69  void stoppedCb(exception const& e){
70  cerr << e.what() << endl;
71  };
72 
73  void invokedCb(uint16_t) override {
74  if (unlikely(rater_.check())) {
75  if (++periodicPingCount_ == msgPerSec_) {
76  cout << (skipped_?'x':'.') << flush;
77  periodicPingCount_ = 0;
78  }
79  Ball p(msgSize_);
80  sender_->send(p, msgSize_);
81  rater_.commit();
82  if (!skipped_) pingCount_++;
83  }
84  }
85 
86  void finalReport() {
87  cout << "\nround trip latency:(" << hist_.sampleSize() << '/' << pingCount_ << "):";
88  hist_.display(cout, {0, 25, 50, 75, 90, 95, 99, 99.9, 99.99, 100});
89  cout << endl;
90  }
91 
92  Rater rater_;
93  Sender* sender_;
94  Hist hist_;
95  size_t periodicPingCount_;
96  size_t pingCount_;
97  size_t skipped_;
98  uint16_t msgPerSec_;
99  uint16_t msgSize_;
100 };
101 
102 
103 template <typename Sender>
104 struct Ponger
105 : Client<Ponger<Sender>, Ball> {
106  Ponger(Sender* sender)
107  : sender_(sender) {
108  }
109 
110  char const* hmbdcName() const {
111  return "ponger";
112  }
113 
114  void messageDispatchingStartedCb(uint16_t) override {
115  cout << "Started, press ctrl-c to stop" << endl;
116  };
117 
118  void handleMessageCb(Ball const& m) {
119  sender_->send(m, m.size);
120  }
121 
122  void stoppedCb(std::exception const& e){
123  std::cerr << e.what() << std::endl;
124  };
125 
126  Sender* sender_;
127 };
128 
129 template <typename NetContext>
130 int
131 pingpong(Config const& config) {
132  uint16_t startCore = config.getExt<uint16_t>("startCore");
133  size_t skipFirst = config.getExt<size_t>("skipFirst");
134  bool ping = config.getExt<bool>("ping");
135  auto runTime = config.getExt<uint32_t>("runTime");
136 
138  auto& net = NetContext::instance();
139  auto msgSize = config.getExt<uint16_t>("msgSize");
140  msgSize = min<uint16_t>(msgSize, 512);
141  msgSize = max<uint16_t>(msgSize, 16);
142  using MyContext = Context<>;
143  MyContext ctx(10
144  , 10
145  , msgSize
146  );
147  Config pingCfg(config, "ping");
148  Config pongCfg(config, "pong");
149  cout << "initailizing..." << endl;
150  if (ping) {
151  auto sengine = net.createSendTransportEngine(pingCfg, msgSize);
152  auto rengine = net.createRecvTransportEngine(pongCfg, ctx.buffer());
153  Pinger<typename NetContext::Sender> pinger(net.getSender("ping")
154  , min<uint16_t>(config.getExt<uint16_t>("msgPerSec"), 1000u)
155  , msgSize
156  , skipFirst
157  );
158  net.listenTo("pong");
159 
160  uint64_t cpuMask = (1ul << startCore);
161 
162  ctx.start(1, cpuMask, true);
163  ctx.addToPool(*sengine);
164  ctx.addToPool(*rengine);
165  cpuMask <<= 1u;
166  ctx.start(pinger, cpuMask);
168  [&ctx]() {
169  ctx.stop();
170  });
171  if (runTime) {
172  sleep(runTime);
173  ctx.stop();
174  }
175 
176  ctx.join();
177 
178  pinger.finalReport();
179  } else {
180  //ponger has to assume the max size message for sending
181  auto sengine = net.createSendTransportEngine(pongCfg, sizeof(Ball));
182  auto rengine = net.createRecvTransportEngine(pingCfg, ctx.buffer());
183 
184  Ponger<typename NetContext::Sender> ponger(net.getSender("pong"));
185  net.listenTo("ping");
186 
187  uint64_t cpuMask = (1ul << startCore);
188 
189  ctx.start(1, cpuMask, true);
190  ctx.addToPool(*sengine);
191  ctx.addToPool(*rengine);
192 
193  cpuMask <<= 1u;
194  ctx.start(ponger, cpuMask);
196  [&ctx]() {
197  ctx.stop();
198  });
199  if (runTime) {
200  sleep(runTime);
201  ctx.stop();
202  }
203 
204  ctx.join();
205  }
206  return 0;
207 }
208 
209 
210 }}}
Definition: Pingpong.hpp:104
Definition: Pingpong.hpp:38
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: Client.hpp:11
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:588
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:597
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: Message.hpp:21
Definition: GuardedSingleton.hpp:12
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:274
Definition: GuardedSingleton.hpp:9
void start(uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask, Args &&...args)
start the context and specify its Pool and direct Clients
Definition: Context.hpp:553
Definition: Pingpong.hpp:26
Definition: Time.hpp:15
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is running in pool mode
Definition: Context.hpp:477
Definition: Context.hpp:384
Definition: Rater.hpp:10
static void onTermIntDo(function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:29
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:143
Definition: Client.hpp:39
Definition: Rater.hpp:13
Definition: Client.hpp:11