hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Config.hpp"
4 #include "hmbdc/app/mcast/Messages.hpp"
5 #include "hmbdc/app/mcast/Sender.hpp"
6 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
7 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
8 #include "hmbdc/app/mcast/DefaultUserConfig.hpp"
9 #include "hmbdc/comm/Topic.hpp"
10 #include "hmbdc/pattern/GuardedSingleton.hpp"
11 
12 #include <boost/regex.hpp>
13 
14 #include <vector>
15 #include <mutex>
16 #include <memory>
17 
18 /**
19  * @namespace hmbdc::app::mcast
20  * UDP multicast transport
21  */
22 namespace hmbdc { namespace app { namespace mcast {
23 
24 /**
25 * @example server-cluster.cpp
26 * @example perf-mcast.cpp
27 * @example ping-pong-mcast.cpp
28 * @example mcast-sniff.cpp
29 * @example rmcast-cp.cpp
30 */
31 
32 /**
33  * @class NetContext
34  * @brief a singleton that holding mcast resources
35  * @details it manages transport engines
36  */
37 struct NetContext
39 
40  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
42 
43 /**
44  * @brief construct a send transport engine (and remember it within the class)
45  * @details After this, user is responsible to get it started within a hmbdc Context,
46  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
47  * more than one thread
48  * otherwise the transport is not functioing/running. Don't create the same thing twice
49  *
50  * @param cfgIn config specifing the transport - hmbdc/app/mcast/DefaultUserConfig.hpp
51  * @param maxMessageSize max messafe size in bytes to be sent
52  * trasnport engines, see ctor of SendTransportEngine for details
53  * @return a pointer to the Engine - don't delete it
54  */
56  , size_t maxMessageSize) {
57  Config dft(DefaultUserConfig, "tx");
58  Config cfg(cfgIn);
59  cfg.setDefaultUserConfig(dft);
60 
61  std::lock_guard<std::mutex> tlock(sendTransportsLock_);
62  auto res = new SendTransportEngine(cfg, maxMessageSize);
63  sendTransports_.emplace_back(res);
64  return res;
65  }
66 
67 /**
68  * @brief same as above but provide a unified interface - not preferred
69  * @return a pointer to the Engine - don't delete it
70  */
72  , size_t maxMessageSize
73  , std::tuple<> args) {
74  return createSendTransportEngine(cfg, maxMessageSize);
75  }
76 
77 /**
78  * @brief construct a send transport and remember it
79  * @details After this, user is responsible to get it started within a hmbdc Context,
80  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
81  * more than one thread
82  * otherwise the transport is not functioing/running. Don't create the same thing twice
83  *
84  * @param cfgIn jason specifing the transport - see hmbdc/app/mcast/DefaultUserConfig.hpp
85  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
86  * @param arb optonally an arbitrator to decide which messages to keep and drop
87  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
88  * it supports either udp packet (BEFORE topic filtering) or
89  * hmbdc message (AFTER topic filtering) level arbitration depending on which one of
90  * int operator()(void* bytes, size_t len) and
91  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
92  * trasnport engines, see ctor of RecvTransportEngineImpl for details
93  * see in example use in mcast-sniff.cpp @snippet mcast-sniff.cpp define packet arb
94  * @return a pointer to the Engine
95  */
96  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
98  , Buffer& buffer
99  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
100  Config dft(DefaultUserConfig, "rx");
101  Config cfg(cfgIn);
102  cfg.setDefaultUserConfig(dft);
103 
104  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
105  auto res =
107  , std::forward<MsgArbitrator>(arb));
108  recvTransports_.emplace_back(res);
109  return res;
110  }
111 
112 /**
113  * @brief same as above but to provide a unified interface - not preferred
114  * @details use forward_as_tuple to make the tuple passed in
115  * @return a pointer to the Engine
116  */
117  template <typename Buffer, typename ArgsTuple>
119  Config const& cfg
120  , Buffer& buffer
121  , ArgsTuple&& args) {
122  return createRecvTransportEngine(cfg
123  , buffer
124  , std::get<0>(args)
125  );
126  }
127 
128 /**
129  * @brief get (or create for the first time) a Sender - whose function is to send messages on
130  * its associated Topic
131  * @details this operation typically might be slow, so caching the return value is recommended.
132  *
133  * @param t - the Topic that the Sender is for
134  */
135  Sender* getSender(Topic const& t) {
136  std::lock_guard<std::mutex> lock(sendersLock_);
137  auto sender = senders_.find(t);
138  if ( sender != senders_.end()) {
139  return sender->second.get();
140  } else {
141  std::lock_guard<std::mutex> slock(sendTransportsLock_);
142  for (auto i = 0u;
143  i < sendTransports_.size();
144  ++i) {
145  auto st = sendTransports_[i];
146  if (st->match(t)) {
147  auto newSender = new Sender(st, t);
148  senders_[t].reset(newSender);
149  return newSender;
150  }
151  }
152  }
153 
154  return nullptr;
155  }
156 
157 /**
158  * @brief This process is interested in a Topic
159  * @details Normally the receiving transport covering this topic
160  * needs to be created - not necessarily running - before calling this
161  *
162  * @param t Topic interested
163  */
164  void listenTo(Topic const& t) {
165  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
166  for (auto ptr : recvTransports_) {
167  ptr->listenTo(t);
168  }
169  }
170 
171 /**
172  * @brief undo the subscription
173  *
174  * @param t Topic
175  */
176  void stopListenTo(Topic const& t) {
177  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
178  for (auto ptr : recvTransports_) {
179  ptr->stopListenTo(t);
180  }
181  }
182 
183 private:
184  NetContext() {
185  }
186 
187  ~NetContext() {
188  }
189 
190 
191  std::vector<SendTransport::ptr> sendTransports_;
192  std::mutex sendTransportsLock_;
193 
194  std::vector<RecvTransport::ptr> recvTransports_;
195  std::mutex recvTransportsLock_;
196 
197  std::map<Topic, Sender::ptr> senders_;
198  std::mutex sendersLock_;
199 };
200 
201 }
202 }}
203 
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:97
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:136
class to hold an hmbdc configuration
Definition: Config.hpp:43
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:35
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
a singleton that holding mcast resources
Definition: NetContext.hpp:37
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport engine (and remember it within the class)
Definition: NetContext.hpp:55
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:118
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:176
char const *const DefaultUserConfig
Definition: DefaultUserConfig.hpp:10
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:164
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide a unified interface - not preferred
Definition: NetContext.hpp:71
impl class
Definition: RecvTransportEngine.hpp:306
Definition: Base.hpp:12
fascade class for sending network messages
Definition: Sender.hpp:14
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:135