hmbdc
simplify-high-performance-messaging-programming
ConsoleRunner.hpp
1 #include <iostream>
2 #include "hmbdc/app/utils/ConsoleClient.hpp"
3 #include "hmbdc/app/LoggerT.hpp"
4 #include "hmbdc/app/Client.hpp"
5 #include "hmbdc/app/Context.hpp"
6 #include "hmbdc/app/Config.hpp"
7 #include "hmbdc/os/Signals.hpp"
8 #include "hmbdc/numeric/BitMath.hpp"
9 
10 #include <boost/program_options.hpp>
11 #include <memory>
12 #include <unistd.h>
13 #include <signal.h>
14 
15 using namespace std;
16 using namespace hmbdc;
17 using namespace hmbdc::numeric;
18 using namespace hmbdc::app;
19 using namespace boost::property_tree;
20 
21 
22 namespace hmbdc { namespace app { namespace utils {
23 using LoggerContext = HMBDC_LOG_CONTEXT;
24 using MyContext = hmbdc::app::Context<>; //run time sized
25 using MyLogger = LoggerT<LoggerContext>;
26 
27 template <typename NetContext>
28 void run(ptree const& configIn) {
29  Config config(configIn);
30  using namespace std;
31  using namespace boost;
32 
33  SingletonGuardian<MyLogger> logGuard(cerr);
34 
35  auto maxMessageSize = config.get<uint16_t>("maxMessageSize");
36 
37  MyContext ctx(config.get<uint16_t>("inBufferSizePower2")
38  , config.get_child("transports").size() * 2 + 1 //max transports count + a Client
39  , maxMessageSize
40  );
41 
43 
44  auto coreBitMask = config.getHex<uint64_t>("coreBitMask");
45 
46  //make sure coreBitMask is not zero
47  if (coreBitMask == 0) {
48  coreBitMask = (1ul << std::thread::hardware_concurrency()) - 1ul;
49  }
50 
51  //start ctx and add its clients
52  ctx.start(setBitsCount(coreBitMask), coreBitMask);
53  // ctx.start(0, 0, true);
54 
55  for (auto const& t : config.get_child("transports")) {
56  Config tconf(config, t.second);
57  auto poolThreadAffinity = 1ul;
58  if (t.second.get("tx", false)) {
59  auto sengine = NetContext::instance().createSendTransportEngine(
60  tconf, maxMessageSize);
61  // ctx.start(*sengine, poolThreadAffinity, true);
62  ctx.addToPool(*sengine, poolThreadAffinity);
63  }
64  if (t.second.get("rx", false)) {
65  auto rengine = NetContext::instance().createRecvTransportEngine(
66  tconf, ctx.buffer());
67  ctx.addToPool(*rengine, poolThreadAffinity);
68  }
69  poolThreadAffinity <<= 1ul;
70  //wrap around
71  if (poolThreadAffinity == (1ul << setBitsCount(coreBitMask))) {
72  poolThreadAffinity = 1ul;
73  }
74  }
75 
76  ConsoleClient<NetContext> client(ctx
77  , cin
78  , cout
79  , cerr
80  , config.get("initCmd", ""));
81 
82  ctx.addToPool(client);
84  [&ctx]() {
85  ctx.stop();
86  });
87 
88  client.waitUntilFinish();
89  sleep(config.get("secWaitAfterInputDone", 1));
90  ctx.stop();
91  ctx.join();
92 }
93 
94 
95 template <typename NetContext>
96 int startConsole(char const* helpStrIn, int argc, char** argv) {
97  using namespace std;
98  if (argc != 2) {
99  cout << helpStrIn << endl;
100  exit(1);
101  }
102 
103  ptree config;
104  string userCfg(argv[1]);
105  std::ifstream cs(userCfg);
106  read_json(cs, config);
107  run<NetContext>(config);
108 }
109 
110 }}}
111 
class to hold an hmbdc configuration
Definition: Config.hpp:35
Definition: Client.hpp:11
Definition: LoggerT.hpp:90
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
Definition: GuardedSingleton.hpp:12
Definition: Context.hpp:384
static void onTermIntDo(function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:29
Definition: Client.hpp:11