hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/NetContextUtil.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/app/udpcast/Messages.hpp"
6 #include "hmbdc/app/udpcast/Sender.hpp"
7 #include "hmbdc/app/udpcast/SendTransportEngine.hpp"
8 #include "hmbdc/app/udpcast/RecvTransportEngine.hpp"
9 #include "hmbdc/app/udpcast/DefaultUserConfig.hpp"
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <boost/regex.hpp>
14 
15 #include <vector>
16 #include <mutex>
17 #include <memory>
18 
19 /**
20  * @namespace hmbdc::app::udpcast
21  * UDP multicast/unicast transport
22  */
23 namespace hmbdc { namespace app { namespace udpcast {
24 
25 /**
26 * @example server-cluster.cpp
27 * @example perf-udpcast.cpp
28 * @example ping-pong-udpcast.cpp
29 * @example udpcast-sniff.cpp
30 * @example rudpcast-cp.cpp
31 */
32 
33 /**
34  * @class NetContext
35  * @brief a singleton that holding udpcast resources
36  * @details it manages transport engines
37  */
38 struct NetContext
41 
42  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
44 
45 /**
46  * @brief construct a send transport engine (and remember it within the class)
47  * @details After this, user is responsible to get it started within a hmbdc Context
48  * Don't create the same thing twice
49  *
50  * @param cfgIn config specifing the transport - hmbdc/app/udpcast/DefaultUserConfig.hpp
51  * @param maxMessageSize max messafe size in bytes to be sent
52  * trasnport engines, see ctor of SendTransportEngine for details
53  * @return a pointer to the Engine - don't delete it
54  */
56  , size_t maxMessageSize) {
57  Config dft(DefaultUserConfig, "tx");
58  Config cfg(cfgIn);
59  cfg.setDefaultUserConfig(dft);
60 
61  std::lock_guard<std::mutex> tlock(sendTransportsLock_);
62  auto res = new SendTransportEngine(cfg, maxMessageSize);
63  sendTransports_.emplace_back(res);
64  return res;
65  }
66 
67 /**
68  * @brief same as above but provide a unified interface - not preferred
69  * @return a pointer to the Engine - don't delete it
70  */
72  , size_t maxMessageSize
73  , std::tuple<> args) {
74  return createSendTransportEngine(cfg, maxMessageSize);
75  }
76 
77 /**
78  * @brief construct a send transport and remember it
79  * @details After this, user is responsible to get it started within a hmbdc Context,
80  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
81  * more than one thread
82  * otherwise the transport is not functioing/running. Don't create the same thing twice
83  *
84  * @param cfgIn jason specifing the transport - see hmbdc/app/udpcast/DefaultUserConfig.hpp
85  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
86  * @param arb optonally an arbitrator to decide which messages to keep and drop
87  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
88  * it supports either udp packet (BEFORE topic filtering) or
89  * hmbdc message (AFTER topic filtering) level arbitration depending on which one of
90  * int operator()(void* bytes, size_t len) and
91  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
92  * trasnport engines, see ctor of RecvTransportEngineImpl for details
93  * see in example use in udpcast-sniff.cpp @snippet udpcast-sniff.cpp define packet arb
94  * see in example use in server-cluster.cpp @snippet server-cluster.cpp define msg arbitrator
95  * @return a pointer to the Engine
96  */
97  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
99  , Buffer& buffer
100  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
101  Config dft(DefaultUserConfig, "rx");
102  Config cfg(cfgIn);
103  cfg.setDefaultUserConfig(dft);
104 
105  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
106  auto res =
108  , std::forward<MsgArbitrator>(arb));
109  recvTransports_.emplace_back(res);
110  return res;
111  }
112 
113 /**
114  * @brief same as above but to provide a unified interface - not preferred
115  * @details use forward_as_tuple to make the tuple passed in
116  * @return a pointer to the Engine
117  */
118  template <typename Buffer, typename ArgsTuple>
120  Config const& cfg
121  , Buffer& buffer
122  , ArgsTuple&& args) {
123  return createRecvTransportEngine(cfg
124  , buffer
125  , std::get<0>(args)
126  );
127  }
128 
129 /**
130  * @brief get (or create for the first time) a Sender - whose function is to send messages on
131  * its associated Topic
132  * @details this operation typically might be slow, so caching the return value is recommended.
133  *
134  * @param t - the Topic that the Sender is for - default to be "_"
135  */
136  Sender* getSender(Topic const& t = Topic("_")) {
137  std::lock_guard<std::mutex> lock(sendersLock_);
138  auto sender = senders_.find(t);
139  if ( sender != senders_.end()) {
140  return sender->second.get();
141  } else {
142  std::lock_guard<std::mutex> slock(sendTransportsLock_);
143  for (auto i = 0u;
144  i < sendTransports_.size();
145  ++i) {
146  auto st = sendTransports_[i];
147  if (st->match(t)) {
148  auto newSender = new Sender(st, t);
149  senders_[t].reset(newSender);
150  return newSender;
151  }
152  }
153  }
154 
155  return nullptr;
156  }
157 
158 /**
159  * @brief This process is interested in a Topic
160  * @details Normally the receiving transport covering this topic
161  * needs to be created - not necessarily running - before calling this
162  *
163  * @param t Topic interested
164  */
165  void listenTo(Topic const& t) {
166  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
167  for (auto ptr : recvTransports_) {
168  ptr->listenTo(t);
169  }
170  }
171 
172 /**
173  * @brief undo the subscription
174  *
175  * @param t Topic
176  */
177  void stopListenTo(Topic const& t) {
178  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
179  for (auto ptr : recvTransports_) {
180  ptr->stopListenTo(t);
181  }
182  }
183 
184 private:
185 /**
186  * @brief this ctor is to create some commonly used basic engine setup to start with.
187  * This is private and only meant to be used through SingletonGuardian,
188  * see in example chat.cpp
189  * @snippet chat.cpp create NetContext with initial tx/rx capabilities
190  * @details It creates one send transport engine and/or one recv transport engine based on cfgIn.
191  * and run them using a single OS thread indicated by runningUsingThreadIndex
192  * If recv engine is created, this method also automaticallly makes the NetContext listen to
193  * a default topic (listenToTopic).
194  *
195  * @param ctx a Context that manages the send / recv transport engines, and hold the received messages
196  * @param cfgIn optional Config for the send AND recv transport engines
197  * @param sendSec the section name for send transport engine in above cfgIn, special values:
198  * nullptr - create send engine using no section config values
199  * "" - do not create send engine
200  * @param recvSec the section name for recv transport engine in above cfgIn
201  * nullptr - create recv engine using no section config values
202  * "" - do not create recv engine
203  * @param maxMessageSize max message size in bytes to be sent, if 0, uses ctx's maxMessageSize()
204  * @param runningUsingThreadIndex see details above
205  * @tparam CcContext ctx Context type
206  */
207  template <typename CcContext>
208  NetContext(CcContext& ctx
209  , Config::Base const* cfgIn = nullptr
210  , char const* sendSec = nullptr
211  , char const* recvSec = nullptr
212  , size_t maxMessageSize = 0
213  , uint8_t runningUsingThreadIndex = 0
214  , char const* listenToTopic = "_")
215  : runningCtx(0u, 2ul) {
216  checkEpollTaskInitialization();
217  createMinimumNetContext(
218  *this, runningCtx, ctx, cfgIn, sendSec, recvSec, maxMessageSize, runningUsingThreadIndex, listenToTopic);
219  }
220 
221 /**
222  * @brief this is for users that want finer control of engine creation - from a blank NetContext.
223  * @details it does not do anything that the previous ctor does
224  */
226  checkEpollTaskInitialization();
227  }
228 
229  ~NetContext() {
230  }
231 
232  std::vector<SendTransport::ptr> sendTransports_;
233  std::mutex sendTransportsLock_;
234 
235  std::vector<RecvTransport::ptr> recvTransports_;
236  std::mutex recvTransportsLock_;
237 
238  std::map<Topic, Sender::ptr> senders_;
239  std::mutex sendersLock_;
241 };
242 
243 }
244 }}
245 
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:140
class to hold an hmbdc configuration
Definition: Config.hpp:46
a singleton that holding udpcast resources
Definition: NetContext.hpp:38
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
NetContext()
this is for users that want finer control of engine creation - from a blank NetContext.
Definition: NetContext.hpp:225
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:34
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:165
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport engine (and remember it within the class)
Definition: NetContext.hpp:55
Definition: NetContextUtil.hpp:10
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:35
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:98
fascade class for sending network messages
Definition: Sender.hpp:14
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:177
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide a unified interface - not preferred
Definition: NetContext.hpp:71
NetContext(CcContext &ctx, Config::Base const *cfgIn=nullptr, char const *sendSec=nullptr, char const *recvSec=nullptr, size_t maxMessageSize=0, uint8_t runningUsingThreadIndex=0, char const *listenToTopic="_")
this ctor is to create some commonly used basic engine setup to start with. This is private and only ...
Definition: NetContext.hpp:208
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:461
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:119
Definition: Base.hpp:12
Sender * getSender(Topic const &t=Topic("_"))
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:136