hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Config.hpp"
4 #include "hmbdc/app/tcpcast/Messages.hpp"
5 #include "hmbdc/app/tcpcast/Sender.hpp"
6 #include "hmbdc/app/tcpcast/SendTransportEngine.hpp"
7 #include "hmbdc/app/tcpcast/RecvTransportEngine.hpp"
8 #include "hmbdc/app/tcpcast/DefaultUserConfig.hpp"
9 
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <boost/regex.hpp>
14 
15 #include <vector>
16 #include <mutex>
17 #include <memory>
18 
19 
20 /**
21  * @namespace hmbdc::app::tcpcast
22  * reliable tcpcast (based on TCP) transport
23  */
24 
25 namespace hmbdc { namespace app { namespace tcpcast {
26 
27 
28 /**
29 * @example chat.cpp
30 * @example perf-tcpcast.cpp
31 * @example rmcast-cp.cpp
32 * @example ping-pong-tcpcast.cpp
33 */
34 
35 /**
36  * @class NetContext
37  * @brief a singleton that holding tcpcast resources
38  * @details it manages transport engines
39  *
40  * see perf-tcpcast.cpp chat.cpp for usage.
41  *
42  */
43 struct NetContext
45 
46  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
48 
49 /**
50  * @brief construct a send transport enngine (and remember it)
51  * @details After this, user is responsible to get it started within a hmbdc Context,
52  * if running in Context pool, it needs to be pinned on a single pool thread, CANNOT span
53  * more than one thread, otherwise the transport is not functioing/running.
54  * Don't create the same thing twice.
55  *
56  * @param cfgIn jason specifing the transport - see perf-tcpcast.cpp and hmbdc/app/tcpcast/DefaultUserConfig.hpp
57  * @param maxMessageSize max messafe size in bytes to be sent
58  * @param minRecvToStart start send when there are that many recipients (processes) online, otherwise
59  * hold the message in buffer
60  * @return a pointer to the Engine
61  */
63  Config const& cfgIn
64  , size_t maxMessageSize
65  , size_t minRecvToStart = 1u) {
66  Config dft(DefaultUserConfig, "tx");
67  Config cfg(cfgIn);
68  cfg.setDefaultUserConfig(dft);
69 
70  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
71  auto res = new SendTransportEngine(
72  cfg, maxMessageSize, minRecvToStart);
73  sendTransports_.emplace_back(res);
74  return res;
75  }
76 
77 /**
78  * @brief same as above but provide an unified interface - not preferred
79  * @return a pointer to the Engine - don't delete it
80  */
82  , size_t maxMessageSize
83  , std::tuple<size_t> args) {
84  return createSendTransportEngine(cfg, maxMessageSize, std::get<0>(args));
85  }
86 
87 /**
88  * @brief construct a send transport and remember it
89  * @details After this, user is respoonsible to get it started within a hmbdc Context,
90  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
91  * more than one thread
92  * otherwise the transport is not functioing/running. Don't create the same thing twice
93  *
94  * @param cfgIn jason specifing the transport - see perf-tcpcast.cpp and hmbdc/app/tcpcast/DefaultUserConfig.hpp
95  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
96  * @param arb optonally an arbitrator to decide which messages to keep and drop
97  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
98  * it supports ONLY hmbdc message level (AFTER topic filtering) arbitration if
99  * int operator()(TransportMessageHeader const* header) presents in the arb passed in.
100  * (NO packet level since it is tcp)
101  * @return a pointer to the Engine
102  */
103  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
105  , Buffer& buffer
106  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
107  Config dft(DefaultUserConfig, "rx");
108  Config cfg(cfgIn);
109  cfg.setDefaultUserConfig(dft);
110 
111  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
113  cfg, buffer, std::forward<MsgArbitrator>(arb));
114  recvTransports_.emplace_back(res);
115  return res;
116  }
117 
118 /**
119  * @brief same as above but to provide a unified interface - not preferred
120  * @details use forward_as_tuple to make the tuple passed in
121  * @return a pointer to the Engine
122  */
123  template <typename Buffer, typename ArgsTuple>
125  Config const& cfg
126  , Buffer& buffer
127  , ArgsTuple&& args) {
128  return createRecvTransportEngine(cfg
129  , buffer
130  , std::get<0>(args)
131  );
132  }
133 
134 /**
135  * @brief get (or create for the first time) a Sender - whose function is to send messages on
136  * its associated Topic
137  * @details this operation typically might be slow, so caching the return value is recommended.
138  *
139  * @param t - the Topic that the Sender is for
140  */
141  Sender* getSender(comm::Topic const& t) {
142  std::lock_guard<std::mutex> lock(sendersLock_);
143  auto sender = senders_.find(t);
144  if ( sender != senders_.end()) {
145  return sender->second.get();
146  } else {
147  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
148  for (auto i = 0u;
149  i < sendTransports_.size();
150  ++i) {
151  auto st = sendTransports_[i];
152  if (st->match(t)) {
153  auto newSender = new Sender(st, t);
154  senders_[t].reset(newSender);
155  return newSender;
156  }
157  }
158  }
159 
160  return nullptr;
161  }
162 
163 /**
164  * @brief This process is interested in a Topic
165  * @details Normally the receiving transport covering this topic
166  * needs to be created - not necessarily running - before calling this
167  *
168  * @param t Topic interested
169  */
170  void listenTo(comm::Topic const& t) {
171  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
172  for (auto ptr : recvTransports_) {
173  ptr->listenTo(t);
174  }
175  }
176 
177 /**
178  * @brief undo the subscription
179  *
180  * @param t Topic
181  */
182  void stopListenTo(comm::Topic const& t) {
183  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
184  for (auto ptr : recvTransports_) {
185  ptr->stopListenTo(t);
186  }
187  }
188 
189 private:
190  NetContext() {
191  }
192 
193  ~NetContext() {
194  }
195 
196 
197  std::vector<SendTransport::ptr> sendTransports_;
198  std::mutex sendTransportEnginesLock_;
199 
200  std::vector<RecvTransport::ptr> recvTransports_;
201  std::mutex recvTransportsLock_;
202 
203  std::map<comm::Topic, Sender::ptr> senders_;
204  std::mutex sendersLock_;
205 };
206 
207 } //tcpcast
208 
209 }}
210 
char const *const DefaultUserConfig
Definition: DefaultUserConfig.hpp:10
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:136
class to hold an hmbdc configuration
Definition: Config.hpp:43
fascade class for sending network messages
Definition: Sender.hpp:11
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize, size_t minRecvToStart=1u)
construct a send transport enngine (and remember it)
Definition: NetContext.hpp:62
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:35
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:104
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:170
Definition: SendTransportEngine.hpp:193
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple< size_t > args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:81
a singleton that holding tcpcast resources
Definition: NetContext.hpp:43
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:182
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:36
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:124
Sender * getSender(comm::Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:141
impl class
Definition: RecvTransportEngine.hpp:61
Definition: Base.hpp:12