MeterLogger
Macros | Functions | Variables
mqtt.c File Reference
#include <esp8266.h>
#include <lwip/ip.h>
#include <lwip/udp.h>
#include <lwip/tcp_impl.h>
#include <netif/etharp.h>
#include <lwip/netif.h>
#include <lwip/lwip_napt.h>
#include <lwip/dns.h>
#include <lwip/app/dhcpserver.h>
#include <lwip/opt.h>
#include <espconn.h>
#include "mqtt_msg.h"
#include "debug.h"
#include "user_config.h"
#include "mqtt.h"
#include "mqtt_utils.h"
#include "queue.h"
Include dependency graph for mqtt.c:

Go to the source code of this file.

Macros

#define MQTT_TASK_PRIO   USER_TASK_PRIO_1
 
#define MQTT_TASK_QUEUE_SIZE   100
 
#define MQTT_SEND_TIMOUT   5
 
#define QUEUE_BUFFER_SIZE   2048
 

Functions

LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found (const char *name, ip_addr_t *ipaddr, void *arg)
 
LOCAL void ICACHE_FLASH_ATTR deliver_publish (MQTT_Client *client, uint8_t *message, int length)
 
void ICACHE_FLASH_ATTR mqtt_send_keepalive (MQTT_Client *client)
 
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete (MQTT_Client *mqttClient)
 Delete tcp client and free all memory. More...
 
void ICACHE_FLASH_ATTR mqtt_client_delete (MQTT_Client *mqttClient)
 Delete MQTT client and free all memory. More...
 
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv (void *arg, char *pdata, unsigned short len)
 Client received callback function. More...
 
void ICACHE_FLASH_ATTR mqtt_tcpclient_sent_cb (void *arg)
 Client send over callback function. More...
 
void ICACHE_FLASH_ATTR mqtt_timer (void *arg)
 
void ICACHE_FLASH_ATTR mqtt_tcpclient_discon_cb (void *arg)
 
void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb (void *arg)
 Tcp client connect success callback function. More...
 
void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb (void *arg, sint8 errType)
 Tcp client connect repeat callback function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_Publish (MQTT_Client *client, const char *topic, const char *data, int data_length, int qos, int retain)
 MQTT publish function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe (MQTT_Client *client, char *topic, uint8_t qos)
 MQTT subscibe function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe (MQTT_Client *client, char *topic)
 MQTT un-subscibe function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_Ping (MQTT_Client *client)
 MQTT ping function. More...
 
void ICACHE_FLASH_ATTR MQTT_Task (os_event_t *e)
 
void ICACHE_FLASH_ATTR MQTT_InitConnection (MQTT_Client *mqttClient, uint8_t *host, uint32_t port, uint8_t security)
 MQTT initialization connection function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_InitClient (MQTT_Client *mqttClient, uint8_t *client_id, uint8_t *client_user, uint8_t *client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
 MQTT initialization mqtt client function. More...
 
void ICACHE_FLASH_ATTR MQTT_InitLWT (MQTT_Client *mqttClient, uint8_t *will_topic, uint8_t *will_msg, uint8_t will_qos, uint8_t will_retain)
 
void ICACHE_FLASH_ATTR MQTT_Connect (MQTT_Client *mqttClient)
 Begin connect to MQTT broker. More...
 
void ICACHE_FLASH_ATTR MQTT_Disconnect (MQTT_Client *mqttClient)
 
void ICACHE_FLASH_ATTR MQTT_DeleteClient (MQTT_Client *mqttClient)
 
void ICACHE_FLASH_ATTR MQTT_OnConnected (MQTT_Client *mqttClient, MqttCallback connectedCb)
 
void ICACHE_FLASH_ATTR MQTT_OnDisconnected (MQTT_Client *mqttClient, MqttCallback disconnectedCb)
 
void ICACHE_FLASH_ATTR MQTT_OnData (MQTT_Client *mqttClient, MqttDataCallback dataCb)
 
void ICACHE_FLASH_ATTR MQTT_OnPublished (MQTT_Client *mqttClient, MqttCallback publishedCb)
 
void ICACHE_FLASH_ATTR MQTT_OnTimeout (MQTT_Client *mqttClient, MqttCallback timeoutCb)
 

Variables

unsigned char * default_certificate
 
unsigned int default_certificate_len = 0
 
unsigned char * default_private_key
 
unsigned int default_private_key_len = 0
 
os_event_t mqtt_procTaskQueue [MQTT_TASK_QUEUE_SIZE]
 

Macro Definition Documentation

◆ MQTT_SEND_TIMOUT

#define MQTT_SEND_TIMOUT   5

Definition at line 58 of file mqtt.c.

Referenced by mqtt_send_keepalive(), MQTT_Task(), and mqtt_tcpclient_connect_cb().

◆ MQTT_TASK_PRIO

#define MQTT_TASK_PRIO   USER_TASK_PRIO_1

◆ MQTT_TASK_QUEUE_SIZE

#define MQTT_TASK_QUEUE_SIZE   100

Definition at line 57 of file mqtt.c.

Referenced by MQTT_InitClient().

◆ QUEUE_BUFFER_SIZE

#define QUEUE_BUFFER_SIZE   2048

Definition at line 61 of file mqtt.c.

Referenced by MQTT_InitClient().

Function Documentation

◆ deliver_publish()

LOCAL void ICACHE_FLASH_ATTR deliver_publish ( MQTT_Client client,
uint8_t *  message,
int  length 
)

Definition at line 119 of file mqtt.c.

References mqtt_event_data_t::data, mqtt_event_data_t::data_length, MQTT_Client::dataCb, ICACHE_FLASH_ATTR, mqtt_get_publish_data(), mqtt_get_publish_topic(), mqtt_event_data_t::topic, and mqtt_event_data_t::topic_length.

Referenced by mqtt_tcpclient_recv().

120 {
121  mqtt_event_data_t event_data;
122 
123  event_data.topic_length = length;
124  event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);
125  event_data.data_length = length;
126  event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
127 
128  if (client->dataCb)
129  client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
130 
131 }
MqttDataCallback dataCb
Definition: mqtt.h:105
const char * topic
Definition: mqtt.h:39
const char *ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t *buffer, uint16_t *length)
Definition: mqtt_msg.c:183
const char * data
Definition: mqtt.h:40
uint16_t topic_length
Definition: mqtt.h:41
uint16_t data_length
Definition: mqtt.h:42
const char *ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t *buffer, uint16_t *length)
Definition: mqtt_msg.c:154
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_client_delete()

void ICACHE_FLASH_ATTR mqtt_client_delete ( MQTT_Client mqttClient)

Delete MQTT client and free all memory.

Parameters
mqttClientThe mqtt client
Return values
None

Definition at line 203 of file mqtt.c.

References QUEUE::buf, mqtt_connection::buffer, mqtt_connect_info::client_id, MQTT_Client::connect_info, MQTT_Client::connectedCb, MQTT_Client::connState, mqtt_message::data, MQTT_Client::dataCb, MQTT_Client::disconnectedCb, MQTT_Client::host, ICACHE_FLASH_ATTR, mqtt_state_t::in_buffer, INFO, mqtt_state_t::mqtt_connection, MQTT_Client::mqtt_state, mqtt_tcpclient_delete(), MQTT_Client::msgQueue, NULL, os_free, mqtt_state_t::out_buffer, mqtt_state_t::outbound_message, mqtt_connect_info::password, MQTT_Client::pCon, MQTT_Client::publishedCb, MQTT_Client::timeoutCb, MQTT_Client::user_data, mqtt_connect_info::username, WIFI_INIT, mqtt_connect_info::will_message, and mqtt_connect_info::will_topic.

Referenced by MQTT_Task().

