hmbdc
simplify-high-performance-messaging-programming
ConsoleRunner.hpp
1 #include <iostream>
2 #include "hmbdc/app/utils/ConsoleClient.hpp"
3 #include "hmbdc/os/Signals.hpp"
4 #include "hmbdc/numeric/BitMath.hpp"
5 
6 #include <boost/program_options.hpp>
7 #include <memory>
8 #include <unistd.h>
9 #include <signal.h>
10 
11 namespace hmbdc { namespace app { namespace utils {
12 
13 /**
14  * @example server-cluster-server.json
15  * @example tcpcast.py
16  */
17 
18 namespace consolerunner_detail {
19 using namespace std;
20 using namespace hmbdc::numeric;
21 using namespace hmbdc::pattern;
22 using namespace boost::property_tree;
23 
24 
25 using LoggerContext = HMBDC_LOG_CONTEXT;
26 using MyContext = hmbdc::app::Context<>; //run time sized
27 using MyLogger = LoggerT<LoggerContext>;
28 
29 template <typename NetContext>
30 void run(ptree const& configIn) {
31  Config config(configIn);
32  using namespace std;
33  using namespace boost;
34 
35  SingletonGuardian<MyLogger> logGuard(cerr);
36 
37  auto maxMessageSize = config.get<uint16_t>("maxMessageSize");
38 
39  MyContext ctx(config.get<uint16_t>("inBufferSizePower2")
40  , config.get_child("transports").size() * 2 + 1 //max transports count + a Client
41  , maxMessageSize
42  );
43 
45 
46  auto coreBitMask = config.getHex<uint64_t>("coreBitMask");
47 
48  //make sure coreBitMask is not zero
49  if (coreBitMask == 0) {
50  coreBitMask = (1ul << std::thread::hardware_concurrency()) - 1ul;
51  }
52 
53  //start ctx and add its clients
54  ctx.start(setBitsCount(coreBitMask), coreBitMask);
55  // ctx.start(0, 0, true);
56 
57  for (auto const& t : config.get_child("transports")) {
58  Config tconf(config, t.second);
59  auto poolThreadAffinity = 1ul;
60  if (t.second.get("tx", false)) {
61  auto sengine = NetContext::instance().createSendTransportEngine(
62  tconf, maxMessageSize);
63  // ctx.start(*sengine, poolThreadAffinity, true);
64  ctx.addToPool(*sengine, poolThreadAffinity);
65  }
66  if (t.second.get("rx", false)) {
67  auto rengine = NetContext::instance().createRecvTransportEngine(
68  tconf, ctx.buffer());
69  ctx.addToPool(*rengine, poolThreadAffinity);
70  }
71  poolThreadAffinity <<= 1ul;
72  //wrap around
73  if (poolThreadAffinity == (1ul << setBitsCount(coreBitMask))) {
74  poolThreadAffinity = 1ul;
75  }
76  }
77 
78  std::unordered_set<uint16_t> attTags;
79  try {
80  config(attTags, "allowRecvMemoryAttachment");
81  } catch (...) {} //some transports don't support this config
82 
83  ConsoleClient<NetContext> client(ctx
84  , cin
85  , cout
86  , cerr
87  , config.get("initCmd", "")
88  , attTags);
89 
90  ctx.addToPool(client);
92  [&ctx]() {
93  ctx.stop();
94  });
95 
96  client.waitUntilFinish();
97  sleep(config.get("secWaitAfterInputDone", 1));
98  ctx.stop();
99  ctx.join();
100 }
101 
102 
103 template <typename NetContext>
104 int startConsole(char const* helpStrIn, int argc, char** argv) {
105  using namespace std;
106  if (argc != 2) {
107  cout << helpStrIn << endl;
108  exit(1);
109  }
110 
111  ptree config;
112  string userCfg(argv[1]);
113  std::ifstream cs(userCfg);
114  read_json(cs, config);
115  run<NetContext>(config);
116  return 0;
117 }
118 
119 } //consolerunner_detail
120 
121 using consolerunner_detail::startConsole;
122 
123 }}}
124 
Definition: TypedString.hpp:74
Definition: BitMath.hpp:6
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
Definition: GuardedSingleton.hpp:9
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:402
Definition: Base.hpp:12