MeterLogger
Data Structures | Macros | Typedefs | Enumerations | Functions
mqtt.h File Reference
#include "mqtt_msg.h"
#include "user_interface.h"
#include "queue.h"
Include dependency graph for mqtt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  mqtt_event_data_t
 
struct  mqtt_state_t
 
struct  MQTT_Client
 

Macros

#define SEC_NONSSL   0
 
#define SEC_SSL   1
 
#define MQTT_FLAG_CONNECTED   1
 
#define MQTT_FLAG_READY   2
 
#define MQTT_FLAG_EXIT   4
 
#define MQTT_EVENT_TYPE_NONE   0
 
#define MQTT_EVENT_TYPE_CONNECTED   1
 
#define MQTT_EVENT_TYPE_DISCONNECTED   2
 
#define MQTT_EVENT_TYPE_SUBSCRIBED   3
 
#define MQTT_EVENT_TYPE_UNSUBSCRIBED   4
 
#define MQTT_EVENT_TYPE_PUBLISH   5
 
#define MQTT_EVENT_TYPE_PUBLISHED   6
 
#define MQTT_EVENT_TYPE_EXITED   7
 
#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION   8
 

Typedefs

typedef struct mqtt_event_data_t mqtt_event_data_t
 
typedef struct mqtt_state_t mqtt_state_t
 
typedef void(* MqttCallback) (uint32_t *args)
 
typedef void(* MqttDataCallback) (uint32_t *args, const char *topic, uint32_t topic_len, const char *data, uint32_t lengh)
 

Enumerations

enum  tConnState {
  WIFI_INIT, WIFI_CONNECTING, WIFI_CONNECTING_ERROR, WIFI_CONNECTED,
  DNS_RESOLVE, TCP_DISCONNECTING, TCP_DISCONNECTED, TCP_RECONNECT_DISCONNECTING,
  TCP_RECONNECT_REQ, TCP_RECONNECT, TCP_CONNECTING, TCP_CONNECTING_ERROR,
  TCP_CONNECTED, MQTT_CONNECT_SEND, MQTT_CONNECT_SENDING, MQTT_SUBSCIBE_SEND,
  MQTT_SUBSCIBE_SENDING, MQTT_DATA, MQTT_KEEPALIVE_SEND, MQTT_PUBLISH_RECV,
  MQTT_PUBLISHING, MQTT_DELETING, MQTT_DELETED
}
 

Functions

