hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/NetContextUtil.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/app/netmap/Messages.hpp"
6 #include "hmbdc/app/netmap/Sender.hpp"
7 #include "hmbdc/app/netmap/SendTransportEngine.hpp"
8 #include "hmbdc/app/netmap/RecvTransportEngine.hpp"
9 #include "hmbdc/app/netmap/DefaultUserConfig.hpp"
10 
11 #include "hmbdc/comm/Topic.hpp"
12 #include "hmbdc/pattern/GuardedSingleton.hpp"
13 
14 #include <boost/regex.hpp>
15 
16 #include <vector>
17 #include <mutex>
18 #include <memory>
19 
20 
21 /**
22  * @namespace hmbdc::app::netmap
23  * netmap multicast transport
24  */
25 namespace hmbdc { namespace app { namespace netmap {
26 
27 /**
28 * @example client-server-netmap.cpp
29 * @example perf-netmap.cpp
30 */
31 
32 /**
33  * @class NetContext
34  * @brief a singleton that holding netmap resources
35  * @details it manage transport engines
36  * see perf-netmap.cpp for usage
37  */
38 struct NetContext
41 
42  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
44 /**
45  * @brief construct a send transport and remember it
46  * @details After this, user is responsible to get it started within a hmbdc Context,
47  * otherwise the transport is not functioing/running. Don't create the same thing twice.
48  * it might take several seconds (configured by nmResetWaitSec=2) to construct the engine
49  * to avoid message losses at the beginning - netmap seems to require that
50  *
51  * @param cfgIn jason specifing the transport - see hmbdc/app/netmap/DefaultUserConfig.hpp
52  * @param maxMessageSize max messafe size in bytes to be sent
53  * @return a pointer to the Engine
54  */
55  SendTransportEngine* createSendTransportEngine(Config const& cfgIn, size_t maxMessageSize) {
56  Config dft(DefaultUserConfig, "tx");
57  Config cfg(cfgIn);
58  cfg.setDefaultUserConfig(dft);
59 
60  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
61  auto res = SendTransportEngine::ptr(new SendTransportEngine(cfg, maxMessageSize));
62  sendTransportEngines_.push_back(res);
63  return res.get();
64  }
65 
66 /**
67  * @brief same as above but provide an unified interface - not preferred
68  * @return a pointer to the Engine - don't delete it
69  */
71  , size_t maxMessageSize
72  , std::tuple<> args) {
73  return createSendTransportEngine(cfg, maxMessageSize);
74  }
75 
76 /**
77  * @brief construct a send transport and remember it
78  * @details After this, user is respoonsible to get it started within a hmbdc Context,
79  * otherwise the transport is not functioing/running. Don't create the same thing twice
80  *
81  * @param cfgIn jason specifing the transport - see perf-netmap.cpp and hmbdc/app/netmap/DefaultUserConfig.hpp
82  * @param buffer buffer that recv messages go in, normally the one returned by
83  * app::Context::buffer()
84  * @param arb optonally an arbitrator to decide which messages to keep and drop
85  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
86  * it also supports either netmap packet level (BEFORE topic filtering) or
87  * hmbdc message level (AFTER topic filtering) arbitration depending on which one of
88  * int operator()(void* bytes, size_t len) and
89  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
90 
91  * @return a pointer to the Engine
92  */
93  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
95  , Buffer& buffer
96  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
97  Config dft(DefaultUserConfig, "rx");
98  Config cfg(cfgIn);
99  cfg.setDefaultUserConfig(dft);
100 
101  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
102  auto res =
104  , std::forward<MsgArbitrator>(arb));
105  recvTransportEngines_.emplace_back(res);
106  return res;
107  }
108 
109 /**
110  * @brief same as above but to provide a unified interface - not preferred
111  * @return a pointer to the Engine
112  */
113  template <typename Buffer, typename ArgsTuple>
115  Config const& cfg
116  , Buffer& buffer
117  , ArgsTuple&& args) {
118  return createRecvTransportEngine(cfg
119  , buffer
120  , std::get<0>(args)
121  );
122  }
123 
124 /**
125  * @brief get (or create for the first time) a Sender - whose function is to send messages on
126  * its associated Topic
127  * @details this operation typically might be slow, so caching the return value is recommended.
128  *
129  * @param t - the Topic that the Sender is for
130  */
131  Sender* getSender(comm::Topic const& t) {
132  std::lock_guard<std::mutex> lock(sendersLock_);
133  auto sender = senders_.find(t);
134  if ( sender != senders_.end()) {
135  return sender->second.get();
136  } else {
137  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
138  for (auto i = 0u;
139  i < sendTransportEngines_.size();
140  ++i) {
141  auto st = sendTransportEngines_[i];
142  if (st->match(t)) {
143  auto newSender = new Sender(st, t);
144  senders_[t].reset(newSender);
145  return newSender;
146  }
147  }
148  }
149 
150  return nullptr;
151  }
152 
153 /**
154  * @brief This process is interested in a Topic
155  * @details Normally the receiving transport covering this topic
156  * needs to be created - not necessarily running - before calling this
157  *
158  * @param t Topic interested
159  */
160  void listenTo(comm::Topic const& t) {
161  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
162  for (auto ptr : recvTransportEngines_) {
163  ptr->listenTo(t);
164  }
165  }
166 
167 /**
168  * @brief undo the subscription
169  *
170  * @param t Topic
171  */
172  void stopListenTo(comm::Topic const& t) {
173  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
174  for (auto ptr : recvTransportEngines_) {
175  ptr->stopListenTo(t);
176  }
177  }
178 
179 private:
180 /**
181  * @brief this ctor is to create some commonly used basic engine setup to start with.
182  * This is private and only meant to be used through SingletonGuardian,
183  * see in example chat.cpp
184  * @snippet chat.cpp create NetContext with initial tx/rx capabilities
185  * @details It creates one send transport engine and/or one recv transport engine based on cfgIn.
186  * and run them using a single OS thread indicated by runningUsingThreadIndex
187  * If recv engine is created, this method also automaticallly makes the NetContext listen to
188  * a default topic (listenToTopic).
189  *
190  * @param ctx a Context that manages the send / recv transport engines, and hold the received messages
191  * @param cfgIn optional Config for the send AND recv transport engines
192  * @param sendSec the section name for send transport engine in above cfgIn, special values:
193  * nullptr - create send engine using no section config values
194  * "" - do not create send engine
195  * @param recvSec the section name for recv transport engine in above cfgIn
196  * nullptr - create recv engine using no section config values
197  * "" - do not create recv engine
198  * @param maxMessageSize max message size in bytes to be sent, if 0, uses ctx's maxMessageSize()
199  * @param runningUsingThreadIndex see details above
200  * @tparam CcContext ctx Context type
201  */
202  template <typename CcContext>
203  NetContext(CcContext& ctx
204  , Config::Base const* cfgIn = nullptr
205  , char const* sendSec = nullptr
206  , char const* recvSec = nullptr
207  , size_t maxMessageSize = 0
208  , uint8_t runningUsingThreadIndex = 0
209  , char const* listenToTopic = "_")
210  : runningCtx(0u, 2ul) {
211  createMinimumNetContext(
212  *this, runningCtx, ctx, cfgIn, sendSec, recvSec, maxMessageSize, runningUsingThreadIndex, listenToTopic);
213  }
214 
215 /**
216  * @brief this is for users that want finer control of engine creation - from a blank NetContext.
217  * @details it does not do anything that the previous ctor does
218  */
220  }
221 
222  ~NetContext() {
223  }
224 
225 
226  std::vector<SendTransportEngine::ptr> sendTransportEngines_;
227  std::mutex sendTransportEnginesLock_;
228 
229  std::vector<RecvTransport::ptr> recvTransportEngines_;
230  std::mutex recvTransportEnginesLock_;
231 
232  std::map<comm::Topic, Sender::ptr> senders_;
233  uint32_t sendTransportRotationIndex_;
234  std::mutex sendersLock_;
235  Context<> runningCtx;
236 };
237 
238 }
239 
240 }}
241 
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:140
class to hold an hmbdc configuration
Definition: Config.hpp:46
NetContext()
this is for users that want finer control of engine creation - from a blank NetContext.
Definition: NetContext.hpp:219
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:10
Definition: NetContextUtil.hpp:10
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:35
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:160
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:70
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:114
power a netmap port sending functions
Definition: SendTransportEngine.hpp:43
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport and remember it
Definition: NetContext.hpp:55
impl class,
Definition: RecvTransportEngine.hpp:70
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:461
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:94
char const *const DefaultUserConfig
Definition: DefaultUserConfig.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:38
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:172
Sender * getSender(comm::Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:131
Definition: Base.hpp:12
NetContext(CcContext &ctx, Config::Base const *cfgIn=nullptr, char const *sendSec=nullptr, char const *recvSec=nullptr, size_t maxMessageSize=0, uint8_t runningUsingThreadIndex=0, char const *listenToTopic="_")
this ctor is to create some commonly used basic engine setup to start with. This is private and only ...
Definition: NetContext.hpp:203