hmbdc
simplify-high-performance-messaging-programming
hmbdc.cpp
///
/// A simple example to show inter-thread messaging using hmbdc lib
///
/// we construct 40 hmbdc Clients, each one send out a Request and listen to the Response sent back by each
/// of the Clients, then send out the next Request. In addition, a monitor Client record the stats of
/// message passing
///
///to build: do change paths
///g++ hmbdc.cpp -g -std=c++1y -Wall -Werror -pthread -Ipath-to-hmbdc-lib-include path-to-hmbdc-lib/libhmbdc.a -lpthread -lrt -o hmbdc
///
#include "hmbdc/app/Base.hpp"
#include <iostream>
#include <memory>
#include <future>
#include <unistd.h>
using namespace hmbdc::app;
using namespace hmbdc::time;
///
/// user defined messages
/// the tag number starting from 0 - 1000 is reserved for internal usage
///
/// Normally a message only contain data type that don't have significant dectructors
/// POD is typical due to the dtor will not be called after the message resides in the buffer.
/// If the above is not possible, the recipients of the message can call
/// the in place dtor explicitly. For example, for the Message m containing
/// memeber std::string str; calling:
/// m.str.~string();
/// in one and only one of the recipient Client should do the job and avoid memory leak
///
struct Request
: hasTag<1001> {
Request(uint16_t srcId, uint64_t id)
: srcId(srcId)
, id(id)
{}
uint16_t srcId;
uint64_t id;
};
struct Response
: hasTag<1002> {
Response(Request const& r)
: srcId(r.srcId)
, respToId(r.id)
{}
uint16_t srcId;
uint64_t respToId;
SysTime sendTs; //no signficant dtor for SysTime
};
//! [define a message]
struct Finish
: hasTag<1004> {
Finish(uint16_t srcId)
: srcId(srcId)
{}
uint16_t srcId;
};
//! [define a message]
//! [explicit using broadcast]
///
/// declare Context type - 32 bytes is the maximum message size this context can handle
/// would not compile if size is too small; 32 happens to be big enough for all the messages
/// defined above
///
//! [explicit using broadcast]
//! [write a Client]
/**
* @brief A MyClient does two things:
* - it sends out a Request and listen to Responses to the Request and send the next Request out
* - it also listens to other peer MyClient's Requests and send out Responses for each one of them
* - including the one sent out by itself
*
* @details When a specified (workload) number of Requests are sent and get back Responses
* from each peer MyClient it takes itself out of message dispatching by throwing an exception
*
*/
struct MyClient
: Client<MyClient, Request, Response> {
/**
* @brief Ctor
*
* @param myCtx the Context
* @param workLoad How many Request to send out before consider done
* @param clientCount how many MyClient instances are participating in this test
* @param srcId each TestCLient has an id
*/
MyClient(MyContext* myCtx, uint64_t workLoad, uint64_t clientCount, uint16_t srcId)
: myCtx(myCtx)
, srcId(srcId)
, requestId(0u)
, workLoad(workLoad)
, clientCount(clientCount)
, respCount(0u){
}
/**
* @brief this callback is called once and only once
* before first ever message is dispatched
* to the Client. we send out the first message
*/
void messageDispatchingStartedCb(uint16_t) override {
if (workLoad) {
myCtx->send(Request(srcId, requestId));
}
}
/**
* @brief callback
* @details all callbacks for a hmbdc Client is garanteed to be called in
* a single OS thread each time, so the programmer can assume a single thread programming model
* callback-wise
*
* @param r received Message
*/
void handleMessageCb(Request const& r) {
//got a Request - send a Response
//use sendInPlace to avoid a Reponse copy in memory
myCtx->template sendInPlace<Response>(r);
}
/**
* @brief another callback
* @details it is allowed to change the signature to be
* void handleMessageCb(Response& clients) in case clients needs to be changed (like locked)
*
* @param resp received Response message
*/
void handleMessageCb(Response const& resp) {
//got a Repsonse - check if it was triggered by the Request sent by me
if (resp.srcId == srcId &&
resp.respToId == requestId) {
if (++respCount == clientCount) {
workLoad--;
respCount = 0u;
++requestId;
//ready to send the next Request if needed
if (workLoad) {
myCtx->send(Request(srcId, requestId));
} else {
//all done, declare it by sending a Finish out
Finish fin(srcId);
myCtx->send(fin);
}
}
}
}
/**
* @brief callback called when this Client is taken out of message dispatching
* @details any exception thrown here is ignored
*
* @param e The exception that caused the Client to be taken out of message dispatching
* e could be thrown by the Client itself in a callback function
* to voluntarily take itself out
*/
void stoppedCb(std::exception const& e) override {
std::cout << e.what() << std::endl;
};
MyContext* myCtx;
uint16_t srcId;
uint64_t requestId;
uint64_t workLoad;
uint64_t clientCount;
uint64_t respCount;
};
//! [write a Client]
/**
* @brief a special Client that monitors other Clients activities
* @details it collect some statistics and print them out
*
*/
struct Monitor
: Client<Monitor, Request, Response, Finish> {
Monitor(uint16_t count)
: count(count)
, totalRequest(0u)
, totalResponse(0u)
{}
char const* hmbdcName() const { return "monitor"; }
void handleMessageCb(Finish const& fin) {
count--;
if (!count) {
allDone.set_value(true);
}
}
void handleMessageCb(Request const&) {
totalRequest++;
}
void handleMessageCb(Response const&) {
totalResponse++;
}
void report() const {
std::cout << "totalRequest=" << totalRequest << std::endl;
std::cout << "totalResponse=" << totalResponse << std::endl;
}
uint16_t count; //how many Clients remain un-Finished
size_t totalRequest;
size_t totalResponse;
std::promise<bool> allDone; //inicicator for things are all done
};
int main() {
const uint16_t clientCount = 40u; //we will construct 40 Clients
const uint64_t workLoad = 1000u; //each one will send 1000 messages
MyContext ctx(20u, clientCount); //2^20 = 1M messages could be bufffered
std::unique_ptr<MyClient> clients[clientCount]; //clients are auto freed at exit
Monitor mon(clientCount); //one monitor to monitor 40 Clients
//! [start pool then client]
//add 40 clients into ctx's threadpool so they could be run later
for (uint16_t i = 0; i < clientCount; ++i) {
clients[i].reset(new MyClient(&ctx, workLoad, clientCount, i));
ctx.addToPool(*clients[i]); //addToPool could also happen after ctx started.
}
//starting context (message dispatching)
ctx.start(3, 0x07ul //3 threads to power the thread pool, and they are running on core 0-2
, mon, 0ul); //mon runs in the direct mode (vs in the pool) and hmdbc picks a core for it
//at this point, all clients are running and listening to messages
//this test count on all messages being dispatched to all clients so they can exit
//when things are all done. in the real world that is hardly the case and the starting
//exiting of clients could be more casual
//! [start pool then client]
//wait until all done
mon.allDone.get_future().get();
//exit gracefully
ctx.stop();
ctx.join();
//monitor can report stat now
mon.report();
}