hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Config.hpp"
4 #include "hmbdc/app/netmap/Messages.hpp"
5 #include "hmbdc/app/netmap/Sender.hpp"
6 #include "hmbdc/app/netmap/SendTransportEngine.hpp"
7 #include "hmbdc/app/netmap/RecvTransportEngine.hpp"
8 #include "hmbdc/app/netmap/DefaultUserConfig.hpp"
9 
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <boost/regex.hpp>
14 
15 #include <vector>
16 #include <mutex>
17 #include <memory>
18 
19 namespace hmbdc { namespace app { namespace netmap {
20 
21 /**
22  * @brief a singleton that holding netmap resources
23  * @details it manage transport engines
24  * see @example client-server-netmap.cpp @example perf-netmap.cpp for usage example
25  */
26 struct NetContext
28 
29  friend class hmbdc::pattern::SingletonGuardian<NetContext>;
31 /**
32  * @brief construct a send transport and remember it
33  * @details After this, user is responsible to get it started within a hmbdc Context,
34  * otherwise the transport is not functioing/running. Don't create the same thing twice.
35  * it might take several seconds (configured by nmResetWaitSec=2) to construct the engine
36  * to avoid message losses at the beginning - netmap seems to require that
37  *
38  * @param cfgIn jason specifing the transport - see example, perf-netmap.cpp and DefaultUserConfig.hpp
39  * @param maxMessageSize max messafe size in bytes to be sent
40  * @return a pointer to the Engine
41  */
42  SendTransportEngine* createSendTransportEngine(Config const& cfgIn, size_t maxMessageSize) {
43  Config dft(DefaultUserConfig, "tx");
44  Config cfg(cfgIn);
45  cfg.setDefaultUserConfig(dft);
46 
47  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
48  auto res = SendTransportEngine::ptr(new SendTransportEngine(cfg, maxMessageSize));
49  sendTransportEngines_.push_back(res);
50  return res.get();
51  }
52 
53 /**
54  * @brief same as above but provide an unified interface - not preferred
55  * @return a pointer to the Engine - don't delete it
56  */
58  , size_t maxMessageSize
59  , tuple<> args) {
60  return createSendTransportEngine(cfg, maxMessageSize);
61  }
62 
63 /**
64  * @brief construct a send transport and remember it
65  * @details After this, user is respoonsible to get it started within a hmbdc Context,
66  * otherwise the transport is not functioing/running. Don't create the same thing twice
67  *
68  * @param cfgIn jason specifing the transport - see example, perf-netmap.cpp and DefaultUserConfig.hpp
69  * @param buffer buffer that recv messages go in, normally the one returned by
70  * app::Context::buffer()
71  * @param arb optonally an arbitrator to decide which messages to keep and drop
72  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
73  * it also supports either netmap packet level (BEFORE topic filtering) or
74  * hmbdc message level (AFTER topic filtering) arbitration depending on which one of
75  * int operator()(void* bytes, size_t len) and
76  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
77 
78  * @return a pointer to the Engine
79  */
80  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
82  , Buffer& buffer
83  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
84  Config dft(DefaultUserConfig, "rx");
85  Config cfg(cfgIn);
86  cfg.setDefaultUserConfig(dft);
87 
88  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
89  auto res =
91  , forward<MsgArbitrator>(arb));
92  recvTransportEngines_.emplace_back(res);
93  return res;
94  }
95 
96 /**
97  * @brief same as above but to provide a unified interface - not preferred
98  * @return a pointer to the Engine
99  */
100  template <typename Buffer, typename ArgsTuple>
102  Config const& cfg
103  , Buffer& buffer
104  , ArgsTuple&& args) {
105  return createRecvTransportEngine(cfg
106  , buffer
107  , get<0>(args)
108  );
109  }
110 
111 /**
112  * @brief get (or create for the first time) a Sender - whose function is to send messages on
113  * its associated Topic
114  * @details this operation typically might be slow, so caching the return value is recommended.
115  *
116  * @param t - the Topic that the Sender is for
117  */
118  Sender* getSender(Topic const& t) {
119  std::lock_guard<std::mutex> lock(sendersLock_);
120  auto sender = senders_.find(t);
121  if ( sender != senders_.end()) {
122  return sender->second.get();
123  } else {
124  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
125  for (auto i = 0u;
126  i < sendTransportEngines_.size();
127  ++i) {
128  auto st = sendTransportEngines_[i];
129  if (st->match(t)) {
130  auto newSender = new Sender(st, t);
131  senders_[t].reset(newSender);
132  return newSender;
133  }
134  }
135  }
136 
137  return nullptr;
138  }
139 
140 /**
141  * @brief This process is interested in a Topic
142  * @details Normally the receiving transport covering this topic
143  * needs to be created - not necessarily running - before calling this
144  *
145  * @param t Topic interested
146  */
147  void listenTo(Topic const& t) {
148  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
149  for (auto ptr : recvTransportEngines_) {
150  ptr->listenTo(t);
151  }
152  }
153 
154 /**
155  * @brief undo the subscription
156  *
157  * @param t Topic
158  */
159  void stopListenTo(Topic const& t) {
160  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
161  for (auto ptr : recvTransportEngines_) {
162  ptr->stopListenTo(t);
163  }
164  }
165 
166 private:
167  NetContext() {
168  }
169 
170  ~NetContext() {
171  }
172 
173 
174  std::vector<SendTransportEngine::ptr> sendTransportEngines_;
175  std::mutex sendTransportEnginesLock_;
176 
177  std::vector<RecvTransport::ptr> recvTransportEngines_;
178  std::mutex recvTransportEnginesLock_;
179 
180  std::map<Topic, Sender::ptr> senders_;
181  uint32_t sendTransportRotationIndex_;
182  std::mutex sendersLock_;
183 };
184 
185 }
186 
187 }}
188 
class to hold an hmbdc configuration
Definition: Config.hpp:35
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:128
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:159
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:10
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:147
Definition: GuardedSingleton.hpp:19
Definition: GuardedSingleton.hpp:12
power a netmap port sending functions
Definition: SendTransportEngine.hpp:45
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:101
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport and remember it
Definition: NetContext.hpp:42
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:118
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:81
Definition: NetContext.hpp:26
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:56
impl class,
Definition: RecvTransportEngine.hpp:76
Definition: Client.hpp:11
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, tuple<> args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:57