hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Topic.hpp"
5 #include "hmbdc/time/Time.hpp"
6 #include "hmbdc/app/Message.hpp"
7 
8 #include <ostream>
9 #include <string>
10 
11 #include <unistd.h>
12 #include <sys/mman.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <fcntl.h>
16 
17 namespace hmbdc { namespace app { namespace tcpcast {
18 
19 using namespace hmbdc;
20 using namespace hmbdc::comm;
21 
22 
24  enum {
25  flag = 1
26  };
27  size_t len;
28  void* attachment;
29  void free() {
30  ::free(attachment);
31  attachment = nullptr;
32  }
33 } __attribute__((aligned (8)));
34 
36  enum {
37  flag = 2
38  };
39  char file[16];
40  size_t len;
41  void* attachment;
42  size_t map(char const* fileName) {
43  using namespace std;
44  int fd;
45  struct stat sb;
46 
47  fd = open(fileName, O_RDONLY);
48  fstat(fd, &sb);
49  len = (uint64_t)sb.st_size;
50  attachment = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
51  if (attachment == MAP_FAILED) return 0;
52  close(fd);
53 
54  string line(fileName);
55  auto pos = line.find_last_of('/');
56  if (pos == string::npos) pos = 0;
57  strncpy(file, line.c_str() + pos + 1, sizeof(file));
58  return len;
59  }
60 
61  void unmap() {
62  munmap(attachment, len);
63  attachment = nullptr;
64  }
65 } __attribute__((aligned (8)));
66 
68  uint8_t topicLen;
69  uint8_t flag; //1 - hasAttachment
70  uint16_t messagePayloadLen;
71 
72  std::pair<char const*, char const*> topic() const {
73  char const* b = reinterpret_cast<const char*>(this)
74  + sizeof(TransportMessageHeader);
75  return std::make_pair(b, b + topicLen);
76  }
77 
78  void const* payload() const {
79  return reinterpret_cast<const char*>(this)
80  + sizeof(TransportMessageHeader) + topicLen;
81  }
82 
83  void * payload() {
84  return reinterpret_cast<char*>(this)
85  + sizeof(TransportMessageHeader) + topicLen;
86  }
87 
88  uint16_t typeTag() const {
89  auto h = static_cast<app::MessageHead const*>(payload());
90  return h->typeTag;
91  }
92 
93  template <typename Message>
94  Message& wrapped() {
95  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
96  return wrap->payload;
97  }
98 
99  template <typename Message>
100  Message const& wrapped() const {
101  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
102  return wrap->payload;
103  }
104 
105  size_t wireSize() const {
106  return sizeof(TransportMessageHeader) + topicLen
107  + messagePayloadLen;
108  }
109 
110  size_t wireSizeContainsTopic() const {
111  return sizeof(TransportMessageHeader) + topicLen;
112  }
113 } __attribute__((packed));
114 
115 struct Subscribe
116 : hasTag<201> {
117  Subscribe(Topic const&topic)
118  : topic(topic){}
119  Topic topic;
120 };
121 
122 struct Unsubscribe
123 : hasTag<202> {
124  Unsubscribe(Topic const&topic)
125  : topic(topic){}
126  Topic topic;
127 };
128 
129 struct TopicSource
130 : hasTag<203> {
131  TopicSource(std::string const&topicRegexIn
132  , std::string const& ipIn
133  , uint16_t portIn
134  , bool loopbackIn)
135  : port(portIn)
136  , pid(getpid())
137  , loopback(loopbackIn)
138  , timestamp(time::SysTime::now()) {
139  strncpy(ip, ipIn.c_str(), sizeof(ip));
140  strncpy(topicRegex, topicRegexIn.c_str(), sizeof(topicRegex));
141  topicRegex[sizeof(topicRegex) - 1] = 0;
142  }
143  char ip[16];
144  char topicRegex[128]; //null terminated string
145  uint16_t port;
146  pid_t pid;
147  bool loopback;
148  time::SysTime timestamp; //when the source started
149 };
150 
152 : hasTag<204> {
153  char ip[16];
154  friend
155  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
156  return os << "Session to topic source started " << m.ip;
157  }
158 };
159 
161 : hasTag<205> {
162  char ip[16];
163  friend
164  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
165  return os << "Session to topic source dropped " << m.ip;
166  }
167 };
168 
169 }}}
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:74
Definition: Messages.hpp:122
Definition: Message.hpp:21
Definition: Messages.hpp:115
Definition: Messages.hpp:160
Definition: Messages.hpp:23
Definition: Message.hpp:25
Definition: Time.hpp:15
Definition: Misc.h:9
Definition: Message.hpp:46
Definition: Messages.hpp:129
Definition: Messages.hpp:35
Definition: Client.hpp:11
Definition: Messages.hpp:151