204 {
205  if(mqttClient == NULL)
206  return;
207 
208  if (mqttClient->pCon != NULL){
209  mqtt_tcpclient_delete(mqttClient);
210  }
211 
212  if (mqttClient->host != NULL) {
213  os_free(mqttClient->host);
214  mqttClient->host = NULL;
215  }
216 
217  if (mqttClient->user_data != NULL) {
218  os_free(mqttClient->user_data);
219  mqttClient->user_data = NULL;
220  }
221 
222  if(mqttClient->mqtt_state.in_buffer != NULL) {
223  os_free(mqttClient->mqtt_state.in_buffer);
224  mqttClient->mqtt_state.in_buffer = NULL;
225  }
226 
227  if(mqttClient->mqtt_state.out_buffer != NULL) {
228  os_free(mqttClient->mqtt_state.out_buffer);
229  mqttClient->mqtt_state.out_buffer = NULL;
230  }
231 
232  if(mqttClient->mqtt_state.outbound_message != NULL) {
233  if(mqttClient->mqtt_state.outbound_message->data != NULL)
234  {
235  os_free(mqttClient->mqtt_state.outbound_message->data);
236  mqttClient->mqtt_state.outbound_message->data = NULL;
237  }
238  }
239 
240  if(mqttClient->mqtt_state.mqtt_connection.buffer != NULL) {
241  // Already freed but not NULL
242  mqttClient->mqtt_state.mqtt_connection.buffer = NULL;
243  }
244 
245  if(mqttClient->connect_info.client_id != NULL) {
246  os_free(mqttClient->connect_info.client_id);
247  mqttClient->connect_info.client_id = NULL;
248  }
249 
250  if(mqttClient->connect_info.username != NULL) {
251  os_free(mqttClient->connect_info.username);
252  mqttClient->connect_info.username = NULL;
253  }
254 
255  if(mqttClient->connect_info.password != NULL) {
256  os_free(mqttClient->connect_info.password);
257  mqttClient->connect_info.password = NULL;
258  }
259 
260  if(mqttClient->connect_info.will_topic != NULL) {
261  os_free(mqttClient->connect_info.will_topic);
262  mqttClient->connect_info.will_topic = NULL;
263  }
264 
265  if(mqttClient->connect_info.will_message != NULL) {
266  os_free(mqttClient->connect_info.will_message);
267  mqttClient->connect_info.will_message = NULL;
268  }
269 
270  if (mqttClient->msgQueue.buf != NULL) {
271  os_free(mqttClient->msgQueue.buf);
272  mqttClient->msgQueue.buf = NULL;
273  }
274 
275  // Initialize state
276  mqttClient->connState = WIFI_INIT;
277  // Clear callback functions to avoid abnormal callback
278  mqttClient->connectedCb = NULL;
279  mqttClient->disconnectedCb = NULL;
280  mqttClient->publishedCb = NULL;
281  mqttClient->timeoutCb = NULL;
282  mqttClient->dataCb = NULL;
283 
284  INFO("MQTT: client already deleted\r\n");
285 }
Definition: mqtt.h:65
char * will_message
Definition: mqtt_msg.h:102
MqttDataCallback dataCb
Definition: mqtt.h:105
char * will_topic
Definition: mqtt_msg.h:101
#define NULL
Definition: def.h:47
uint8_t * host
Definition: mqtt.h:96
MqttCallback publishedCb
Definition: mqtt.h:103
struct espconn * pCon
Definition: mqtt.h:94
char * username
Definition: mqtt_msg.h:99
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client *mqttClient)
Delete tcp client and free all memory.
Definition: mqtt.c:179
void * user_data
Definition: mqtt.h:112
uint8_t * in_buffer
Definition: mqtt.h:51
MqttCallback connectedCb
Definition: mqtt.h:101
MqttCallback timeoutCb
Definition: mqtt.h:104
QUEUE msgQueue
Definition: mqtt.h:111
char * client_id
Definition: mqtt_msg.h:98
mqtt_connect_info_t connect_info
Definition: mqtt.h:100
uint8_t * out_buffer
Definition: mqtt.h:52
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
#define os_free(s)
Definition: mem.h:40
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
uint8_t * buf
Definition: queue.h:36
tConnState connState
Definition: mqtt.h:110
MqttCallback disconnectedCb
Definition: mqtt.h:102
#define INFO(...)
Definition: debug.h:17
uint8_t * buffer
Definition: mqtt_msg.h:91
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_Connect()

void ICACHE_FLASH_ATTR MQTT_Connect ( MQTT_Client mqttClient)

Begin connect to MQTT broker.

Parameters
clientMQTT_Client reference
Return values
None

Definition at line 892 of file mqtt.c.

References MQTT_Client::connState, espconn_connect(), espconn_gethostbyname(), ESPCONN_KEEPALIVE, ESPCONN_KEEPCNT, ESPCONN_KEEPIDLE, ESPCONN_KEEPINTVL, ESPCONN_NONE, espconn_port(), espconn_regist_connectcb(), espconn_regist_reconcb(), espconn_secure_connect(), espconn_set_keepalive(), espconn_set_opt(), ESPCONN_TCP, MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, MQTT_Client::ip, MQTT_Client::keepAliveTick, _esp_tcp::local_port, mqtt_dns_found(), mqtt_tcpclient_connect_cb(), mqtt_tcpclient_delete(), mqtt_tcpclient_recon_cb(), mqtt_timer(), MQTT_Client::mqttTimer, os_timer_arm, os_timer_disarm, os_timer_func_t, os_timer_setfn, os_zalloc, MQTT_Client::pCon, MQTT_Client::port, espconn::proto, MQTT_Client::reconnectTick, _esp_tcp::remote_ip, _esp_tcp::remote_port, espconn::reverse, MQTT_Client::security, espconn::state, espconn::tcp, TCP_CONNECTING, espconn::type, and UTILS_StrToIP().

Referenced by MQTT_Task().

893 {
894  uint32_t keeplive;
895 
896  if (mqttClient->pCon) {
897  // Clean up the old connection forcefully - using MQTT_Disconnect
898  // does not actually release the old connection until the
899  // disconnection callback is invoked.
900  mqtt_tcpclient_delete(mqttClient);
901  }
902  mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
903  mqttClient->pCon->type = ESPCONN_TCP;
904  mqttClient->pCon->state = ESPCONN_NONE;
905  mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
906  mqttClient->pCon->proto.tcp->local_port = espconn_port();
907  mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;
908  mqttClient->pCon->reverse = mqttClient;
909 
910  // enable TCP keep alive
911  espconn_set_opt(mqttClient->pCon, ESPCONN_KEEPALIVE);
912 
913  // set keepalive: 75s = 60 + 5*3
914  keeplive = 60;
915  espconn_set_keepalive(mqttClient->pCon, ESPCONN_KEEPIDLE, &keeplive);
916 
917  keeplive = 5;
918  espconn_set_keepalive(mqttClient->pCon, ESPCONN_KEEPINTVL, &keeplive);
919 
920  keeplive = 3; //try times
921  espconn_set_keepalive(mqttClient->pCon, ESPCONN_KEEPCNT, &keeplive);
922 
925 
926  mqttClient->keepAliveTick = 0;
927  mqttClient->reconnectTick = 0;
928 
929  os_timer_disarm(&mqttClient->mqttTimer);
930  os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);
931  os_timer_arm(&mqttClient->mqttTimer, 1000, 1);
932 
933  if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) {
934  INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port);
935  if (mqttClient->security)
936  {
937 #ifdef MQTT_SSL_ENABLE
938  espconn_secure_connect(mqttClient->pCon);
939 #else
940  INFO("TCP: Do not support SSL\r\n");
941 #endif
942  }
943  else
944  {
945  espconn_connect(mqttClient->pCon);
946  }
947  }
948  else {
949  INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
950  espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
951  }
952  mqttClient->connState = TCP_CONNECTING;
953 }
sint8 espconn_set_opt(struct espconn *espconn, uint8 opt)
Definition: espconn.c:1056
sint8 espconn_connect(struct espconn *espconn)
Definition: espconn.c:260
uint32_t port
Definition: mqtt.h:97
void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
Tcp client connect repeat callback function.
Definition: mqtt.c:567
void * reverse
Definition: espconn.h:112
ETSTimer mqttTimer
Definition: mqtt.h:106
#define os_timer_disarm
Definition: osapi.h:51
esp_tcp * tcp
Definition: espconn.h:105
int local_port
Definition: espconn.h:72
sint8 espconn_regist_reconcb(struct espconn *espconn, espconn_reconnect_callback recon_cb)
Definition: espconn.c:789
uint8 remote_ip[4]
Definition: espconn.h:74
void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
Definition: mqtt.c:472
#define os_timer_func_t
Definition: os_type.h:35
uint8_t * host
Definition: mqtt.h:96
struct espconn * pCon
Definition: mqtt.h:94
uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t *str, void *ip)
Definition: mqtt_utils.c:91
#define os_timer_setfn
Definition: osapi.h:52
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client *mqttClient)
Delete tcp client and free all memory.
Definition: mqtt.c:179
int remote_port
Definition: espconn.h:71
#define os_zalloc(s)
Definition: mem.h:44
sint8 espconn_set_keepalive(struct espconn *espconn, uint8 level, void *optarg)
Definition: espconn.c:1128
uint32_t reconnectTick
Definition: mqtt.h:108
uint8_t security
Definition: mqtt.h:95
union espconn::@1 proto
tConnState connState
Definition: mqtt.h:110
#define os_timer_arm(a, b, c)
Definition: osapi.h:50
enum espconn_type type
Definition: espconn.h:101
enum espconn_state state
Definition: espconn.h:103
uint32 espconn_port(void)
Definition: espconn.c:1245
sint8 espconn_regist_connectcb(struct espconn *espconn, espconn_connect_callback connect_cb)
Definition: espconn.c:751
void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
Tcp client connect success callback function.
Definition: mqtt.c:527
LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
Definition: mqtt.c:76
ip_addr_t ip
Definition: mqtt.h:98
#define INFO(...)
Definition: debug.h:17
sint8 espconn_gethostbyname(struct espconn *pespconn, const char *name, ip_addr_t *addr, dns_found_callback found)
Definition: espconn.c:1288
sint8 espconn_secure_connect(struct espconn *espconn)
uint32_t keepAliveTick
Definition: mqtt.h:107
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_DeleteClient()

