blob: 39f6a8e2f51c7309537f797851bb09ad087abc3b [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix for #96 - check rem_len in readPacket
* Ian Craggs - add ability to set message handler separately #6
*******************************************************************************/
#include <stdio.h>
#include <stdarg.h>
//#include "commontypedef.h"
#include "MQTTClient.h"
#ifdef FEATURE_MQTT_TLS_ENABLE
#include "sha1.h"
#include "sha256.h"
#include "md5.h"
#endif
//#include "debug_trace.h"
//#include DEBUG_LOG_HEADER_FILE
char *mqttSendbuf = NULL;
char *mqttReadbuf = NULL;
unsigned char mqttJsonbuff[256] = {0};
char mqtt_payload[128] = {0};
int ec_sensor_temp = 20;
char ec_data_type = 3;
int ec_data_len = 0;
//osMessageQueueId_t mqttRecvMsgHandle = NULL;
//osMessageQueueId_t mqttSendMsgHandle = NULL;
//osMessageQueueId_t appMqttMsgHandle = NULL;
///osThreadId_t mqttRecvTaskHandle = NULL;
#ifdef MBTK_OPENCPU_SUPPORT
#define MAX_RECV_TASK 5
osThreadId_t mbtk_mqttRecvTaskHandle[MAX_RECV_TASK];
Mutex mbtk_mqttMutex1[MAX_RECV_TASK];
#endif
//osThreadId_t mqttSendTaskHandle = NULL;
//osThreadId_t appMqttTaskHandle = NULL;
//Mutex mqttMutex1;
//Mutex mqttMutex2;
MQTTClient mqttClient;
Network mqttNetwork;
int mqtt_send_task_status_flag = 0;
int mqtt_keepalive_retry_count = 0;
char mqttHb2Hex(unsigned char hb)
{
hb = hb&0xF;
return (char)(hb<10 ? '0'+hb : hb-10+'a');
}
#ifdef FEATURE_MQTT_TLS_ENABLE
void mqttAliHmacSha1(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
{
int i;
mbedtls_sha1_context ctx;
unsigned char k_ipad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};
unsigned char k_opad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};
unsigned char tempbuf[ALI_SHA1_DIGEST_SIZE];
memset(k_ipad, 0x36, ALI_SHA1_KEY_IOPAD_SIZE);
memset(k_opad, 0x5C, ALI_SHA1_KEY_IOPAD_SIZE);
for(i=0; i<keylen; i++)
{
if(i>=ALI_SHA1_KEY_IOPAD_SIZE)
{
break;
}
k_ipad[i] ^=key[i];
k_opad[i] ^=key[i];
}
mbedtls_sha1_init(&ctx);
mbedtls_sha1_starts(&ctx);
mbedtls_sha1_update(&ctx, k_ipad, ALI_SHA1_KEY_IOPAD_SIZE);
mbedtls_sha1_update(&ctx, input, ilen);
mbedtls_sha1_finish(&ctx, tempbuf);
mbedtls_sha1_starts(&ctx);
mbedtls_sha1_update(&ctx, k_opad, ALI_SHA1_KEY_IOPAD_SIZE);
mbedtls_sha1_update(&ctx, tempbuf, ALI_SHA1_DIGEST_SIZE);
mbedtls_sha1_finish(&ctx, tempbuf);
for(i=0; i<ALI_SHA1_DIGEST_SIZE; ++i)
{
output[i*2] = mqttHb2Hex(tempbuf[i]>>4);
output[i*2+1] = mqttHb2Hex(tempbuf[i]);
}
mbedtls_sha1_free(&ctx);
}
/*
* output = SHA-256( input buffer )
*/
void mqttAliHmacSha256(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
{
int i;
mbedtls_sha256_context ctx;
unsigned char k_ipad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};
unsigned char k_opad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};
memset(k_ipad, 0x36, 64);
memset(k_opad, 0x5C, 64);
if ((NULL == input) || (NULL == key) || (NULL == output)) {
return;
}
if (keylen > ALI_SHA256_KEY_IOPAD_SIZE) {
return;
}
for(i=0; i<keylen; i++)
{
if(i>=ALI_SHA256_KEY_IOPAD_SIZE)
{
break;
}
k_ipad[i] ^=key[i];
k_opad[i] ^=key[i];
}
mbedtls_sha256_init(&ctx);
mbedtls_sha256_starts(&ctx, 0);
mbedtls_sha256_update(&ctx, k_ipad, ALI_SHA256_KEY_IOPAD_SIZE);
mbedtls_sha256_update(&ctx, input, ilen);
mbedtls_sha256_finish(&ctx, output);
mbedtls_sha256_starts(&ctx, 0);
mbedtls_sha256_update(&ctx, k_opad, ALI_SHA256_KEY_IOPAD_SIZE);
mbedtls_sha256_update(&ctx, output, ALI_SHA256_DIGEST_SIZE);
mbedtls_sha256_finish(&ctx, output);
mbedtls_sha256_free(&ctx);
}
/*
* output = MD-5( input buffer )
*/
void mqttAliHmacMd5(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
{
int i;
mbedtls_md5_context ctx;
unsigned char k_ipad[ALI_MD5_KEY_IOPAD_SIZE] = {0};
unsigned char k_opad[ALI_MD5_KEY_IOPAD_SIZE] = {0};
unsigned char tempbuf[ALI_MD5_DIGEST_SIZE];
memset(k_ipad, 0x36, ALI_MD5_KEY_IOPAD_SIZE);
memset(k_opad, 0x5C, ALI_MD5_KEY_IOPAD_SIZE);
for(i=0; i<keylen; i++)
{
if(i>=ALI_MD5_KEY_IOPAD_SIZE)
{
break;
}
k_ipad[i] ^=key[i];
k_opad[i] ^=key[i];
}
mbedtls_md5_init(&ctx);
mbedtls_md5_starts(&ctx);
mbedtls_md5_update(&ctx, k_ipad, ALI_MD5_KEY_IOPAD_SIZE);
mbedtls_md5_update(&ctx, input, ilen);
mbedtls_md5_finish(&ctx, tempbuf);
mbedtls_md5_starts(&ctx);
mbedtls_md5_update(&ctx, k_opad, ALI_MD5_KEY_IOPAD_SIZE);
mbedtls_md5_update(&ctx, tempbuf, ALI_MD5_DIGEST_SIZE);
mbedtls_md5_finish(&ctx, tempbuf);
for(i=0; i<ALI_MD5_DIGEST_SIZE; ++i)
{
output[i*2] = mqttHb2Hex(tempbuf[i]>>4);
output[i*2+1] = mqttHb2Hex(tempbuf[i]);
}
mbedtls_md5_free(&ctx);
}
#endif
void mqttDefMessageArrived(MessageData* data)
{
char *bufTemp = NULL;
bufTemp = malloc(data->message->payloadlen+1);
memset(bufTemp, 0, data->message->payloadlen+1);
memcpy(bufTemp, data->message->payload, data->message->payloadlen);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_2, P_SIG, ".........MQTT topic is:%s", (const uint8_t *)data->topicName->lenstring.data);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_1, P_SIG, ".........MQTT_messageArrived is:%s", (const uint8_t *)bufTemp);
free(bufTemp);
}
static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
md->topicName = aTopicName;
md->message = aMessage;
}
static int getNextPacketId(MQTTClient *c) {
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
}
static int sendPacket(MQTTClient* c, int length, Timer* timer)
{
int rc = FAILURE,
sent = 0;
#ifdef FEATURE_MQTT_TLS_ENABLE
#ifdef MBTK_OPENCPU_SUPPORT
if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
{
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls3, P_INFO, 0, "...mqttSendPacket..0.");
rc = mqttSslSend(c->mqtts_client, &c->mqtts_client->sendBuf[sent], length);
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls4, P_INFO, 1, "...mqttSendPacket..=%d.", rc);
TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
return rc;
}
else
#endif
#endif
{
while (sent < length && !TimerIsExpired(timer))
{
#ifdef MQTT_RAI_OPTIMIZE
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer), 0, false);
#else
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
#endif
if (rc < 0) // there was an error writing the data
break;
sent += rc;
}
if (sent == length)
{
TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
rc = SUCCESS;
}
else
rc = FAILURE;
return rc;
}
}
void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
{
int i;
c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms;
c->buf = sendbuf;
c->buf_size = sendbuf_size;
c->readbuf = readbuf;
c->readbuf_size = readbuf_size;
c->isconnected = 0;
c->cleansession = 0;
c->ping_outstanding = 0;
c->defaultMessageHandler = mqttDefMessageArrived;
c->next_packetid = 1;
TimerInit(&c->last_sent);
TimerInit(&c->last_received);
#ifdef FEATURE_MQTT_TLS_ENABLE
#ifdef MBTK_OPENCPU_SUPPORT
c->recv_task_num = -1;
if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
{
c->mqtts_client->sendBuf = sendbuf;
c->mqtts_client->sendBufSize = sendbuf_size;
c->mqtts_client->readBuf = readbuf;
c->mqtts_client->readBufSize = readbuf_size;
}
#endif
#endif
#if defined(MQTT_TASK)
MutexInit(&c->mutex);
#endif
}
static int decodePacket(MQTTClient* c, int* value, int timeout)
{
unsigned char i;
int multiplier = 1;
int len = 0;
const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
#ifdef FEATURE_MQTT_TLS_ENABLE
#ifdef MBTK_OPENCPU_SUPPORT
if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
{
rc = mqttSslRead(c->mqtts_client, &i, 1, timeout);
}
else
{
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
}
#endif
#else
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
#endif
if (rc != 1)
goto exit;
*value += (i & 127) * multiplier;
multiplier *= 128;
} while ((i & 128) != 0);
exit:
return len;
}
static int readPacket(MQTTClient* c, Timer* timer)
{
MQTTHeader header = {0};
int len = 0;
int rem_len = 0;
int rc = 0;
#ifdef FEATURE_MQTT_TLS_ENABLE
#ifdef MBTK_OPENCPU_SUPPORT
if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
{
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls0, P_INFO, 0, "...mqttReadPacket..0.");
/* 1. read the header byte. This has the packet type in it */
rc = mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf, 1, TimerLeftMS(timer));
if (rc != 1)
{
goto exit;
}
len = 1;
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls1, P_INFO, 0, "...mqttReadPacket..1.");
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(c->mqtts_client->readBuf + 1, rem_len); /* put the original remaining length back into the buffer */
if (rem_len > (c->mqtts_client->readBufSize - len))
{
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls202, P_INFO, 0, "...mqttReadPacket..202.");
rc = BUFFER_OVERFLOW;
goto exit;
}
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls2, P_INFO, 0, "...mqttReadPacket..2.");
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf + len, rem_len, TimerLeftMS(timer)) != rem_len))
{
rc = 0;
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls200, P_INFO, 0, "...mqttReadPacket..200.");
goto exit;
}
header.byte = c->mqtts_client->readBuf[0];
rc = header.bits.type;
if (c->keepAliveInterval > 0)
{
ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls201, P_INFO, 0, "...mqttReadPacket..201.");
TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
}
}
else
#endif
#endif
{
/* 1. read the header byte. This has the packet type in it */
rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
if (rc != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
if (rem_len > (c->readbuf_size - len))
{
rc = BUFFER_OVERFLOW;
goto exit;
}
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
rc = 0;
goto exit;
}
header.byte = c->readbuf[0];
rc = header.bits.type;
if (c->keepAliveInterval > 0)
TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
}
exit:
return rc;
}
// assume topic filter and name is in correct format
// # can only be at end
// + and # can only be next to separator
static char isTopicMatched(char* topicFilter, MQTTString* topicName)
{
char* curf = topicFilter;
char* curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end)
{
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+')
{ // skip until we meet the next separator, or end of string
char* nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/')
nextpos = ++curn + 1;
}
else if (*curf == '#')
curn = curn_end - 1; // skip until end of string
curf++;
curn++;
};
return (curn == curn_end) && (*curf == '\0');
}
int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
{
int i;
int rc = FAILURE;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage1, P_SIG, 0, "....1....deliverMessage..");
// we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
{
if (c->messageHandlers[i].fp != NULL)
{
MessageData md;
NewMessageData(&md, topicName, message);
c->messageHandlers[i].fp(&md);
rc = SUCCESS;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage2, P_SIG, 0, "....2....deliverMessage..");
}
}
}
if (rc == FAILURE && c->defaultMessageHandler != NULL)
{
MessageData md;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage3, P_SIG, 0, "....3....deliverMessage..");
NewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md);
rc = SUCCESS;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage4, P_SIG, 0, "....4....deliverMessage..");
}
return rc;
}
int keepalive(MQTTClient* c)
{
int rc = SUCCESS;
if (c->keepAliveInterval == 0)
{
goto exit;
}
if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
{
if (c->ping_outstanding)
{
mqtt_keepalive_retry_count++;
//ECOMM_TRACE(UNILOG_MQTT, keepalive_0, P_SIG, 0, "....keepalive....ping_outstanding..=1..");
rc = FAILURE; /* PINGRESP not received in keepalive interval */
}
else
{
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
#ifdef MBTK_OPENCPU_SUPPORT
memset(c->buf, 0, MQTT_SEND_BUFF_LEN);
memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);
#endif
//memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
//memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
//ECOMM_TRACE(UNILOG_MQTT, keepalive_1, P_SIG, 0, "....keepalive....send packet..");
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
c->ping_outstanding = 1;
}
}
exit:
return rc;
}
int keepaliveRetry(MQTTClient* c)
{
int rc = SUCCESS;
if (c->keepAliveInterval == 0)
{
goto exit;
}
if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
{
{
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
#ifdef MBTK_OPENCPU_SUPPORT
memset(c->buf, 0, MQTT_SEND_BUFF_LEN);
memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);
#endif
//memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
//memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
//ECOMM_TRACE(UNILOG_MQTT, keepalive_1pp, P_SIG, 0, "....keepalive....send packet..");
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
c->ping_outstanding = 1;
}
}
exit:
return rc;
}
void MQTTCleanSession(MQTTClient* c)
{
int i = 0;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = NULL;
}
void MQTTCloseSession(MQTTClient* c)
{
c->ping_outstanding = 0;
c->isconnected = 0;
if (c->cleansession)
MQTTCleanSession(c);
}
#ifdef MBTK_OPENCPU_SUPPORT
int mqtt_try_resubscribe(MQTTClient* c)
{
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0)
{
MQTTSubscribe(c, c->messageHandlers[i].topicFilter, c->messageHandlers[i].qos,c->messageHandlers[i].fp);
}
}
return 0;
}
int mqtt_try_reconnect(MQTTClient* c)
{
int reconnect_flag = 1;
int rc = 0;
mqtt_keepalive_retry_count = 0;
int ret = 0;
if(c->isconnected == 1)
{
if(c->is_mqtts == 0)
{
ret = c->ipstack->disconnect(c->ipstack);
}
else
{
#ifdef FEATURE_MQTT_TLS_ENABLE
#ifdef MBTK_OPENCPU_SUPPORT
ret = mqttSslClose(c->mqtts_client);
#endif
#endif
}
if(ret == 0)
{
c->isconnected = 0;
}
}
if(c->mqtt_connect_callback != 0)
{
c->mqtt_connect_callback(MQTT_START_RECONNECT_EVENT,0);
}
c->isconnected = 0;
#ifdef FEATURE_MQTT_TLS_ENABLE
#ifdef MBTK_OPENCPU_SUPPORT
if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
{
if(mqttRecvTaskHandle)
rc = mqttSslConn_new(c->mqtts_client, c->mbtk_mqtt_server_url);
}
else
#endif
#endif
{
rc = NetworkConnectTimeout(c->ipstack, c->mbtk_mqtt_server_url, c->mbtk_mqtt_port, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT);
}
if(rc < 0)
{
reconnect_flag = 0;
}
else
{
if ((rc = (MQTTConnect(c, c->mbtk_mqtt_options))) != 0)
{
reconnect_flag = 0;
}
else
{
mqtt_try_resubscribe(c);
return 0;
}
}
if(reconnect_flag == 0)
{
if(c->mqtt_connect_callback != NULL)
{
int socket_status = sock_get_errno(c->ipstack->my_socket);
c->mqtt_connect_callback(MQTT_RECONNECT_FAIL_EVENT,socket_status);
}
}
return 0;
}
#endif
int cycle(MQTTClient* c, Timer* timer)
{
int len = 0,
rc = SUCCESS;
int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
//ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
#ifdef MBTK_OPENCPU_SUPPORT
if(c->is_mqtts == MBTK_MQTT_SSL_NONE)
{
int socket_status = sock_get_errno(c->ipstack->my_socket);
if(socket_status == ENOTCONN && c->isconnected == 1)
{
if(c->mqtt_connect_callback != NULL)
{
c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,socket_status);
}
if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)
{
mqtt_try_reconnect(c);
}
else
{
goto exit;
}
}
else if(c->isconnected == 0)
{
goto exit;
}
}
if(c->is_mqtts && packet_type < 0)
{
if(c->isconnected)
{
if(c->mqtt_connect_callback != NULL)
{
c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,ENOTCONN);
}
if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)
{
mqtt_try_reconnect(c);
}
else
{
goto exit;
}
}
else
{
goto exit;
}
}
#endif
switch (packet_type)
{
default:
/* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
rc = packet_type;
break;
case 0: /* timed out reading packet */
break;
case CONNACK:
case PUBACK:
case SUBACK:
case UNSUBACK:
if(packet_type == SUBACK)
{
//rc = packet_type;
}
break;
case PUBLISH:
{
MQTTString topicName;
MQTTMessage msg;
int intQoS;
msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
goto exit;
msg.qos = (enum QoS)intQoS;
deliverMessage(c, &topicName, &msg);
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
if (len <= 0)
rc = FAILURE;
else
rc = sendPacket(c, len, timer);
if (rc == FAILURE)
goto exit; // there was a problem
}
break;
}
case PUBREC:
case PUBREL:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
(packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
break;
}
case PUBCOMP:
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
}
if (keepalive(c) != SUCCESS) {
int socket_stat = 0;
#ifndef MBTK_OPENCPU_SUPPORT
mqttSendMsg mqttMsg;
#endif
//check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
rc = FAILURE;
socket_stat = sock_get_errno(c->ipstack->my_socket);
if((socket_stat == MQTT_ERR_ABRT)||(socket_stat == MQTT_ERR_RST)||(socket_stat == MQTT_ERR_CLSD)||(socket_stat == MQTT_ERR_BADE))
{
#ifdef MBTK_OPENCPU_SUPPORT
if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)
{
mqtt_try_reconnect(c);
}
#else
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0, P_INFO, 0, ".....now, need reconnect.......");
/* send reconnect msg to send task */
memset(&mqttMsg, 0, sizeof(mqttMsg));
mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);
#endif
}
else
{
if(mqtt_keepalive_retry_count>3)
{
#ifdef MBTK_OPENCPU_SUPPORT
if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)
{
mqtt_try_reconnect(c);
}
#else
mqtt_keepalive_retry_count = 0;
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_ee0, P_INFO, 0, ".....now, need reconnect.......");
/* send reconnect msg to send task */
memset(&mqttMsg, 0, sizeof(mqttMsg));
mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);
#endif
}
else
{
keepaliveRetry(c);
}
}
}
exit:
if (rc == SUCCESS)
rc = packet_type;
else if (c->isconnected)
;//MQTTCloseSession(c);
return rc;
}
int MQTTYield(MQTTClient* c, int timeout_ms)
{
int rc = SUCCESS;
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, timeout_ms);
do
{
if (cycle(c, &timer) < 0)
{
rc = FAILURE;
break;
}
} while (!TimerIsExpired(&timer));
return rc;
}
int MQTTIsConnected(MQTTClient* client)
{
return client->isconnected;
}
void MQTTRun(void* parm)
{
Timer timer;
MQTTClient* c = (MQTTClient*)&mqttClient;
#ifndef MBTK_OPENCPU_SUPPORT
if(mqttSendMsgHandle == NULL)
{
mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);
}
if(appMqttMsgHandle == NULL)
{
appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);
}
#endif
/*
if(mqttMutex1.sem == NULL)
{
MutexInit(&mqttMutex1);
}
*/
TimerInit(&timer);
while (1)
{
#if defined(MQTT_TASK)
//MutexLock(&c->mutex);
#endif
//MutexLock(&mqttMutex1);
TimerCountdownMS(&timer, 1500); /* Don't wait too long if no traffic is incoming */
cycle(c, &timer);
//MutexUnlock(&mqttMutex1);
#if defined(MQTT_TASK)
//MutexUnlock(&c->mutex);
#endif
sleep(200);
}
}
#if defined(MQTT_TASK)
int MQTTStartTask(MQTTClient* client)
{
return ThreadStart(&client->thread, &MQTTRun, client);
}
#endif
int MQTTStartRECVTask(void)
{
//osThreadAttr_t task_attr;
//memset(&task_attr, 0, sizeof(task_attr));
//task_attr.name = "mqttRecv";
//task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
#if defined FEATURE_LITEOS_ENABLE
task_attr.priority = osPriorityBelowNormal4;
#elif defined FEATURE_FREERTOS_ENABLE
task_attr.priority = osPriorityBelowNormal7;
#endif
//mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);
if(mqttRecvTaskHandle == NULL)
{
return FAILURE;
}
return SUCCESS;
}
int waitfor(MQTTClient* c, int packet_type, Timer* timer)
{
int rc = FAILURE;
do
{
if (TimerIsExpired(timer))
break; // we timed out
rc = cycle(c, timer);
}
while (rc != packet_type && rc >= 0);
return rc;
}
int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
{
Timer connect_timer;
int rc = FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (c->isconnected) /* don't send connect packet again if we are already connected */
goto exit;
TimerInit(&connect_timer);
TimerCountdownMS(&connect_timer, c->command_timeout_ms);
if (options == 0)
options = &default_options; /* set default options if none were supplied */
c->keepAliveInterval = options->keepAliveInterval;
c->cleansession = options->cleansession;
TimerCountdown(&c->last_received, c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
goto exit; // there was a problem
// this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
{
data->rc = 0;
data->sessionPresent = 0;
if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
rc = data->rc;
else
rc = FAILURE;
}
else
rc = FAILURE;
exit:
#ifdef MBTK_OPENCPU_SUPPORT
if (rc == SUCCESS)
{
if(c->mqtt_connect_callback != NULL)
{
c->mqtt_connect_callback(MQTT_CONNECT_SUCCESS_EVENT,0);
}
c->isconnected = 1;
c->ping_outstanding = 0;
}
#endif
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTReConnect(MQTTClient* client, MQTTPacket_connectData* connectData)
{
int ret = FAILURE;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13hh0, P_INFO, 0, "...start tcp disconnect ..");
client->ipstack->disconnect(client->ipstack);
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14hh1, P_INFO, 0, "...start tcp connect ...");
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16hh3, P_INFO, 0, "...tcp connect ok...");
client->isconnected = 0;
if((NetworkConnectTimeout(client->ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) < 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17hh4, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18hh5, P_INFO, 0, "...start mqtt connect ..");
if ((MQTTConnect(client, connectData)) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19hh6, P_INFO, 0, "...mqtt reconnect fial!!!...");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20hh7, P_INFO, 0, "...mqtt reconnect ok!!!...");
ret = SUCCESS;
}
}
}
return ret;
}
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
{
MQTTConnackData data;
return MQTTConnectWithResults(c, options, &data);
}
int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
{
int rc = FAILURE;
int i = -1;
/* first check for an existing matching slot */
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
{
if (messageHandler == NULL) /* remove existing */
{
c->messageHandlers[i].topicFilter = NULL;
c->messageHandlers[i].fp = NULL;
}
rc = SUCCESS; /* return i when adding new subscription */
break;
}
}
/* if no existing, look for empty slot (unless we are removing) */
if (messageHandler != NULL) {
if (rc == FAILURE)
{
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == NULL)
{
rc = SUCCESS;
break;
}
}
}
if (i < MAX_MESSAGE_HANDLERS)
{
c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = messageHandler;
#ifdef MBTK_OPENCPU_SUPPORT
c->messageHandlers[i].qos = c->qos;
#endif
}
}
return rc;
}
#ifdef MBTK_OPENCPU_SUPPORT
void mbtk_MQTTRun(void* parm)
{
Timer timer;
MQTTClient* c = (MQTTClient*)parm;
int i = c->recv_task_num;
if(mbtk_mqttMutex1[i].sem == NULL)
{
MutexInit(&mbtk_mqttMutex1[i]);
}
TimerInit(&timer);
while (1)
{
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
MutexLock(&mbtk_mqttMutex1[i]);
TimerCountdownMS(&timer, 1500);
cycle(c, &timer);
MutexUnlock(&mbtk_mqttMutex1[i]);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
osDelay(200);
}
}
int mbtk_mqtt_demo_recv_task_init(int i,MQTTClient* c)
{
osThreadAttr_t task_attr;
char name[10] = {0};
memset(&task_attr, 0, sizeof(task_attr));
task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;
task_attr.priority = osPriorityBelowNormal7;
sprintf(name, "mqttRecv%d", i);
task_attr.name = name;
ECOMM_TRACE(UNILOG_MQTT, mbtk_mqtt_demo_recv_task_init_1, P_INFO, 0, "mbtk_mqtt_demo_recv_task_init task_attr.name%s",task_attr.name);
mbtk_mqttRecvTaskHandle[i] = osThreadNew(mbtk_MQTTRun, (void *)c,&task_attr);
if(mbtk_mqttRecvTaskHandle[i] == NULL)
{
return FAILURE;
}
return SUCCESS;
}
#endif
int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
messageHandler messageHandler, MQTTSubackData* data)
{
int rc = FAILURE;
Timer timer;
int len = 0;
int mqttQos = (int)qos;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{
int count = 0;
unsigned short mypacketid;
//data->grantedQoS = QOS0;
mqttQos = QOS0;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
{
if (mqttQos != 0x80)
{//mbtk change
#ifndef MBTK_OPENCPU_SUPPORT
rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
mqttClient = *c;
if(mqttRecvTaskHandle == NULL)
{
mqtt_demo_recv_task_init();
}
#else
rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
int i = 0;
if(c->first_sub == 0)
{
c->first_sub = 1;
for(i = 0; i < MAX_RECV_TASK; i++)
{
if(mbtk_mqttRecvTaskHandle[i] != NULL)
{
ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_0, P_INFO, 0, "mbtk_mqttRecvTaskHandle %d has create",i);
continue;
}
else
{
c->recv_task_num = i;
mbtk_mqtt_demo_recv_task_init(i,c);
break;
}
}
if(i >= MAX_RECV_TASK)
{
ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_1, P_INFO, 0, "create mqtt recv task error");
rc = FAILURE;
}
}
#endif
}//mbtk change, end
}
}
else
rc = FAILURE;
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
messageHandler messageHandler)
{
MQTTSubackData data;
return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
}
int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
{
unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
{
/* remove the subscription message handler associated with this topic, if there is one */
MQTTSetMessageHandler(c, topicFilter, NULL);
}
}
else
rc = FAILURE;
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicName;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if (message->qos == QOS1 || message->qos == QOS2)
message->id = getNextPacketId(c);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (message->qos == QOS1)
{
if (waitfor(c, PUBACK, &timer) == PUBACK)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
else if (message->qos == QOS2)
{
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTDisconnect(MQTTClient* c)
{
int rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_disconnect(c->buf, c->buf_size);
if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet
MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTInit(MQTTClient* c, Network* n, unsigned char* sendBuf, unsigned char* readBuf)
{
NetworkInit(n);
MQTTClientInit(c, n, 40000, (unsigned char *)sendBuf, 1000, (unsigned char *)readBuf, 1000);
return 0;
}
int MQTTCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)
{
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
int clientLen = 0;
int usernameLen = 0;
int passwordLen = 0;
if(connData != NULL)
{
memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
}
else
{
connectData.MQTTVersion = 4;
connectData.keepAliveInterval = 120;
}
if(clientID != NULL)
{
clientLen = strlen(clientID);
connectData.clientID.cstring = malloc(clientLen+1);
memset(connectData.clientID.cstring, 0, (clientLen+1));
memcpy(connectData.clientID.cstring, clientID, clientLen);
}
if(username != NULL)
{
usernameLen = strlen(username);
connectData.username.cstring = malloc(usernameLen+1);;
memset(connectData.username.cstring, 0, (usernameLen+1));
memcpy(connectData.username.cstring, username, usernameLen);
}
if(password != NULL)
{
passwordLen = strlen(password);
connectData.password.cstring = malloc(passwordLen+1);
memset(connectData.password.cstring, 0, (passwordLen+1));
memcpy(connectData.password.cstring, password, passwordLen);
}
{
if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)
{
c->keepAliveInterval = connectData.keepAliveInterval;
c->ping_outstanding = 1;
return 1;
}
else
{
if ((MQTTConnect(c, &connectData)) != 0)
{
c->ping_outstanding = 1;
return 1;
}
else
{
#if defined(MQTT_TASK)
if ((MQTTStartTask(c)) != pdPASS)
{
return 1;
}
else
{
return 0;
}
#endif
}
}
}
return 1;
}
#ifdef FEATURE_CUCC_DM_ENABLE
int MQTTCuccCycle(MQTTClient* c, Timer* timer)
{
int rc = SUCCESS;
int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
//ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
switch (packet_type)
{
default:
/* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
rc = packet_type;
break;
case 0: /* timed out reading packet */
break;
case CONNACK:
case PUBACK:
case SUBACK:
case UNSUBACK:
if(packet_type == SUBACK)
{
//rc = packet_type;
}
break;
case PUBLISH:
{
break;
}
case PUBREC:
case PUBREL:
{
break;
}
case PUBCOMP:
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
}
if (rc == SUCCESS)
rc = packet_type;
else if (c->isconnected)
;//MQTTCloseSession(c);
return rc;
}
int MQTTCuccWaitfor(MQTTClient* c, int packet_type, Timer* timer)
{
int rc = FAILURE;
do
{
if (TimerIsExpired(timer))
break; // we timed out
rc = MQTTCuccCycle(c, timer);
}
while (rc != packet_type && rc >= 0);
return rc;
}
int MQTTCuccCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)
{
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
int clientLen = 0;
int usernameLen = 0;
int passwordLen = 0;
if(connData != NULL)
{
memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
}
else
{
connectData.MQTTVersion = 4;
connectData.keepAliveInterval = 120;
}
if(clientID != NULL)
{
clientLen = strlen(clientID);
connectData.clientID.cstring = malloc(clientLen+1);
memset(connectData.clientID.cstring, 0, (clientLen+1));
memcpy(connectData.clientID.cstring, clientID, clientLen);
}
if(username != NULL)
{
usernameLen = strlen(username);
connectData.username.cstring = malloc(usernameLen+1);;
memset(connectData.username.cstring, 0, (usernameLen+1));
memcpy(connectData.username.cstring, username, usernameLen);
}
if(password != NULL)
{
passwordLen = strlen(password);
connectData.password.cstring = malloc(passwordLen+1);
memset(connectData.password.cstring, 0, (passwordLen+1));
memcpy(connectData.password.cstring, password, passwordLen);
}
{
if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)
{
c->keepAliveInterval = connectData.keepAliveInterval;
c->ping_outstanding = 1;
return 1;
}
else
{
if ((MQTTConnect(c, &connectData)) != 0)
{
//c->ping_outstanding = 1;
//return 1;
}
else
{
#if defined(MQTT_TASK)
if ((MQTTStartTask(c)) != pdPASS)
{
return 1;
}
else
{
return 0;
}
#endif
}
}
}
return 0;
}
int MQTTCuccConnect(MQTTClient* c, char* clientID, char* username, char* password, MQTTPacket_connectData* connData)
{
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
int clientLen = 0;
int usernameLen = 0;
int passwordLen = 0;
if(connData != NULL)
{
memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
}
else
{
connectData.MQTTVersion = 4;
connectData.keepAliveInterval = 120;
}
if(clientID != NULL)
{
clientLen = strlen(clientID);
connectData.clientID.cstring = malloc(clientLen+1);
memset(connectData.clientID.cstring, 0, (clientLen+1));
memcpy(connectData.clientID.cstring, clientID, clientLen);
}
if(username != NULL)
{
usernameLen = strlen(username);
connectData.username.cstring = malloc(usernameLen+1);;
memset(connectData.username.cstring, 0, (usernameLen+1));
memcpy(connectData.username.cstring, username, usernameLen);
}
if(password != NULL)
{
passwordLen = strlen(password);
connectData.password.cstring = malloc(passwordLen+1);
memset(connectData.password.cstring, 0, (passwordLen+1));
memcpy(connectData.password.cstring, password, passwordLen);
}
if ((MQTTConnect(c, &connectData)) != 0)
{
c->ping_outstanding = 1;
return 1;
}
return 0;
}
int MQTTCuccPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicName;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if (message->qos == QOS1 || message->qos == QOS2)
message->id = getNextPacketId(c);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (message->qos == QOS1)
{
if (waitfor(c, PUBACK, &timer) == PUBACK)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
else if (message->qos == QOS2)
{
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTCuccSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
int rc = FAILURE;
Timer timer;
int len = 0;
int mqttQos = (int)qos;
//MQTTSubackData data;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, 40000);
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{
int count = 0;
unsigned short mypacketid;
//data.grantedQoS = QOS0;
mqttQos = QOS0;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
{
if (mqttQos == 0x80)
{
rc = FAILURE;
}
else
{
rc = SUCCESS;
}
}
}
else
{
rc = FAILURE;
}
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTCuccDisconnect(MQTTClient* c)
{
int rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_disconnect(c->buf, c->buf_size);
if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet
MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTCuccWaitForRecv(MQTTClient* c, int packet_type, unsigned int timerOut, MQTTString *topicName, char *outPayload)
{
int rc = FAILURE;
Timer timer;
MQTTMessage msg;
int intQoS;
memset(&msg, 0, sizeof(MQTTMessage));
TimerInit(&timer);
TimerCountdownMS(&timer, timerOut);
do
{
if (TimerIsExpired(&timer))
break; // we timed out
rc = cycle(c, &timer);
}
while (rc != packet_type && rc >= 0);
if(rc == PUBLISH)
{
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) == 1)
{
memcpy(outPayload, (unsigned char*)msg.payload, msg.payloadlen);
rc = SUCCESS;
}
}
return rc;
}
#endif
#define MQTT_DEMO_EXAMPLE_1_ONENET
void mqtt_demo_onenet(void)
{
int len = 0;
MQTTMessage message;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
connectData.MQTTVersion = 4;
connectData.clientID.cstring = "34392813";
connectData.username.cstring = "122343";
connectData.password.cstring = "test001";
connectData.keepAliveInterval = 120;
//ECOMM_TRACE(UNILOG_MQTT, mqtt_hh00, P_SIG, 0, "mqtt_demo........");
ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet0, P_INFO, "mqtt_demo........");
mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
NetworkInit(&mqttNetwork);
MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);
if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
mqttClient.ping_outstanding = 1;
}
else
{
if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
mqttClient.ping_outstanding = 1;
}
else
{
ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet1, P_INFO, "mqtt_demo socket connect ok");
if ((MQTTConnect(&mqttClient, &connectData)) != 0)
{
mqttClient.ping_outstanding = 1;
}
else
{
ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet2, P_INFO, "mqtt_demo connect fail");
mqttClient.ping_outstanding = 0;
}
}
if(mqttClient.ping_outstanding == 0)
{
if ((MQTTStartRECVTask()) != SUCCESS)
;
}
}
while(1)
{
sprintf(mqtt_payload,"{\"ec_smart_sensor_data\":%d}", ec_sensor_temp);
len = strlen(mqtt_payload);
ec_data_len = len;
unsigned char *ptr = mqttJsonbuff;
sprintf((char *)mqttJsonbuff,"%c%c%c%s", ec_data_type, ec_data_type,ec_data_type, mqtt_payload);
message.payload = mqttJsonbuff;
message.payloadlen = strlen((char *)mqttJsonbuff);
writeChar(&ptr, ec_data_type);
writeInt(&ptr, ec_data_len);
ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet3, P_INFO, "mqtt_demo send data");
MQTTPublish(&mqttClient, "$dp", &message);
osDelay(10000);
}
/* do not return */
}
#define MQTT_DEMO_EXAMPLE_2_ALI
#ifdef FEATURE_MQTT_TLS_ENABLE
void mqtt_demo_ali(void)
{
//char *pub_topic;
int len = 0;
MQTTMessage message;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
char hmac_source[256] = {0};
char *ali_clientID = NULL;
char *ali_username = NULL;
char *ali_signature = NULL;
/* eigencomm ec_smoke a1xFDTv3InR sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg 8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */
/* eigencomm deviceGrape a1fsx061r0x WLar6NunAcCJ0aZHbaNw4eQwdsYVKyC9 8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */
/* clientID deviceName productKey deviceSecret*/
ali_clientID = malloc(128);
ali_username = malloc(64);
ali_signature = malloc(96);
memset(ali_clientID, 0, 128);
memset(ali_username, 0, 64);
memset(ali_signature, 0, 96);
snprintf(hmac_source, sizeof(hmac_source), "clientId%s" "deviceName%s" "productKey%s" "timestamp%s", "eigencomm", "ec_smoke", "a1xFDTv3InR", "8964");
mqttAliHmacSha1((unsigned char *)hmac_source, strlen(hmac_source), (unsigned char *)ali_signature,(unsigned char *)"sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg", strlen("sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg"));
sprintf(ali_clientID,"%s|securemode=3,signmethod=hmacsha1,timestamp=8964|", "eigencomm");
sprintf(ali_username,"%s&%s","ec_smoke","a1xFDTv3InR");
connectData.clientID.cstring = ali_clientID;
connectData.username.cstring = ali_username;
connectData.password.cstring = ali_signature;
connectData.MQTTVersion = 4;
connectData.keepAliveInterval = 120;
//ECOMM_TRACE(UNILOG_MQTT, mqtt_hh001, P_SIG, 0, "mqtt_demo........");
mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
NetworkInit(&mqttNetwork);
MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);
if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
mqttClient.ping_outstanding = 1;
}
else
{
if ((NetworkConnect(&mqttNetwork, "a1xFDTv3InR.iot-as-mqtt.cn-shanghai.aliyuncs.com", 1883)) != 0)
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
mqttClient.ping_outstanding = 1;
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqtt_hh012, P_SIG, 0, "mqtt_demo....1....");
if ((MQTTConnect(&mqttClient, &connectData)) != 0)
{
mqttClient.ping_outstanding = 1;
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqtt_hh023, P_SIG, 0, "mqtt_demo....2....");
mqttClient.ping_outstanding = 0;
}
}
if(mqttClient.ping_outstanding == 0)
{
if ((MQTTStartRECVTask()) != SUCCESS)
;
}
}
while(1)
{
memset(mqtt_payload, 0, sizeof(mqtt_payload));
memcpy(mqtt_payload, "update", strlen("update"));
len = strlen(mqtt_payload);
message.payload = mqtt_payload;
message.payloadlen = len;
//ECOMM_TRACE(UNILOG_MQTT, mqtt_hh034, P_SIG, 0, "mqtt_demo....3....");
MQTTPublish(&mqttClient, "a1xFDTv3InR/ec_smoke/user/get", &message);
osDelay(10000);
}
/* do not return */
}
#endif
#define MQTT_DEMO_EXAMPLE_3_APP
void mqtt_demo_send_task(void *argument)
{
int ret = FAILURE;
int msgType = 0xff;
mqttSendMsg mqttMsg;
mqttDataMsg mqttMessage;
int socket_stat = -1;
int socket_err = -1;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
connectData.MQTTVersion = 4;
connectData.clientID.cstring = "34392813";
connectData.username.cstring = "122343";
connectData.password.cstring = "test001";
connectData.keepAliveInterval = 120;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_0, P_SIG, 0, "mqttSendTask........");
mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
NetworkInit(&mqttNetwork);
MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, MQTT_SEND_BUFF_LEN, (unsigned char *)mqttReadbuf, MQTT_RECV_BUFF_LEN);
if((NetworkSetConnTimeout(&mqttNetwork, 10000, 10000)) != 0)
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
mqttClient.ping_outstanding = 1;
}
else
{
if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
mqttClient.ping_outstanding = 1;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_1, P_SIG, 0, "mqttSendTask..tcp connect fail....");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_2, P_SIG, 0, "mqttSendTask..tcp connect ok....");
if ((MQTTConnect(&mqttClient, &connectData)) != 0)
{
mqttClient.ping_outstanding = 1;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_3, P_SIG, 0, "mqttSendTask..mqtt connect fail....");
}
else
{
mqttClient.keepAliveInterval = connectData.keepAliveInterval;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4, P_SIG, 0, "mqttSendTask..mqtt connect ok....");
mqtt_send_task_status_flag = 1;
mqttClient.ping_outstanding = 0;
}
}
}
/* sub topic*/
//MQTTSubscribe(&mqttClient, topic_data, 0, NULL);
/*start recv task*/
if(mqttClient.ping_outstanding == 0)
{
mqtt_demo_recv_task_init();
}
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4000, P_SIG, 0, "mqttSendTask..start recv msg....");
while(1)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4001, P_SIG, 1, "mqttSendTask..recv msg hand=0x%x....",mqttSendMsgHandle);
/* recv msg (block mode) */
osMessageQueueGet(mqttSendMsgHandle, &mqttMsg, 0, osWaitForever);
msgType = mqttMsg.cmdType;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4002, P_SIG, 1, "mqttSendTask..recv msg=%d....",msgType);
switch(msgType)
{
case MQTT_DEMO_MSG_PUBLISH:
/* send packet */
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_500, P_INFO, 0, ".....start send mqtt publish packet.......");
MutexLock(&mqttMutex1);
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_5, P_INFO, 0, ".....start send mqtt publish packet.......");
memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
ret = MQTTPublish(&mqttClient, mqttMsg.topic, &mqttMsg.message);
if(mqttMsg.topic != NULL)
{
free(mqttMsg.topic);
mqttMsg.topic = NULL;
}
if(mqttMsg.message.payload != NULL)
{
free(mqttMsg.message.payload);
mqttMsg.message.payload = NULL;
}
/* send result to at task */
if(ret == SUCCESS)
{
memset(&mqttMessage, 0, sizeof(mqttMessage));
mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;
mqttMessage.ret = SUCCESS;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600, P_INFO, 0, ".....send mqtt publish packet ok.......");
osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_6, P_INFO, 0, ".....send mqtt publish packet fail.......");
socket_stat = sock_get_errno(mqttClient.ipstack->my_socket);
socket_err = socket_error_is_fatal(socket_stat);
if(socket_err == 1)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_7, P_INFO, 0, ".....find need reconnect when publish packet.......");
MQTTReConnect(&mqttClient, &connectData);
}
else
{
memset(&mqttMessage, 0, sizeof(mqttMessage));
mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;
mqttMessage.ret = SUCCESS;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600ii, P_INFO, 0, ".....send mqtt publish packet ok.......");
osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);
}
//memset(&mqttMessage, 0, sizeof(mqttMessage));
//mqttMessage.cmdType = MQTT_MSG_RECONNECT;
//mqttMessage.ret = FAILURE;
//xQueueSend(appMqttMsgHandle, &mqttMessage, MQTT_MSG_TIMEOUT);
}
MutexUnlock(&mqttMutex1);
break;
#if 0
case MQTT_DEMO_MSG_KEEPALIVE:
/* send keepalive packet */
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_8, P_INFO, 0, ".....start send mqtt keeplive packet.......");
ret = keepalive(&mqttClient);
if(ret == SUCCESS) // send the ping packet
{
mqttClient->ping_outstanding = 1;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_9, P_INFO, 1, ".....mqtt keeplive send ok.......");
}
else
{
socket_stat = sock_get_errno(mqttNewContext->mqtt_client->ipstack->my_socket);
socket_err = socket_error_is_fatal(socket_stat);
if(socket_err == 1)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_10, P_INFO, 0, ".....find need reconnect when send keeplive packet.......");
ret = MQTT_RECONNECT;
}
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_11, P_INFO, 1, ".....mqtt send keeplive Packet fail");
}
break;
#endif
case MQTT_DEMO_MSG_RECONNECT:
MutexLock(&mqttMutex1);
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_12, P_INFO, 0, ".....find need reconnect when read packet.......");
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13, P_INFO, 0, "...start tcp disconnect ..");
mqttClient.ipstack->disconnect(mqttClient.ipstack);
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14, P_INFO, 0, "...start tcp connect ...");
if ((NetworkSetConnTimeout(mqttClient.ipstack, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_15, P_INFO, 0, "...tcp socket set timeout fail...");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16, P_INFO, 0, "...tcp connect ok...");
mqttClient.isconnected = 0;
if((NetworkConnect(mqttClient.ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT)) < 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18, P_INFO, 0, "...start mqtt connect ..");
if ((MQTTConnect(&mqttClient, &connectData)) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19, P_INFO, 0, "...mqtt reconnect fial!!!...");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20, P_INFO, 0, "...mqtt reconnect ok!!!...");
}
}
}
MutexUnlock(&mqttMutex1);
break;
}
}
}
void app_mqtt_demo_task(void *argument)
{
int msgType = 1;
int payloadLen = 0;
mqttSendMsg mqttMsg;
mqttDataMsg mqttMessage;
int ret = FAILURE;
/*init driver*/
/*start mqtt send task*/
ret = mqtt_demo_send_task_init();
if(ret == FAILURE)
{
;
}
while(1)
{
if(mqtt_send_task_status_flag == 1)
{
break;
}
osDelay(4000);
}
/*state machine*/
while(1)
{
/*read data*/
osDelay(4000);
/*send msg to mqtt send task, and wait for excute result*/
//ECOMM_TRACE(UNILOG_MQTT, appMqttTask_0, P_SIG, 0, "appMqttTask...send start.....");
memset(&mqttMsg, 0, sizeof(mqttSendMsg));
mqttMsg.cmdType = MQTT_DEMO_MSG_PUBLISH;
mqttMsg.topic = malloc(128);
memset(mqttMsg.topic, 0, 128);
memcpy(mqttMsg.topic, "$dp", strlen("$dp"));
mqttMsg.message.qos = QOS0;
mqttMsg.message.retained = 0;
mqttMsg.message.id = 0;
mqttMsg.message.payload = malloc(32);
memset(mqttMsg.message.payload, 0, 32);
memcpy(mqttMsg.message.payload, "{\"data\":90}", strlen("{\"data\":90}"));
sprintf(mqttMsg.message.payload, "%c%c%c%s",msgType, msgType, msgType, "{\"data\":90}");
payloadLen = strlen(mqttMsg.message.payload);
unsigned char *ptr = (unsigned char *)mqttMsg.message.payload;
writeChar(&ptr,3);
writeInt(&ptr,strlen("{\"data\":90}"));
mqttMsg.message.payloadlen = payloadLen;
osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, (2*MQTT_MSG_TIMEOUT));
osMessageQueueGet(appMqttMsgHandle, &mqttMessage, 0, cmsisMAX_DELAY);
if(mqttMessage.cmdType == MQTT_DEMO_MSG_PUBLISH_ACK)
{
//ECOMM_TRACE(UNILOG_MQTT, appMqttTask_1, P_SIG, 0, "appMqttTask...send ok.....");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, appMqttTask_2, P_SIG, 0, "appMqttTask...send fail.....");
}
}
}
int mqtt_demo_recv_task_init(void)
{
osThreadAttr_t task_attr;
memset(&task_attr, 0, sizeof(task_attr));
task_attr.name = "mqttRecv";
#ifdef MBTK_OPENCPU_SUPPORT
task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;
#else
task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
#endif
#if defined FEATURE_LITEOS_ENABLE
task_attr.priority = osPriorityBelowNormal4;
#elif defined FEATURE_FREERTOS_ENABLE
task_attr.priority = osPriorityBelowNormal7;
#endif
mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);
if(mqttRecvTaskHandle == NULL)
{
return FAILURE;
}
return SUCCESS;
}
int mqtt_demo_send_task_init(void)
{
osThreadAttr_t task_attr;
memset(&task_attr, 0, sizeof(task_attr));
task_attr.name = "mqttSend";
task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
#if defined FEATURE_LITEOS_ENABLE
task_attr.priority = osPriorityBelowNormal3;
#elif defined FEATURE_FREERTOS_ENABLE
task_attr.priority = osPriorityBelowNormal6;
#endif
mqttSendTaskHandle = osThreadNew(mqtt_demo_send_task, NULL,&task_attr);
if(mqttSendTaskHandle == NULL)
{
return FAILURE;
}
return SUCCESS;
}
int app_mqtt_demo_task_init(void)
{
osThreadAttr_t task_attr;
if(mqttSendMsgHandle == NULL)
{
mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);
}
if(appMqttMsgHandle == NULL)
{
appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);
}
if(mqttMutex1.sem == NULL)
{
MutexInit(&mqttMutex1);
}
memset(&task_attr, 0, sizeof(task_attr));
task_attr.name = "appTask";
task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
#if defined FEATURE_LITEOS_ENABLE
task_attr.priority = osPriorityBelowNormal2;
#elif defined FEATURE_FREERTOS_ENABLE
task_attr.priority = osPriorityBelowNormal5;
#endif
appMqttTaskHandle = osThreadNew(app_mqtt_demo_task, NULL,&task_attr);
if(appMqttTaskHandle == NULL)
{
return FAILURE;
}
return SUCCESS;
}