void ICACHE_FLASH_ATTR MQTT_InitConnection (MQTT_Client *mqttClient, uint8_t *host, uint32_t port, uint8_t security)
 MQTT initialization connection function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_InitClient (MQTT_Client *mqttClient, uint8_t *client_id, uint8_t *client_user, uint8_t *client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
 MQTT initialization mqtt client function. More...
 
void ICACHE_FLASH_ATTR MQTT_DeleteClient (MQTT_Client *mqttClient)
 
void ICACHE_FLASH_ATTR MQTT_InitLWT (MQTT_Client *mqttClient, uint8_t *will_topic, uint8_t *will_msg, uint8_t will_qos, uint8_t will_retain)
 
void ICACHE_FLASH_ATTR MQTT_OnConnected (MQTT_Client *mqttClient, MqttCallback connectedCb)
 
void ICACHE_FLASH_ATTR MQTT_OnDisconnected (MQTT_Client *mqttClient, MqttCallback disconnectedCb)
 
void ICACHE_FLASH_ATTR MQTT_OnPublished (MQTT_Client *mqttClient, MqttCallback publishedCb)
 
void ICACHE_FLASH_ATTR MQTT_OnTimeout (MQTT_Client *mqttClient, MqttCallback timeoutCb)
 
void ICACHE_FLASH_ATTR MQTT_OnData (MQTT_Client *mqttClient, MqttDataCallback dataCb)
 
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe (MQTT_Client *client, char *topic, uint8_t qos)
 MQTT subscibe function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe (MQTT_Client *client, char *topic)
 MQTT un-subscibe function. More...
 
void ICACHE_FLASH_ATTR MQTT_Connect (MQTT_Client *mqttClient)
 Begin connect to MQTT broker. More...
 
void ICACHE_FLASH_ATTR MQTT_Disconnect (MQTT_Client *mqttClient)
 
BOOL ICACHE_FLASH_ATTR MQTT_Publish (MQTT_Client *client, const char *topic, const char *data, int data_length, int qos, int retain)
 MQTT publish function. More...
 
BOOL ICACHE_FLASH_ATTR MQTT_Ping (MQTT_Client *client)
 MQTT ping function. More...
 

Macro Definition Documentation

◆ MQTT_EVENT_TYPE_CONNECTED

#define MQTT_EVENT_TYPE_CONNECTED   1

Definition at line 123 of file mqtt.h.

◆ MQTT_EVENT_TYPE_DISCONNECTED

#define MQTT_EVENT_TYPE_DISCONNECTED   2

Definition at line 124 of file mqtt.h.

◆ MQTT_EVENT_TYPE_EXITED

#define MQTT_EVENT_TYPE_EXITED   7

Definition at line 129 of file mqtt.h.

◆ MQTT_EVENT_TYPE_NONE

#define MQTT_EVENT_TYPE_NONE   0

Definition at line 122 of file mqtt.h.

◆ MQTT_EVENT_TYPE_PUBLISH

#define MQTT_EVENT_TYPE_PUBLISH   5

Definition at line 127 of file mqtt.h.

◆ MQTT_EVENT_TYPE_PUBLISH_CONTINUATION

#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION   8

Definition at line 130 of file mqtt.h.

◆ MQTT_EVENT_TYPE_PUBLISHED

#define MQTT_EVENT_TYPE_PUBLISHED   6

Definition at line 128 of file mqtt.h.

◆ MQTT_EVENT_TYPE_SUBSCRIBED

#define MQTT_EVENT_TYPE_SUBSCRIBED   3

Definition at line 125 of file mqtt.h.

◆ MQTT_EVENT_TYPE_UNSUBSCRIBED

#define MQTT_EVENT_TYPE_UNSUBSCRIBED   4

Definition at line 126 of file mqtt.h.

◆ MQTT_FLAG_CONNECTED

#define MQTT_FLAG_CONNECTED   1

Definition at line 118 of file mqtt.h.

◆ MQTT_FLAG_EXIT

#define MQTT_FLAG_EXIT   4

Definition at line 120 of file mqtt.h.

◆ MQTT_FLAG_READY

#define MQTT_FLAG_READY   2

Definition at line 119 of file mqtt.h.

◆ SEC_NONSSL

#define SEC_NONSSL   0

Definition at line 115 of file mqtt.h.

◆ SEC_SSL

#define SEC_SSL   1

Definition at line 116 of file mqtt.h.

Typedef Documentation

◆ mqtt_event_data_t

◆ mqtt_state_t

typedef struct mqtt_state_t mqtt_state_t

◆ MqttCallback

typedef void(* MqttCallback) (uint32_t *args)

Definition at line 90 of file mqtt.h.

◆ MqttDataCallback

typedef void(* MqttDataCallback) (uint32_t *args, const char *topic, uint32_t topic_len, const char *data, uint32_t lengh)

Definition at line 91 of file mqtt.h.

Enumeration Type Documentation

◆ tConnState

enum tConnState
Enumerator
WIFI_INIT 
WIFI_CONNECTING 
WIFI_CONNECTING_ERROR 
WIFI_CONNECTED 
DNS_RESOLVE 
TCP_DISCONNECTING 
TCP_DISCONNECTED 
TCP_RECONNECT_DISCONNECTING 
TCP_RECONNECT_REQ 
TCP_RECONNECT 
TCP_CONNECTING 
TCP_CONNECTING_ERROR 
TCP_CONNECTED 
MQTT_CONNECT_SEND 
MQTT_CONNECT_SENDING 
MQTT_SUBSCIBE_SEND 
MQTT_SUBSCIBE_SENDING 
MQTT_DATA 
MQTT_KEEPALIVE_SEND 
MQTT_PUBLISH_RECV 
MQTT_PUBLISHING 
MQTT_DELETING 
MQTT_DELETED 

Definition at line 64 of file mqtt.h.

Function Documentation

◆ MQTT_Connect()

void ICACHE_FLASH_ATTR MQTT_Connect ( MQTT_Client mqttClient)

Begin connect to MQTT broker.

Parameters
clientMQTT_Client reference
Return values
None

Definition at line 892 of file mqtt.c.

References MQTT_Client::connState, espconn_connect(), espconn_gethostbyname(), ESPCONN_KEEPALIVE, ESPCONN_KEEPCNT, ESPCONN_KEEPIDLE, ESPCONN_KEEPINTVL, ESPCONN_NONE, espconn_port(), espconn_regist_connectcb(), espconn_regist_reconcb(), espconn_secure_connect(), espconn_set_keepalive(), espconn_set_opt(), ESPCONN_TCP, MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, MQTT_Client::ip, MQTT_Client::keepAliveTick, _esp_tcp::local_port, mqtt_dns_found(), mqtt_tcpclient_connect_cb(), mqtt_tcpclient_delete(), mqtt_tcpclient_recon_cb(), mqtt_timer(), MQTT_Client::mqttTimer, os_timer_arm, os_timer_disarm, os_timer_func_t, os_timer_setfn, os_zalloc, MQTT_Client::pCon, MQTT_Client::port, espconn::proto, MQTT_Client::reconnectTick, _esp_tcp::remote_ip, _esp_tcp::remote_port, espconn::reverse, MQTT_Client::security, espconn::state, espconn::tcp, TCP_CONNECTING, espconn::type, and UTILS_StrToIP().

Referenced by MQTT_Task().

893 {
894  uint32_t keeplive;
895 
896  if (mqttClient->pCon) {
897  // Clean up the old connection forcefully - using MQTT_Disconnect
898  // does not actually release the old connection until the
899  // disconnection callback is invoked.
900  mqtt_tcpclient_delete(mqttClient);
901  }
902  mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
903  mqttClient->pCon->type = ESPCONN_TCP;
904  mqttClient->pCon->state = ESPCONN_NONE;
905  mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
906  mqttClient->pCon->proto.tcp->local_port = espconn_port();
907  mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;
908  mqttClient->pCon->reverse = mqttClient;
909 
910  // enable TCP keep alive
911  espconn_set_opt(mqttClient->pCon, ESPCONN_KEEPALIVE);
912 
913  // set keepalive: 75s = 60 + 5*3
914  keeplive = 60;
915  espconn_set_keepalive(mqttClient->pCon, ESPCONN_KEEPIDLE, &keeplive);
916 
917  keeplive = 5;
918  espconn_set_keepalive(mqttClient->pCon, ESPCONN_KEEPINTVL, &keeplive);
919 
920  keeplive = 3; //try times
921  espconn_set_keepalive(mqttClient->pCon, ESPCONN_KEEPCNT, &keeplive);
922 
925 
926  mqttClient->keepAliveTick = 0;
927  mqttClient->reconnectTick = 0;
928 
929  os_timer_disarm(&mqttClient->mqttTimer);
930  os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);
931  os_timer_arm(&mqttClient->mqttTimer, 1000, 1);
932 
933  if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) {
934  INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port);
935  if (mqttClient->security)
936  {
937 #ifdef MQTT_SSL_ENABLE
938  espconn_secure_connect(mqttClient->pCon);
939 #else
940  INFO("TCP: Do not support SSL\r\n");
941 #endif
942  }
943  else
944  {
945  espconn_connect(mqttClient->pCon);
946  }
947  }
948  else {
949  INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
950  espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
951  }
952  mqttClient->connState = TCP_CONNECTING;
953 }
sint8 espconn_set_opt(struct espconn *espconn, uint8 opt)
Definition: espconn.c:1056
sint8 espconn_connect(struct espconn *espconn)
Definition: espconn.c:260
uint32_t port
Definition: mqtt.h:97
void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
Tcp client connect repeat callback function.
Definition: mqtt.c:567
void * reverse
Definition: espconn.h:112
ETSTimer mqttTimer
Definition: mqtt.h:106
#define os_timer_disarm
Definition: osapi.h:51
esp_tcp * tcp
Definition: espconn.h:105
int local_port
Definition: espconn.h:72
sint8 espconn_regist_reconcb(struct espconn *espconn, espconn_reconnect_callback recon_cb)
Definition: espconn.c:789
uint8 remote_ip[4]
Definition: espconn.h:74
void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
Definition: mqtt.c:472
#define os_timer_func_t
Definition: os_type.h:35
uint8_t * host
Definition: mqtt.h:96
struct espconn * pCon
Definition: mqtt.h:94
uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t *str, void *ip)
Definition: mqtt_utils.c:91
#define os_timer_setfn
Definition: osapi.h:52
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client *mqttClient)
Delete tcp client and free all memory.
Definition: mqtt.c:179
int remote_port
Definition: espconn.h:71
#define os_zalloc(s)
Definition: mem.h:44
sint8 espconn_set_keepalive(struct espconn *espconn, uint8 level, void *optarg)
Definition: espconn.c:1128
uint32_t reconnectTick
Definition: mqtt.h:108
uint8_t security
Definition: mqtt.h:95
union espconn::@1 proto
tConnState connState
Definition: mqtt.h:110
#define os_timer_arm(a, b, c)
Definition: osapi.h:50
enum espconn_type type
Definition: espconn.h:101
enum espconn_state state
Definition: espconn.h:103
uint32 espconn_port(void)
Definition: espconn.c:1245
sint8 espconn_regist_connectcb(struct espconn *espconn, espconn_connect_callback connect_cb)
Definition: espconn.c:751
void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
Tcp client connect success callback function.
Definition: mqtt.c:527
LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
Definition: mqtt.c:76
ip_addr_t ip
Definition: mqtt.h:98
#define INFO(...)
Definition: debug.h:17
sint8 espconn_gethostbyname(struct espconn *pespconn, const char *name, ip_addr_t *addr, dns_found_callback found)
Definition: espconn.c:1288
sint8 espconn_secure_connect(struct espconn *espconn)
uint32_t keepAliveTick
Definition: mqtt.h:107
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_DeleteClient()

