hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/NetContextUtil.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/app/mcast/Messages.hpp"
6 #include "hmbdc/app/mcast/Sender.hpp"
7 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
8 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
9 #include "hmbdc/app/mcast/DefaultUserConfig.hpp"
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <boost/regex.hpp>
14 
15 #include <vector>
16 #include <mutex>
17 #include <memory>
18 
19 /**
20  * @namespace hmbdc::app::mcast
21  * UDP multicast transport
22  */
23 namespace hmbdc { namespace app { namespace mcast {
24 
25 /**
26 * @example server-cluster.cpp
27 * @example perf-mcast.cpp
28 * @example ping-pong-mcast.cpp
29 * @example mcast-sniff.cpp
30 * @example rmcast-cp.cpp
31 */
32 
33 /**
34  * @class NetContext
35  * @brief a singleton that holding mcast resources
36  * @details it manages transport engines
37  */
38 struct NetContext
41 
42  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
44 
45 /**
46  * @brief construct a send transport engine (and remember it within the class)
47  * @details After this, user is responsible to get it started within a hmbdc Context,
48  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
49  * more than one thread
50  * otherwise the transport is not functioing/running. Don't create the same thing twice
51  *
52  * @param cfgIn config specifing the transport - hmbdc/app/mcast/DefaultUserConfig.hpp
53  * @param maxMessageSize max messafe size in bytes to be sent
54  * trasnport engines, see ctor of SendTransportEngine for details
55  * @return a pointer to the Engine - don't delete it
56  */
58  , size_t maxMessageSize) {
59  Config dft(DefaultUserConfig, "tx");
60  Config cfg(cfgIn);
61  cfg.setDefaultUserConfig(dft);
62 
63  std::lock_guard<std::mutex> tlock(sendTransportsLock_);
64  auto res = new SendTransportEngine(cfg, maxMessageSize);
65  sendTransports_.emplace_back(res);
66  return res;
67  }
68 
69 /**
70  * @brief same as above but provide a unified interface - not preferred
71  * @return a pointer to the Engine - don't delete it
72  */
74  , size_t maxMessageSize
75  , std::tuple<> args) {
76  return createSendTransportEngine(cfg, maxMessageSize);
77  }
78 
79 /**
80  * @brief construct a send transport and remember it
81  * @details After this, user is responsible to get it started within a hmbdc Context,
82  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
83  * more than one thread
84  * otherwise the transport is not functioing/running. Don't create the same thing twice
85  *
86  * @param cfgIn jason specifing the transport - see hmbdc/app/mcast/DefaultUserConfig.hpp
87  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
88  * @param arb optonally an arbitrator to decide which messages to keep and drop
89  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
90  * it supports either udp packet (BEFORE topic filtering) or
91  * hmbdc message (AFTER topic filtering) level arbitration depending on which one of
92  * int operator()(void* bytes, size_t len) and
93  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
94  * trasnport engines, see ctor of RecvTransportEngineImpl for details
95  * see in example use in mcast-sniff.cpp @snippet mcast-sniff.cpp define packet arb
96  * see in example use in server-cluster.cpp @snippet server-cluster.cpp define msg arbitrator
97  * @return a pointer to the Engine
98  */
99  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
101  , Buffer& buffer
102  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
103  Config dft(DefaultUserConfig, "rx");
104  Config cfg(cfgIn);
105  cfg.setDefaultUserConfig(dft);
106 
107  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
108  auto res =
110  , std::forward<MsgArbitrator>(arb));
111  recvTransports_.emplace_back(res);
112  return res;
113  }
114 
115 /**
116  * @brief same as above but to provide a unified interface - not preferred
117  * @details use forward_as_tuple to make the tuple passed in
118  * @return a pointer to the Engine
119  */
120  template <typename Buffer, typename ArgsTuple>
122  Config const& cfg
123  , Buffer& buffer
124  , ArgsTuple&& args) {
125  return createRecvTransportEngine(cfg
126  , buffer
127  , std::get<0>(args)
128  );
129  }
130 
131 /**
132  * @brief get (or create for the first time) a Sender - whose function is to send messages on
133  * its associated Topic
134  * @details this operation typically might be slow, so caching the return value is recommended.
135  *
136  * @param t - the Topic that the Sender is for - default to be "_"
137  */
138  Sender* getSender(Topic const& t = Topic("_")) {
139  std::lock_guard<std::mutex> lock(sendersLock_);
140  auto sender = senders_.find(t);
141  if ( sender != senders_.end()) {
142  return sender->second.get();
143  } else {
144  std::lock_guard<std::mutex> slock(sendTransportsLock_);
145  for (auto i = 0u;
146  i < sendTransports_.size();
147  ++i) {
148  auto st = sendTransports_[i];
149  if (st->match(t)) {
150  auto newSender = new Sender(st, t);
151  senders_[t].reset(newSender);
152  return newSender;
153  }
154  }
155  }
156 
157  return nullptr;
158  }
159 
160 /**
161  * @brief This process is interested in a Topic
162  * @details Normally the receiving transport covering this topic
163  * needs to be created - not necessarily running - before calling this
164  *
165  * @param t Topic interested
166  */
167  void listenTo(Topic const& t) {
168  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
169  for (auto ptr : recvTransports_) {
170  ptr->listenTo(t);
171  }
172  }
173 
174 /**
175  * @brief undo the subscription
176  *
177  * @param t Topic
178  */
179  void stopListenTo(Topic const& t) {
180  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
181  for (auto ptr : recvTransports_) {
182  ptr->stopListenTo(t);
183  }
184  }
185 
186 private:
187 /**
188  * @brief this ctor is to create some commonly used basic engine setup to start with.
189  * This is private and only meant to be used through SingletonGuardian,
190  * see in example chat.cpp
191  * @snippet chat.cpp create NetContext with initial tx/rx capabilities
192  * @details It creates one send transport engine and/or one recv transport engine based on cfgIn.
193  * and run them using a single OS thread indicated by runningUsingThreadIndex
194  * If recv engine is created, this method also automaticallly makes the NetContext listen to
195  * a default topic (listenToTopic).
196  *
197  * @param ctx a Context that manages the send / recv transport engines, and hold the received messages
198  * @param cfgIn optional Config for the send AND recv transport engines
199  * @param sendSec the section name for send transport engine in above cfgIn, special values:
200  * nullptr - create send engine using no section config values
201  * "" - do not create send engine
202  * @param recvSec the section name for recv transport engine in above cfgIn
203  * nullptr - create recv engine using no section config values
204  * "" - do not create recv engine
205  * @param maxMessageSize max message size in bytes to be sent, if 0, uses ctx's maxMessageSize()
206  * @param runningUsingThreadIndex see details above
207  * @tparam CcContext ctx Context type
208  */
209  template <typename CcContext>
210  NetContext(CcContext& ctx
211  , Config::Base const* cfgIn = nullptr
212  , char const* sendSec = nullptr
213  , char const* recvSec = nullptr
214  , size_t maxMessageSize = 0
215  , uint8_t runningUsingThreadIndex = 0
216  , char const* listenToTopic = "_")
217  : runningCtx(0u, 2ul) {
218  createMinimumNetContext(
219  *this, runningCtx, ctx, cfgIn, sendSec, recvSec, maxMessageSize, runningUsingThreadIndex, listenToTopic);
220  }
221 
222 /**
223  * @brief this is for users that want finer control of engine creation - from a blank NetContext.
224  * @details it does not do anything that the previous ctor does
225  */
227  }
228 
229  ~NetContext() {
230  }
231 
232 
233  std::vector<SendTransport::ptr> sendTransports_;
234  std::mutex sendTransportsLock_;
235 
236  std::vector<RecvTransport::ptr> recvTransports_;
237  std::mutex recvTransportsLock_;
238 
239  std::map<Topic, Sender::ptr> senders_;
240  std::mutex sendersLock_;
241  Context<> runningCtx;
242 };
243 
244 }
245 }}
246 
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:100
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:139
class to hold an hmbdc configuration
Definition: Config.hpp:44
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: NetContextUtil.hpp:7
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:35
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
a singleton that holding mcast resources
Definition: NetContext.hpp:38
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport engine (and remember it within the class)
Definition: NetContext.hpp:57
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:121
Sender * getSender(Topic const &t=Topic("_"))
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:138
NetContext(CcContext &ctx, Config::Base const *cfgIn=nullptr, char const *sendSec=nullptr, char const *recvSec=nullptr, size_t maxMessageSize=0, uint8_t runningUsingThreadIndex=0, char const *listenToTopic="_")
this ctor is to create some commonly used basic engine setup to start with. This is private and only ...
Definition: NetContext.hpp:210
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:179
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:408
NetContext()
this is for users that want finer control of engine creation - from a blank NetContext.
Definition: NetContext.hpp:226
char const *const DefaultUserConfig
Definition: DefaultUserConfig.hpp:10
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:167
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide a unified interface - not preferred
Definition: NetContext.hpp:73
impl class
Definition: RecvTransportEngine.hpp:306
Definition: Base.hpp:12
fascade class for sending network messages
Definition: Sender.hpp:14