a partition Context rightlyfully doesn't contain a thread pool and all its Clients are in direct mode. Pool related interfaces are turned off in compile time
#include "hmbdc/app/mcast/NetContext.hpp"
#include "hmbdc/time/Timers.hpp"
#include "hmbdc/os/Signals.hpp"
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <algorithm>
#include <string>
#include <memory>
#include <unistd.h>
using namespace boost;
Topic REQUEST_TOPIC(
"req");
struct Request
char word[120];
};
Topic RESULT_TOPIC(
"res");
struct Result
char reversedWord[120];
};
auto CONFIG = R"|(
{
"outBufferSizePower2" : 13,
"request" : {
"topicRegex" : "req",
"mcastPort" : 4321,
"mcastAddr" : "232.43.211.234"
},
"result" : {
"topicRegex" : "res",
"mcastPort" : 4321,
"mcastAddr" : "232.43.211.235"
}
}
)|";
struct WordRequester
:
Client<WordRequester, Result> {
WordRequester()
: sender_(NetCtx::instance().getSender(REQUEST_TOPIC))
, resultCount_(0) {
}
void sendRequests(size_t dup) {
string line;
while(getline(cin, line)) {
for (size_t i = 0; i < dup; ++i) {
sender_->sendBytes(Request::typeTag
, line.c_str()
, min(line.size() + 1, sizeof(Request::word)));
}
}
}
void handleMessageCb(Result const& m) {
resultCount_++;
}
size_t resultCount_;
};
void runClient(
Config const& config,
size_t msgDupCount) {
ClientContext ctx;
auto& net = NetCtx::instance();
Config requestConfig(config.get_child(
"request"), config);
auto sengine = net.createSendTransportEngine(requestConfig
, sizeof(Request));
Config resultConfig(config.get_child(
"result"), config);
auto rengine = net.createRecvTransportEngine(resultConfig
, ctx.buffer());
WordRequester wr;
ctx.start(*sengine, 0x01ul
, *rengine, 0x02ul
, wr, 0x04ul);
net.listenTo(RESULT_TOPIC);
wr.sendRequests(msgDupCount);
[&ctx] {
ctx.stop();
}
);
cerr << "all requests sent, waiting for results, press ctrl-d, ctrl-c to terminate" << endl;
ctx.join();
cout << "\nresult count = " << wr.resultCount_ << endl;
}
struct WordReverser
:
Client<WordReverser, Request>
WordReverser()
, sender_(NetCtx::instance().getSender(RESULT_TOPIC))
, periodTps_(0)
, peakTps_(0) {
setCallback(
record();
}
);
schedule(SysTime::now(), *this);
}
void handleMessageCb(Request const& m) {
char s[sizeof(Result::reversedWord)];
auto l = min(sizeof(s) -1, strnlen(m.word, sizeof(s) -1));
reverse_copy(m.word, m.word + l, s);
s[l] = 0;
sender_->sendBytes(Result::typeTag, s, l + 1);
periodTps_++;
}
void record() {
peakTps_ = max(peakTps_, periodTps_);
periodTps_ = 0;
}
size_t periodTps_;
size_t peakTps_;
};
void runServer(
Config const& config) {
ServerContext ctx;
auto& net = NetCtx::instance();
Config resultConfig(config.get_child(
"result"), config);
auto sengine = net.createSendTransportEngine(resultConfig
, sizeof(Result));
Config requestConfig(config.get_child(
"request"), config);
auto rengine = net.createRecvTransportEngine(requestConfig
, ctx.buffer()
if (h->typeTag() == Request::typeTag) {
auto& m = h->wrapped<Request>();
uint8_t c = (uint8_t) m.word[0];
if (c >= 0 && c <= 127) {
return 1;
}
}
return -1;
}
);
WordReverser wr[3];
ctx.start(
wr[0], 0x08ul
, wr[1], 0x10ul
, wr[2], 0x20ul
, *sengine, 0x40ul
, *rengine, 0x80ul);
net.listenTo(REQUEST_TOPIC);
[&ctx] {
ctx.stop();
}
);
ctx.join();
cout << endl;
for (size_t i = 0; i < sizeof(wr) / sizeof(wr[0]); i++) {
cout << "worker " << i << " peak tps=" << wr[i].peakTps_ << endl;
}
}
int main(int argc, char** argv) {
istringstream is(CONFIG);
if (argc == 3) {
} else if (argc == 2) {
}
else {
cerr << argv[0] << " incoming-local-ip [sendBytesPerSec - use 0 here for no rate control]" << endl;
cerr << "multicast should be enabled on local-ip network, server sides requires 6 cores and client side requires 2" << endl;
return -1;
}
config.put("ifaceAddr", argv[1]);
if (argc == 3) {
runClient(config, lexical_cast<size_t>(argv[2]));
} else {
runServer(config);
}
return 0;
}