void ICACHE_FLASH_ATTR MQTT_DeleteClient ( MQTT_Client mqttClient)

Definition at line 964 of file mqtt.c.

References MQTT_Client::connState, ICACHE_FLASH_ATTR, MQTT_DELETED, MQTT_TASK_PRIO, NULL, os_param_t, os_timer_disarm, and system_os_post().

965 {
966  if(NULL == mqttClient)
967  return;
968 
969  mqttClient->connState = MQTT_DELETED;
970  // if(TCP_DISCONNECTED == mqttClient->connState) {
971  // mqttClient->connState = MQTT_DELETED;
972  // } else if(MQTT_DELETED != mqttClient->connState) {
973  // mqttClient->connState = MQTT_DELETING;
974  // }
975 
976  system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
977  os_timer_disarm(&mqttClient->mqttTimer);
978 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
ETSTimer mqttTimer
Definition: mqtt.h:106
#define os_timer_disarm
Definition: osapi.h:51
#define NULL
Definition: def.h:47
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
Here is the call graph for this function:

◆ MQTT_Disconnect()

void ICACHE_FLASH_ATTR MQTT_Disconnect ( MQTT_Client mqttClient)

Definition at line 956 of file mqtt.c.

References MQTT_Client::connState, ICACHE_FLASH_ATTR, MQTT_TASK_PRIO, os_param_t, os_timer_disarm, system_os_post(), and TCP_DISCONNECTING.

957 {
958  mqttClient->connState = TCP_DISCONNECTING;
959  system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
960  os_timer_disarm(&mqttClient->mqttTimer);
961 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
ETSTimer mqttTimer
Definition: mqtt.h:106
#define os_timer_disarm
Definition: osapi.h:51
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
tConnState connState
Definition: mqtt.h:110
#define os_param_t
Definition: os_type.h:31
Here is the call graph for this function:

◆ MQTT_InitClient()

BOOL ICACHE_FLASH_ATTR MQTT_InitClient ( MQTT_Client mqttClient,
uint8_t *  client_id,
uint8_t *  client_user,
uint8_t *  client_pass,
uint32_t  keepAliveTime,
uint8_t  cleanSession 
)

MQTT initialization mqtt client function.

Parameters
clientMQTT_Client reference
clientidMQTT client id
client_user:MQTTclient user
client_pass:MQTTclient password
client_pass:MQTTkeep alive timer, in second
Return values
None

Definition at line 801 of file mqtt.c.

References mqtt_connect_info::clean_session, mqtt_connect_info::client_id, mqtt_state_t::connect_info, MQTT_Client::connect_info, ICACHE_FLASH_ATTR, mqtt_state_t::in_buffer, mqtt_state_t::in_buffer_length, INFO, mqtt_connect_info::keepalive, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_init(), mqtt_procTaskQueue, MQTT_Client::mqtt_state, MQTT_Task(), MQTT_TASK_PRIO, MQTT_TASK_QUEUE_SIZE, MQTT_Client::msgQueue, os_memset, os_param_t, os_strcpy, os_strlen, os_zalloc, mqtt_state_t::out_buffer, mqtt_state_t::out_buffer_length, mqtt_connect_info::password, QUEUE_BUFFER_SIZE, QUEUE_Init(), system_os_post(), system_os_task(), and mqtt_connect_info::username.

802 {
803  uint32_t temp;
804  INFO("MQTT:InitClient\r\n");
805 
806  os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
807 
808  if ( !client_id ) {
809  /* Should be allowed by broker, but clean session flag must be set. */
810 #ifdef PROTOCOL_NAMEv311
811  if (cleanSession) {
812  mqttClient->connect_info.client_id = zero_len_id;
813  } else {
814  INFO("cleanSession must be set to use 0 length client_id\r\n");
815  return false;
816  }
817  /* Not supported. Return. */
818 #else
819  INFO("Client ID required for MQTT < 3.1.1!\r\n");
820  return false;
821 #endif
822  }
823 
824  /* If connect_info's client_id is still NULL and we get here, we can *
825  * assume the passed client_id is non-NULL. */
826  if ( !(mqttClient->connect_info.client_id) )
827  {
828  temp = os_strlen(client_id);
829  mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1);
830  os_strcpy(mqttClient->connect_info.client_id, client_id);
831  mqttClient->connect_info.client_id[temp] = 0;
832  }
833 
834  if (client_user)
835  {
836  temp = os_strlen(client_user);
837  mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
838  os_strcpy(mqttClient->connect_info.username, client_user);
839  mqttClient->connect_info.username[temp] = 0;
840  }
841 
842  if (client_pass)
843  {
844  temp = os_strlen(client_pass);
845  mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
846  os_strcpy(mqttClient->connect_info.password, client_pass);
847  mqttClient->connect_info.password[temp] = 0;
848  }
849 
850 
851  mqttClient->connect_info.keepalive = keepAliveTime;
852  mqttClient->connect_info.clean_session = cleanSession;
853 
854  mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
856  mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
858  mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
859 
861 
862  QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);
863 
865  system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
866  return true;
867 }
#define MQTT_TASK_QUEUE_SIZE
Definition: mqtt.c:57
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
char * username
Definition: mqtt_msg.h:99
#define os_zalloc(s)
Definition: mem.h:44
uint8_t * in_buffer
Definition: mqtt.h:51
os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]
Definition: mqtt.c:69
#define MQTT_BUF_SIZE
Definition: user_config.h:31
int out_buffer_length
Definition: mqtt.h:54
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
bool system_os_task(os_task_t task, uint8 prio, os_event_t *queue, uint8 qlen)
#define os_strlen
Definition: osapi.h:43
#define QUEUE_BUFFER_SIZE
Definition: mqtt.c:61
#define os_memset
Definition: osapi.h:38
int in_buffer_length
Definition: mqtt.h:53
QUEUE msgQueue
Definition: mqtt.h:111
char * client_id
Definition: mqtt_msg.h:98
void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize)
Definition: queue.c:39
mqtt_connect_info_t connect_info
Definition: mqtt.h:100
uint8_t * out_buffer
Definition: mqtt.h:52
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint16_t buffer_length)
Definition: mqtt_msg.c:128
void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e)
Definition: mqtt.c:697
uint32_t keepalive
Definition: mqtt_msg.h:103
#define os_strcpy
Definition: osapi.h:42
#define os_param_t
Definition: os_type.h:31
mqtt_connect_info_t * connect_info
Definition: mqtt.h:50
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:

