#include <iostream>
#define HMBDC_LOG_CONTEXT hmbdc::app::Context<80> //sizeof Hist is
#include "hmbdc/app/netmap/NetContext.hpp"
#include "hmbdc/app/netmap/Sender.hpp"
#include "hmbdc/app/LoggerT.hpp"
#include "hmbdc/app/Client.hpp"
#include "hmbdc/app/Context.hpp"
#include "hmbdc/text/Misc.h"
#include "hmbdc/time/Time.hpp"
#include "hmbdc/time/Rater.hpp"
#include "hmbdc/numeric/StatHistogram.hpp"
#include <iostream>
#include <memory>
#include <unistd.h>
#include <unordered_map>
#include <boost/program_options.hpp>
struct Request
Request(uint32_t compId, uint16_t srcId, uint16_t requestId)
: compId(compId)
, srcId(srcId)
, requestId(requestId)
, ts(SysTime::now())
, dirty(false)
{}
uint32_t compId;
uint16_t srcId;
uint16_t requestId;
bool dirty;
friend std::ostream& operator << (std::ostream& os, Request const& r) {
os << "Request " << r.compId << ' ' << r. srcId << ' ' << r. requestId << ' ' << r.ts << ' ' << r.dirty;
return os;
}
};
struct Response
Response(Request const& r)
: srcId(r.srcId)
, requestId(r.requestId)
, ts(r.ts)
, respTs(SysTime::now())
{}
uint16_t srcId;
uint64_t requestId;
friend std::ostream& operator << (std::ostream& os, Response const& r) {
os << "Response " << r.srcId << ' ' << r.requestId << ' ' << r.ts << ' ' << r.respTs;
return os;
}
};
template <typename MyContext>
struct MyClientT
:
Client<MyClientT<MyContext>, Response> {
MyClientT(MyContext* myCtx, uint64_t workLoad, uint32_t compId, uint32_t srcId)
: myCtx(myCtx)
, compId(compId)
, requestId(0u)
, workLoad(workLoad)
, srcId(srcId)
, sender(NetContext::instance().getSender(
Topic(
"server")))
, rater(Duration::microseconds(1000000), 1u, 1u)
, roundTripHist(Duration::microseconds(50000)) {
NetContext::instance().listenTo(
Topic(to_string(compId)));
}
void handleMessageCb(Response const& resp) {
if (resp.srcId == srcId &&
resp.requestId == requestId) {
HMBDC_LOG_N(srcId, "<--", resp);
auto now = SysTime::now();
workLoad--;
roundTripHist.add(now - resp.ts);
if (!workLoad) {
HMBDC_LOG_N("all done for srcId=", srcId
, " missing response count=", requestId - roundTripHist.sampleSize(), " roundtrip=", roundTripHist);
throw(std::runtime_error(""));
}
}
}
void invokedCb(uint16_t threadId) override {
if (rater.check()) {
Request r(compId, srcId, ++requestId);
HMBDC_LOG_N(srcId, "-->", r);
sender->send(r);
rater.commit();
}
}
void stoppedCb(std::exception const& e){
HMBDC_LOG_N(e.what());
};
char const* hmbdcName() const { return "MyClientT"; }
MyContext* myCtx;
uint32_t compId;
uint16_t requestId;
uint64_t workLoad;
uint16_t srcId;
Hist roundTripHist;
};
template <typename MyContext>
struct MyServerT
:
Client<MyServerT<MyContext>, Request> {
MyServerT(MyContext* myCtx)
: myCtx(myCtx) {
NetContext::instance().listenTo(
Topic(
"server"));
}
void createResponseSender(uint32_t srcId) {
senders[srcId] = myCtx->template
getSender<Sender>(
Topic(std::to_string(srcId)));
}
void handleMessageCb(Request& r) {
if (__sync_bool_compare_and_swap(&r.dirty, false, true)) {
HMBDC_LOG_N("<--", r);
auto it = senders.find(r.compId);
if (it == senders.end()) {
s = NetContext::instance().getSender(
Topic(std::to_string(r.compId)));
if (s) {
senders[r.compId] = s;
} else {
HMBDC_LOG_C(r.compId, " is unknown topic ");
return;
}
} else {
s = it->second;
}
Response resp(r);
HMBDC_LOG_N("-->", resp);
}
}
void stoppedCb(std::exception const& e){
HMBDC_LOG_N(e.what());
};
char const* hmbdcName() const { return "MyServerT"; }
MyContext* myCtx;
std::unordered_map<uint32_t, Sender*> senders;
};
using MyContext = HMBDC_LOG_CONTEXT;
using MyClient = MyClientT<MyContext>;
using MyServer = MyServerT<MyContext>;
void runInServerMode(
Config const& config) {
using namespace boost;
MyContext ctx;
SingletonGuardian<MyLogger> logGuard(std::cout, ctx);
SingletonGuardian<NetContext> g;
ctx.addToPool(MyLogger::instance());
auto& sendEng = *NetContext::instance().createSendTransportEngine(config, MyContext::MAX_MESSAGE_SIZE);
auto& recvEng = *NetContext::instance().createRecvTransportEngine(config, ctx.buffer());
HMBDC_LOG_N("starting in server mode ...");
ctx.start(1u, 0x01ul
, sendEng, 0x02ul
, recvEng, 0x04ul
, true);
uint64_t cpuAffinityMask = 0x04ul;
vector<std::shared_ptr<MyServer>> servers;
auto serverThreads = config.
getExt<uint32_t>(
"serverThreads");
for (auto i = 0u; i < serverThreads; ++i) {
HMBDC_LOG_N("creating server #", i);
auto s = new MyServer(&ctx);
servers.emplace_back(s);
cpuAffinityMask <<= 1u;
HMBDC_LOG_N("starting server #", i, " cpuAffinityMask=", reinterpret_cast<void*>(cpuAffinityMask));
ctx.start(*s, cpuAffinityMask, true);
}
ctx.start();
HMBDC_LOG_N("started, main thread block for 1 hr");
sleep(3600);
ctx.stop();
ctx.join();
}
void runInClientMode(
Config const& config) {
using namespace boost;
MyContext ctx;
SingletonGuardian<MyLogger> logGuard(std::cout, ctx);
SingletonGuardian<NetContext> g;
auto clientHostId = config.
getExt<uint32_t>(
"clientHostId");
auto clientThreads = config.
getExt<uint32_t>(
"clientThreads");
uint64_t cpuAffinityMask = (1ul << clientThreads) - 1u;
HMBDC_LOG_N("starting in client mode ... ");
ctx.start(clientThreads, cpuAffinityMask
, true);
ctx.addToPool(MyLogger::instance());
auto& sendEng = *NetContext::instance().createSendTransportEngine(config, MyContext::MAX_MESSAGE_SIZE);
auto& recvEng = *NetContext::instance().createRecvTransportEngine(config, ctx.buffer());
cpuAffinityMask++;
ctx.start(sendEng, cpuAffinityMask, true);
cpuAffinityMask <<= 1u;
ctx.start(recvEng, cpuAffinityMask, true);
ctx.start();
vector<std::shared_ptr<MyClient>> clients;
for (uint32_t i = 0; i < config.
getExt<uint32_t>(
"clientCount"); ++i) {
auto c =
new MyClient(&ctx, config.
getExt<uint32_t>(
"clientWorkload"), clientHostId, i);
clients.emplace_back(c);
HMBDC_LOG_N("adding into pool client #", i);
ctx.addToPool(*c);
}
while(ctx.clientCountInPool() > 1u) {
sleep(1);
}
NetContext::instance().stopListenTo(
Topic(to_string(clientHostId)));
ctx.stop();
ctx.join();
}
int main(int argc, char** argv) {
std::string userCfg;
uint32_t clientHostId = 0;
bool isServer = true;
uint16_t serverThreads = std::min(std::thread::hardware_concurrency(), MyContext::Buffer::max_parallel_consumer - 2 - 1u);
uint16_t clientThreads = std::min(std::thread::hardware_concurrency(), MyContext::Buffer::max_parallel_consumer - 2u);
uint16_t clientCount;
uint32_t clientWorkload;
uint32_t clientIntervalMs;
uint32_t clientBurst;
string interface;
namespace po = boost::program_options;
po::options_description desc("Allowed options");
auto helpStr =
"This program can be used to test hmbdc-netmap messaging function among a group of hosts. It is based on the client-server-netmap.cpp code example.\n"
"It can run as either server or client mode. One server and multiple clients - they all need to be on different hosts where netmap is installed. \n"
"When Server gets a client's request, it responds using a client specific topic.\n"
"After a client hears the response, it send the next request until workload finishes.\n"
"After all messges (see client-workload) delivered successfully, the client exits.\n"
"Example:\nrunning \n$server-host: ./cs-netmap netmap:eth0 \n"
"$client-host1: ./cs-netmap --client-host-id 1 netmap:p2p3 \n"
"$client-host2: ./cs-netmap --client-host-id 2 netmap:p2p3 \n"
"would start the server using netmap enabled eth0 NIC and two clients using their p2p3 NICs."
;
desc.add_options()
("help", helpStr)
("cfg,c", po::value<std::string>(&userCfg), "use the user provided config file")
("print", "print effective cfg values")
("client-host-id", po::value<uint32_t>(&clientHostId), "run in client mode with unique host number id - server is the default")
("server-threads", po::value<uint16_t>(&serverThreads)->default_value(serverThreads), "how many server threads - running in direct mode")
("client-threads", po::value<uint16_t>(&clientThreads)->default_value(clientThreads), "how many client threads - supporting the pool mode")
("client-count", po::value<uint16_t>(&clientCount)->default_value(MyContext::Buffer::max_parallel_consumer * 4u), "how many Clients in pool")
("client-workload,w", po::value<uint32_t>(&clientWorkload)->default_value(100), "how many requests each Clients send")
("req-interval-millisec,t", po::value<uint32_t>(&clientIntervalMs)->default_value(1000u), "when a Client sending request, how long between 2 bursts")
("req-burst,b", po::value<uint32_t>(&clientBurst)->default_value(10), "when a client sending request, how many requests per burst")
("interface,I", po::value<string>(&interface), "netmap enabled NIC to use, for example: eth0");
po::positional_options_description p;
p.add("interface", -1);
po::variables_map vm;
po::store(po::command_line_parser(argc, argv).
options(desc).positional(p).run(), vm);
po::notify(vm);
if (vm.count("help")) {
cout << desc << "\n";
return 0;
}
if (vm.count("client-host-id")) {
isServer = false;
}
char const* cfgFile =
"{"
" \"outBufferSizePower2\" : 10,"
" \"doChecksum\" : false,"
" \"ttl\" : 1"
"}"
;
if (userCfg.size()) {
} else {
try {
} catch(...) {
std::cout << cfgFile <<std::endl;
exit(1);
}
}
config.put("netmapPort", interface);
config.put("clientHostId", clientHostId);
config.put("intervalInMillisec", clientIntervalMs);
config.put("burst", clientBurst);
config.put("serverThreads", serverThreads);
config.put("clientThreads", clientThreads);
config.put("clientCount", clientCount);
config.put("clientWorkload", clientWorkload);
if (vm.count("print")) {
write_json(cout, config);
exit(0);
}
if (isServer) {
runInServerMode(config);
} else {
runInClientMode(config);
}
}