hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Config.hpp"
4 #include "hmbdc/app/netmap/Messages.hpp"
5 #include "hmbdc/app/netmap/Sender.hpp"
6 #include "hmbdc/app/netmap/SendTransportEngine.hpp"
7 #include "hmbdc/app/netmap/RecvTransportEngine.hpp"
8 #include "hmbdc/app/netmap/DefaultUserConfig.hpp"
9 
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <boost/regex.hpp>
14 
15 #include <vector>
16 #include <mutex>
17 #include <memory>
18 
19 
20 /**
21  * @namespace hmbdc::app::netmap
22  * netmap multicast transport
23  */
24 namespace hmbdc { namespace app { namespace netmap {
25 
26 /**
27 * @example client-server-netmap.cpp
28 * @example perf-netmap.cpp
29 */
30 
31 /**
32  * @class NetContext
33  * @brief a singleton that holding netmap resources
34  * @details it manage transport engines
35  * see perf-netmap.cpp for usage
36  */
37 struct NetContext
39 
40  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
42 /**
43  * @brief construct a send transport and remember it
44  * @details After this, user is responsible to get it started within a hmbdc Context,
45  * otherwise the transport is not functioing/running. Don't create the same thing twice.
46  * it might take several seconds (configured by nmResetWaitSec=2) to construct the engine
47  * to avoid message losses at the beginning - netmap seems to require that
48  *
49  * @param cfgIn jason specifing the transport - see hmbdc/app/netmap/DefaultUserConfig.hpp
50  * @param maxMessageSize max messafe size in bytes to be sent
51  * @return a pointer to the Engine
52  */
53  SendTransportEngine* createSendTransportEngine(Config const& cfgIn, size_t maxMessageSize) {
54  Config dft(DefaultUserConfig, "tx");
55  Config cfg(cfgIn);
56  cfg.setDefaultUserConfig(dft);
57 
58  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
59  auto res = SendTransportEngine::ptr(new SendTransportEngine(cfg, maxMessageSize));
60  sendTransportEngines_.push_back(res);
61  return res.get();
62  }
63 
64 /**
65  * @brief same as above but provide an unified interface - not preferred
66  * @return a pointer to the Engine - don't delete it
67  */
69  , size_t maxMessageSize
70  , std::tuple<> args) {
71  return createSendTransportEngine(cfg, maxMessageSize);
72  }
73 
74 /**
75  * @brief construct a send transport and remember it
76  * @details After this, user is respoonsible to get it started within a hmbdc Context,
77  * otherwise the transport is not functioing/running. Don't create the same thing twice
78  *
79  * @param cfgIn jason specifing the transport - see perf-netmap.cpp and hmbdc/app/netmap/DefaultUserConfig.hpp
80  * @param buffer buffer that recv messages go in, normally the one returned by
81  * app::Context::buffer()
82  * @param arb optonally an arbitrator to decide which messages to keep and drop
83  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
84  * it also supports either netmap packet level (BEFORE topic filtering) or
85  * hmbdc message level (AFTER topic filtering) arbitration depending on which one of
86  * int operator()(void* bytes, size_t len) and
87  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
88 
89  * @return a pointer to the Engine
90  */
91  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
93  , Buffer& buffer
94  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
95  Config dft(DefaultUserConfig, "rx");
96  Config cfg(cfgIn);
97  cfg.setDefaultUserConfig(dft);
98 
99  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
100  auto res =
102  , std::forward<MsgArbitrator>(arb));
103  recvTransportEngines_.emplace_back(res);
104  return res;
105  }
106 
107 /**
108  * @brief same as above but to provide a unified interface - not preferred
109  * @return a pointer to the Engine
110  */
111  template <typename Buffer, typename ArgsTuple>
113  Config const& cfg
114  , Buffer& buffer
115  , ArgsTuple&& args) {
116  return createRecvTransportEngine(cfg
117  , buffer
118  , std::get<0>(args)
119  );
120  }
121 
122 /**
123  * @brief get (or create for the first time) a Sender - whose function is to send messages on
124  * its associated Topic
125  * @details this operation typically might be slow, so caching the return value is recommended.
126  *
127  * @param t - the Topic that the Sender is for
128  */
129  Sender* getSender(comm::Topic const& t) {
130  std::lock_guard<std::mutex> lock(sendersLock_);
131  auto sender = senders_.find(t);
132  if ( sender != senders_.end()) {
133  return sender->second.get();
134  } else {
135  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
136  for (auto i = 0u;
137  i < sendTransportEngines_.size();
138  ++i) {
139  auto st = sendTransportEngines_[i];
140  if (st->match(t)) {
141  auto newSender = new Sender(st, t);
142  senders_[t].reset(newSender);
143  return newSender;
144  }
145  }
146  }
147 
148  return nullptr;
149  }
150 
151 /**
152  * @brief This process is interested in a Topic
153  * @details Normally the receiving transport covering this topic
154  * needs to be created - not necessarily running - before calling this
155  *
156  * @param t Topic interested
157  */
158  void listenTo(comm::Topic const& t) {
159  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
160  for (auto ptr : recvTransportEngines_) {
161  ptr->listenTo(t);
162  }
163  }
164 
165 /**
166  * @brief undo the subscription
167  *
168  * @param t Topic
169  */
170  void stopListenTo(comm::Topic const& t) {
171  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
172  for (auto ptr : recvTransportEngines_) {
173  ptr->stopListenTo(t);
174  }
175  }
176 
177 private:
178  NetContext() {
179  }
180 
181  ~NetContext() {
182  }
183 
184 
185  std::vector<SendTransportEngine::ptr> sendTransportEngines_;
186  std::mutex sendTransportEnginesLock_;
187 
188  std::vector<RecvTransport::ptr> recvTransportEngines_;
189  std::mutex recvTransportEnginesLock_;
190 
191  std::map<comm::Topic, Sender::ptr> senders_;
192  uint32_t sendTransportRotationIndex_;
193  std::mutex sendersLock_;
194 };
195 
196 }
197 
198 }}
199 
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:136
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:10
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:35
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:158
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:68
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:112
power a netmap port sending functions
Definition: SendTransportEngine.hpp:43
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport and remember it
Definition: NetContext.hpp:53
impl class,
Definition: RecvTransportEngine.hpp:70
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:92
char const *const DefaultUserConfig
Definition: DefaultUserConfig.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:37
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:170
Sender * getSender(comm::Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:129
Definition: Base.hpp:12