hmbdc
simplify-high-performance-messaging-programming
Pingpong.hpp
1 #include "hmbdc/app/Base.hpp"
2 #include "hmbdc/numeric/StatHistogram.hpp"
3 #include "hmbdc/time/Rater.hpp"
4 #include "hmbdc/os/Signals.hpp"
5 
6 #include <iostream>
7 #include <memory>
8 #include <functional>
9 #include <unistd.h>
10 #include <sys/mman.h>
11 
12 namespace hmbdc { namespace app { namespace utils {
13 
14 namespace pingpong_detail {
15 using namespace hmbdc::time;
16 using namespace hmbdc::numeric;
17 using namespace std;
18 
20 
21 struct Ball
22 : hasTag<1001> {
23  Ball(size_t s)
24  : size(s)
25  , ts(SysTime::now())
26  {}
27  size_t size;
28  SysTime ts;
29  char padding[1013 - 16];
30 };
31 
32 template <typename Sender>
33 struct Pinger
34 : Client<Pinger<Sender>, Ball> {
35  Pinger(Sender* sender, uint16_t msgPerSec, uint16_t msgSize, size_t skipFirst)
36  : rater_(Duration::seconds(1), msgPerSec, 1u)
37  , sender_(sender)
38  , hist_(Duration::microseconds(0), Duration::microseconds(1000), 50000)
39  , periodicPingCount_(0)
40  , pingCount_(0)
41  , skipped_(skipFirst)
42  , msgPerSec_(msgPerSec)
43  , msgSize_(msgSize) {
44  }
45 
46  char const* hmbdcName() const {
47  return "pinger";
48  }
49 
50  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
51  cout << "Started with the first " << skipped_ << " values ignored(x), press ctrl-c to get results" << endl;
52  };
53 
54  void handleMessageCb(Ball const& m) {
55  auto now = SysTime::now();
56  auto lat = now - m.ts;
57  if (!skipped_) {
58  hist_.add(lat);
59  } else {
60  --skipped_;
61  }
62  }
63 
64  void stoppedCb(exception const& e) override {
65  cerr << e.what() << endl;
66  };
67 
68  void invokedCb(uint16_t) override {
69  if (hmbdc_unlikely(rater_.check())) {
70  if (++periodicPingCount_ == msgPerSec_) {
71  cout << (skipped_?'x':'.') << flush;
72  periodicPingCount_ = 0;
73  }
74  Ball p(msgSize_);
75  sender_->send(p, msgSize_);
76  rater_.commit();
77  if (!skipped_) pingCount_++;
78  }
79  }
80 
81  void finalReport() {
82  cout << "\nround trip latency:(" << hist_.sampleSize() << '/' << pingCount_ << "):";
83  hist_.display(cout, {0, 25, 50, 75, 90, 95, 99, 99.9, 99.99, 100});
84  cout << endl;
85  }
86 
87  Rater rater_;
88  Sender* sender_;
89  Hist hist_;
90  size_t periodicPingCount_;
91  size_t pingCount_;
92  size_t skipped_;
93  uint16_t msgPerSec_;
94  uint16_t msgSize_;
95 };
96 
97 
98 template <typename Sender>
99 struct Ponger
100 : Client<Ponger<Sender>, Ball> {
101  Ponger(Sender* sender)
102  : sender_(sender) {
103  }
104 
105  char const* hmbdcName() const {
106  return "ponger";
107  }
108 
109  void messageDispatchingStartedCb(uint16_t) override {
110  cout << "Started, press ctrl-c to stop" << endl;
111  };
112 
113  void handleMessageCb(Ball const& m) {
114  sender_->send(m, m.size);
115  }
116 
117  void stoppedCb(exception const& e) override {
118  cerr << e.what() << endl;
119  };
120 
121  Sender* sender_;
122 };
123 
124 template <typename NetContext>
125 int
126 pingpong(Config const& config) {
127  uint16_t startCore = config.getExt<uint16_t>("startCore");
128  size_t skipFirst = config.getExt<size_t>("skipFirst");
129  bool ping = config.getExt<bool>("ping");
130  auto runTime = config.getExt<uint32_t>("runTime");
131 
133  auto& net = NetContext::instance();
134  auto msgSize = config.getExt<uint16_t>("msgSize");
135  msgSize = min<uint16_t>(msgSize, 512);
136  msgSize = max<uint16_t>(msgSize, 16);
137  using MyContext = Context<>;
138  MyContext ctx(10
139  , 10
140  , msgSize
141  );
142  Config pingCfg(config, "ping");
143  Config pongCfg(config, "pong");
144  cout << "initailizing..." << endl;
145  if (ping) {
146  auto sengine = net.createSendTransportEngine(pingCfg, msgSize);
147  auto rengine = net.createRecvTransportEngine(pongCfg, ctx.buffer());
148  Pinger<typename NetContext::Sender> pinger(net.getSender("ping")
149  , min<uint16_t>(config.getExt<uint16_t>("msgPerSec"), 1000u)
150  , msgSize
151  , skipFirst
152  );
153  net.listenTo("pong");
154 
155  uint64_t cpuMask = (1ul << startCore);
156 
157  ctx.start(1, cpuMask, true);
158  ctx.addToPool(*sengine);
159  ctx.addToPool(*rengine);
160  cpuMask <<= 1u;
161  ctx.start(pinger, cpuMask);
163  [&ctx]() {
164  ctx.stop();
165  });
166  if (runTime) {
167  sleep(runTime);
168  ctx.stop();
169  }
170 
171  ctx.join();
172 
173  pinger.finalReport();
174  } else {
175  //ponger has to assume the max size message for sending
176  auto sengine = net.createSendTransportEngine(pongCfg, sizeof(Ball));
177  auto rengine = net.createRecvTransportEngine(pingCfg, ctx.buffer());
178 
179  Ponger<typename NetContext::Sender> ponger(net.getSender("pong"));
180  net.listenTo("ping");
181 
182  uint64_t cpuMask = (1ul << startCore);
183 
184  ctx.start(1, cpuMask, true);
185  ctx.addToPool(*sengine);
186  ctx.addToPool(*rengine);
187 
188  cpuMask <<= 1u;
189  ctx.start(ponger, cpuMask);
191  [&ctx]() {
192  ctx.stop();
193  });
194  if (runTime) {
195  sleep(runTime);
196  ctx.stop();
197  }
198 
199  ctx.join();
200  }
201  return 0;
202 }
203 
204 } //pingpong_detail
205 
206 using pingpong_detail::pingpong;
207 }}}
class to hold an hmbdc configuration
Definition: Config.hpp:44
Definition: Pingpong.hpp:21
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
each message type has 16 bit tag
Definition: Message.hpp:30
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: Time.hpp:13
collect sample values and keep histogram for top percentages
Definition: StatHistogram.hpp:40
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:408
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
Definition: Base.hpp:12