◆ MQTT_InitConnection()

void ICACHE_FLASH_ATTR MQTT_InitConnection ( MQTT_Client mqttClient,
uint8_t *  host,
uint32_t  port,
uint8_t  security 
)

MQTT initialization connection function.

Parameters
clientMQTT_Client reference
hostDomain or IP string
portPort to connect
security1 for ssl, 0 for none
Return values
None

Definition at line 777 of file mqtt.c.

References BOOL, MQTT_Client::host, ICACHE_FLASH_ATTR, INFO, os_memset, os_strcpy, os_strlen, os_zalloc, MQTT_Client::port, and MQTT_Client::security.

778 {
779  uint32_t temp;
780  INFO("MQTT:InitConnection\r\n");
781  os_memset(mqttClient, 0, sizeof(MQTT_Client));
782  temp = os_strlen(host);
783  mqttClient->host = (uint8_t*)os_zalloc(temp + 1);
784  os_strcpy(mqttClient->host, host);
785  mqttClient->host[temp] = 0;
786  mqttClient->port = port;
787  mqttClient->security = security;
788 
789 }
uint32_t port
Definition: mqtt.h:97
uint8_t * host
Definition: mqtt.h:96
#define os_zalloc(s)
Definition: mem.h:44
#define os_strlen
Definition: osapi.h:43
#define os_memset
Definition: osapi.h:38
uint8_t security
Definition: mqtt.h:95
#define os_strcpy
Definition: osapi.h:42
#define INFO(...)
Definition: debug.h:17

