hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Topic.hpp"
5 #include "hmbdc/comm/inet/Endpoint.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/app/Message.hpp"
8 #include "hmbdc/Endian.hpp"
9 
10 #include <iostream>
11 #include <string>
12 
13 #include <unistd.h>
14 #include <arpa/inet.h>
15 
16 namespace hmbdc { namespace app { namespace tcpcast {
17 
19  uint8_t topicLen;
20  uint8_t flag; //1 - hasAttachment
21  XmitEndian<uint16_t> messagePayloadLen;
22 
23  std::pair<char const*, char const*> topic() const {
24  char const* b = reinterpret_cast<const char*>(this)
25  + sizeof(TransportMessageHeader);
26  return std::make_pair(b, b + topicLen);
27  }
28 
29  void const* payload() const {
30  return reinterpret_cast<const char*>(this)
31  + sizeof(TransportMessageHeader) + topicLen;
32  }
33 
34  void * payload() {
35  return reinterpret_cast<char*>(this)
36  + sizeof(TransportMessageHeader) + topicLen;
37  }
38 
39  uint16_t typeTag() const {
40  auto h = static_cast<app::MessageHead const*>(payload());
41  return h->typeTag;
42  }
43 
44  template <typename Message>
45  Message& wrapped() {
46  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
47  return wrap->payload;
48  }
49 
50  template <typename Message>
51  Message const& wrapped() const {
52  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
53  return wrap->payload;
54  }
55 
56  size_t wireSize() const {
57  return sizeof(TransportMessageHeader) + topicLen
58  + messagePayloadLen;
59  }
60 
61  size_t wireSizeContainsTopic() const {
62  return sizeof(TransportMessageHeader) + topicLen;
63  }
64 } __attribute__((packed));
65 
66 struct Subscribe
67 : hasTag<201> {
68  Subscribe(comm::Topic const&topic)
69  : topic(topic){}
70  comm::Topic topic;
71 };
72 
73 struct Unsubscribe
74 : hasTag<202> {
75  Unsubscribe(comm::Topic const&topic)
76  : topic(topic){}
77  comm::Topic topic;
78 };
79 static_assert(sizeof(Unsubscribe) == 64
80  , "do you have a pack pragma unclosed that influencs the above struct packing unexpectedly?");
81 
82 struct TopicSource
83 : hasTag<203> {
84  TopicSource()
85  : port(0)
86  , pid(0)
87  , loopback(false)
88  {}
89 
90  TopicSource(std::string const&topicRegexIn
91  , std::string const& ipIn
92  , uint16_t portIn
93  , bool loopbackIn)
94  : port(portIn)
95  , pid(getpid())
96  , loopback(loopbackIn) {
97  strncpy(ip, ipIn.c_str(), sizeof(ip));
98  strncpy(topicRegex, topicRegexIn.c_str(), sizeof(topicRegex));
99  topicRegex[sizeof(topicRegex) - 1] = 0;
100  }
101  char ip[16];
102  char topicRegex[128]; //null terminated string
104  XmitEndian<pid_t> pid;
105  bool loopback;
106  uint64_t connKey;
107 
108  friend
109  std::ostream& operator << (std::ostream& os, TopicSource const& m) {
110  return os << m.ip << ' '
111  << m.port << ' '
112  << m.topicRegex;
113  }
114 
115  friend
116  std::istream& operator >> (std::istream& is, TopicSource& m) {
117  is
118  >> m.ip
119  >> m.port
120  >> m.topicRegex
121  ;
122  return is;
123  }
124 } __attribute__((packed));
125 
126 /**
127  * @class SessionStarted
128  * @brief this message appears in the receiver's buffer indicating a new source is connected
129  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
130  */
132 : hasTag<204> {
133  char ip[16];
134  friend
135  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
136  return os << "Session to topic source started " << m.ip;
137  }
138 };
139 
140 /**
141  * @class SessionDropped
142  * @brief this message appears in the receiver's buffer indicating a previously connected source is dropped
143  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
144  */
146 : hasTag<205> {
147  char ip[16];
148  friend
149  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
150  return os << "Session to topic source dropped " << m.ip;
151  }
152 };
153 
154 struct TopicSink
155 : hasTag<206> {
156  TopicSink()
157  {}
158 
159  TopicSink(std::string const& ipIn
160  , uint16_t portIn) {
161  auto tmp = comm::inet::Endpoint{ipIn, portIn};
162  sinkAddr = tmp.v;
163  }
164  sockaddr_in sinkAddr;
165 } __attribute__((aligned (8)));
166 
167 }}}
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Endpoint.hpp:15
Definition: Messages.hpp:73
each message type has 16 bit tag
Definition: Message.hpp:29
Definition: Messages.hpp:66
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:145
Definition: Message.hpp:42
Definition: Messages.hpp:154
Definition: Message.hpp:76
Definition: Messages.hpp:82
Definition: Base.hpp:12
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:131