hmbdc
simplify-high-performance-messaging-programming
Pingpong.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Base.hpp"
4 #include "hmbdc/numeric/Stat.hpp"
5 #include "hmbdc/time/Rater.hpp"
6 #include "hmbdc/os/Signals.hpp"
7 
8 #include <iostream>
9 #include <memory>
10 #include <functional>
11 #include <unistd.h>
12 #include <sys/mman.h>
13 
14 namespace hmbdc { namespace app { namespace utils {
15 
16 namespace pingpong_detail {
17 using namespace hmbdc::time;
18 using namespace hmbdc::numeric;
19 using namespace std;
20 
21 struct Ball
22 : hasTag<1001> {
23  Ball(size_t s)
24  : size(s)
25  , ts(SysTime::now())
26  {}
27  size_t size;
28  SysTime ts;
29  char padding[1013 - 16];
30 };
31 
32 template <typename Sender>
33 struct Pinger
34 : Client<Pinger<Sender>, Ball> {
35  Pinger(Sender* sender, uint16_t msgPerSec, uint16_t msgSize, size_t skipFirst)
36  : rater_(Duration::seconds(1), msgPerSec, 1u)
37  , sender_(sender)
38  , periodicPingCount_(0)
39  , pingCount_(0)
40  , skipped_(skipFirst)
41  , msgPerSec_(msgPerSec)
42  , msgSize_(msgSize) {
43  }
44 
45  char const* hmbdcName() const {
46  return "pinger";
47  }
48 
49  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
50  cout << "Started with the first " << skipped_ << " values ignored(x), press ctrl-c to get results" << endl;
51  };
52 
53  void handleMessageCb(Ball const& m) {
54  auto now = SysTime::now();
55  auto lat = now - m.ts;
56  if (!skipped_) {
57  stat_.add(lat);
58  } else {
59  --skipped_;
60  }
61  }
62 
63  void stoppedCb(exception const& e) override {
64  cerr << e.what() << endl;
65  };
66 
67  void invokedCb(uint16_t) override {
68  if (hmbdc_unlikely(rater_.check())) {
69  if (++periodicPingCount_ == msgPerSec_) {
70  cout << (skipped_?'x':'.') << flush;
71  periodicPingCount_ = 0;
72  }
73  Ball p(msgSize_);
74  sender_->send(p, msgSize_);
75  rater_.commit();
76  if (!skipped_) pingCount_++;
77  }
78  }
79 
80  void finalReport() {
81  cout << "\nround trip time (sec):(" << stat_.sampleSize() << '/' << pingCount_ << "):";
82  cout << stat_;
83  cout << endl;
84  }
85 
86  Rater rater_;
87  Sender* sender_;
88  Stat<Duration> stat_;
89  size_t periodicPingCount_;
90  size_t pingCount_;
91  size_t skipped_;
92  uint16_t msgPerSec_;
93  uint16_t msgSize_;
94 };
95 
96 
97 template <typename Sender>
98 struct Ponger
99 : Client<Ponger<Sender>, Ball> {
100  Ponger(Sender* sender)
101  : sender_(sender) {
102  }
103 
104  char const* hmbdcName() const {
105  return "ponger";
106  }
107 
108  void messageDispatchingStartedCb(uint16_t) override {
109  cout << "Started, press ctrl-c to stop" << endl;
110  };
111 
112  void handleMessageCb(Ball const& m) {
113  sender_->send(m, m.size);
114  }
115 
116  void stoppedCb(exception const& e) override {
117  cerr << e.what() << endl;
118  };
119 
120  Sender* sender_;
121 };
122 
123 template <typename NetContext>
124 int
125 pingpong(Config const& config, vector<uint16_t> cpus) {
126  size_t skipFirst = config.getExt<size_t>("skipFirst");
127  bool ping = config.getExt<bool>("ping");
128  auto runTime = config.getExt<uint32_t>("runTime");
129 
131  auto& net = NetContext::instance();
132  auto msgSize = config.getExt<uint16_t>("msgSize");
133  msgSize = min<uint16_t>(msgSize, 512);
134  msgSize = max<uint16_t>(msgSize, 16);
135  using MyContext = Context<>;
136  MyContext ctx(14
137  , 10
138  , msgSize
139  );
140  Config pingCfg(config, "ping");
141  Config pongCfg(config, "pong");
142  cout << "initailizing..." << endl;
143  if (ping) {
144  auto sengine = net.createSendTransportEngine(pingCfg, msgSize);
145  auto rengine = net.createRecvTransportEngine(pongCfg, ctx.buffer());
146  Pinger<typename NetContext::Sender> pinger(net.getSender("ping")
147  , config.getExt<uint16_t>("msgPerSec")
148  , msgSize
149  , skipFirst
150  );
151  net.listenTo("pong");
152 
153  ctx.start(*rengine, 1ul << cpus[0]
154  , *sengine, 1ul << cpus[1]
155  , pinger, 1ul << cpus[2]);
157  [&ctx]() {
158  ctx.stop();
159  });
160  if (runTime) {
161  sleep(runTime);
162  ctx.stop();
163  }
164 
165  ctx.join();
166 
167  pinger.finalReport();
168  } else {
169  auto sengine = net.createSendTransportEngine(pongCfg, msgSize);
170  auto rengine = net.createRecvTransportEngine(pingCfg, ctx.buffer());
171 
172  Ponger<typename NetContext::Sender> ponger(net.getSender("pong"));
173  net.listenTo("ping");
174  ctx.start(*rengine, 1ul << cpus[0]
175  , *sengine, 1ul << cpus[1]
176  , ponger, 1ul << cpus[2]);
178  [&ctx]() {
179  ctx.stop();
180  });
181  if (runTime) {
182  sleep(runTime);
183  ctx.stop();
184  }
185 
186  ctx.join();
187  }
188  return 0;
189 }
190 
191 } //pingpong_detail
192 
193 using pingpong_detail::pingpong;
194 }}}
void stoppedCb(exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: Pingpong.hpp:63
class to hold an hmbdc configuration
Definition: Config.hpp:44
Definition: Pingpong.hpp:21
T getExt(const path_type &param) const
get a value from the config
Definition: Config.hpp:154
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
each message type has 16 bit tag
Definition: Message.hpp:34
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: Time.hpp:14
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: Pingpong.hpp:49
void stoppedCb(exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: Pingpong.hpp:116
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:455
Definition: Rater.hpp:10
void invokedCb(uint16_t) override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: Pingpong.hpp:67
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
Definition: Rater.hpp:11
Definition: Base.hpp:12
void messageDispatchingStartedCb(uint16_t) override
called before any messages got dispatched - only once
Definition: Pingpong.hpp:108
Definition: Stat.hpp:11