◆ MQTT_InitLWT()

void ICACHE_FLASH_ATTR MQTT_InitLWT ( MQTT_Client mqttClient,
uint8_t *  will_topic,
uint8_t *  will_msg,
uint8_t  will_qos,
uint8_t  will_retain 
)

Definition at line 869 of file mqtt.c.

References MQTT_Client::connect_info, ICACHE_FLASH_ATTR, os_strcpy, os_strlen, os_zalloc, mqtt_connect_info::will_message, mqtt_connect_info::will_qos, mqtt_connect_info::will_retain, and mqtt_connect_info::will_topic.

870 {
871  uint32_t temp;
872  temp = os_strlen(will_topic);
873  mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1);
874  os_strcpy(mqttClient->connect_info.will_topic, will_topic);
875  mqttClient->connect_info.will_topic[temp] = 0;
876 
877  temp = os_strlen(will_msg);
878  mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
879  os_strcpy(mqttClient->connect_info.will_message, will_msg);
880  mqttClient->connect_info.will_message[temp] = 0;
881 
882 
883  mqttClient->connect_info.will_qos = will_qos;
884  mqttClient->connect_info.will_retain = will_retain;
885 }
char * will_message
Definition: mqtt_msg.h:102
char * will_topic
Definition: mqtt_msg.h:101
#define os_zalloc(s)
Definition: mem.h:44
#define os_strlen
Definition: osapi.h:43
mqtt_connect_info_t connect_info
Definition: mqtt.h:100
#define os_strcpy
Definition: osapi.h:42

