hmbdc
simplify-high-performance-messaging-programming
server-cluster.cpp

Context template parameter inidcating each message is sent to one and only one of the clients within the Context and its attached ipc Contexts if appllies. each message is still subjected to Client's message type filtering When this Context is specialized using this type, the context normally works with homogeneous Clients to achieve load balance thru threads. No coordination is needed between Clients. Only the direct mode Clients are supported, thread pool is NOT supported by the Context - the pool related functions in Context are also disabled

There is no limit on how many Clients can be started in the Context. Also, there is no limit on when you can start a Client in this Context. see for partition Context usage

//this example is to show how to use hmbdc to write simple scalable high performance client/server programs
//in a server cluster environment
//
//a client sends a single word per request and asks the word gets reversed in the response from one of
//the servers in a cluster.
//requests are load-banlancedly (by first char in the word) handled by a cluster of server hosts,
//the example shows just one server (with multiple worker cores) for demo purpose
//
//to build:
//g++ server-cluster.cpp -g -O3 -std=c++1y -Wall -Werror -pthread -I /opt/hmbdc/include/ /opt/hmbdc/lib/libhmbdc.a /usr/local/lib/libboost_regex.a /usr/local/lib/libboost_system.a -lpthread -lrt -o /tmp/server-cluster
//
// In the scenario of 1 client (using 3 cores) and 1 server (using 5 cores), we send every word in
// /usr/share/dict/words 30 times to saturate the network, we get 479828*30=14394840 results back
// peak transaction per sec reachs 4.8M tps (loseless).
// see the screen shots below:
/*
[server]$ /tmp/server-cluster 192.168.0.101/24
-------------------------------------------------------------------------------------
[client]$ cat /usr/share/dict/words| /tmp/server-cluster 192.168.0.101/24 30
all requests sent, waiting for results, press ctrl-c to terminate
^C
result count = 14394840
[client]$ wc -l /usr/share/dict/words
479828 /usr/share/dict/words
-------------------------------------------------------------------------------------
[server]$ /tmp/server-cluster 192.168.0.101/24
^C
worker 0 peak tps=1595908
worker 1 peak tps=1528706
worker 2 peak tps=1650952
//
//alternatively, we could use the console to act as the client:
//
//[client]$ for w in `cat /usr/share/dict/words /usr/share/dict/words /usr/share/dict/words`; do echo sendstr 1005 $w; done|./console-mcast examples/server-cluster-client.json > /tmp/res
*/
#include "hmbdc/app/mcast/NetContext.hpp" //use udp multicast for communication,
#include "hmbdc/app/Context.hpp"
#include "hmbdc/time/Timers.hpp" //we count transactions in every sec interval
#include "hmbdc/os/Signals.hpp"
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <algorithm>
#include <string>
#include <memory>
#include <unistd.h>
using namespace std;
using namespace boost;
using namespace hmbdc;
using namespace hmbdc::time;
using namespace hmbdc::app;
using namespace hmbdc::app::mcast;
Topic REQUEST_TOPIC("req");
struct Request
: hasTag<1005> {
char word[120];
};
Topic RESULT_TOPIC("res");
struct Result
: hasTag<1004> {
char reversedWord[120];
};
auto CONFIG = R"|(
{
"request" : {
"topicRegex" : "req",
"mcastPort" : 4321,
"mcastAddr" : "232.43.211.234"
},
"result" : {
"topicRegex" : "res",
"mcastPort" : 4321,
"mcastAddr" : "232.43.211.235"
}
}
)|";
using NetCtx = mcast::NetContext;
using ClientContext = Context<sizeof(Result)>;
struct WordRequester
: Client<WordRequester, Result> {
WordRequester()
: sender_(NetCtx::instance().getSender(REQUEST_TOPIC))
, resultCount_(0) {
}
void sendRequests(size_t msgDupCount) {
string line;
while(getline(cin, line)) {
for (size_t i = 0; i < msgDupCount; ++i) {
//since word varies in lengths, use sendBytes
sender_->sendBytes(Request::typeTag
, line.c_str()
, min(line.size() + 1, sizeof(Request::word)));
}
}
}
void handleMessageCb(Result const& m) {
resultCount_++;
// cout << m.reversedWord << endl;
}
Sender* sender_;
size_t resultCount_;
};
/**
* @brief run in client mode
*
*/
void runClient(Config const& config) {
ClientContext ctx; //context to power network transports and other things
SingletonGuardian<NetCtx> g; //RAII for mcast::NetContext resources
auto& net = NetCtx::instance();
// we send out messages, need a send engine
Config requestConfig(config.get_child("request"), config);
auto sengine = net.createSendTransportEngine(requestConfig
, sizeof(Request)); //maximum size of the message the engine can send out
// we receive messages, need a receive engine
Config resultConfig(config.get_child("result"), config);
auto rengine = net.createRecvTransportEngine(resultConfig
, ctx.buffer()); //received messages go to ctx so WordRequester can see
WordRequester wr;
ctx.start(0, 0
, *sengine, 0x01ul
, *rengine, 0x02ul
, wr, 0x04ul);
net.listenTo(RESULT_TOPIC);
//send requests
wr.sendRequests(requestConfig.getExt<size_t>("msgDupCount"));
//requests are sent, wait long enough to receive all the results
//it is stopped by signal SIGTERM or SIGINT
[&ctx] {
ctx.stop();
}
);
//wait until all results processed
cerr << "all requests sent, waiting for results, press ctrl-c to terminate" << endl;
ctx.join();
cout << "\nresult count = " << wr.resultCount_ << endl;
}
/**
* @brief a worker (thread)
*/
struct WordReverser
: Client<WordReverser, Request>
WordReverser()
: ReoccuringTimer(Duration::seconds(1u)) //count transaction every sec
, sender_(NetCtx::instance().getSender(RESULT_TOPIC))
, periodTps_(0)
, peakTps_(0) {
setCallback( //call this when timer fires
[this](TimerManager& tm, SysTime const& now) {
record();
}
);
schedule(SysTime::now(), *this);
}
//although not obvious in this example, all callbacks of a particular Client
//are garanteed thread-safe (i.e. not called concurrently) by hmbdc which
//simplifies programming
void handleMessageCb(Request const& m) {
char s[sizeof(Result::reversedWord)];
auto l = min(sizeof(s) -1, strnlen(m.word, sizeof(s) -1));
reverse_copy(m.word, m.word + l, s);
s[l] = 0;
sender_->sendBytes(Result::typeTag, s, l + 1);
periodTps_++;
}
void record() { //record the peak transaction per sec
peakTps_ = max(peakTps_, periodTps_);
periodTps_ = 0;
}
Sender* sender_;
size_t periodTps_;
size_t peakTps_;
};
// use partition so that a message is dispatched to one and only one client in the context
/**
* @brief run server mode
*/
void runServer(Config const& config) {
ServerContext ctx;
SingletonGuardian<NetCtx> g; //RAII for mcast::NetContext resources
auto& net = NetCtx::instance();
// we send out messages, need a send engine
Config resultConfig(config.get_child("result"), config);
auto sengine = net.createSendTransportEngine(resultConfig
, sizeof(Result)); //maximum size of the message the engine can send out
// we receive messages, need a receive engine
Config requestConfig(config.get_child("request"), config);
auto rengine = net.createRecvTransportEngine(requestConfig
, ctx.buffer() //put the incoming messsage into server context's buffer
//a receive engine can optionally decide what messages to drop/keep by
//the following
, [](TransportMessageHeader const* h) { //only keep messages in my range
if (h->typeTag() == Request::typeTag) {
auto& m = h->wrapped<Request>();
uint8_t c = (uint8_t) m.word[0];
//basically covers everything ascii here
//to scale the server to multiple hosts, each host could handle a range partition
//so they collectively cover the whole range
if (c >= 0 && c <= 127) {
return 1; //good
}
}
return -1; //ignore
}
);
//this the main context, it hosts the network transport engine threads
//and worker threads
WordReverser wr[3];
ctx.start(
//3 worker threads in this hosts
//the buffer type used in the ctx ensures a message goes to
//one and only one of the worker - note transport engines not participating
wr[0], 0x20ul
, wr[1], 0x02ul
, wr[2], 0x04ul
//normally partition Context only holds homogeneous Clients like above
//however, transport engines do not participating messaging dispatching
//safe to run them in the any type of Context
, *sengine, 0x08ul
, *rengine, 0x10ul);
//let requests flow in
net.listenTo(REQUEST_TOPIC);
//things r running, we need to provide an exit
//like most server, it is stopped by signal SIGTERM or SIGINT
[&ctx] {
ctx.stop();
}
);
//block until ctx is stopped by signal handler
ctx.join();
cout << endl;
for (size_t i = 0; i < sizeof(wr) / sizeof(wr[0]); i++) {
cout << "worker " << i << " peak tps=" << wr[i].peakTps_ << endl;
}
}
int main(int argc, char** argv) {
istringstream is(CONFIG);
Config config(is);
if (argc == 3) { //run as client
//to saturate the net link for testing purpose,
//we intentionally send multiple copies of the request
//to increase the load of the system
config.get_child("request")
.put("msgDupCount", lexical_cast<uint32_t>(argv[2]));
} else if (argc == 2) {
//run as server
}
else {
cerr << argv[0] << " incoming-local-ip [client-send-rate]" << endl;
cerr << "multicast should be enabled on local-ip network" << endl;
return -1;
}
//looks like we r testing on a single host
//set global parameters for both transports
if (string(argv[1]) == "127.0.0.1") {
config.put("loopback", true);
}
config.put("ifaceAddr", argv[1]);
if (argc == 3) {
runClient(config);
} else {
runServer(config);
}
}