Context template parameter inidcating each message is sent to one and only one of the clients within the Context and its attached ipc Contexts if appllies. each message is still subjected to Client's message type filtering When this Context is specialized using this type, the context normally works with homogeneous Clients to achieve load balance thru threads. No coordination is needed between Clients. Only the direct mode Clients are supported, thread pool is NOT supported by the Context - the pool related functions in Context are also disabled
There is no limit on how many Clients can be started in the Context. Also, there is no limit on when you can start a Client in this Context. see for partition Context usage
#include "hmbdc/app/mcast/NetContext.hpp"
#include "hmbdc/app/Context.hpp"
#include "hmbdc/time/Timers.hpp"
#include "hmbdc/os/Signals.hpp"
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <algorithm>
#include <string>
#include <memory>
#include <unistd.h>
using namespace boost;
Topic REQUEST_TOPIC(
"req");
struct Request
char word[120];
};
Topic RESULT_TOPIC(
"res");
struct Result
char reversedWord[120];
};
auto CONFIG = R"|(
{
"request" : {
"topicRegex" : "req",
"mcastPort" : 4321,
"mcastAddr" : "232.43.211.234"
},
"result" : {
"topicRegex" : "res",
"mcastPort" : 4321,
"mcastAddr" : "232.43.211.235"
}
}
)|";
struct WordRequester
:
Client<WordRequester, Result> {
WordRequester()
: sender_(NetCtx::instance().getSender(REQUEST_TOPIC))
, resultCount_(0) {
}
void sendRequests(size_t msgDupCount) {
string line;
while(getline(cin, line)) {
for (size_t i = 0; i < msgDupCount; ++i) {
sender_->sendBytes(Request::typeTag
, line.c_str()
, min(line.size() + 1, sizeof(Request::word)));
}
}
}
void handleMessageCb(Result const& m) {
resultCount_++;
}
size_t resultCount_;
};
void runClient(
Config const& config) {
ClientContext ctx;
SingletonGuardian<NetCtx> g;
auto& net = NetCtx::instance();
Config requestConfig(config.get_child(
"request"), config);
auto sengine = net.createSendTransportEngine(requestConfig
, sizeof(Request));
Config resultConfig(config.get_child(
"result"), config);
auto rengine = net.createRecvTransportEngine(resultConfig
, ctx.buffer());
WordRequester wr;
ctx.start(0, 0
, *sengine, 0x01ul
, *rengine, 0x02ul
, wr, 0x04ul);
net.listenTo(RESULT_TOPIC);
wr.sendRequests(requestConfig.getExt<size_t>("msgDupCount"));
[&ctx] {
ctx.stop();
}
);
cerr << "all requests sent, waiting for results, press ctrl-c to terminate" << endl;
ctx.join();
cout << "\nresult count = " << wr.resultCount_ << endl;
}
struct WordReverser
:
Client<WordReverser, Request>
WordReverser()
, sender_(NetCtx::instance().getSender(RESULT_TOPIC))
, periodTps_(0)
, peakTps_(0) {
setCallback(
record();
}
);
schedule(SysTime::now(), *this);
}
void handleMessageCb(Request const& m) {
char s[sizeof(Result::reversedWord)];
auto l = min(sizeof(s) -1, strnlen(m.word, sizeof(s) -1));
reverse_copy(m.word, m.word + l, s);
s[l] = 0;
sender_->sendBytes(Result::typeTag, s, l + 1);
periodTps_++;
}
void record() {
peakTps_ = max(peakTps_, periodTps_);
periodTps_ = 0;
}
size_t periodTps_;
size_t peakTps_;
};
void runServer(
Config const& config) {
ServerContext ctx;
SingletonGuardian<NetCtx> g;
auto& net = NetCtx::instance();
Config resultConfig(config.get_child(
"result"), config);
auto sengine = net.createSendTransportEngine(resultConfig
, sizeof(Result));
Config requestConfig(config.get_child(
"request"), config);
auto rengine = net.createRecvTransportEngine(requestConfig
, ctx.buffer()
if (h->typeTag() == Request::typeTag) {
auto& m = h->wrapped<Request>();
uint8_t c = (uint8_t) m.word[0];
if (c >= 0 && c <= 127) {
return 1;
}
}
return -1;
}
);
WordReverser wr[3];
ctx.start(
wr[0], 0x20ul
, wr[1], 0x02ul
, wr[2], 0x04ul
, *sengine, 0x08ul
, *rengine, 0x10ul);
net.listenTo(REQUEST_TOPIC);
[&ctx] {
ctx.stop();
}
);
ctx.join();
cout << endl;
for (size_t i = 0; i < sizeof(wr) / sizeof(wr[0]); i++) {
cout << "worker " << i << " peak tps=" << wr[i].peakTps_ << endl;
}
}
int main(int argc, char** argv) {
istringstream is(CONFIG);
if (argc == 3) {
config.get_child("request")
.put("msgDupCount", lexical_cast<uint32_t>(argv[2]));
} else if (argc == 2) {
}
else {
cerr << argv[0] << " incoming-local-ip [client-send-rate]" << endl;
cerr << "multicast should be enabled on local-ip network" << endl;
return -1;
}
if (string(argv[1]) == "127.0.0.1") {
config.put("loopback", true);
}
config.put("ifaceAddr", argv[1]);
if (argc == 3) {
runClient(config);
} else {
runServer(config);
}
}