hmbdc
simplify-high-performance-messaging-programming
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Config.hpp"
4 #include "hmbdc/app/mcast/Messages.hpp"
5 #include "hmbdc/app/mcast/Sender.hpp"
6 #include "hmbdc/app/mcast/SendTransportEngine.hpp"
7 #include "hmbdc/app/mcast/RecvTransportEngine.hpp"
8 #include "hmbdc/app/mcast/DefaultUserConfig.hpp"
9 #include "hmbdc/comm/Topic.hpp"
10 #include "hmbdc/pattern/GuardedSingleton.hpp"
11 
12 #include <boost/regex.hpp>
13 
14 #include <vector>
15 #include <mutex>
16 #include <memory>
17 
18 //commands used to build and install netmap for intel NIC
19 //./configure --kernel-dir=/usr/src/kernels/3.10.0-327.el7.x86_64/ --kernel-sources=/home/urname/rpmbuild/SOURCES/linux-3.10.0-327.36.3.el7 --drivers=e1000e
20 //make
21 //sudo make install
22 
23 
24 namespace hmbdc { namespace app { namespace mcast {
25 
26 /**
27  * @brief a singleton that holding mcast resources
28  * @details it manages transport engines
29  *
30  * see @example perf-mcast.cpp and @example server-cluster.cpp for usage.
31  *
32  */
33 struct NetContext
35 
36  friend class hmbdc::pattern::SingletonGuardian<NetContext>;
38 
39 /**
40  * @brief construct a send transport engine (and remember it within the class)
41  * @details After this, user is responsible to get it started within a hmbdc Context,
42  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
43  * more than one thread
44  * otherwise the transport is not functioing/running. Don't create the same thing twice
45  *
46  * @param cfgIn config specifing the transport - see example in perf-mcast.cpp and DefaultUserConfig.hpp
47  * @param maxMessageSize max messafe size in bytes to be sent
48  * trasnport engines, see ctor of SendTransportEngine for details
49  * @return a pointer to the Engine - don't delete it
50  */
52  , size_t maxMessageSize) {
53  Config dft(DefaultUserConfig, "tx");
54  Config cfg(cfgIn);
55  cfg.setDefaultUserConfig(dft);
56 
57  std::lock_guard<std::mutex> tlock(sendTransportsLock_);
58  auto res = new SendTransportEngine(cfg, maxMessageSize);
59  sendTransports_.emplace_back(res);
60  return res;
61  }
62 
63 /**
64  * @brief same as above but provide a unified interface - not preferred
65  * @return a pointer to the Engine - don't delete it
66  */
68  , size_t maxMessageSize
69  , tuple<> args) {
70  return createSendTransportEngine(cfg, maxMessageSize);
71  }
72 
73 /**
74  * @brief construct a send transport and remember it
75  * @details After this, user is responsible to get it started within a hmbdc Context,
76  * if running in Context pool, it needs to pin at a single pool thread, CANNOT span
77  * more than one thread
78  * otherwise the transport is not functioing/running. Don't create the same thing twice
79  *
80  * @param cfgIn jason specifing the transport - see example, perf-mcast.cpp and DefaultUserConfig.hpp
81  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
82  * @param arb optonally an arbitrator to decide which messages to keep and drop
83  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
84  * it supports either udp packet (BEFORE topic filtering) or
85  * hmbdc message (AFTER topic filtering) level arbitration depending on which one of
86  * int operator()(void* bytes, size_t len) and
87  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
88  * trasnport engines, see ctor of RecvTransportEngineImpl for details
89  * @return a pointer to the Engine
90  */
91  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
93  , Buffer& buffer
94  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
95  Config dft(DefaultUserConfig, "rx");
96  Config cfg(cfgIn);
97  cfg.setDefaultUserConfig(dft);
98 
99  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
100  auto res =
102  , forward<MsgArbitrator>(arb));
103  recvTransports_.emplace_back(res);
104  return res;
105  }
106 
107 /**
108  * @brief same as above but to provide a unified interface - not preferred
109  * @details use forward_as_tuple to make the tuple passed in
110  * @return a pointer to the Engine
111  */
112  template <typename Buffer, typename ArgsTuple>
114  Config const& cfg
115  , Buffer& buffer
116  , ArgsTuple&& args) {
117  return createRecvTransportEngine(cfg
118  , buffer
119  , get<0>(args)
120  , get<1>(args)
121  );
122  }
123 
124 /**
125  * @brief get (or create for the first time) a Sender - whose function is to send messages on
126  * its associated Topic
127  * @details this operation typically might be slow, so caching the return value is recommended.
128  *
129  * @param t - the Topic that the Sender is for
130  */
131  Sender* getSender(Topic const& t) {
132  std::lock_guard<std::mutex> lock(sendersLock_);
133  auto sender = senders_.find(t);
134  if ( sender != senders_.end()) {
135  return sender->second.get();
136  } else {
137  std::lock_guard<std::mutex> slock(sendTransportsLock_);
138  for (auto i = 0u;
139  i < sendTransports_.size();
140  ++i) {
141  auto st = sendTransports_[i];
142  if (st->match(t)) {
143  auto newSender = new Sender(st, t);
144  senders_[t].reset(newSender);
145  return newSender;
146  }
147  }
148  }
149 
150  return nullptr;
151  }
152 
153 /**
154  * @brief This process is interested in a Topic
155  * @details Normally the receiving transport covering this topic
156  * needs to be created - not necessarily running - before calling this
157  *
158  * @param t Topic interested
159  */
160  void listenTo(Topic const& t) {
161  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
162  for (auto ptr : recvTransports_) {
163  ptr->listenTo(t);
164  }
165  }
166 
167 /**
168  * @brief undo the subscription
169  *
170  * @param t Topic
171  */
172  void stopListenTo(Topic const& t) {
173  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
174  for (auto ptr : recvTransports_) {
175  ptr->stopListenTo(t);
176  }
177  }
178 
179 private:
180  NetContext() {
181  }
182 
183  ~NetContext() {
184  }
185 
186 
187  std::vector<SendTransport::ptr> sendTransports_;
188  std::mutex sendTransportsLock_;
189 
190  std::vector<RecvTransport::ptr> recvTransports_;
191  std::mutex recvTransportsLock_;
192 
193  std::map<Topic, Sender::ptr> senders_;
194  std::mutex sendersLock_;
195 };
196 
197 }
198 }}
199 
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a send transport and remember it
Definition: NetContext.hpp:92
class to hold an hmbdc configuration
Definition: Config.hpp:35
void setDefaultUserConfig(Config const &c)
internal use
Definition: Config.hpp:128
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: SendTransportEngine.hpp:259
Definition: GuardedSingleton.hpp:19
Definition: GuardedSingleton.hpp:12
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
Definition: NetContext.hpp:33
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, tuple<> args)
same as above but provide a unified interface - not preferred
Definition: NetContext.hpp:67
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport engine (and remember it within the class)
Definition: NetContext.hpp:51
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:113
impl class
Definition: RecvTransportEngine.hpp:305
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:172
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:160
Definition: Client.hpp:11
Definition: Sender.hpp:15
Sender * getSender(Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:131