hmbdc
simplify-high-performance-messaging-programming
ConsoleClient.hpp
1 #include "hmbdc/app/LoggerT.hpp"
2 #include "hmbdc/app/Context.hpp"
3 
4 #include <boost/format.hpp>
5 
6 #include <iostream>
7 #include <iomanip>
8 #include <memory>
9 #include <unistd.h>
10 
11 namespace hmbdc { namespace app { namespace utils {
12 
13 using namespace std;
14 /**
15  * @brief a Client that work as a console to send and receive network messages
16  * @details a Client that receives all net messages deleived to the process
17  * it uses JustBytes to indicate it wants all the messages bytes
18  * regardless of their types - still Topic filtering is effective
19  *
20  * @tparam NetCtx some NetContext type for a specific transport mechanism,
21  * for example mcast::NetContext, tcpcast::NetContext
22  */
23 template <typename NetCtx>
25 : Client<ConsoleClient<NetCtx>, JustBytes> {
26  /**
27  * @brief construct
28  * @details just construct the Client - need to be run in the Context later
29  *
30  * @param ctx a runtime sized context that the Client will run in
31  * @param myCin for incoming commands
32  * @param myCout for output
33  * @param myCerr for error
34  */
36  , istream& myCin = cin
37  , ostream& myCout = cout
38  , ostream& myCerr = cerr
39  , string initCmd = "")
40 
41  : ctx_(ctx)
42  , hex_(true)
43  , myCin_(myCin)
44  , myCout_(myCout)
45  , myCerr_(myCerr)
46  , sender_(nullptr)
47  , initCmd_(initCmd) {
48  thread stdinThread([this]() {
49  stdinThreadEntrance();
50  });
51  stdinThread_ = move(stdinThread);
52  }
53 
54  char const* hmbdcName() const {
55  return "console";
56  }
57 
58  /**
59  * @brief documentation for all commands the console interprets
60  * @details commands are seperated by \n, when read eof, terminates console
61  * @return a documentation string
62  */
63  static char const* help() {
64  return
65 "\nCONSOLE LANGUAGE:\n"
66 "topic <topic>, set current topic\nexample: topic console\n\n"
67 "ohex, output message in hex format\nexample: ohex\n\n"
68 "sendhex <tag> <hex bytes>, send a binary message with a message tag\nexample: sendhex 1001 01 00 0f ef\n\n"
69 "ostr, output message in string format\nexample: ostr\n\n"
70 "sendstr <tag> <string>, send a string message with a message tag\nexample: sendstr 1001 hello\n\n"
71 "listen <topic>, console start to listen to the topic\nexample: listen my_topic\n\n"
72 "stoplisten <topic>, console stop listen to the topic\nexample: stoplisten john\n\n"
73 ;
74  }
75 
76  void stoppedCb(exception const& e){
77  myCout_ << (e.what()) << endl;
78  };
79 
80  /**
81  * @brief callback for JustBytes
82  * @details it is the user's responsibility to figure ouot the size of the message
83  * thru tag and interpret the bytes
84  *
85  * @param tag message tag - starting from 1000
86  * @param bytes contents of the message - will not exceed the associated
87  * Context::maxMessageSize() return value
88  */
89  void handleJustBytesCb(uint16_t tag, uint8_t* bytes) {
90  myCout_ << dec << tag;
91  if (hex_) {
92  for (auto p = bytes; p != bytes + ctx_.maxMessageSize(); ++p) {
93  myCout_ << ' ' << boost::format("%02x") % (uint16_t)*p;
94  }
95  } else {
96  myCout_ << ' ' << string((char*)bytes, strnlen((char*)bytes, ctx_.maxMessageSize()));
97  }
98  myCout_ << endl;
99  }
100 
101  /**
102  * @brief wait until the console closes
103  */
105  stdinThread_.join();
106  }
107 
108  private:
109  void stdinThreadEntrance() {
110  processCmd(initCmd_);
111  processCmd(myCin_);
112  }
113 
114  void processCmd(istream& is) {
115  string line;
116  while(getline(is, line)) {
117  if (!line.size()) continue;
118  istringstream iss{line};
119  string op;
120  iss >> op;
121  if (op == "topic") {
122  string topic;
123  iss >> topic;
124  Topic tmp(topic.c_str());
125  auto sender = NetCtx::instance().getSender(tmp);
126  if (sender) {
127  topic_ = tmp;
128  sender_ = sender;
129  } else {
130  myCerr_ << " cannot find transport for " << topic << endl;
131  }
132  } else if (op == "sendstr") {
133  if (!sender_) {
134  myCerr_ << " topic is not set yet" << endl;
135  continue;
136  }
137  uint16_t tag;
138  iss >> tag;
139  string msg;
140  iss >> msg;
141  if (iss) {
142  sender_->sendBytes(tag, msg.c_str(),
143  min(msg.size() + 1u, ctx_.maxMessageSize())
144  );
145  } else {
146  myCerr_ << " sendstr grammar error" << endl;
147  }
148  } else if (op == "sendhex") {
149  if (!sender_) {
150  myCerr_ << " topic is not set yet" << endl;
151  continue;
152  }
153  uint16_t tag;
154  iss >> tag;
155  iss >> hex;
156  vector<uint8_t> bytes;
157  uint16_t tmp;
158  while (iss >> tmp) {
159  bytes.push_back((uint8_t)tmp);
160  }
161 
162  if (iss) {
163  sender_->sendBytes(tag, &bytes[0],
164  min(bytes.size(), ctx_.maxMessageSize())
165  );
166  } else {
167  myCerr_ << " sendhex grammar error" << endl;
168  }
169  } else if (op == "ohex") {
170  hex_ = true;
171  } else if (op == "ostr") {
172  hex_ = false;
173  } else if (op == "listen") {
174  string topic;
175  iss >> topic;
176  NetCtx::instance().listenTo(Topic(topic));
177  } else if (op == "stoplisten") {
178  string topic;
179  iss >> topic;
180  NetCtx::instance().stopListenTo(Topic(topic));
181  } else {
182  myCerr_ << " unknown command " << op << endl;
183  }
184  }
185  }
186 
187  hmbdc::app::Context<>& ctx_;
188  atomic<bool> hex_;
189  thread stdinThread_;
190  istream& myCin_;
191  ostream& myCout_;
192  ostream& myCerr_;
193  Topic topic_;
194  typename NetCtx::Sender* sender_;
195  istringstream initCmd_;
196 };
197 
198 }}}
199 
static char const * help()
documentation for all commands the console interprets
Definition: ConsoleClient.hpp:63
void handleJustBytesCb(uint16_t tag, uint8_t *bytes)
callback for JustBytes
Definition: ConsoleClient.hpp:89
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
a Client that work as a console to send and receive network messages
Definition: ConsoleClient.hpp:24
Definition: TypedString.hpp:74
ConsoleClient(Context<> &ctx, istream &myCin=cin, ostream &myCout=cout, ostream &myCerr=cerr, string initCmd="")
construct
Definition: ConsoleClient.hpp:35
Definition: Context.hpp:384
void waitUntilFinish()
wait until the console closes
Definition: ConsoleClient.hpp:104
Definition: Client.hpp:39
Definition: Client.hpp:11