void ICACHE_FLASH_ATTR MQTT_DeleteClient ( MQTT_Client mqttClient)

Definition at line 964 of file mqtt.c.

References MQTT_Client::connState, ICACHE_FLASH_ATTR, MQTT_DELETED, MQTT_TASK_PRIO, NULL, os_param_t, os_timer_disarm, and system_os_post().

965 {
966  if(NULL == mqttClient)
967  return;
968 
969  mqttClient->connState = MQTT_DELETED;
970  // if(TCP_DISCONNECTED == mqttClient->connState) {
971  // mqttClient->connState = MQTT_DELETED;
972  // } else if(MQTT_DELETED != mqttClient->connState) {
973  // mqttClient->connState = MQTT_DELETING;
974  // }
975 
976  system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
977  os_timer_disarm(&mqttClient->mqttTimer);
978 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
ETSTimer mqttTimer
Definition: mqtt.h:106
#define os_timer_disarm
Definition: osapi.h:51
#define NULL
Definition: def.h:47
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
Here is the call graph for this function:

◆ MQTT_Disconnect()

void ICACHE_FLASH_ATTR MQTT_Disconnect ( MQTT_Client mqttClient)

Definition at line 956 of file mqtt.c.

References MQTT_Client::connState, ICACHE_FLASH_ATTR, MQTT_TASK_PRIO, os_param_t, os_timer_disarm, system_os_post(), and TCP_DISCONNECTING.

957 {
958  mqttClient->connState = TCP_DISCONNECTING;
959  system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
960  os_timer_disarm(&mqttClient->mqttTimer);
961 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
ETSTimer mqttTimer
Definition: mqtt.h:106
#define os_timer_disarm
Definition: osapi.h:51
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
Here is the call graph for this function:

◆ mqtt_dns_found()

LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found ( const char *  name,
ip_addr_t ipaddr,
void *  arg 
)

Definition at line 76 of file mqtt.c.

References MQTT_Client::connState, espconn_connect(), espconn_secure_connect(), ICACHE_FLASH_ATTR, INFO, MQTT_Client::ip, LOCAL, MQTT_TASK_PRIO, NULL, os_memcpy, os_param_t, MQTT_Client::pCon, espconn::proto, _esp_tcp::remote_ip, espconn::reverse, MQTT_Client::security, system_os_post(), espconn::tcp, TCP_CONNECTING, and TCP_RECONNECT_REQ.

Referenced by MQTT_Connect().

77 {
78  struct espconn *pConn = (struct espconn *)arg;
79  MQTT_Client* client = (MQTT_Client *)pConn->reverse;
80 
81 
82  if (ipaddr == NULL)
83  {
84  INFO("DNS: Found, but got no ip, try to reconnect\r\n");
85  client->connState = TCP_RECONNECT_REQ;
86  return;
87  }
88 
89  INFO("DNS: found ip %d.%d.%d.%d\n",
90  *((uint8 *) &ipaddr->addr),
91  *((uint8 *) &ipaddr->addr + 1),
92  *((uint8 *) &ipaddr->addr + 2),
93  *((uint8 *) &ipaddr->addr + 3));
94 
95  if (client->ip.addr == 0 && ipaddr->addr != 0)
96  {
97  os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
98  if (client->security) {
99 #ifdef MQTT_SSL_ENABLE
100  espconn_secure_connect(client->pCon);
101 #else
102  INFO("TCP: Do not support SSL\r\n");
103 #endif
104  }
105  else {
106  espconn_connect(client->pCon);
107  }
108 
109  client->connState = TCP_CONNECTING;
110  INFO("TCP: connecting...\r\n");
111  }
112 
114 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
sint8 espconn_connect(struct espconn *espconn)
Definition: espconn.c:260
void * reverse
Definition: espconn.h:112
esp_tcp * tcp
Definition: espconn.h:105
#define NULL
Definition: def.h:47
uint8 remote_ip[4]
Definition: espconn.h:74
struct espconn * pCon
Definition: mqtt.h:94
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
#define os_memcpy
Definition: osapi.h:36
unsigned char uint8
Definition: c_types.h:45
uint8_t security
Definition: mqtt.h:95
union espconn::@1 proto
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
ip_addr_t ip
Definition: mqtt.h:98
#define INFO(...)
Definition: debug.h:17
sint8 espconn_secure_connect(struct espconn *espconn)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_InitClient()

BOOL ICACHE_FLASH_ATTR MQTT_InitClient ( MQTT_Client mqttClient,
uint8_t *  client_id,
uint8_t *  client_user,
uint8_t *  client_pass,
uint32_t  keepAliveTime,
uint8_t  cleanSession 
)

MQTT initialization mqtt client function.

Parameters
clientMQTT_Client reference
clientidMQTT client id
client_user:MQTTclient user
client_pass:MQTTclient password
client_pass:MQTTkeep alive timer, in second
Return values
None

Definition at line 801 of file mqtt.c.

References mqtt_connect_info::clean_session, mqtt_connect_info::client_id, mqtt_state_t::connect_info, MQTT_Client::connect_info, ICACHE_FLASH_ATTR, mqtt_state_t::in_buffer, mqtt_state_t::in_buffer_length, INFO, mqtt_connect_info::keepalive, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_init(), mqtt_procTaskQueue, MQTT_Client::mqtt_state, MQTT_Task(), MQTT_TASK_PRIO, MQTT_TASK_QUEUE_SIZE, MQTT_Client::msgQueue, os_memset, os_param_t, os_strcpy, os_strlen, os_zalloc, mqtt_state_t::out_buffer, mqtt_state_t::out_buffer_length, mqtt_connect_info::password, QUEUE_BUFFER_SIZE, QUEUE_Init(), system_os_post(), system_os_task(), and mqtt_connect_info::username.

802 {
803  uint32_t temp;
804  INFO("MQTT:InitClient\r\n");
805 
806  os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
807 
808  if ( !client_id ) {
809  /* Should be allowed by broker, but clean session flag must be set. */
810 #ifdef PROTOCOL_NAMEv311
811  if (cleanSession) {
812  mqttClient->connect_info.client_id = zero_len_id;
813  } else {
814  INFO("cleanSession must be set to use 0 length client_id\r\n");
815  return false;
816  }
817  /* Not supported. Return. */
818 #else
819  INFO("Client ID required for MQTT < 3.1.1!\r\n");
820  return false;
821 #endif
822  }
823 
824  /* If connect_info's client_id is still NULL and we get here, we can *
825  * assume the passed client_id is non-NULL. */
826  if ( !(mqttClient->connect_info.client_id) )
827  {
828  temp = os_strlen(client_id);
829  mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1);
830  os_strcpy(mqttClient->connect_info.client_id, client_id);
831  mqttClient->connect_info.client_id[temp] = 0;
832  }
833 
834  if (client_user)
835  {
836  temp = os_strlen(client_user);
837  mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
838  os_strcpy(mqttClient->connect_info.username, client_user);
839  mqttClient->connect_info.username[temp] = 0;
840  }
841 
842  if (client_pass)
843  {
844  temp = os_strlen(client_pass);
845  mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
846  os_strcpy(mqttClient->connect_info.password, client_pass);
847  mqttClient->connect_info.password[temp] = 0;
848  }
849 
850 
851  mqttClient->connect_info.keepalive = keepAliveTime;
852  mqttClient->connect_info.clean_session = cleanSession;
853 
854  mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
856  mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
858  mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
859 
861 
862  QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);
863 
865  system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
866  return true;
867 }
#define MQTT_TASK_QUEUE_SIZE
Definition: mqtt.c:57
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
char * username
Definition: mqtt_msg.h:99
#define os_zalloc(s)
Definition: mem.h:44
uint8_t * in_buffer
Definition: mqtt.h:51
os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]
Definition: mqtt.c:69
#define MQTT_BUF_SIZE
Definition: user_config.h:31
int out_buffer_length
Definition: mqtt.h:54
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
bool system_os_task(os_task_t task, uint8 prio, os_event_t *queue, uint8 qlen)
#define os_strlen
Definition: osapi.h:43
#define QUEUE_BUFFER_SIZE
Definition: mqtt.c:61
#define os_memset
Definition: osapi.h:38
int in_buffer_length
Definition: mqtt.h:53
QUEUE msgQueue
Definition: mqtt.h:111
char * client_id
Definition: mqtt_msg.h:98
void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize)
Definition: queue.c:39
mqtt_connect_info_t connect_info
Definition: mqtt.h:100
uint8_t * out_buffer
Definition: mqtt.h:52
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length)
Definition: mqtt_msg.c:128
void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e)
Definition: mqtt.c:697
uint32_t keepalive
Definition: mqtt_msg.h:103
#define os_strcpy
Definition: osapi.h:42
#define os_param_t
Definition: os_type.h:31
mqtt_connect_info_t * connect_info
Definition: mqtt.h:50
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:

