1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Config.hpp" 4 #include "hmbdc/app/netmap/Messages.hpp" 5 #include "hmbdc/app/netmap/Sender.hpp" 6 #include "hmbdc/app/netmap/SendTransportEngine.hpp" 7 #include "hmbdc/app/netmap/RecvTransportEngine.hpp" 8 #include "hmbdc/app/netmap/DefaultUserConfig.hpp" 10 #include "hmbdc/comm/Topic.hpp" 11 #include "hmbdc/pattern/GuardedSingleton.hpp" 13 #include <boost/regex.hpp> 19 namespace hmbdc {
namespace app {
namespace netmap {
43 Config dft(DefaultUserConfig,
"tx");
47 std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
49 sendTransportEngines_.push_back(res);
58 ,
size_t maxMessageSize
80 template <
typename Buffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
84 Config dft(DefaultUserConfig,
"rx");
88 std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
91 , forward<MsgArbitrator>(arb));
92 recvTransportEngines_.emplace_back(res);
100 template <
typename Buffer,
typename ArgsTuple>
104 , ArgsTuple&& args) {
119 std::lock_guard<std::mutex> lock(sendersLock_);
120 auto sender = senders_.find(t);
121 if ( sender != senders_.end()) {
122 return sender->second.get();
124 std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
126 i < sendTransportEngines_.size();
128 auto st = sendTransportEngines_[i];
130 auto newSender =
new Sender(st, t);
131 senders_[t].reset(newSender);
148 std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
149 for (
auto ptr : recvTransportEngines_) {
160 std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
161 for (
auto ptr : recvTransportEngines_) {
162 ptr->stopListenTo(t);
174 std::vector<SendTransportEngine::ptr> sendTransportEngines_;
175 std::mutex sendTransportEnginesLock_;
177 std::vector<RecvTransport::ptr> recvTransportEngines_;
178 std::mutex recvTransportEnginesLock_;
180 std::map<Topic, Sender::ptr> senders_;
181 uint32_t sendTransportRotationIndex_;
182 std::mutex sendersLock_;
class to hold an hmbdc configuration
Definition: Config.hpp:35
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:128
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:159
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:10
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:147
Definition: GuardedSingleton.hpp:19
Definition: GuardedSingleton.hpp:12
power a netmap port sending functions
Definition: SendTransportEngine.hpp:45
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:101
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport and remember it
Definition: NetContext.hpp:42
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:118
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:81
Definition: NetContext.hpp:26
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:56
impl class,
Definition: RecvTransportEngine.hpp:76
Definition: Client.hpp:11
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, tuple<> args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:57