1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Config.hpp" 4 #include "hmbdc/app/tcpcast/Messages.hpp" 5 #include "hmbdc/app/tcpcast/Sender.hpp" 6 #include "hmbdc/app/tcpcast/SendTransportEngine.hpp" 7 #include "hmbdc/app/tcpcast/RecvTransportEngine.hpp" 8 #include "hmbdc/app/tcpcast/DefaultUserConfig.hpp" 10 #include "hmbdc/comm/Topic.hpp" 11 #include "hmbdc/pattern/GuardedSingleton.hpp" 13 #include <boost/regex.hpp> 19 namespace hmbdc {
namespace app {
namespace tcpcast {
49 ,
size_t maxMessageSize
50 ,
size_t minRecvToStart = 1u) {
51 Config dft(DefaultUserConfig,
"tx");
55 std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
57 cfg, maxMessageSize, minRecvToStart);
58 sendTransports_.emplace_back(res);
67 ,
size_t maxMessageSize
68 , tuple<size_t> args) {
88 template <
typename Buffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
92 Config dft(DefaultUserConfig,
"rx");
96 std::lock_guard<std::mutex> tlock(recvTransportsLock_);
98 cfg, buffer, forward<MsgArbitrator>(arb));
99 recvTransports_.emplace_back(res);
108 template <
typename Buffer,
typename ArgsTuple>
112 , ArgsTuple&& args) {
127 std::lock_guard<std::mutex> lock(sendersLock_);
128 auto sender = senders_.find(t);
129 if ( sender != senders_.end()) {
130 return sender->second.get();
132 std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
134 i < sendTransports_.size();
136 auto st = sendTransports_[i];
138 auto newSender =
new Sender(st, t);
139 senders_[t].reset(newSender);
156 std::lock_guard<std::mutex> tlock(recvTransportsLock_);
157 for (
auto ptr : recvTransports_) {
168 std::lock_guard<std::mutex> tlock(recvTransportsLock_);
169 for (
auto ptr : recvTransports_) {
170 ptr->stopListenTo(t);
182 std::vector<SendTransport::ptr> sendTransports_;
183 std::mutex sendTransportEnginesLock_;
185 std::vector<RecvTransport::ptr> recvTransports_;
186 std::mutex recvTransportsLock_;
188 std::map<Topic, Sender::ptr> senders_;
189 std::mutex sendersLock_;
class to hold an hmbdc configuration
Definition: Config.hpp:35
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, tuple< size_t > args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:66
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:128
impl class
Definition: RecvTransportEngine.hpp:60
fascade class for sending network messages
Definition: Sender.hpp:11
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize, size_t minRecvToStart=1u)
construct a send transport enngine (and remember it)
Definition: NetContext.hpp:47
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:167
Definition: GuardedSingleton.hpp:19
Definition: GuardedSingleton.hpp:12
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:89
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:155
Definition: NetContext.hpp:28
Definition: SendTransportEngine.hpp:201
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:126
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:41
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:109
Definition: Client.hpp:11