◆ MQTT_InitConnection()

void ICACHE_FLASH_ATTR MQTT_InitConnection ( MQTT_Client mqttClient,
uint8_t *  host,
uint32_t  port,
uint8_t  security 
)

MQTT initialization connection function.

Parameters
clientMQTT_Client reference
hostDomain or IP string
portPort to connect
security1 for ssl, 0 for none
Return values
None

Definition at line 777 of file mqtt.c.

References BOOL, MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, os_memset, os_strcpy, os_strlen, os_zalloc, MQTT_Client::port, and MQTT_Client::security.

778 {
779  uint32_t temp;
780  INFO("MQTT:InitConnection\r\n");
781  os_memset(mqttClient, 0, sizeof(MQTT_Client));
782  temp = os_strlen(host);
783  mqttClient->host = (uint8_t*)os_zalloc(temp + 1);
784  os_strcpy(mqttClient->host, host);
785  mqttClient->host[temp] = 0;
786  mqttClient->port = port;
787  mqttClient->security = security;
788 
789 }
uint32_t port
Definition: mqtt.h:97
uint8_t * host
Definition: mqtt.h:96
#define os_zalloc(s)
Definition: mem.h:44
#define os_strlen
Definition: osapi.h:43
#define os_memset
Definition: osapi.h:38
uint8_t security
Definition: mqtt.h:95
#define os_strcpy
Definition: osapi.h:42
#define INFO(...)
Definition: debug.h:17

◆ MQTT_InitLWT()

void ICACHE_FLASH_ATTR MQTT_InitLWT ( MQTT_Client mqttClient,
uint8_t *  will_topic,
uint8_t *  will_msg,
uint8_t  will_qos,
uint8_t  will_retain 
)

Definition at line 869 of file mqtt.c.

References MQTT_Client::connect_info, ICACHE_FLASH_ATTR, os_strcpy, os_strlen, os_zalloc, mqtt_connect_info::will_message, mqtt_connect_info::will_qos, mqtt_connect_info::will_retain, and mqtt_connect_info::will_topic.

870 {
871  uint32_t temp;
872  temp = os_strlen(will_topic);
873  mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1);
874  os_strcpy(mqttClient->connect_info.will_topic, will_topic);
875  mqttClient->connect_info.will_topic[temp] = 0;
876 
877  temp = os_strlen(will_msg);
878  mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
879  os_strcpy(mqttClient->connect_info.will_message, will_msg);
880  mqttClient->connect_info.will_message[temp] = 0;
881 
882 
883  mqttClient->connect_info.will_qos = will_qos;
884  mqttClient->connect_info.will_retain = will_retain;
885 }
char * will_message
Definition: mqtt_msg.h:102
char * will_topic
Definition: mqtt_msg.h:101
#define os_zalloc(s)
Definition: mem.h:44
#define os_strlen
Definition: osapi.h:43
mqtt_connect_info_t connect_info
Definition: mqtt.h:100
#define os_strcpy
Definition: osapi.h:42

◆ MQTT_OnConnected()

void ICACHE_FLASH_ATTR MQTT_OnConnected ( MQTT_Client mqttClient,
MqttCallback  connectedCb 
)

Definition at line 981 of file mqtt.c.

References MQTT_Client::connectedCb, and ICACHE_FLASH_ATTR.

982 {
983  mqttClient->connectedCb = connectedCb;
984 }
MqttCallback connectedCb
Definition: mqtt.h:101

◆ MQTT_OnData()

void ICACHE_FLASH_ATTR MQTT_OnData ( MQTT_Client mqttClient,
MqttDataCallback  dataCb 
)

Definition at line 993 of file mqtt.c.

References MQTT_Client::dataCb, and ICACHE_FLASH_ATTR.

994 {
995  mqttClient->dataCb = dataCb;
996 }
MqttDataCallback dataCb
Definition: mqtt.h:105

◆ MQTT_OnDisconnected()

void ICACHE_FLASH_ATTR MQTT_OnDisconnected ( MQTT_Client mqttClient,
MqttCallback  disconnectedCb 
)

Definition at line 987 of file mqtt.c.

References MQTT_Client::disconnectedCb, and ICACHE_FLASH_ATTR.

988 {
989  mqttClient->disconnectedCb = disconnectedCb;
990 }
MqttCallback disconnectedCb
Definition: mqtt.h:102

◆ MQTT_OnPublished()

void ICACHE_FLASH_ATTR MQTT_OnPublished ( MQTT_Client mqttClient,
MqttCallback  publishedCb 
)

Definition at line 999 of file mqtt.c.

References ICACHE_FLASH_ATTR, and MQTT_Client::publishedCb.

1000 {
1001  mqttClient->publishedCb = publishedCb;
1002 }
MqttCallback publishedCb
Definition: mqtt.h:103

◆ MQTT_OnTimeout()

void ICACHE_FLASH_ATTR MQTT_OnTimeout ( MQTT_Client mqttClient,
MqttCallback  timeoutCb 
)

Definition at line 1005 of file mqtt.c.

References MQTT_Client::timeoutCb.

1006 {
1007  mqttClient->timeoutCb = timeoutCb;
1008 }
MqttCallback timeoutCb
Definition: mqtt.h:104

◆ MQTT_Ping()

BOOL ICACHE_FLASH_ATTR MQTT_Ping ( MQTT_Client client)

MQTT ping function.

Parameters
clientMQTT_Client reference
Return values
TRUEif success queue

Definition at line 675 of file mqtt.c.

