hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Config.hpp"
4 #include "hmbdc/app/tcpcast/Messages.hpp"
5 #include "hmbdc/app/tcpcast/Sender.hpp"
6 #include "hmbdc/app/tcpcast/SendTransportEngine.hpp"
7 #include "hmbdc/app/tcpcast/RecvTransportEngine.hpp"
8 #include "hmbdc/app/tcpcast/DefaultUserConfig.hpp"
9 
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <boost/regex.hpp>
14 
15 #include <vector>
16 #include <mutex>
17 #include <memory>
18 
19 namespace hmbdc { namespace app { namespace tcpcast {
20 
21 /**
22  * @brief a singleton that holding tcpcast resources
23  * @details it manages transport engines
24  *
25  * see @example perf-tcpcast.cpp @example chat.cpp for usage.
26  *
27  */
28 struct NetContext
30 
31  friend class hmbdc::pattern::SingletonGuardian<NetContext>;
33 
34 /**
35  * @brief construct a send transport enngine (and remember it)
36  * @details After this, user is responsible to get it started within a hmbdc Context,
37  * if running in Context pool, it needs to be pinned on a single pool thread, CANNOT span
38  * more than one thread, otherwise the transport is not functioing/running.
39  * Don't create the same thing twice.
40  *
41  * @param cfgIn jason specifing the transport - see example, perf-tcpcast.cpp and DefaultUserConfig.hpp
42  * @param maxMessageSize max messafe size in bytes to be sent
43  * @param minRecvToStart start send when there are that many recipients (processes) online, otherwise
44  * hold the message in buffer
45  * @return a pointer to the Engine
46  */
48  Config const& cfgIn
49  , size_t maxMessageSize
50  , size_t minRecvToStart = 1u) {
51  Config dft(DefaultUserConfig, "tx");
52  Config cfg(cfgIn);
53  cfg.setDefaultUserConfig(dft);
54 
55  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
56  auto res = new SendTransportEngine(
57  cfg, maxMessageSize, minRecvToStart);
58  sendTransports_.emplace_back(res);
59  return res;
60  }
61 
62 /**
63  * @brief same as above but provide an unified interface - not preferred
64  * @return a pointer to the Engine - don't delete it
65  */
67  , size_t maxMessageSize
68  , tuple<size_t> args) {
69  return createSendTransportEngine(cfg, maxMessageSize, get<0>(args));
70  }
71 
72 /**
73  * @brief construct a send transport and remember it
74  * @details After this, user is respoonsible to get it started within a hmbdc Context,
75  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
76  * more than one thread
77  * otherwise the transport is not functioing/running. Don't create the same thing twice
78  *
79  * @param cfgIn jason specifing the transport - see example, perf-tcpcast.cpp and DefaultUserConfig.hpp
80  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
81  * @param arb optonally an arbitrator to decide which messages to keep and drop
82  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
83  * it supports ONLY hmbdc message level (AFTER topic filtering) arbitration if
84  * int operator()(TransportMessageHeader const* header) presents in the arb passed in.
85  * (NO packet level since it is tcp)
86  * @return a pointer to the Engine
87  */
88  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
90  , Buffer& buffer
91  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
92  Config dft(DefaultUserConfig, "rx");
93  Config cfg(cfgIn);
94  cfg.setDefaultUserConfig(dft);
95 
96  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
98  cfg, buffer, forward<MsgArbitrator>(arb));
99  recvTransports_.emplace_back(res);
100  return res;
101  }
102 
103 /**
104  * @brief same as above but to provide a unified interface - not preferred
105  * @details use forward_as_tuple to make the tuple passed in
106  * @return a pointer to the Engine
107  */
108  template <typename Buffer, typename ArgsTuple>
110  Config const& cfg
111  , Buffer& buffer
112  , ArgsTuple&& args) {
113  return createRecvTransportEngine(cfg
114  , buffer
115  , get<0>(args)
116  );
117  }
118 
119 /**
120  * @brief get (or create for the first time) a Sender - whose function is to send messages on
121  * its associated Topic
122  * @details this operation typically might be slow, so caching the return value is recommended.
123  *
124  * @param t - the Topic that the Sender is for
125  */
126  Sender* getSender(Topic const& t) {
127  std::lock_guard<std::mutex> lock(sendersLock_);
128  auto sender = senders_.find(t);
129  if ( sender != senders_.end()) {
130  return sender->second.get();
131  } else {
132  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
133  for (auto i = 0u;
134  i < sendTransports_.size();
135  ++i) {
136  auto st = sendTransports_[i];
137  if (st->match(t)) {
138  auto newSender = new Sender(st, t);
139  senders_[t].reset(newSender);
140  return newSender;
141  }
142  }
143  }
144 
145  return nullptr;
146  }
147 
148 /**
149  * @brief This process is interested in a Topic
150  * @details Normally the receiving transport covering this topic
151  * needs to be created - not necessarily running - before calling this
152  *
153  * @param t Topic interested
154  */
155  void listenTo(Topic const& t) {
156  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
157  for (auto ptr : recvTransports_) {
158  ptr->listenTo(t);
159  }
160  }
161 
162 /**
163  * @brief undo the subscription
164  *
165  * @param t Topic
166  */
167  void stopListenTo(Topic const& t) {
168  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
169  for (auto ptr : recvTransports_) {
170  ptr->stopListenTo(t);
171  }
172  }
173 
174 private:
175  NetContext() {
176  }
177 
178  ~NetContext() {
179  }
180 
181 
182  std::vector<SendTransport::ptr> sendTransports_;
183  std::mutex sendTransportEnginesLock_;
184 
185  std::vector<RecvTransport::ptr> recvTransports_;
186  std::mutex recvTransportsLock_;
187 
188  std::map<Topic, Sender::ptr> senders_;
189  std::mutex sendersLock_;
190 };
191 
192 } //tcpcast
193 
194 }}
195 
class to hold an hmbdc configuration
Definition: Config.hpp:35
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, tuple< size_t > args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:66
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:128
impl class
Definition: RecvTransportEngine.hpp:60
fascade class for sending network messages
Definition: Sender.hpp:11
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize, size_t minRecvToStart=1u)
construct a send transport enngine (and remember it)
Definition: NetContext.hpp:47
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:167
Definition: GuardedSingleton.hpp:19
Definition: GuardedSingleton.hpp:12
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:89
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:155
Definition: NetContext.hpp:28
Definition: SendTransportEngine.hpp:201
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:126
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:41
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:109
Definition: Client.hpp:11