hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Topic.hpp"
5 #include "hmbdc/time/Time.hpp"
6 #include "hmbdc/app/Message.hpp"
7 
8 #include <iostream>
9 #include <string>
10 
11 #include <unistd.h>
12 
13 namespace hmbdc { namespace app { namespace tcpcast {
14 
16  uint8_t topicLen;
17  uint8_t flag; //1 - hasAttachment
18  uint16_t messagePayloadLen;
19 
20  std::pair<char const*, char const*> topic() const {
21  char const* b = reinterpret_cast<const char*>(this)
22  + sizeof(TransportMessageHeader);
23  return std::make_pair(b, b + topicLen);
24  }
25 
26  void const* payload() const {
27  return reinterpret_cast<const char*>(this)
28  + sizeof(TransportMessageHeader) + topicLen;
29  }
30 
31  void * payload() {
32  return reinterpret_cast<char*>(this)
33  + sizeof(TransportMessageHeader) + topicLen;
34  }
35 
36  uint16_t typeTag() const {
37  auto h = static_cast<app::MessageHead const*>(payload());
38  return h->typeTag;
39  }
40 
41  template <typename Message>
42  Message& wrapped() {
43  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
44  return wrap->payload;
45  }
46 
47  template <typename Message>
48  Message const& wrapped() const {
49  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
50  return wrap->payload;
51  }
52 
53  size_t wireSize() const {
54  return sizeof(TransportMessageHeader) + topicLen
55  + messagePayloadLen;
56  }
57 
58  size_t wireSizeContainsTopic() const {
59  return sizeof(TransportMessageHeader) + topicLen;
60  }
61 } __attribute__((packed));
62 
63 struct Subscribe
64 : hasTag<201> {
65  Subscribe(comm::Topic const&topic)
66  : topic(topic){}
67  comm::Topic topic;
68 };
69 
70 struct Unsubscribe
71 : hasTag<202> {
72  Unsubscribe(comm::Topic const&topic)
73  : topic(topic){}
74  comm::Topic topic;
75 };
76 
77 struct TopicSource
78 : hasTag<203> {
79  TopicSource()
80  : pid(0)
81  , loopback(false)
82  , timestamp(time::SysTime::now())
83  {}
84 
85  TopicSource(std::string const&topicRegexIn
86  , std::string const& ipIn
87  , uint16_t portIn
88  , bool loopbackIn)
89  : port(portIn)
90  , pid(getpid())
91  , loopback(loopbackIn)
92  , timestamp(time::SysTime::now()) {
93  strncpy(ip, ipIn.c_str(), sizeof(ip));
94  strncpy(topicRegex, topicRegexIn.c_str(), sizeof(topicRegex));
95  topicRegex[sizeof(topicRegex) - 1] = 0;
96  }
97  char ip[16];
98  char topicRegex[128]; //null terminated string
99  uint16_t port;
100  pid_t pid;
101  bool loopback;
102  time::SysTime timestamp; //when the source started
103 
104  friend
105  std::ostream& operator << (std::ostream& os, TopicSource const& m) {
106  return os << m.ip << ' '
107  << m.port << ' '
108  << m.topicRegex;
109  }
110 
111  friend
112  std::istream& operator >> (std::istream& is, TopicSource& m) {
113  is
114  >> m.ip
115  >> m.port
116  >> m.topicRegex
117  ;
118  return is;
119  }
120 };
121 
122 /**
123  * @class SessionStarted
124  * @brief this message appears in the receiver's buffer indicating a new source is connected
125  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
126  */
128 : hasTag<204> {
129  char ip[16];
130  friend
131  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
132  return os << "Session to topic source started " << m.ip;
133  }
134 };
135 
136 /**
137  * @class SessionDropped
138  * @brief this message appears in the receiver's buffer indicating a previously connected source is dropped
139  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
140  */
142 : hasTag<205> {
143  char ip[16];
144  friend
145  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
146  return os << "Session to topic source dropped " << m.ip;
147  }
148 };
149 
150 }}}
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Messages.hpp:70
each message type has 16 bit tag
Definition: Message.hpp:30
Definition: Messages.hpp:63
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:141
Definition: Message.hpp:34
Definition: Time.hpp:13
Definition: Message.hpp:55
Definition: Messages.hpp:77
Definition: Base.hpp:12
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:127