◆ MQTT_OnConnected()

void ICACHE_FLASH_ATTR MQTT_OnConnected ( MQTT_Client mqttClient,
MqttCallback  connectedCb 
)

Definition at line 981 of file mqtt.c.

References MQTT_Client::connectedCb, and ICACHE_FLASH_ATTR.

982 {
983  mqttClient->connectedCb = connectedCb;
984 }
MqttCallback connectedCb
Definition: mqtt.h:101

◆ MQTT_OnData()

void ICACHE_FLASH_ATTR MQTT_OnData ( MQTT_Client mqttClient,
MqttDataCallback  dataCb 
)

Definition at line 993 of file mqtt.c.

References MQTT_Client::dataCb, and ICACHE_FLASH_ATTR.

994 {
995  mqttClient->dataCb = dataCb;
996 }
MqttDataCallback dataCb
Definition: mqtt.h:105

◆ MQTT_OnDisconnected()

void ICACHE_FLASH_ATTR MQTT_OnDisconnected ( MQTT_Client mqttClient,
MqttCallback  disconnectedCb 
)

Definition at line 987 of file mqtt.c.

References MQTT_Client::disconnectedCb, and ICACHE_FLASH_ATTR.

988 {
989  mqttClient->disconnectedCb = disconnectedCb;
990 }
MqttCallback disconnectedCb
Definition: mqtt.h:102

