hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Topic.hpp"
5 #include "hmbdc/time/Time.hpp"
6 #include "hmbdc/app/Message.hpp"
7 
8 #include <ostream>
9 #include <string>
10 
11 namespace hmbdc { namespace app { namespace tcpcast {
12 
14  uint8_t topicLen;
15  uint8_t flag; //1 - hasAttachment
16  uint16_t messagePayloadLen;
17 
18  std::pair<char const*, char const*> topic() const {
19  char const* b = reinterpret_cast<const char*>(this)
20  + sizeof(TransportMessageHeader);
21  return std::make_pair(b, b + topicLen);
22  }
23 
24  void const* payload() const {
25  return reinterpret_cast<const char*>(this)
26  + sizeof(TransportMessageHeader) + topicLen;
27  }
28 
29  void * payload() {
30  return reinterpret_cast<char*>(this)
31  + sizeof(TransportMessageHeader) + topicLen;
32  }
33 
34  uint16_t typeTag() const {
35  auto h = static_cast<app::MessageHead const*>(payload());
36  return h->typeTag;
37  }
38 
39  template <typename Message>
40  Message& wrapped() {
41  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
42  return wrap->payload;
43  }
44 
45  template <typename Message>
46  Message const& wrapped() const {
47  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
48  return wrap->payload;
49  }
50 
51  size_t wireSize() const {
52  return sizeof(TransportMessageHeader) + topicLen
53  + messagePayloadLen;
54  }
55 
56  size_t wireSizeContainsTopic() const {
57  return sizeof(TransportMessageHeader) + topicLen;
58  }
59 } __attribute__((packed));
60 
61 struct Subscribe
62 : hasTag<201> {
63  Subscribe(comm::Topic const&topic)
64  : topic(topic){}
65  comm::Topic topic;
66 };
67 
68 struct Unsubscribe
69 : hasTag<202> {
70  Unsubscribe(comm::Topic const&topic)
71  : topic(topic){}
72  comm::Topic topic;
73 };
74 
75 struct TopicSource
76 : hasTag<203> {
77  TopicSource(std::string const&topicRegexIn
78  , std::string const& ipIn
79  , uint16_t portIn
80  , bool loopbackIn)
81  : port(portIn)
82  , pid(getpid())
83  , loopback(loopbackIn)
84  , timestamp(time::SysTime::now()) {
85  strncpy(ip, ipIn.c_str(), sizeof(ip));
86  strncpy(topicRegex, topicRegexIn.c_str(), sizeof(topicRegex));
87  topicRegex[sizeof(topicRegex) - 1] = 0;
88  }
89  char ip[16];
90  char topicRegex[128]; //null terminated string
91  uint16_t port;
92  pid_t pid;
93  bool loopback;
94  time::SysTime timestamp; //when the source started
95 };
96 
97 /**
98  * @class SessionStarted
99  * @brief this message appears in the receiver's buffer indicating a new source is connected
100  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
101  */
103 : hasTag<204> {
104  char ip[16];
105  friend
106  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
107  return os << "Session to topic source started " << m.ip;
108  }
109 };
110 
111 /**
112  * @class SessionDropped
113  * @brief this message appears in the receiver's buffer indicating a previously connected source is dropped
114  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
115  */
117 : hasTag<205> {
118  char ip[16];
119  friend
120  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
121  return os << "Session to topic source dropped " << m.ip;
122  }
123 };
124 
125 }}}
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Messages.hpp:68
each message type has 16 bit tag
Definition: Message.hpp:30
Definition: Messages.hpp:61
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:116
Definition: Message.hpp:34
Definition: Time.hpp:13
Definition: Message.hpp:55
Definition: Messages.hpp:75
Definition: Base.hpp:12
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:102