MeterLogger
mqtt_msg.c
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2014, Stephen Robinson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the copyright holder nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31 
32 #include <string.h>
33 #include "mqtt_msg.h"
34 #include "user_config.h"
35 #define MQTT_MAX_FIXED_HEADER_SIZE 3
36 
38 {
44 };
45 
46 struct __attribute((__packed__)) mqtt_connect_variable_header
47 {
48  uint8_t lengthMsb;
49  uint8_t lengthLsb;
50 #if defined(PROTOCOL_NAMEv31)
51  uint8_t magic[6];
52 #elif defined(PROTOCOL_NAMEv311)
53  uint8_t magic[4];
54 #else
55 #error "Please define protocol name"
56 #endif
57  uint8_t version;
58  uint8_t flags;
59  uint8_t keepaliveMsb;
60  uint8_t keepaliveLsb;
61 };
62 
63 static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
64 {
65  if (connection->message.length + len + 2 > connection->buffer_length)
66  return -1;
67 
68  connection->buffer[connection->message.length++] = len >> 8;
69  connection->buffer[connection->message.length++] = len & 0xff;
70  memcpy(connection->buffer + connection->message.length, string, len);
71  connection->message.length += len;
72 
73  return len + 2;
74 }
75 
76 static uint16_t ICACHE_FLASH_ATTR append_message_id(mqtt_connection_t* connection, uint16_t message_id)
77 {
78  // If message_id is zero then we should assign one, otherwise
79  // we'll use the one supplied by the caller
80  while (message_id == 0)
81  message_id = ++connection->message_id;
82 
83  if (connection->message.length + 2 > connection->buffer_length)
84  return 0;
85 
86  connection->buffer[connection->message.length++] = message_id >> 8;
87  connection->buffer[connection->message.length++] = message_id & 0xff;
88 
89  return message_id;
90 }
91 
93 {
96 }
97 
99 {
100  connection->message.data = connection->buffer;
101  connection->message.length = 0;
102  return &connection->message;
103 }
104 
105 static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
106 {
107  int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
108 
109  if (remaining_length > 127)
110  {
111  connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
112  connection->buffer[1] = 0x80 | (remaining_length % 128);
113  connection->buffer[2] = remaining_length / 128;
114  connection->message.length = remaining_length + 3;
115  connection->message.data = connection->buffer;
116  }
117  else
118  {
119  connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
120  connection->buffer[2] = remaining_length;
121  connection->message.length = remaining_length + 2;
122  connection->message.data = connection->buffer + 1;
123  }
124 
125  return &connection->message;
126 }
127 
128 void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length)
129 {
130  memset(connection, 0, sizeof(mqtt_connection_t));
131  connection->buffer = buffer;
132  connection->buffer_length = buffer_length;
133 }
134 
135 int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length)
136 {
137  int i;
138  int totlen = 0;
139 
140  for (i = 1; i < length; ++i)
141  {
142  totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
143  if ((buffer[i] & 0x80) == 0)
144  {
145  ++i;
146  break;
147  }
148  }
149  totlen += i;
150 
151  return totlen;
152 }
153 
154 const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
155 {
156  int i;
157  int totlen = 0;
158  int topiclen;
159 
160  for (i = 1; i < *length; ++i)
161  {
162  totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
163  if ((buffer[i] & 0x80) == 0)
164  {
165  ++i;
166  break;
167  }
168  }
169  totlen += i;
170 
171  if (i + 2 >= *length)
172  return NULL;
173  topiclen = buffer[i++] << 8;
174  topiclen |= buffer[i++];
175 
176  if (i + topiclen > *length)
177  return NULL;
178 
179  *length = topiclen;
180  return (const char*)(buffer + i);
181 }
182 
183 const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
184 {
185  int i;
186  int totlen = 0;
187  int topiclen;
188  int blength = *length;
189  *length = 0;
190 
191  for (i = 1; i < blength; ++i)
192  {
193  totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
194  if ((buffer[i] & 0x80) == 0)
195  {
196  ++i;
197  break;
198  }
199  }
200  totlen += i;
201 
202  if (i + 2 >= blength)
203  return NULL;
204  topiclen = buffer[i++] << 8;
205  topiclen |= buffer[i++];
206 
207  if (i + topiclen >= blength)
208  return NULL;
209 
210  i += topiclen;
211 
212  if (mqtt_get_qos(buffer) > 0)
213  {
214  if (i + 2 >= blength)
215  return NULL;
216  i += 2;
217  }
218 
219  if (totlen < i)
220  return NULL;
221 
222  if (totlen <= blength)
223  *length = totlen - i;
224  else
225  *length = blength - i;
226  return (const char*)(buffer + i);
227 }
228 
229 uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length)
230 {
231  if (length < 1)
232  return 0;
233 
234  switch (mqtt_get_type(buffer))
235  {
237  {
238  int i;
239  int topiclen;
240 
241  for (i = 1; i < length; ++i)
242  {
243  if ((buffer[i] & 0x80) == 0)
244  {
245  ++i;
246  break;
247  }
248  }
249 
250  if (i + 2 >= length)
251  return 0;
252  topiclen = buffer[i++] << 8;
253  topiclen |= buffer[i++];
254 
255  if (i + topiclen >= length)
256  return 0;
257  i += topiclen;
258 
259  if (mqtt_get_qos(buffer) > 0)
260  {
261  if (i + 2 >= length)
262  return 0;
263  //i += 2;
264  } else {
265  return 0;
266  }
267 
268  return (buffer[i] << 8) | buffer[i + 1];
269  }
277  {
278  // This requires the remaining length to be encoded in 1 byte,
279  // which it should be.
280  if (length >= 4 && (buffer[1] & 0x80) == 0)
281  return (buffer[2] << 8) | buffer[3];
282  else
283  return 0;
284  }
285 
286  default:
287  return 0;
288  }
289 }
290 
292 {
293  struct mqtt_connect_variable_header* variable_header;
294 
295  init_message(connection);
296 
297  if (connection->message.length + sizeof(*variable_header) > connection->buffer_length)
298  return fail_message(connection);
299  variable_header = (void*)(connection->buffer + connection->message.length);
300  connection->message.length += sizeof(*variable_header);
301 
302  variable_header->lengthMsb = 0;
303 #if defined(PROTOCOL_NAMEv31)
304  variable_header->lengthLsb = 6;
305  memcpy(variable_header->magic, "MQIsdp", 6);
306  variable_header->version = 3;
307 #elif defined(PROTOCOL_NAMEv311)
308  variable_header->lengthLsb = 4;
309  memcpy(variable_header->magic, "MQTT", 4);
310  variable_header->version = 4;
311 #else
312 #error "Please define protocol name"
313 #endif
314 
315  variable_header->flags = 0;
316  variable_header->keepaliveMsb = info->keepalive >> 8;
317  variable_header->keepaliveLsb = info->keepalive & 0xff;
318 
319  if (info->clean_session)
320  variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
321 
322  if (info->client_id == NULL)
323  {
324  /* Never allowed */
325  return fail_message(connection);
326  }
327  else if (info->client_id[0] == '\0')
328  {
329 #ifdef PROTOCOL_NAMEv311
330  /* Allowed. Format 0 Length ID */
331  append_string(connection, info->client_id, 2) ;
332 #else
333  /* 0 Length not allowed */
334  return fail_message(connection);
335 #endif
336  }
337  else
338  {
339  /* No 0 data and at least 1 long. Good to go. */
340  if(append_string(connection, info->client_id, strlen(info->client_id)) < 0)
341  return fail_message(connection);
342  }
343 
344  if (info->will_topic != NULL && info->will_topic[0] != '\0')
345  {
346  if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0)
347  return fail_message(connection);
348 
349  if (append_string(connection, info->will_message, strlen(info->will_message)) < 0)
350  return fail_message(connection);
351 
352  variable_header->flags |= MQTT_CONNECT_FLAG_WILL;
353  if (info->will_retain)
354  variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
355  variable_header->flags |= (info->will_qos & 3) << 3;
356  }
357 
358  if (info->username != NULL && info->username[0] != '\0')
359  {
360  if (append_string(connection, info->username, strlen(info->username)) < 0)
361  return fail_message(connection);
362 
363  variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;
364  }
365 
366  if (info->password != NULL && info->password[0] != '\0')
367  {
368  if (append_string(connection, info->password, strlen(info->password)) < 0)
369  return fail_message(connection);
370 
371  variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;
372  }
373 
374  return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
375 }
376 
377 mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id)
378 {
379  init_message(connection);
380 
381  if (topic == NULL || topic[0] == '\0')
382  return fail_message(connection);
383 
384  if (append_string(connection, topic, strlen(topic)) < 0)
385  return fail_message(connection);
386 
387  if (qos > 0)
388  {
389  if ((*message_id = append_message_id(connection, 0)) == 0)
390  return fail_message(connection);
391  }
392  else
393  *message_id = 0;
394 
395  if (connection->message.length + data_length > connection->buffer_length)
396  return fail_message(connection);
397  memcpy(connection->buffer + connection->message.length, data, data_length);
398  connection->message.length += data_length;
399 
400  return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
401 }
402 
404 {
405  init_message(connection);
406  if (append_message_id(connection, message_id) == 0)
407  return fail_message(connection);
408  return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
409 }
410 
412 {
413  init_message(connection);
414  if (append_message_id(connection, message_id) == 0)
415  return fail_message(connection);
416  return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
417 }
418 
420 {
421  init_message(connection);
422  if (append_message_id(connection, message_id) == 0)
423  return fail_message(connection);
424  return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
425 }
426 
428 {
429  init_message(connection);
430  if (append_message_id(connection, message_id) == 0)
431  return fail_message(connection);
432  return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
433 }
434 
435 mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id)
436 {
437  init_message(connection);
438 
439  if (topic == NULL || topic[0] == '\0')
440  return fail_message(connection);
441 
442  if ((*message_id = append_message_id(connection, 0)) == 0)
443  return fail_message(connection);
444 
445  if (append_string(connection, topic, strlen(topic)) < 0)
446  return fail_message(connection);
447 
448  if (connection->message.length + 1 > connection->buffer_length)
449  return fail_message(connection);
450  connection->buffer[connection->message.length++] = qos;
451 
452  return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
453 }
454 
455 mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
456 {
457  init_message(connection);
458 
459  if (topic == NULL || topic[0] == '\0')
460  return fail_message(connection);
461 
462  if ((*message_id = append_message_id(connection, 0)) == 0)
463  return fail_message(connection);
464 
465  if (append_string(connection, topic, strlen(topic)) < 0)
466  return fail_message(connection);
467 
468  return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
469 }
470 
472 {
473  init_message(connection);
474  return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
475 }
476 
478 {
479  init_message(connection);
480  return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
481 }
482 
484 {
485  init_message(connection);
486  return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
487 }
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
Definition: mqtt_msg.c:455
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
Definition: mqtt_msg.c:291
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:403
#define memset(x, a, b)
Definition: platform.h:21
static mqtt_message_t *ICACHE_FLASH_ATTR fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
Definition: mqtt_msg.c:105
const char *ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t *buffer, uint16_t *length)
Definition: mqtt_msg.c:154
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t *connection)
Definition: mqtt_msg.c:483
static mqtt_message_t *ICACHE_FLASH_ATTR fail_message(mqtt_connection_t *connection)
Definition: mqtt_msg.c:98
char * will_message
Definition: mqtt_msg.h:102
char * will_topic
Definition: mqtt_msg.h:101
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:229
struct __attribute((__packed__))
Definition: mqtt_msg.c:46
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
Definition: mqtt_msg.c:377
#define NULL
Definition: def.h:47
static uint16_t ICACHE_FLASH_ATTR append_message_id(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:76
#define ICACHE_FLASH_ATTR
Definition: c_types.h:99
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length)
Definition: mqtt_msg.c:128
uint16_t message_id
Definition: mqtt_msg.h:90
char * username
Definition: mqtt_msg.h:99
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t *connection)
Definition: mqtt_msg.c:471
mqtt_message_t message
Definition: mqtt_msg.h:88
static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t *connection, const char *string, int len)
Definition: mqtt_msg.c:63
char * client_id
Definition: mqtt_msg.h:98
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t *connection)
Definition: mqtt_msg.c:477
static int ICACHE_FLASH_ATTR init_message(mqtt_connection_t *connection)
Definition: mqtt_msg.c:92
uint8_t * data
Definition: mqtt_msg.h:81
#define MQTT_MAX_FIXED_HEADER_SIZE
Definition: mqtt_msg.c:35
const char *ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t *buffer, uint16_t *length)
Definition: mqtt_msg.c:183
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:427
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:411
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:419
uint16_t length
Definition: mqtt_msg.h:82
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:135
uint32_t keepalive
Definition: mqtt_msg.h:103
#define memcpy(x, a, b)
Definition: platform.h:22
static int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t *buffer)
Definition: mqtt_msg.h:111
uint8_t * buffer
Definition: mqtt_msg.h:91
#define strlen(a)
Definition: platform.h:25
uint16_t buffer_length
Definition: mqtt_msg.h:92
static int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t *buffer)
Definition: mqtt_msg.h:114
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
Definition: mqtt_msg.c:435
mqtt_connect_flag
Definition: mqtt_msg.c:37