hmbdc
simplify-high-performance-messaging-programming
ConsoleClient.hpp
1 #include "hmbdc/app/Base.hpp"
2 
3 #include <boost/format.hpp>
4 
5 #include <iostream>
6 #include <iomanip>
7 #include <memory>
8 #include <unistd.h>
9 #include <unordered_set>
10 
11 namespace hmbdc { namespace app { namespace utils {
12 
13 namespace consoleclient_detail {
14 using namespace std;
15 /**
16  * @class ConsoleClient<>
17  * @brief a Client that work as a console to send and receive network messages
18  * @details a Client that receives all net messages delivered to the process
19  * it uses JustBytes to indicate it wants all the messages bytes
20  * regardless of their types - still Topic filtering is effective
21  *
22  * @tparam NetCtx some NetContext type for a specific transport mechanism,
23  * for example mcast::NetContext, tcpcast::NetContext
24  */
25 template <typename NetCtx>
27 : Client<ConsoleClient<NetCtx>, JustBytes> {
28  /**
29  * @brief constructor
30  * @details just construct the Client - need to be run in the Context later
31  *
32  * @param ctx a runtime sized context that the Client will run in
33  * @param myCin for incoming commands
34  * @param myCout for output
35  * @param myCerr for error
36  */
38  , istream& myCin = cin
39  , ostream& myCout = cout
40  , ostream& myCerr = cerr
41  , string initCmd = ""
42  , std::unordered_set<uint16_t> memoryAttachmentTags = std::unordered_set<uint16_t>()
43  ) : ctx_(ctx)
44  , hex_(true)
45  , myCin_(myCin)
46  , myCout_(myCout)
47  , myCerr_(myCerr)
48  , sender_(nullptr)
49  , initCmd_(initCmd)
50  , memoryAttachmentTags_(memoryAttachmentTags) {
51  thread stdinThread([this]() {
52  stdinThreadEntrance();
53  });
54  stdinThread_ = move(stdinThread);
55  }
56 
57  char const* hmbdcName() const {
58  return "console";
59  }
60 
61  /**
62  * @brief documentation for all commands the console interprets
63  * @details commands are seperated by \n, when read eof, terminates console
64  * @return a documentation string
65  */
66  static char const* help() {
67  return
68 "\nCONSOLE LANGUAGE:\n"
69 "topic <topic>, set current topic\nexample: topic console\n\n"
70 "ohex, output message in hex format\nexample: ohex\n\n"
71 "sendhex <tag> <hex bytes>, send a binary message with a message tag\nexample: sendhex 1001 01 00 0f ef\n\n"
72 "ostr, output message in string format\nexample: ostr\n\n"
73 "sendstr <tag> <string>, send a string message with a message tag\nexample: sendstr 1001 hello\n\n"
74 "listen <topic>, console start to listen to the topic\nexample: listen my_topic\n\n"
75 "stoplisten <topic>, console stop listen to the topic\nexample: stoplisten john\n\n"
76 ;
77  }
78 
79  void stoppedCb(exception const& e) override {
80  myCout_ << (e.what()) << endl;
81  };
82 
83  /**
84  * @brief callback for JustBytes
85  * @details it is the user's responsibility to figure ouot the size of the message
86  * thru tag and interpret the bytes
87  *
88  * @param tag message tag - starting from 1000
89  * @param bytes contents of the message - will not exceed the associated
90  * Context::maxMessageSize() return value
91  */
92  void handleJustBytesCb(uint16_t tag, uint8_t* bytes) {
93  myCout_ << dec << tag;
94  hasMemoryAttachment* att = nullptr;
95  if (memoryAttachmentTags_.find(tag) != memoryAttachmentTags_.end()) {
96  att = reinterpret_cast<hasMemoryAttachment*>(bytes);
97  bytes += sizeof(hasMemoryAttachment);
98  }
99  if (hex_) {
100  for (auto p = bytes; p != bytes + ctx_.maxMessageSize(); ++p) {
101  myCout_ << ' ' << boost::format("%02x") % (uint16_t)*p;
102  }
103  if (att) {
104  myCout_ << '\n';
105  for (auto p = (uint8_t*)att->attachment; p != (uint8_t*)att->attachment + att->len; ++p) {
106  myCout_ << boost::format("%02x") % (uint16_t)*p << ' ';
107  }
108  }
109  } else {
110  myCout_ << ' ' << string((char*)bytes, strnlen((char*)bytes, ctx_.maxMessageSize()));
111  if (att) {
112  myCout_ << '\n';
113  myCout_ << string((char*)att->attachment, att->len);
114  }
115  }
116  myCout_ << endl;
117  }
118 
119  /**
120  * @brief wait until the console closes
121  */
123  stdinThread_.join();
124  }
125 
126  private:
127  void stdinThreadEntrance() {
128  processCmd(initCmd_);
129  processCmd(myCin_);
130  }
131 
132  void processCmd(istream& is) {
133  string line;
134  while(getline(is, line)) {
135  if (!line.size()) continue;
136  istringstream iss{line};
137  string op;
138  iss >> op;
139  if (op == "topic") {
140  string topic;
141  iss >> topic;
142  Topic tmp(topic.c_str());
143  auto sender = NetCtx::instance().getSender(tmp);
144  if (sender) {
145  topic_ = tmp;
146  sender_ = sender;
147  } else {
148  myCerr_ << " cannot find transport for " << topic << endl;
149  }
150  } else if (op == "sendstr") {
151  if (!sender_) {
152  myCerr_ << " topic is not set yet" << endl;
153  continue;
154  }
155  uint16_t tag;
156  iss >> tag;
157  string msg;
158  getline(iss, msg);
159  if (iss) {
160  sender_->sendBytes(tag, msg.c_str() + 1, //drop leading space
161  min(msg.size(), ctx_.maxMessageSize())
162  );
163  } else {
164  myCerr_ << " sendstr syntax error" << endl;
165  }
166  } else if (op == "sendstratt") {
167  if (!sender_) {
168  myCerr_ << " topic is not set yet" << endl;
169  continue;
170  }
171  uint16_t tag;
172  iss >> tag;
173  string msg;
174  getline(iss, msg);
175  if (!iss || !getline(is, line)) {
176  myCerr_ << " sendstratt syntax error" << endl;
177  continue;
178  }
179  auto att = malloc(line.size());
180  memcpy(att, line.c_str(), line.size());
181  auto rawSize = sizeof(hasMemoryAttachment) + msg.size();
182  char rawMsg[rawSize];
183  auto hma = new (rawMsg) hasMemoryAttachment;
184  hma->attachment = att;
185  hma->len = line.size();
186  memcpy((char*)rawMsg + sizeof(hasMemoryAttachment), msg.c_str() + 1, msg.size()); //drop leading space
187  sender_->sendBytes(tag, hma, min(rawSize, ctx_.maxMessageSize()));
188  } else if (op == "sendhex") {
189  if (!sender_) {
190  myCerr_ << " topic is not set yet" << endl;
191  continue;
192  }
193  uint16_t tag;
194  iss >> tag;
195  iss >> hex;
196  vector<uint8_t> bytes;
197  uint16_t tmp;
198  while (iss >> tmp) {
199  bytes.push_back((uint8_t)tmp);
200  }
201 
202  if (iss.eof()) {
203  sender_->sendBytes(tag, &bytes[0],
204  min(bytes.size(), ctx_.maxMessageSize())
205  );
206  } else {
207  myCerr_ << " sendhex syntax error" << endl;
208  }
209  } else if (op == "sendhexatt") {
210  if (!sender_) {
211  myCerr_ << " topic is not set yet" << endl;
212  continue;
213  }
214 
215  uint16_t tag;
216  iss >> tag;
217  iss >> hex;
218  vector<uint8_t> bytes;
219  uint16_t tmp;
220  while (iss >> tmp) {
221  bytes.push_back((uint8_t)tmp);
222  }
223  if (!iss.eof() || !getline(is, line)) {
224  myCerr_ << " sendhexhex syntax error" << endl;
225  continue;
226  }
227 
228  istringstream iss{line};
229  iss >> hex;
230  vector<uint8_t> attBytes;
231  while (iss >> tmp) {
232  attBytes.push_back((uint8_t)tmp);
233  }
234 
235  auto att = malloc(attBytes.size());
236  memcpy(att, &(attBytes[0]), attBytes.size());
237  auto rawSize = sizeof(hasMemoryAttachment) + attBytes.size();
238  char rawMsg[rawSize];
239  auto hma = new (rawMsg) hasMemoryAttachment;
240  hma->attachment = att;
241  hma->len = attBytes.size();
242  memcpy((char*)rawMsg + sizeof(hasMemoryAttachment), &bytes[0], bytes.size());
243  sender_->sendBytes(tag, hma, min(rawSize, ctx_.maxMessageSize()));
244  } else if (op == "ohex") {
245  hex_ = true;
246  } else if (op == "ostr") {
247  hex_ = false;
248  } else if (op == "listen") {
249  string topic;
250  iss >> topic;
251  NetCtx::instance().listenTo(Topic(topic));
252  } else if (op == "stoplisten") {
253  string topic;
254  iss >> topic;
255  NetCtx::instance().stopListenTo(Topic(topic));
256  } else {
257  myCerr_ << " unknown command " << op << endl;
258  }
259  }
260  }
261 
262  hmbdc::app::Context<>& ctx_;
263  atomic<bool> hex_;
264  thread stdinThread_;
265  istream& myCin_;
266  ostream& myCout_;
267  ostream& myCerr_;
268  Topic topic_;
269  typename NetCtx::Sender* sender_;
270  istringstream initCmd_;
271  std::unordered_set<uint16_t> memoryAttachmentTags_;
272 };
273 
274 } //consoleclient_detail
275 
276 template <typename NetCtx>
278 }}}
279 
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
static char const * help()
documentation for all commands the console interprets
Definition: ConsoleClient.hpp:66
void waitUntilFinish()
wait until the console closes
Definition: ConsoleClient.hpp:122
a Client that work as a console to send and receive network messages
Definition: ConsoleClient.hpp:26
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:402
ConsoleClient(Context<> &ctx, istream &myCin=cin, ostream &myCout=cout, ostream &myCerr=cerr, string initCmd="", std::unordered_set< uint16_t > memoryAttachmentTags=std::unordered_set< uint16_t >())
constructor
Definition: ConsoleClient.hpp:37
void handleJustBytesCb(uint16_t tag, uint8_t *bytes)
callback for JustBytes
Definition: ConsoleClient.hpp:92
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:47
if a specific hmbdc network transport (for example tcpcast) supports message with memory attachment...
Definition: Message.hpp:158
Definition: Base.hpp:12