References mqtt_message::data, FALSE, RINGBUF::fill_cnt, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_pingreq(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, QUEUE_Gets(), QUEUE_Puts(), QUEUE::rb, RINGBUF::size, system_os_post(), and TRUE.

676 {
677  uint8_t dataBuffer[MQTT_BUF_SIZE];
678  uint16_t dataLen;
680  if(client->mqtt_state.outbound_message->length == 0){
681  INFO("MQTT: Queuing publish failed\r\n");
682  return FALSE;
683  }
684  INFO("MQTT: queuing publish, length: %d, queue size(%ld/%ld)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
685  while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
686  INFO("MQTT: Queue full\r\n");
687  if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
688  INFO("MQTT: Serious buffer error\r\n");
689  return FALSE;
690  }
691  }
693  return TRUE;
694 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
RINGBUF rb
Definition: queue.h:37
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
volatile I32 fill_cnt
Definition: ringbuf.h:11
mqtt_state_t mqtt_state
Definition: mqtt.h:99
I32 size
Definition: ringbuf.h:12
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t *connection)
Definition: mqtt_msg.c:471
Here is the call graph for this function:

◆ MQTT_Publish()

BOOL ICACHE_FLASH_ATTR MQTT_Publish ( MQTT_Client client,
const char *  topic,
const char *  data,
int  data_length,
int  qos,
int  retain 
)

MQTT publish function.

Parameters
clientMQTT_Client reference
topicstring topic will publish to
databuffer data send point to
data_lengthlength of data
qosqos
retainretain
Return values
TRUEif success queue

Definition at line 591 of file mqtt.c.

References BOOL, mqtt_message::data, FALSE, RINGBUF::fill_cnt, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_publish(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, mqtt_state_t::pending_msg_id, QUEUE_Gets(), QUEUE_Puts(), QUEUE::rb, RINGBUF::size, system_os_post(), and TRUE.

Referenced by en61107_received_task(), en61107_request_send(), kmp_received_task(), and kmp_request_send().

592 {
593  uint8_t dataBuffer[MQTT_BUF_SIZE];
594  uint16_t dataLen;
596  topic, data, data_length,
597  qos, retain,
598  &client->mqtt_state.pending_msg_id);
599  if (client->mqtt_state.outbound_message->length == 0) {
600  INFO("MQTT: Queuing publish failed\r\n");
601  return FALSE;
602  }
603  INFO("MQTT: queuing publish, length: %d, queue size(%ld/%ld)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
604  while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
605  INFO("MQTT: Queue full\r\n");
606  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
607  INFO("MQTT: Serious buffer error\r\n");
608  return FALSE;
609  }
610  }
612  return TRUE;
613 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
Definition: mqtt_msg.c:377
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
RINGBUF rb
Definition: queue.h:37
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
volatile I32 fill_cnt
Definition: ringbuf.h:11
mqtt_state_t mqtt_state
Definition: mqtt.h:99
I32 size
Definition: ringbuf.h:12
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_send_keepalive()

void ICACHE_FLASH_ATTR mqtt_send_keepalive ( MQTT_Client client)

Definition at line 134 of file mqtt.c.

References MQTT_Client::connState, mqtt_message::data, ESPCONN_OK, espconn_secure_send(), espconn_send(), ESPCONN_WRITE, MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, MQTT_Client::keepAliveTick, mqtt_message::length, mqtt_state_t::mqtt_connection, MQTT_DATA, mqtt_get_id(), mqtt_get_type(), mqtt_msg_pingreq(), MQTT_MSG_TYPE_PINGREQ, MQTT_SEND_TIMOUT, MQTT_Client::mqtt_state, MQTT_TASK_PRIO, NULL, os_param_t, mqtt_state_t::outbound_message, MQTT_Client::pCon, mqtt_state_t::pending_msg_id, mqtt_state_t::pending_msg_type, MQTT_Client::port, MQTT_Client::security, MQTT_Client::sendTimeout, espconn::state, system_os_post(), and TCP_RECONNECT_DISCONNECTING.

Referenced by MQTT_Task().

134  {
135  if (client->pCon->state == ESPCONN_WRITE) {
137  return;
138  }
139 
140  INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
145 
146 
147  client->sendTimeout = MQTT_SEND_TIMOUT;
148  INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
149  err_t result = ESPCONN_OK;
150  if (client->security) {
151 #ifdef MQTT_SSL_ENABLE
153 #else
154  INFO("TCP: Do not support SSL\r\n");
155 #endif
156  }
157  else {
158  result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
159  }
160 
161  client->mqtt_state.outbound_message = NULL;
162  if(ESPCONN_OK == result) {
163  client->keepAliveTick = 0;
164  client->connState = MQTT_DATA;
166  }
167  else {
170  }
171 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
uint32_t sendTimeout
Definition: mqtt.h:109
uint32_t port
Definition: mqtt.h:97
sint8 espconn_send(struct espconn *espconn, uint8 *psent, uint16 length) __attribute__((alias("espconn_sent")))
#define NULL
Definition: def.h:47
#define MQTT_SEND_TIMOUT
Definition: mqtt.c:58
#define ESPCONN_OK
Definition: espconn.h:20
uint8_t * host
Definition: mqtt.h:96
struct espconn * pCon
Definition: mqtt.h:94
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
s8_t err_t
Definition: err.h:47
Definition: mqtt.h:82
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:229
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
sint8 espconn_secure_send(struct espconn *espconn, uint8 *psent, uint16 length)
uint8_t security
Definition: mqtt.h:95
uint16_t length
Definition: mqtt_msg.h:82
tConnState connState
Definition: mqtt.h:110
enum espconn_state state
Definition: espconn.h:103
#define os_param_t
Definition: os_type.h:31
int pending_msg_type
Definition: mqtt.h:60
#define INFO(...)
Definition: debug.h:17
static int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t *buffer)
Definition: mqtt_msg.h:111
mqtt_state_t mqtt_state
Definition: mqtt.h:99
uint32_t keepAliveTick
Definition: mqtt.h:107
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t *connection)
Definition: mqtt_msg.c:471
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_Subscribe()

BOOL ICACHE_FLASH_ATTR MQTT_Subscribe ( MQTT_Client client,
char *  topic,
uint8_t  qos 
)

MQTT subscibe function.

Parameters
clientMQTT_Client reference
topicstring topic will subscribe
qosqos
Return values
TRUEif success queue

Definition at line 623 of file mqtt.c.

References BOOL, mqtt_message::data, FALSE, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_subscribe(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, mqtt_state_t::pending_msg_id, QUEUE_Gets(), QUEUE_Puts(), system_os_post(), and TRUE.

624 {
625  uint8_t dataBuffer[MQTT_BUF_SIZE];
626  uint16_t dataLen;
627 
629  topic, qos,
630  &client->mqtt_state.pending_msg_id);
631  INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
632  while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
633  INFO("MQTT: Queue full\r\n");
634  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
635  INFO("MQTT: Serious buffer error\r\n");
636  return FALSE;
637  }
638  }
640  return TRUE;
641 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
Definition: mqtt_msg.c:435
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:

◆ MQTT_Task()

void ICACHE_FLASH_ATTR MQTT_Task ( os_event_t e)

Definition at line 697 of file mqtt.c.

References MQTT_Client::connState, espconn_disconnect(), espconn_secure_disconnect(), espconn_secure_send(), espconn_send(), MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, MQTT_BUF_SIZE, mqtt_client_delete(), MQTT_Connect(), MQTT_DATA, MQTT_DELETED, MQTT_DELETING, mqtt_get_id(), mqtt_get_type(), MQTT_KEEPALIVE_SEND, mqtt_send_keepalive(), MQTT_SEND_TIMOUT, MQTT_Client::mqtt_state, mqtt_tcpclient_delete(), MQTT_Client::msgQueue, NULL, mqtt_state_t::outbound_message, MQTT_Client::pCon, mqtt_state_t::pending_msg_id, mqtt_state_t::pending_msg_type, MQTT_Client::port, QUEUE_Gets(), QUEUE_IsEmpty(), MQTT_Client::security, MQTT_Client::sendTimeout, TCP_CONNECTING, TCP_DISCONNECTED, TCP_DISCONNECTING, TCP_RECONNECT, TCP_RECONNECT_DISCONNECTING, and TCP_RECONNECT_REQ.

Referenced by MQTT_InitClient().

698 {
699  MQTT_Client* client = (MQTT_Client*)e->par;
700  uint8_t dataBuffer[MQTT_BUF_SIZE];
701  uint16_t dataLen;
702  if (e->par == 0)
703  return;
704  switch (client->connState) {
705 
706  case TCP_RECONNECT_REQ:
707  break;
708  case TCP_RECONNECT:
709  mqtt_tcpclient_delete(client);
710  MQTT_Connect(client);
711  INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
712  client->connState = TCP_CONNECTING;
713  break;
714  case MQTT_DELETING:
715  case TCP_DISCONNECTING:
717  if (client->security) {
718 #ifdef MQTT_SSL_ENABLE
720 #else
721  INFO("TCP: Do not support SSL\r\n");
722 #endif
723  }
724  else {
725  espconn_disconnect(client->pCon);
726  }
727  break;
728  case TCP_DISCONNECTED:
729  INFO("MQTT: Disconnected\r\n");
730  mqtt_tcpclient_delete(client);
731  break;
732  case MQTT_DELETED:
733  INFO("MQTT: Deleted client\r\n");
734  mqtt_client_delete(client);
735  break;
736  case MQTT_KEEPALIVE_SEND:
737  mqtt_send_keepalive(client);
738  break;
739  case MQTT_DATA:
740  if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
741  break;
742  }
743  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) {
744  client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
745  client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
746 
747 
748  client->sendTimeout = MQTT_SEND_TIMOUT;
749  INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
750  if (client->security) {
751 #ifdef MQTT_SSL_ENABLE
752  espconn_secure_send(client->pCon, dataBuffer, dataLen);
753 #else
754  INFO("TCP: Do not support SSL\r\n");
755 #endif
756  }
757  else {
758  espconn_send(client->pCon, dataBuffer, dataLen);
759  }
760 
761  client->mqtt_state.outbound_message = NULL;
762  break;
763  }
764  break;
765  }
766 }
uint32_t sendTimeout
Definition: mqtt.h:109
uint32_t port
Definition: mqtt.h:97
void ICACHE_FLASH_ATTR mqtt_client_delete(MQTT_Client *mqttClient)
Delete MQTT client and free all memory.
Definition: mqtt.c:203
sint8 espconn_send(struct espconn *espconn, uint8 *psent, uint16 length) __attribute__((alias("espconn_sent")))
#define NULL
Definition: def.h:47
#define MQTT_SEND_TIMOUT
Definition: mqtt.c:58
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
Begin connect to MQTT broker.
Definition: mqtt.c:892
uint8_t * host
Definition: mqtt.h:96
struct espconn * pCon
Definition: mqtt.h:94
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client *mqttClient)
Delete tcp client and free all memory.
Definition: mqtt.c:179
#define MQTT_BUF_SIZE
Definition: user_config.h:31
void ICACHE_FLASH_ATTR mqtt_send_keepalive(MQTT_Client *client)
Definition: mqtt.c:134
sint8 espconn_disconnect(struct espconn *espconn)
Definition: espconn.c:942
BOOL ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE *queue)
Definition: queue.c:66
QUEUE msgQueue
Definition: mqtt.h:111
Definition: mqtt.h:82
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:229
uint16_t pending_msg_id
Definition: mqtt.h:59
sint8 espconn_secure_disconnect(struct espconn *espconn)
mqtt_message_t * outbound_message
Definition: mqtt.h:57
sint8 espconn_secure_send(struct espconn *espconn, uint8 *psent, uint16 length)
uint8_t security
Definition: mqtt.h:95
tConnState connState
Definition: mqtt.h:110
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
int pending_msg_type
Definition: mqtt.h:60
#define INFO(...)
Definition: debug.h:17
static int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t *buffer)
Definition: mqtt_msg.h:111
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_tcpclient_connect_cb()

void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb ( void *  arg)

Tcp client connect success callback function.

Parameters
argcontain the ip link information
Return values
None

Definition at line 527 of file mqtt.c.

References mqtt_state_t::connect_info, MQTT_Client::connState, mqtt_message::data, espconn_regist_disconcb(), espconn_regist_recvcb(), espconn_regist_sentcb(), espconn_secure_send(), espconn_send(), MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_CONNECT_SENDING, mqtt_state_t::mqtt_connection, mqtt_get_id(), mqtt_get_type(), mqtt_msg_connect(), mqtt_msg_init(), MQTT_SEND_TIMOUT, MQTT_Client::mqtt_state, MQTT_TASK_PRIO, mqtt_tcpclient_discon_cb(), mqtt_tcpclient_recv(), mqtt_tcpclient_sent_cb(), NULL, os_param_t, mqtt_state_t::out_buffer, mqtt_state_t::out_buffer_length, mqtt_state_t::outbound_message, MQTT_Client::pCon, mqtt_state_t::pending_msg_id, mqtt_state_t::pending_msg_type, MQTT_Client::port, espconn::reverse, MQTT_Client::security, MQTT_Client::sendTimeout, and system_os_post().

Referenced by MQTT_Connect().

528 {
529  struct espconn *pCon = (struct espconn *)arg;
530  MQTT_Client* client = (MQTT_Client *)pCon->reverse;
531 
535  INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
536 
541 
542 
543  client->sendTimeout = MQTT_SEND_TIMOUT;
544  INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
545  if (client->security) {
546 #ifdef MQTT_SSL_ENABLE
548 #else
549  INFO("TCP: Do not support SSL\r\n");
550 #endif
551  }
552  else {
554  }
555 
556  client->mqtt_state.outbound_message = NULL;
559 }
sint8 espconn_regist_sentcb(struct espconn *espconn, espconn_sent_callback sent_cb)
Definition: espconn.c:712
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
uint32_t sendTimeout
Definition: mqtt.h:109
sint8 espconn_regist_recvcb(struct espconn *espconn, espconn_recv_callback recv_cb)
Definition: espconn.c:770
uint32_t port
Definition: mqtt.h:97
void * reverse
Definition: espconn.h:112
sint8 espconn_send(struct espconn *espconn, uint8 *psent, uint16 length) __attribute__((alias("espconn_sent")))
#define NULL
Definition: def.h:47
#define MQTT_SEND_TIMOUT
Definition: mqtt.c:58
uint8_t * host
Definition: mqtt.h:96
struct espconn * pCon
Definition: mqtt.h:94
int out_buffer_length
Definition: mqtt.h:54
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
Definition: mqtt_msg.c:291
void ICACHE_FLASH_ATTR mqtt_tcpclient_discon_cb(void *arg)
Definition: mqtt.c:498
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:229
sint8 espconn_regist_disconcb(struct espconn *espconn, espconn_connect_callback discon_cb)
Definition: espconn.c:807
uint16_t pending_msg_id
Definition: mqtt.h:59
uint8_t * out_buffer
Definition: mqtt.h:52
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length)
Definition: mqtt_msg.c:128
sint8 espconn_secure_send(struct espconn *espconn, uint8 *psent, uint16 length)
uint8_t security
Definition: mqtt.h:95
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
Client received callback function.
Definition: mqtt.c:296
uint16_t length
Definition: mqtt_msg.h:82
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
int pending_msg_type
Definition: mqtt.h:60
mqtt_connect_info_t * connect_info
Definition: mqtt.h:50
#define INFO(...)
Definition: debug.h:17
static int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t *buffer)
Definition: mqtt_msg.h:111
void ICACHE_FLASH_ATTR mqtt_tcpclient_sent_cb(void *arg)
Client send over callback function.
Definition: mqtt.c:456
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_tcpclient_delete()

void ICACHE_FLASH_ATTR mqtt_tcpclient_delete ( MQTT_Client mqttClient)

Delete tcp client and free all memory.

Parameters
mqttClientThe mqtt client which contain TCP client
Return values
None

Definition at line 179 of file mqtt.c.

References espconn_abort(), espconn_delete(), ICACHE_FLASH_ATTR, INFO, NULL, os_free, MQTT_Client::pCon, espconn::proto, and espconn::tcp.

Referenced by mqtt_client_delete(), MQTT_Connect(), and MQTT_Task().

180 {
181  if (mqttClient->pCon != NULL) {
182  INFO("TCP: Free memory\r\n");
183  // Force abort connections
184  espconn_abort(mqttClient->pCon);
185  // Delete connections
186  espconn_delete(mqttClient->pCon);
187 
188  if (mqttClient->pCon->proto.tcp) {
189  os_free(mqttClient->pCon->proto.tcp);
190  mqttClient->pCon->proto.tcp = NULL;
191  }
192  os_free(mqttClient->pCon);
193  mqttClient->pCon = NULL;
194  }
195 }
esp_tcp * tcp
Definition: espconn.h:105
#define NULL
Definition: def.h:47
struct espconn * pCon
Definition: mqtt.h:94
#define os_free(s)
Definition: mem.h:40
sint8 espconn_delete(struct espconn *espconn)
Definition: espconn.c:1217
union espconn::@1 proto
#define INFO(...)
Definition: debug.h:17
sint8 ICACHE_FLASH_ATTR espconn_abort(struct espconn *espconn)
Definition: espconn.c:972
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_tcpclient_discon_cb()

void ICACHE_FLASH_ATTR mqtt_tcpclient_discon_cb ( void *  arg)

Definition at line 498 of file mqtt.c.

References MQTT_Client::connState, MQTT_Client::disconnectedCb, ICACHE_FLASH_ATTR, INFO, MQTT_DELETED, MQTT_DELETING, MQTT_TASK_PRIO, os_param_t, espconn::reverse, system_os_post(), TCP_DISCONNECTED, TCP_DISCONNECTING, and TCP_RECONNECT_REQ.

Referenced by mqtt_tcpclient_connect_cb().

499 {
500 
501  struct espconn *pespconn = (struct espconn *)arg;
502  MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
503  INFO("TCP: Disconnected callback\r\n");
504  if(TCP_DISCONNECTING == client->connState) {
505  client->connState = TCP_DISCONNECTED;
506  }
507  else if(MQTT_DELETING == client->connState) {
508  client->connState = MQTT_DELETED;
509  }
510  else {
511  client->connState = TCP_RECONNECT_REQ;
512  }
513  if (client->disconnectedCb)
514  client->disconnectedCb((uint32_t*)client);
515 
517 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
void * reverse
Definition: espconn.h:112
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
tConnState connState
Definition: mqtt.h:110
MqttCallback disconnectedCb
Definition: mqtt.h:102
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_tcpclient_recon_cb()

void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb ( void *  arg,
sint8  errType 
)

Tcp client connect repeat callback function.

Parameters
argcontain the ip link information
Return values
None

Definition at line 567 of file mqtt.c.

References BOOL, MQTT_Client::connState, MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, MQTT_TASK_PRIO, os_param_t, MQTT_Client::port, espconn::reverse, system_os_post(), and TCP_RECONNECT_REQ.

Referenced by MQTT_Connect().

568 {
569  struct espconn *pCon = (struct espconn *)arg;
570  MQTT_Client* client = (MQTT_Client *)pCon->reverse;
571 
572  INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port);
573 
574  client->connState = TCP_RECONNECT_REQ;
575 
577 
578 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
uint32_t port
Definition: mqtt.h:97
void * reverse
Definition: espconn.h:112
uint8_t * host
Definition: mqtt.h:96
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_tcpclient_recv()

void ICACHE_FLASH_ATTR mqtt_tcpclient_recv ( void *  arg,
char *  pdata,
unsigned short  len 
)

Client received callback function.

Parameters
argcontain the ip link information
pdatareceived data
lenthe lenght of received data
Return values
None

Definition at line 296 of file mqtt.c.

References MQTT_Client::connectedCb, CONNECTION_ACCEPTED, CONNECTION_REFUSE_BAD_USERNAME, CONNECTION_REFUSE_NOT_AUTHORIZED, CONNECTION_REFUSE_PROTOCOL, CONNECTION_REFUSE_SERVER_UNAVAILABLE, MQTT_Client::connState, mqtt_message::data, deliver_publish(), espconn_disconnect(), espconn_secure_disconnect(), MQTT_Client::host, ICACHE_FLASH_ATTR, mqtt_state_t::in_buffer, mqtt_state_t::in_buffer_length, INFO, MQTT_Client::keepAliveTick, mqtt_message::length, mqtt_state_t::message_length, mqtt_state_t::message_length_read, MQTT_CONNECT_SENDING, mqtt_state_t::mqtt_connection, MQTT_DATA, mqtt_get_connect_return_code(), mqtt_get_id(), mqtt_get_qos(), mqtt_get_total_length(), mqtt_get_type(), MQTT_KEEPALIVE_SEND, mqtt_msg_pingresp(), mqtt_msg_puback(), mqtt_msg_pubcomp(), mqtt_msg_pubrec(), mqtt_msg_pubrel(), MQTT_MSG_TYPE_CONNACK, MQTT_MSG_TYPE_CONNECT, MQTT_MSG_TYPE_PINGREQ, MQTT_MSG_TYPE_PINGRESP, MQTT_MSG_TYPE_PUBACK, MQTT_MSG_TYPE_PUBCOMP, MQTT_MSG_TYPE_PUBLISH, MQTT_MSG_TYPE_PUBREC, MQTT_MSG_TYPE_PUBREL, MQTT_MSG_TYPE_SUBACK, MQTT_MSG_TYPE_SUBSCRIBE, MQTT_MSG_TYPE_UNSUBACK, MQTT_MSG_TYPE_UNSUBSCRIBE, MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_memcpy, os_param_t, mqtt_state_t::outbound_message, MQTT_Client::pCon, mqtt_state_t::pending_msg_id, mqtt_state_t::pending_msg_type, MQTT_Client::port, QUEUE_Puts(), espconn::reverse, MQTT_Client::security, and system_os_post().

Referenced by mqtt_tcpclient_connect_cb().

297 {
298  uint8_t msg_type;
299  uint8_t msg_qos;
300  uint16_t msg_id;
301  uint8_t msg_conn_ret;
302 
303  struct espconn *pCon = (struct espconn*)arg;
304  MQTT_Client *client = (MQTT_Client *)pCon->reverse;
305 
306  client->keepAliveTick = 0;
307 READPACKET:
308  INFO("TCP: data received %d bytes\r\n", len);
309  // INFO("STATE: %d\r\n", client->connState);
310  if (len < MQTT_BUF_SIZE && len > 0) {
311  os_memcpy(client->mqtt_state.in_buffer, pdata, len);
312 
313  msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
314  msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
315  msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
316  switch (client->connState) {
318  if (msg_type == MQTT_MSG_TYPE_CONNACK) {
320  INFO("MQTT: Invalid packet\r\n");
321  if (client->security) {
322 #ifdef MQTT_SSL_ENABLE
324 #else
325  INFO("TCP: Do not support SSL\r\n");
326 #endif
327  }
328  else {
329  espconn_disconnect(client->pCon);
330  }
331  } else {
332  msg_conn_ret = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
333  switch (msg_conn_ret) {
334  case CONNECTION_ACCEPTED:
335  INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
336  client->connState = MQTT_DATA;
337  if (client->connectedCb)
338  client->connectedCb((uint32_t*)client);
339  break;
344  INFO("MQTT: Connection refuse, reason code: %d\r\n", msg_conn_ret);
345  default:
346  if (client->security) {
347 #ifdef MQTT_SSL_ENABLE
348  espconn_secure_disconnect(client->pCon);
349 #else
350  INFO("TCP: Do not support SSL\r\n");
351 #endif
352  }
353  else {
354  espconn_disconnect(client->pCon);
355  }
356  }
357  }
358 
359  }
360  break;
361  case MQTT_DATA:
362  case MQTT_KEEPALIVE_SEND:
363  client->mqtt_state.message_length_read = len;
365 
366 
367  switch (msg_type) {
370  INFO("MQTT: Subscribe successful\r\n");
371  break;
374  INFO("MQTT: UnSubscribe successful\r\n");
375  break;
377  if (msg_qos == 1)
379  else if (msg_qos == 2)
381  if (msg_qos == 1 || msg_qos == 2) {
382  INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
383  if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
384  INFO("MQTT: Queue full\r\n");
385  }
386  }
387 
389  break;
391  if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
392  INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
393  }
394 
395  break;
398  if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
399  INFO("MQTT: Queue full\r\n");
400  }
401  break;
404  if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
405  INFO("MQTT: Queue full\r\n");
406  }
407  break;
409  if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
410  INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
411  }
412  break;
415  if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
416  INFO("MQTT: Queue full\r\n");
417  }
418  break;
420  // Ignore
421  break;
422  }
423  // NOTE: this is done down here and not in the switch case above
424  // because the PSOCK_READBUF_LEN() won't work inside a switch
425  // statement due to the way protothreads resume.
426  if (msg_type == MQTT_MSG_TYPE_PUBLISH)
427  {
428  len = client->mqtt_state.message_length_read;
429 
431  {
432  //client->connState = MQTT_PUBLISH_RECV;
433  //Not Implement yet
434  len -= client->mqtt_state.message_length;
435  pdata += client->mqtt_state.message_length;
436 
437  INFO("Get another published message\r\n");
438  goto READPACKET;
439  }
440 
441  }
442  break;
443  }
444  } else {
445  INFO("ERROR: Message too long\r\n");
446  }
448 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
uint32_t port
Definition: mqtt.h:97
void * reverse
Definition: espconn.h:112
uint16_t message_length
Definition: mqtt.h:55
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:135
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:411
uint8_t * host
Definition: mqtt.h:96
struct espconn * pCon
Definition: mqtt.h:94
LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client *client, uint8_t *message, int length)
Definition: mqtt.c:119
uint8_t * in_buffer
Definition: mqtt.h:51
MqttCallback connectedCb
Definition: mqtt.h:101
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
#define os_memcpy
Definition: osapi.h:36
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:427
sint8 espconn_disconnect(struct espconn *espconn)
Definition: espconn.c:942
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:403
int in_buffer_length
Definition: mqtt.h:53
QUEUE msgQueue
Definition: mqtt.h:111
Definition: mqtt.h:82
uint16_t message_length_read
Definition: mqtt.h:56
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t *buffer, uint16_t length)
Definition: mqtt_msg.c:229
uint16_t pending_msg_id
Definition: mqtt.h:59
sint8 espconn_secure_disconnect(struct espconn *espconn)
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
uint8_t security
Definition: mqtt.h:95
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
static int ICACHE_FLASH_ATTR mqtt_get_connect_return_code(uint8_t *buffer)
Definition: mqtt_msg.h:112
uint16_t length
Definition: mqtt_msg.h:82
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
int pending_msg_type
Definition: mqtt.h:60
#define INFO(...)
Definition: debug.h:17
static int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t *buffer)
Definition: mqtt_msg.h:111
mqtt_state_t mqtt_state
Definition: mqtt.h:99
uint32_t keepAliveTick
Definition: mqtt.h:107
static int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t *buffer)
Definition: mqtt_msg.h:114
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t *connection)
Definition: mqtt_msg.c:477
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
Definition: mqtt_msg.c:419
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_tcpclient_sent_cb()

