hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Topic.hpp"
5 #include "hmbdc/time/Time.hpp"
6 #include "hmbdc/app/Message.hpp"
7 #include "hmbdc/Endian.hpp"
8 
9 #include <iostream>
10 #include <string>
11 
12 #include <unistd.h>
13 
14 namespace hmbdc { namespace app { namespace tcpcast {
15 
17  uint8_t topicLen;
18  uint8_t flag; //1 - hasAttachment
19  XmitEndian<uint16_t> messagePayloadLen;
20 
21  std::pair<char const*, char const*> topic() const {
22  char const* b = reinterpret_cast<const char*>(this)
23  + sizeof(TransportMessageHeader);
24  return std::make_pair(b, b + topicLen);
25  }
26 
27  void const* payload() const {
28  return reinterpret_cast<const char*>(this)
29  + sizeof(TransportMessageHeader) + topicLen;
30  }
31 
32  void * payload() {
33  return reinterpret_cast<char*>(this)
34  + sizeof(TransportMessageHeader) + topicLen;
35  }
36 
37  uint16_t typeTag() const {
38  auto h = static_cast<app::MessageHead const*>(payload());
39  return h->typeTag;
40  }
41 
42  template <typename Message>
43  Message& wrapped() {
44  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
45  return wrap->payload;
46  }
47 
48  template <typename Message>
49  Message const& wrapped() const {
50  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
51  return wrap->payload;
52  }
53 
54  size_t wireSize() const {
55  return sizeof(TransportMessageHeader) + topicLen
56  + messagePayloadLen;
57  }
58 
59  size_t wireSizeContainsTopic() const {
60  return sizeof(TransportMessageHeader) + topicLen;
61  }
62 } __attribute__((packed));
63 
64 struct Subscribe
65 : hasTag<201> {
66  Subscribe(comm::Topic const&topic)
67  : topic(topic){}
68  comm::Topic topic;
69 };
70 
71 struct Unsubscribe
72 : hasTag<202> {
73  Unsubscribe(comm::Topic const&topic)
74  : topic(topic){}
75  comm::Topic topic;
76 };
77 
78 struct TopicSource
79 : hasTag<203> {
80  TopicSource()
81  : port(0)
82  , pid(0)
83  , loopback(false)
84  {}
85 
86  TopicSource(std::string const&topicRegexIn
87  , std::string const& ipIn
88  , uint16_t portIn
89  , bool loopbackIn)
90  : port(portIn)
91  , pid(getpid())
92  , loopback(loopbackIn) {
93  strncpy(ip, ipIn.c_str(), sizeof(ip));
94  strncpy(topicRegex, topicRegexIn.c_str(), sizeof(topicRegex));
95  topicRegex[sizeof(topicRegex) - 1] = 0;
96  }
97  char ip[16];
98  char topicRegex[128]; //null terminated string
100  XmitEndian<pid_t> pid;
101  bool loopback;
102 
103  friend
104  std::ostream& operator << (std::ostream& os, TopicSource const& m) {
105  return os << m.ip << ' '
106  << m.port << ' '
107  << m.topicRegex;
108  }
109 
110  friend
111  std::istream& operator >> (std::istream& is, TopicSource& m) {
112  is
113  >> m.ip
114  >> m.port
115  >> m.topicRegex
116  ;
117  return is;
118  }
119 } __attribute__((packed));
120 
121 /**
122  * @class SessionStarted
123  * @brief this message appears in the receiver's buffer indicating a new source is connected
124  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
125  */
127 : hasTag<204> {
128  char ip[16];
129  friend
130  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
131  return os << "Session to topic source started " << m.ip;
132  }
133 };
134 
135 /**
136  * @class SessionDropped
137  * @brief this message appears in the receiver's buffer indicating a previously connected source is dropped
138  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
139  */
141 : hasTag<205> {
142  char ip[16];
143  friend
144  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
145  return os << "Session to topic source dropped " << m.ip;
146  }
147 };
148 
149 }}}
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Messages.hpp:71
each message type has 16 bit tag
Definition: Message.hpp:34
Definition: Messages.hpp:64
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:140
Definition: Message.hpp:38
Definition: Message.hpp:72
Definition: Messages.hpp:78
Definition: Base.hpp:12
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:126