◆ MQTT_OnPublished()

void ICACHE_FLASH_ATTR MQTT_OnPublished ( MQTT_Client mqttClient,
MqttCallback  publishedCb 
)

Definition at line 999 of file mqtt.c.

References ICACHE_FLASH_ATTR, and MQTT_Client::publishedCb.

1000 {
1001  mqttClient->publishedCb = publishedCb;
1002 }
MqttCallback publishedCb
Definition: mqtt.h:103

◆ MQTT_OnTimeout()

void ICACHE_FLASH_ATTR MQTT_OnTimeout ( MQTT_Client mqttClient,
MqttCallback  timeoutCb 
)

Definition at line 1005 of file mqtt.c.

References MQTT_Client::timeoutCb.

1006 {
1007  mqttClient->timeoutCb = timeoutCb;
1008 }
MqttCallback timeoutCb
Definition: mqtt.h:104

◆ MQTT_Ping()

BOOL ICACHE_FLASH_ATTR MQTT_Ping ( MQTT_Client client)

MQTT ping function.

Parameters
clientMQTT_Client reference
Return values
TRUEif success queue

Definition at line 675 of file mqtt.c.

References mqtt_message::data, FALSE, RINGBUF::fill_cnt, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_pingreq(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, QUEUE_Gets(), QUEUE_Puts(), QUEUE::rb, RINGBUF::size, system_os_post(), and TRUE.

676 {
677  uint8_t dataBuffer[MQTT_BUF_SIZE];
678  uint16_t dataLen;
680  if(client->mqtt_state.outbound_message->length == 0){
681  INFO("MQTT: Queuing publish failed\r\n");
682  return FALSE;
683  }
684  INFO("MQTT: queuing publish, length: %d, queue size(%ld/%ld)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
685  while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
686  INFO("MQTT: Queue full\r\n");
687  if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
688  INFO("MQTT: Serious buffer error\r\n");
689  return FALSE;
690  }
691  }
693  return TRUE;
694 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
RINGBUF rb
Definition: queue.h:37
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
volatile I32 fill_cnt
Definition: ringbuf.h:11
mqtt_state_t mqtt_state
Definition: mqtt.h:99
I32 size
Definition: ringbuf.h:12
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t *connection)
Definition: mqtt_msg.c:471
Here is the call graph for this function:

◆ MQTT_Publish()

BOOL ICACHE_FLASH_ATTR MQTT_Publish ( MQTT_Client client,
const char *  topic,
const char *  data,
int  data_length,
int  qos,
int  retain 
)

MQTT publish function.

Parameters
clientMQTT_Client reference
topicstring topic will publish to
databuffer data send point to
data_lengthlength of data
qosqos
retainretain
Return values
TRUEif success queue

Definition at line 591 of file mqtt.c.

