51 #include "user_config.h" 56 #define MQTT_TASK_PRIO USER_TASK_PRIO_1 57 #define MQTT_TASK_QUEUE_SIZE 100 58 #define MQTT_SEND_TIMOUT 5 60 #ifndef QUEUE_BUFFER_SIZE 61 #define QUEUE_BUFFER_SIZE 2048 71 #ifdef PROTOCOL_NAMEv311 72 LOCAL uint8_t zero_len_id[2] = { 0, 0 };
84 INFO(
"DNS: Found, but got no ip, try to reconnect\r\n");
89 INFO(
"DNS: found ip %d.%d.%d.%d\n",
90 *((
uint8 *) &ipaddr->addr),
91 *((
uint8 *) &ipaddr->addr + 1),
92 *((
uint8 *) &ipaddr->addr + 2),
93 *((
uint8 *) &ipaddr->addr + 3));
95 if (client->
ip.addr == 0 && ipaddr->addr != 0)
99 #ifdef MQTT_SSL_ENABLE 102 INFO(
"TCP: Do not support SSL\r\n");
110 INFO(
"TCP: connecting...\r\n");
140 INFO(
"\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->
host, client->
port);
151 #ifdef MQTT_SSL_ENABLE 154 INFO(
"TCP: Do not support SSL\r\n");
182 INFO(
"TCP: Free memory\r\n");
205 if(mqttClient ==
NULL)
284 INFO(
"MQTT: client already deleted\r\n");
301 uint8_t msg_conn_ret;
308 INFO(
"TCP: data received %d bytes\r\n", len);
310 if (len < MQTT_BUF_SIZE && len > 0) {
320 INFO(
"MQTT: Invalid packet\r\n");
322 #ifdef MQTT_SSL_ENABLE 325 INFO(
"TCP: Do not support SSL\r\n");
333 switch (msg_conn_ret) {
335 INFO(
"MQTT: Connected to %s:%d\r\n", client->
host, client->
port);
344 INFO(
"MQTT: Connection refuse, reason code: %d\r\n", msg_conn_ret);
346 if (client->security) {
347 #ifdef MQTT_SSL_ENABLE 350 INFO(
"TCP: Do not support SSL\r\n");
370 INFO(
"MQTT: Subscribe successful\r\n");
374 INFO(
"MQTT: UnSubscribe successful\r\n");
379 else if (msg_qos == 2)
381 if (msg_qos == 1 || msg_qos == 2) {
382 INFO(
"MQTT: Queue response QoS: %d\r\n", msg_qos);
384 INFO(
"MQTT: Queue full\r\n");
392 INFO(
"MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
399 INFO(
"MQTT: Queue full\r\n");
405 INFO(
"MQTT: Queue full\r\n");
410 INFO(
"MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
416 INFO(
"MQTT: Queue full\r\n");
437 INFO(
"Get another published message\r\n");
445 INFO(
"ERROR: Message too long\r\n");
460 INFO(
"TCP: Sent\r\n");
489 if (client->timeoutCb)
490 client->timeoutCb((uint32_t*)client);
503 INFO(
"TCP: Disconnected callback\r\n");
535 INFO(
"MQTT: Connected to broker %s:%d\r\n", client->
host, client->
port);
546 #ifdef MQTT_SSL_ENABLE 549 INFO(
"TCP: Do not support SSL\r\n");
572 INFO(
"TCP: Reconnect to %s:%d\r\n", client->
host, client->
port);
596 topic, data, data_length,
600 INFO(
"MQTT: Queuing publish failed\r\n");
605 INFO(
"MQTT: Queue full\r\n");
607 INFO(
"MQTT: Serious buffer error\r\n");
633 INFO(
"MQTT: Queue full\r\n");
635 INFO(
"MQTT: Serious buffer error\r\n");
659 INFO(
"MQTT: Queue full\r\n");
661 INFO(
"MQTT: Serious buffer error\r\n");
681 INFO(
"MQTT: Queuing publish failed\r\n");
686 INFO(
"MQTT: Queue full\r\n");
688 INFO(
"MQTT: Serious buffer error\r\n");
711 INFO(
"TCP: Reconnect to: %s:%d\r\n", client->
host, client->
port);
718 #ifdef MQTT_SSL_ENABLE 721 INFO(
"TCP: Do not support SSL\r\n");
729 INFO(
"MQTT: Disconnected\r\n");
733 INFO(
"MQTT: Deleted client\r\n");
751 #ifdef MQTT_SSL_ENABLE 754 INFO(
"TCP: Do not support SSL\r\n");
780 INFO(
"MQTT:InitConnection\r\n");
785 mqttClient->
host[temp] = 0;
786 mqttClient->
port = port;
801 MQTT_InitClient(
MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
804 INFO(
"MQTT:InitClient\r\n");
810 #ifdef PROTOCOL_NAMEv311 814 INFO(
"cleanSession must be set to use 0 length client_id\r\n");
819 INFO(
"Client ID required for MQTT < 3.1.1!\r\n");
896 if (mqttClient->
pCon) {
934 INFO(
"TCP: Connect to ip %s:%d\r\n", mqttClient->
host, mqttClient->
port);
937 #ifdef MQTT_SSL_ENABLE 940 INFO(
"TCP: Do not support SSL\r\n");
949 INFO(
"TCP: Connect to domain %s:%d\r\n", mqttClient->
host, mqttClient->
port);
966 if(
NULL == mqttClient)
995 mqttClient->
dataCb = dataCb;
sint8 espconn_regist_sentcb(struct espconn *espconn, espconn_sent_callback sent_cb)
#define MQTT_RECONNECT_TIMEOUT
#define MQTT_TASK_QUEUE_SIZE
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
unsigned int default_certificate_len
unsigned char * default_certificate
sint8 espconn_set_opt(struct espconn *espconn, uint8 opt)
sint8 espconn_regist_recvcb(struct espconn *espconn, espconn_recv_callback recv_cb)
sint8 espconn_connect(struct espconn *espconn)
void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
Tcp client connect repeat callback function.
void ICACHE_FLASH_ATTR mqtt_client_delete(MQTT_Client *mqttClient)
Delete MQTT client and free all memory.
sint8 espconn_send(struct espconn *espconn, uint8 *psent, uint16 length) __attribute__((alias("espconn_sent")))
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char *topic, const char *data, int data_length, int qos, int retain)
MQTT publish function.
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t *buffer, uint16_t length)
void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t *host, uint32_t port, uint8_t security)
MQTT initialization connection function.
sint8 espconn_regist_reconcb(struct espconn *espconn, espconn_reconnect_callback recon_cb)
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
Begin connect to MQTT broker.
BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe(MQTT_Client *client, char *topic)
MQTT un-subscibe function.
#define ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client *client, uint8_t *message, int length)
void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb)
uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t *str, void *ip)
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client *mqttClient)
Delete tcp client and free all memory.
os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]
void ICACHE_FLASH_ATTR MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t *will_topic, uint8_t *will_msg, uint8_t will_qos, uint8_t will_retain)
void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
bool system_os_task(os_task_t task, uint8 prio, os_event_t *queue, uint8 qlen)
void ICACHE_FLASH_ATTR mqtt_send_keepalive(MQTT_Client *client)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
BOOL ICACHE_FLASH_ATTR MQTT_Ping(MQTT_Client *client)
MQTT ping function.
#define QUEUE_BUFFER_SIZE
const char *ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t *buffer, uint16_t *length)
void(* MqttCallback)(uint32_t *args)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
void ICACHE_FLASH_ATTR MQTT_DeleteClient(MQTT_Client *mqttClient)
sint8 espconn_disconnect(struct espconn *espconn)
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char *topic, uint8_t qos)
MQTT subscibe function.
void ICACHE_FLASH_ATTR MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
BOOL ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE *queue)
typedefPACK_STRUCT_END struct ip_addr ip_addr_t
uint16_t message_length_read
void ICACHE_FLASH_ATTR mqtt_tcpclient_discon_cb(void *arg)
void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize)
sint8 espconn_set_keepalive(struct espconn *espconn, uint8 level, void *optarg)
mqtt_connect_info_t connect_info
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t *buffer, uint16_t length)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
sint8 espconn_regist_disconcb(struct espconn *espconn, espconn_connect_callback discon_cb)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
sint8 espconn_secure_disconnect(struct espconn *espconn)
mqtt_connection_t mqtt_connection
mqtt_message_t * outbound_message
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length)
sint8 espconn_secure_send(struct espconn *espconn, uint8 *psent, uint16 length)
sint8 espconn_delete(struct espconn *espconn)
void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client *mqttClient)
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
Client received callback function.
static int ICACHE_FLASH_ATTR mqtt_get_connect_return_code(uint8_t *buffer)
void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e)
MqttCallback disconnectedCb
#define os_timer_arm(a, b, c)
unsigned char * default_private_key
uint32 espconn_port(void)
void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb)
sint8 espconn_regist_connectcb(struct espconn *espconn, espconn_connect_callback connect_cb)
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
mqtt_connect_info_t * connect_info
void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
Tcp client connect success callback function.
LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
unsigned int default_private_key_len
BOOL ICACHE_FLASH_ATTR MQTT_InitClient(MQTT_Client *mqttClient, uint8_t *client_id, uint8_t *client_user, uint8_t *client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
MQTT initialization mqtt client function.
void(* MqttDataCallback)(uint32_t *args, const char *topic, uint32_t topic_len, const char *data, uint32_t lengh)
static int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t *buffer)
sint8 espconn_gethostbyname(struct espconn *pespconn, const char *name, ip_addr_t *addr, dns_found_callback found)
void ICACHE_FLASH_ATTR mqtt_tcpclient_sent_cb(void *arg)
Client send over callback function.
sint8 espconn_secure_connect(struct espconn *espconn)
const char *ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t *buffer, uint16_t *length)
void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb)
static int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t *buffer)
sint8 ICACHE_FLASH_ATTR espconn_abort(struct espconn *espconn)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t *connection)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t *connection)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)