void ICACHE_FLASH_ATTR mqtt_tcpclient_sent_cb ( void *  arg)

Client send over callback function.

Parameters
argcontain the ip link information
Return values
None

Definition at line 456 of file mqtt.c.

References MQTT_Client::connState, INFO, MQTT_Client::keepAliveTick, MQTT_DATA, MQTT_KEEPALIVE_SEND, MQTT_MSG_TYPE_PUBLISH, MQTT_Client::mqtt_state, MQTT_TASK_PRIO, os_param_t, mqtt_state_t::pending_msg_type, MQTT_Client::publishedCb, espconn::reverse, MQTT_Client::sendTimeout, and system_os_post().

Referenced by mqtt_tcpclient_connect_cb().

457 {
458  struct espconn *pCon = (struct espconn *)arg;
459  MQTT_Client* client = (MQTT_Client *)pCon->reverse;
460  INFO("TCP: Sent\r\n");
461  client->sendTimeout = 0;
462  client->keepAliveTick =0;
463 
464  if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND)
466  if (client->publishedCb)
467  client->publishedCb((uint32_t*)client);
468  }
470 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
uint32_t sendTimeout
Definition: mqtt.h:109
void * reverse
Definition: espconn.h:112
MqttCallback publishedCb
Definition: mqtt.h:103
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
Definition: mqtt.h:82
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
int pending_msg_type
Definition: mqtt.h:60
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
uint32_t keepAliveTick
Definition: mqtt.h:107
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mqtt_timer()

void ICACHE_FLASH_ATTR mqtt_timer ( void *  arg)

Definition at line 472 of file mqtt.c.

References mqtt_state_t::connect_info, MQTT_Client::connState, ICACHE_FLASH_ATTR, mqtt_connect_info::keepalive, MQTT_Client::keepAliveTick, MQTT_DATA, MQTT_KEEPALIVE_SEND, MQTT_RECONNECT_TIMEOUT, MQTT_Client::mqtt_state, MQTT_TASK_PRIO, os_param_t, MQTT_Client::reconnectTick, MQTT_Client::sendTimeout, system_os_post(), TCP_RECONNECT, and TCP_RECONNECT_REQ.

Referenced by MQTT_Connect().

473 {
474  MQTT_Client* client = (MQTT_Client*)arg;
475 
476  if (client->connState == MQTT_DATA) {
477  client->keepAliveTick ++;
478  if (client->keepAliveTick > (client->mqtt_state.connect_info->keepalive / 2)) {
479  client->connState = MQTT_KEEPALIVE_SEND;
481  }
482 
483  } else if (client->connState == TCP_RECONNECT_REQ) {
484  client->reconnectTick ++;
485  if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) {
486  client->reconnectTick = 0;
487  client->connState = TCP_RECONNECT;
489  if (client->timeoutCb)
490  client->timeoutCb((uint32_t*)client);
491  }
492  }
493  if (client->sendTimeout > 0)
494  client->sendTimeout --;
495 }
#define MQTT_RECONNECT_TIMEOUT
Definition: user_config.h:41
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
uint32_t sendTimeout
Definition: mqtt.h:109
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
MqttCallback timeoutCb
Definition: mqtt.h:104
Definition: mqtt.h:82
uint32_t reconnectTick
Definition: mqtt.h:108
tConnState connState
Definition: mqtt.h:110
uint32_t keepalive
Definition: mqtt_msg.h:103
#define os_param_t
Definition: os_type.h:31
mqtt_connect_info_t * connect_info
Definition: mqtt.h:50
mqtt_state_t mqtt_state
Definition: mqtt.h:99
uint32_t keepAliveTick
Definition: mqtt.h:107
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_UnSubscribe()

BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe ( MQTT_Client client,
char *  topic 
)

MQTT un-subscibe function.

Parameters
clientMQTT_Client reference
topicString topic will un-subscribe
Return values
TRUEif success queue

Definition at line 650 of file mqtt.c.

References BOOL, mqtt_message::data, FALSE, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_unsubscribe(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, mqtt_state_t::pending_msg_id, QUEUE_Gets(), QUEUE_Puts(), system_os_post(), and TRUE.

651 {
652  uint8_t dataBuffer[MQTT_BUF_SIZE];
653  uint16_t dataLen;
655  topic,
656  &client->mqtt_state.pending_msg_id);
657  INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
658  while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
659  INFO("MQTT: Queue full\r\n");
660  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
661  INFO("MQTT: Serious buffer error\r\n");
662  return FALSE;
663  }
664  }
666  return TRUE;
667 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
Definition: mqtt_msg.c:455
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:

Variable Documentation

◆ default_certificate

unsigned char* default_certificate

Definition at line 64 of file mqtt.c.

◆ default_certificate_len

unsigned int default_certificate_len = 0

Definition at line 65 of file mqtt.c.

◆ default_private_key

unsigned char* default_private_key

Definition at line 66 of file mqtt.c.

◆ default_private_key_len

unsigned int default_private_key_len = 0

Definition at line 67 of file mqtt.c.

◆ mqtt_procTaskQueue

os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]

Definition at line 69 of file mqtt.c.

Referenced by MQTT_InitClient().