References BOOL, mqtt_message::data, FALSE, RINGBUF::fill_cnt, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_publish(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, mqtt_state_t::pending_msg_id, QUEUE_Gets(), QUEUE_Puts(), QUEUE::rb, RINGBUF::size, system_os_post(), and TRUE.

Referenced by en61107_received_task(), en61107_request_send(), kmp_received_task(), and kmp_request_send().

592 {
593  uint8_t dataBuffer[MQTT_BUF_SIZE];
594  uint16_t dataLen;
596  topic, data, data_length,
597  qos, retain,
598  &client->mqtt_state.pending_msg_id);
599  if (client->mqtt_state.outbound_message->length == 0) {
600  INFO("MQTT: Queuing publish failed\r\n");
601  return FALSE;
602  }
603  INFO("MQTT: queuing publish, length: %d, queue size(%ld/%ld)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
604  while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
605  INFO("MQTT: Queue full\r\n");
606  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
607  INFO("MQTT: Serious buffer error\r\n");
608  return FALSE;
609  }
610  }
612  return TRUE;
613 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
Definition: mqtt_msg.c:377
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
RINGBUF rb
Definition: queue.h:37
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
volatile I32 fill_cnt
Definition: ringbuf.h:11
mqtt_state_t mqtt_state
Definition: mqtt.h:99
I32 size
Definition: ringbuf.h:12
Here is the call graph for this function:
Here is the caller graph for this function:

◆ MQTT_Subscribe()

BOOL ICACHE_FLASH_ATTR MQTT_Subscribe ( MQTT_Client client,
char *  topic,
uint8_t  qos 
)

MQTT subscibe function.

Parameters
clientMQTT_Client reference
topicstring topic will subscribe
qosqos
Return values
TRUEif success queue

Definition at line 623 of file mqtt.c.

References BOOL, mqtt_message::data, FALSE, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_subscribe(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, mqtt_state_t::pending_msg_id, QUEUE_Gets(), QUEUE_Puts(), system_os_post(), and TRUE.

624 {
625  uint8_t dataBuffer[MQTT_BUF_SIZE];
626  uint16_t dataLen;
627 
629  topic, qos,
630  &client->mqtt_state.pending_msg_id);
631  INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
632  while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
633  INFO("MQTT: Queue full\r\n");
634  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
635  INFO("MQTT: Serious buffer error\r\n");
636  return FALSE;
637  }
638  }
640  return TRUE;
641 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
Definition: mqtt_msg.c:435
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function:

◆ MQTT_UnSubscribe()

BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe ( MQTT_Client client,
char *  topic 
)

MQTT un-subscibe function.

Parameters
clientMQTT_Client reference
topicString topic will un-subscribe
Return values
TRUEif success queue

Definition at line 650 of file mqtt.c.

References BOOL, mqtt_message::data, FALSE, ICACHE_FLASH_ATTR, INFO, mqtt_message::length, MQTT_BUF_SIZE, mqtt_state_t::mqtt_connection, mqtt_msg_unsubscribe(), MQTT_Client::mqtt_state, MQTT_TASK_PRIO, MQTT_Client::msgQueue, os_param_t, mqtt_state_t::outbound_message, mqtt_state_t::pending_msg_id, QUEUE_Gets(), QUEUE_Puts(), system_os_post(), and TRUE.

651 {
652  uint8_t dataBuffer[MQTT_BUF_SIZE];
653  uint16_t dataLen;
655  topic,
656  &client->mqtt_state.pending_msg_id);
657  INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
658  while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
659  INFO("MQTT: Queue full\r\n");
660  if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
661  INFO("MQTT: Serious buffer error\r\n");
662  return FALSE;
663  }
664  }
666  return TRUE;
667 }
bool system_os_post(uint8 prio, os_signal_t sig, os_param_t par)
#define FALSE
Definition: c_types.h:111
#define TRUE
Definition: c_types.h:110
#define MQTT_BUF_SIZE
Definition: user_config.h:31
#define MQTT_TASK_PRIO
Definition: mqtt.c:56
QUEUE msgQueue
Definition: mqtt.h:111
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
Definition: mqtt_msg.c:455
uint16_t pending_msg_id
Definition: mqtt.h:59
mqtt_connection_t mqtt_connection
Definition: mqtt.h:58
uint8_t * data
Definition: mqtt_msg.h:81
mqtt_message_t * outbound_message
Definition: mqtt.h:57
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t *buffer, uint16_t len)
Definition: queue.c:44
uint16_t length
Definition: mqtt_msg.h:82
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t *buffer, uint16_t *len, uint16_t maxLen)
Definition: queue.c:61
#define os_param_t
Definition: os_type.h:31
#define INFO(...)
Definition: debug.h:17
mqtt_state_t mqtt_state
Definition: mqtt.h:99
Here is the call graph for this function: