1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Config.hpp" 4 #include "hmbdc/app/mcast/Messages.hpp" 5 #include "hmbdc/app/mcast/Sender.hpp" 6 #include "hmbdc/app/mcast/SendTransportEngine.hpp" 7 #include "hmbdc/app/mcast/RecvTransportEngine.hpp" 8 #include "hmbdc/app/mcast/DefaultUserConfig.hpp" 9 #include "hmbdc/comm/Topic.hpp" 10 #include "hmbdc/pattern/GuardedSingleton.hpp" 12 #include <boost/regex.hpp> 24 namespace hmbdc {
namespace app {
namespace mcast {
52 ,
size_t maxMessageSize) {
53 Config dft(DefaultUserConfig,
"tx");
57 std::lock_guard<std::mutex> tlock(sendTransportsLock_);
59 sendTransports_.emplace_back(res);
68 ,
size_t maxMessageSize
91 template <
typename Buffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
95 Config dft(DefaultUserConfig,
"rx");
99 std::lock_guard<std::mutex> tlock(recvTransportsLock_);
102 , forward<MsgArbitrator>(arb));
103 recvTransports_.emplace_back(res);
112 template <
typename Buffer,
typename ArgsTuple>
116 , ArgsTuple&& args) {
132 std::lock_guard<std::mutex> lock(sendersLock_);
133 auto sender = senders_.find(t);
134 if ( sender != senders_.end()) {
135 return sender->second.get();
137 std::lock_guard<std::mutex> slock(sendTransportsLock_);
139 i < sendTransports_.size();
141 auto st = sendTransports_[i];
143 auto newSender =
new Sender(st, t);
144 senders_[t].reset(newSender);
161 std::lock_guard<std::mutex> tlock(recvTransportsLock_);
162 for (
auto ptr : recvTransports_) {
173 std::lock_guard<std::mutex> tlock(recvTransportsLock_);
174 for (
auto ptr : recvTransports_) {
175 ptr->stopListenTo(t);
187 std::vector<SendTransport::ptr> sendTransports_;
188 std::mutex sendTransportsLock_;
190 std::vector<RecvTransport::ptr> recvTransports_;
191 std::mutex recvTransportsLock_;
193 std::map<Topic, Sender::ptr> senders_;
194 std::mutex sendersLock_;
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:92
class to hold an hmbdc configuration
Definition: Config.hpp:35
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:128
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: SendTransportEngine.hpp:259
Definition: GuardedSingleton.hpp:19
Definition: GuardedSingleton.hpp:12
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
Definition: NetContext.hpp:33
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, tuple<> args)
same as above but provide a unified interface - not preferred
Definition: NetContext.hpp:67
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport engine (and remember it within the class)
Definition: NetContext.hpp:51
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:113
impl class
Definition: RecvTransportEngine.hpp:305
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:172
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:160
Definition: Client.hpp:11
Definition: Sender.hpp:15
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:131