[Feature][MD310EU][task-view-1009] Create MD310EU project code directory

Change-Id: I57cd3c474efe4493ae8a242d7e2fa643ad8ecbde
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTClient.c b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTClient.c
new file mode 100755
index 0000000..39f6a8e
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTClient.c
@@ -0,0 +1,2352 @@
+/*******************************************************************************

+ * Copyright (c) 2014, 2017 IBM Corp.

+ *

+ * All rights reserved. This program and the accompanying materials

+ * are made available under the terms of the Eclipse Public License v1.0

+ * and Eclipse Distribution License v1.0 which accompany this distribution.

+ *

+ * The Eclipse Public License is available at

+ *    http://www.eclipse.org/legal/epl-v10.html

+ * and the Eclipse Distribution License is available at

+ *   http://www.eclipse.org/org/documents/edl-v10.php.

+ *

+ * Contributors:

+ *   Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation

+ *   Ian Craggs - fix for #96 - check rem_len in readPacket

+ *   Ian Craggs - add ability to set message handler separately #6

+ *******************************************************************************/

+ 

+#include <stdio.h>

+#include <stdarg.h>

+//#include "commontypedef.h"

+#include "MQTTClient.h"

+#ifdef FEATURE_MQTT_TLS_ENABLE

+#include "sha1.h"

+#include "sha256.h"

+#include "md5.h"

+#endif

+//#include "debug_trace.h"

+//#include DEBUG_LOG_HEADER_FILE

+

+char *mqttSendbuf = NULL;

+char *mqttReadbuf = NULL;

+

+unsigned char mqttJsonbuff[256] = {0};

+char mqtt_payload[128] = {0};

+int ec_sensor_temp = 20;

+char ec_data_type = 3;

+int ec_data_len = 0;

+//osMessageQueueId_t mqttRecvMsgHandle = NULL;

+//osMessageQueueId_t mqttSendMsgHandle = NULL;

+//osMessageQueueId_t appMqttMsgHandle = NULL;

+

+///osThreadId_t mqttRecvTaskHandle = NULL;

+

+#ifdef MBTK_OPENCPU_SUPPORT

+#define MAX_RECV_TASK 5

+osThreadId_t mbtk_mqttRecvTaskHandle[MAX_RECV_TASK];

+Mutex mbtk_mqttMutex1[MAX_RECV_TASK];

+#endif

+

+//osThreadId_t mqttSendTaskHandle = NULL;

+//osThreadId_t appMqttTaskHandle = NULL;

+

+//Mutex mqttMutex1;

+//Mutex mqttMutex2;

+MQTTClient mqttClient;

+Network mqttNetwork;

+int mqtt_send_task_status_flag = 0;

+int mqtt_keepalive_retry_count = 0;

+char mqttHb2Hex(unsigned char hb)

+{

+    hb = hb&0xF;

+    return (char)(hb<10 ? '0'+hb : hb-10+'a');

+}

+

+#ifdef FEATURE_MQTT_TLS_ENABLE

+void mqttAliHmacSha1(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)

+{

+    int i;

+    mbedtls_sha1_context ctx;

+    unsigned char k_ipad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};

+    unsigned char k_opad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};

+    unsigned char tempbuf[ALI_SHA1_DIGEST_SIZE];

+

+    memset(k_ipad, 0x36, ALI_SHA1_KEY_IOPAD_SIZE);

+    memset(k_opad, 0x5C, ALI_SHA1_KEY_IOPAD_SIZE);

+

+    for(i=0; i<keylen; i++)

+    {

+        if(i>=ALI_SHA1_KEY_IOPAD_SIZE)

+        {

+            break;

+        }

+        k_ipad[i] ^=key[i];

+        k_opad[i] ^=key[i];

+    }

+    mbedtls_sha1_init(&ctx);

+

+    mbedtls_sha1_starts(&ctx);

+    mbedtls_sha1_update(&ctx, k_ipad, ALI_SHA1_KEY_IOPAD_SIZE);

+    mbedtls_sha1_update(&ctx, input, ilen);

+    mbedtls_sha1_finish(&ctx, tempbuf);

+

+    mbedtls_sha1_starts(&ctx);

+    mbedtls_sha1_update(&ctx, k_opad, ALI_SHA1_KEY_IOPAD_SIZE);

+    mbedtls_sha1_update(&ctx, tempbuf, ALI_SHA1_DIGEST_SIZE);

+    mbedtls_sha1_finish(&ctx, tempbuf);

+

+    for(i=0; i<ALI_SHA1_DIGEST_SIZE; ++i)

+    {

+        output[i*2] = mqttHb2Hex(tempbuf[i]>>4);

+        output[i*2+1] = mqttHb2Hex(tempbuf[i]);

+    }

+    mbedtls_sha1_free(&ctx);

+}

+/*

+ * output = SHA-256( input buffer )

+ */

+void mqttAliHmacSha256(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)

+{

+    int i;

+    mbedtls_sha256_context ctx;

+    unsigned char k_ipad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};

+    unsigned char k_opad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};

+

+    memset(k_ipad, 0x36, 64);

+    memset(k_opad, 0x5C, 64);

+

+    if ((NULL == input) || (NULL == key) || (NULL == output)) {

+        return;

+    }

+

+    if (keylen > ALI_SHA256_KEY_IOPAD_SIZE) {

+        return;

+    }

+

+    for(i=0; i<keylen; i++)

+    {

+        if(i>=ALI_SHA256_KEY_IOPAD_SIZE)

+        {

+            break;

+        }

+        k_ipad[i] ^=key[i];

+        k_opad[i] ^=key[i];

+    }

+    mbedtls_sha256_init(&ctx);

+

+    mbedtls_sha256_starts(&ctx, 0);

+    mbedtls_sha256_update(&ctx, k_ipad, ALI_SHA256_KEY_IOPAD_SIZE);

+    mbedtls_sha256_update(&ctx, input, ilen);

+    mbedtls_sha256_finish(&ctx, output);

+

+    mbedtls_sha256_starts(&ctx, 0);

+    mbedtls_sha256_update(&ctx, k_opad, ALI_SHA256_KEY_IOPAD_SIZE);

+    mbedtls_sha256_update(&ctx, output, ALI_SHA256_DIGEST_SIZE);

+    mbedtls_sha256_finish(&ctx, output);

+

+    mbedtls_sha256_free(&ctx);

+}

+/*

+ * output = MD-5( input buffer )

+ */

+void mqttAliHmacMd5(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)

+{

+    int i;

+    mbedtls_md5_context ctx;

+    unsigned char k_ipad[ALI_MD5_KEY_IOPAD_SIZE] = {0};

+    unsigned char k_opad[ALI_MD5_KEY_IOPAD_SIZE] = {0};

+    unsigned char tempbuf[ALI_MD5_DIGEST_SIZE];

+

+    memset(k_ipad, 0x36, ALI_MD5_KEY_IOPAD_SIZE);

+    memset(k_opad, 0x5C, ALI_MD5_KEY_IOPAD_SIZE);

+

+    for(i=0; i<keylen; i++)

+    {

+        if(i>=ALI_MD5_KEY_IOPAD_SIZE)

+        {

+            break;

+        }

+        k_ipad[i] ^=key[i];

+        k_opad[i] ^=key[i];

+    }

+    mbedtls_md5_init(&ctx);

+

+    mbedtls_md5_starts(&ctx);

+    mbedtls_md5_update(&ctx, k_ipad, ALI_MD5_KEY_IOPAD_SIZE);

+    mbedtls_md5_update(&ctx, input, ilen);

+    mbedtls_md5_finish(&ctx, tempbuf);

+

+    mbedtls_md5_starts(&ctx);

+    mbedtls_md5_update(&ctx, k_opad, ALI_MD5_KEY_IOPAD_SIZE);

+    mbedtls_md5_update(&ctx, tempbuf, ALI_MD5_DIGEST_SIZE);

+    mbedtls_md5_finish(&ctx, tempbuf);

+

+    for(i=0; i<ALI_MD5_DIGEST_SIZE; ++i)

+    {

+        output[i*2] = mqttHb2Hex(tempbuf[i]>>4);

+        output[i*2+1] = mqttHb2Hex(tempbuf[i]);

+    }

+    mbedtls_md5_free(&ctx);

+}

+#endif

+void mqttDefMessageArrived(MessageData* data)

+{

+    char *bufTemp = NULL;

+    bufTemp = malloc(data->message->payloadlen+1);

+    memset(bufTemp, 0, data->message->payloadlen+1);

+    memcpy(bufTemp, data->message->payload, data->message->payloadlen);

+    //ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_2, P_SIG, ".........MQTT topic is:%s", (const uint8_t *)data->topicName->lenstring.data);

+    //ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_1, P_SIG, ".........MQTT_messageArrived is:%s", (const uint8_t *)bufTemp);

+    free(bufTemp);

+}

+

+static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {

+    md->topicName = aTopicName;

+    md->message = aMessage;

+}

+

+

+static int getNextPacketId(MQTTClient *c) {

+    return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;

+}

+

+static int sendPacket(MQTTClient* c, int length, Timer* timer)

+{

+    int rc = FAILURE,

+        sent = 0;

+

+#ifdef FEATURE_MQTT_TLS_ENABLE

+#ifdef MBTK_OPENCPU_SUPPORT

+    if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)

+    {

+        ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls3, P_INFO, 0, "...mqttSendPacket..0.");

+        rc = mqttSslSend(c->mqtts_client, &c->mqtts_client->sendBuf[sent], length);

+        ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls4, P_INFO, 1, "...mqttSendPacket..=%d.", rc);

+        TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet

+        return rc;

+    }

+    else

+#endif

+#endif

+    {

+

+    while (sent < length && !TimerIsExpired(timer))

+    {

+        #ifdef MQTT_RAI_OPTIMIZE

+        rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer), 0, false);

+        #else

+        rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));

+        #endif

+        if (rc < 0)  // there was an error writing the data

+            break;

+        sent += rc;

+    }

+    if (sent == length)

+    {

+        TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet

+        rc = SUCCESS;

+    }

+    else

+        rc = FAILURE;

+    return rc;

+

+    }

+}

+

+void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,

+        unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)

+{

+    int i;

+    c->ipstack = network;

+

+    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)

+        c->messageHandlers[i].topicFilter = 0;

+    c->command_timeout_ms = command_timeout_ms;

+    c->buf = sendbuf;

+    c->buf_size = sendbuf_size;

+    c->readbuf = readbuf;

+    c->readbuf_size = readbuf_size;

+    c->isconnected = 0;

+    c->cleansession = 0;

+    c->ping_outstanding = 0;

+    c->defaultMessageHandler = mqttDefMessageArrived;

+      c->next_packetid = 1;

+    TimerInit(&c->last_sent);

+    TimerInit(&c->last_received);

+	

+#ifdef FEATURE_MQTT_TLS_ENABLE

+#ifdef MBTK_OPENCPU_SUPPORT

+    c->recv_task_num = -1;

+    if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)

+    {

+        c->mqtts_client->sendBuf = sendbuf;

+        c->mqtts_client->sendBufSize = sendbuf_size;

+        c->mqtts_client->readBuf = readbuf;

+        c->mqtts_client->readBufSize = readbuf_size;

+    }

+#endif

+#endif

+

+#if defined(MQTT_TASK)

+      MutexInit(&c->mutex);

+#endif

+}

+

+static int decodePacket(MQTTClient* c, int* value, int timeout)

+{

+    unsigned char i;

+    int multiplier = 1;

+    int len = 0;

+    const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;

+

+    *value = 0;

+    do

+    {

+        int rc = MQTTPACKET_READ_ERROR;

+

+        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)

+        {

+            rc = MQTTPACKET_READ_ERROR; /* bad data */

+            goto exit;

+        }

+        

+#ifdef FEATURE_MQTT_TLS_ENABLE

+        #ifdef MBTK_OPENCPU_SUPPORT

+        if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)

+        {

+            rc = mqttSslRead(c->mqtts_client, &i, 1, timeout);

+        }

+        else

+        {

+            rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);

+        }

+        #endif

+#else

+        rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);

+#endif

+

+        if (rc != 1)

+            goto exit;

+        *value += (i & 127) * multiplier;

+        multiplier *= 128;

+    } while ((i & 128) != 0);

+exit:

+    return len;

+}

+

+static int readPacket(MQTTClient* c, Timer* timer)

+{

+    MQTTHeader header = {0};

+    int len = 0;

+    int rem_len = 0;

+    int rc = 0;

+

+#ifdef FEATURE_MQTT_TLS_ENABLE

+#ifdef MBTK_OPENCPU_SUPPORT

+    if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)

+    {

+        ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls0, P_INFO, 0, "...mqttReadPacket..0.");

+        /* 1. read the header byte.  This has the packet type in it */

+        rc = mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf, 1, TimerLeftMS(timer));

+        if (rc != 1)

+        {

+            goto exit;

+        }

+

+        len = 1;

+        ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls1, P_INFO, 0, "...mqttReadPacket..1.");

+        /* 2. read the remaining length.  This is variable in itself */

+        decodePacket(c, &rem_len, TimerLeftMS(timer));

+        len += MQTTPacket_encode(c->mqtts_client->readBuf + 1, rem_len); /* put the original remaining length back into the buffer */

+

+        if (rem_len > (c->mqtts_client->readBufSize - len))

+        {

+            ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls202, P_INFO, 0, "...mqttReadPacket..202.");

+            rc = BUFFER_OVERFLOW;

+            goto exit;

+        }

+

+        ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls2, P_INFO, 0, "...mqttReadPacket..2.");

+        /* 3. read the rest of the buffer using a callback to supply the rest of the data */

+        if (rem_len > 0 && (mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf + len, rem_len, TimerLeftMS(timer)) != rem_len))

+        {

+            rc = 0;

+            ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls200, P_INFO, 0, "...mqttReadPacket..200.");

+            goto exit;

+        }

+

+        header.byte = c->mqtts_client->readBuf[0];

+        rc = header.bits.type;

+        if (c->keepAliveInterval > 0)

+        {

+            ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls201, P_INFO, 0, "...mqttReadPacket..201.");

+            TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet

+        }

+    }

+    else

+#endif

+#endif

+    {

+

+    /* 1. read the header byte.  This has the packet type in it */

+    rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));

+    if (rc != 1)

+        goto exit;

+

+    len = 1;

+    /* 2. read the remaining length.  This is variable in itself */

+    decodePacket(c, &rem_len, TimerLeftMS(timer));

+    len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */

+

+    if (rem_len > (c->readbuf_size - len))

+    {

+        rc = BUFFER_OVERFLOW;

+        goto exit;

+    }

+

+    /* 3. read the rest of the buffer using a callback to supply the rest of the data */

+    if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {

+        rc = 0;

+        goto exit;

+    }

+

+    header.byte = c->readbuf[0];

+    rc = header.bits.type;

+    if (c->keepAliveInterval > 0)

+        TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet

+

+    }

+

+exit:

+    return rc;

+}

+

+// assume topic filter and name is in correct format

+// # can only be at end

+// + and # can only be next to separator

+static char isTopicMatched(char* topicFilter, MQTTString* topicName)

+{

+    char* curf = topicFilter;

+    char* curn = topicName->lenstring.data;

+    char* curn_end = curn + topicName->lenstring.len;

+

+    while (*curf && curn < curn_end)

+    {

+        if (*curn == '/' && *curf != '/')

+            break;

+        if (*curf != '+' && *curf != '#' && *curf != *curn)

+            break;

+        if (*curf == '+')

+        {   // skip until we meet the next separator, or end of string

+            char* nextpos = curn + 1;

+            while (nextpos < curn_end && *nextpos != '/')

+                nextpos = ++curn + 1;

+        }

+        else if (*curf == '#')

+            curn = curn_end - 1;    // skip until end of string

+        curf++;

+        curn++;

+    };

+

+    return (curn == curn_end) && (*curf == '\0');

+}

+

+int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)

+{

+    int i;

+    int rc = FAILURE;

+    //ECOMM_TRACE(UNILOG_MQTT, deliverMessage1, P_SIG, 0, "....1....deliverMessage..");

+

+    // we have to find the right message handler - indexed by topic

+    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)

+    {

+        if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||

+                isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))

+        {

+            if (c->messageHandlers[i].fp != NULL)

+            {

+                MessageData md;

+                NewMessageData(&md, topicName, message);

+                c->messageHandlers[i].fp(&md);

+                rc = SUCCESS;

+                //ECOMM_TRACE(UNILOG_MQTT, deliverMessage2, P_SIG, 0, "....2....deliverMessage..");

+            }

+        }

+    }

+

+    if (rc == FAILURE && c->defaultMessageHandler != NULL)

+    {

+        MessageData md;

+        //ECOMM_TRACE(UNILOG_MQTT, deliverMessage3, P_SIG, 0, "....3....deliverMessage..");

+        NewMessageData(&md, topicName, message);

+        c->defaultMessageHandler(&md);

+        rc = SUCCESS;

+        //ECOMM_TRACE(UNILOG_MQTT, deliverMessage4, P_SIG, 0, "....4....deliverMessage..");

+    }

+

+    return rc;

+}

+

+int keepalive(MQTTClient* c)

+{

+    int rc = SUCCESS;

+

+    if (c->keepAliveInterval == 0)

+    {

+        goto exit;

+    }

+

+    if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))

+    {

+        if (c->ping_outstanding)

+        {

+            mqtt_keepalive_retry_count++;

+            //ECOMM_TRACE(UNILOG_MQTT, keepalive_0, P_SIG, 0, "....keepalive....ping_outstanding..=1..");

+            rc = FAILURE; /* PINGRESP not received in keepalive interval */

+        }

+        else

+        {

+            Timer timer;

+            TimerInit(&timer);

+            TimerCountdownMS(&timer, 1000);

+            

+            #ifdef MBTK_OPENCPU_SUPPORT

+            memset(c->buf, 0, MQTT_SEND_BUFF_LEN);

+            memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);

+            #endif

+

+            //memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);

+            //memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);

+            int len = MQTTSerialize_pingreq(c->buf, c->buf_size);

+            

+            //ECOMM_TRACE(UNILOG_MQTT, keepalive_1, P_SIG, 0, "....keepalive....send packet..");

+            if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet

+                c->ping_outstanding = 1;

+        }

+    }

+

+exit:

+    return rc;

+}

+

+int keepaliveRetry(MQTTClient* c)

+{

+    int rc = SUCCESS;

+

+    if (c->keepAliveInterval == 0)

+    {

+        goto exit;

+    }

+

+    if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))

+    {

+        {

+            Timer timer;

+            TimerInit(&timer);

+            TimerCountdownMS(&timer, 1000);

+            

+            #ifdef MBTK_OPENCPU_SUPPORT

+            memset(c->buf, 0, MQTT_SEND_BUFF_LEN);

+            memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);

+            #endif

+

+            //memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);

+            //memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);

+            int len = MQTTSerialize_pingreq(c->buf, c->buf_size);

+            

+            //ECOMM_TRACE(UNILOG_MQTT, keepalive_1pp, P_SIG, 0, "....keepalive....send packet..");

+            if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet

+                c->ping_outstanding = 1;

+        }

+    }

+

+exit:

+    return rc;

+}

+

+void MQTTCleanSession(MQTTClient* c)

+{

+    int i = 0;

+

+    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)

+        c->messageHandlers[i].topicFilter = NULL;

+}

+

+void MQTTCloseSession(MQTTClient* c)

+{

+    c->ping_outstanding = 0;

+    c->isconnected = 0;

+    if (c->cleansession)

+        MQTTCleanSession(c);

+}

+

+#ifdef MBTK_OPENCPU_SUPPORT

+

+int mqtt_try_resubscribe(MQTTClient* c)

+{

+    int i;

+    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)

+    {

+        if (c->messageHandlers[i].topicFilter != 0)

+        {

+            MQTTSubscribe(c, c->messageHandlers[i].topicFilter, c->messageHandlers[i].qos,c->messageHandlers[i].fp);

+        }

+    }

+    return 0;

+}

+int mqtt_try_reconnect(MQTTClient* c)

+{

+    int reconnect_flag = 1;

+    int rc = 0;

+    mqtt_keepalive_retry_count = 0;

+    int ret = 0;

+

+    if(c->isconnected == 1)

+    {

+        if(c->is_mqtts == 0)

+        {

+            ret = c->ipstack->disconnect(c->ipstack);

+        }

+        else

+        {

+#ifdef FEATURE_MQTT_TLS_ENABLE

+#ifdef MBTK_OPENCPU_SUPPORT

+            ret = mqttSslClose(c->mqtts_client);

+#endif

+#endif

+        }

+        if(ret == 0)

+        {

+            c->isconnected = 0;

+        }

+    }

+

+    if(c->mqtt_connect_callback != 0)

+    {

+        c->mqtt_connect_callback(MQTT_START_RECONNECT_EVENT,0);

+    }

+

+    c->isconnected = 0;

+#ifdef FEATURE_MQTT_TLS_ENABLE

+#ifdef MBTK_OPENCPU_SUPPORT

+    if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)

+    {

+        if(mqttRecvTaskHandle)

+        rc = mqttSslConn_new(c->mqtts_client, c->mbtk_mqtt_server_url);

+    }

+    else

+#endif

+#endif

+    {

+        rc = NetworkConnectTimeout(c->ipstack, c->mbtk_mqtt_server_url, c->mbtk_mqtt_port, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT);

+    }

+    if(rc < 0)

+    {

+        reconnect_flag = 0;

+    }

+    else

+    {

+        if ((rc = (MQTTConnect(c, c->mbtk_mqtt_options))) != 0)

+        {

+            reconnect_flag = 0;

+        }

+        else

+        {

+            mqtt_try_resubscribe(c);

+            return 0;

+        }

+    }

+

+

+

+    if(reconnect_flag == 0)

+    {

+        if(c->mqtt_connect_callback != NULL)

+        {

+            int socket_status = sock_get_errno(c->ipstack->my_socket);

+            c->mqtt_connect_callback(MQTT_RECONNECT_FAIL_EVENT,socket_status);

+        }

+    }

+

+    return 0;

+}

+#endif

+

+int cycle(MQTTClient* c, Timer* timer)

+{

+    int len = 0,

+        rc = SUCCESS;

+

+    int packet_type = readPacket(c, timer);     /* read the socket, see what work is due */

+    //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);

+    //ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);

+

+#ifdef MBTK_OPENCPU_SUPPORT

+    if(c->is_mqtts == MBTK_MQTT_SSL_NONE)

+    {

+        int socket_status = sock_get_errno(c->ipstack->my_socket);

+        if(socket_status == ENOTCONN && c->isconnected == 1)

+        {

+            if(c->mqtt_connect_callback != NULL)

+            {

+                c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,socket_status);

+            }

+            if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)

+            {

+                mqtt_try_reconnect(c);

+            }

+            else

+            {

+                goto exit;

+            }

+        }

+        else if(c->isconnected == 0)

+        {

+            goto exit;

+        }

+    }

+    if(c->is_mqtts && packet_type < 0)

+    {

+        if(c->isconnected)

+        {

+            if(c->mqtt_connect_callback != NULL)

+            {

+                c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,ENOTCONN);

+            }

+            if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)

+            {

+                mqtt_try_reconnect(c);

+            }

+            else

+            {

+                goto exit;

+            }

+        }

+        else

+        {

+            goto exit;

+        }

+    }

+#endif

+    switch (packet_type)

+    {

+        default:

+            /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */

+            rc = packet_type;

+            break;

+        case 0: /* timed out reading packet */

+            break;

+        case CONNACK:

+        case PUBACK:

+        case SUBACK:

+        case UNSUBACK:

+            if(packet_type == SUBACK)

+            {

+                //rc = packet_type;

+            }

+            break;

+        case PUBLISH:

+        {

+            MQTTString topicName;

+            MQTTMessage msg;

+            int intQoS;

+            msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */

+            if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,

+               (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)

+                goto exit;

+            msg.qos = (enum QoS)intQoS;

+            deliverMessage(c, &topicName, &msg);

+            if (msg.qos != QOS0)

+            {

+                if (msg.qos == QOS1)

+                    len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);

+                else if (msg.qos == QOS2)

+                    len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);

+                if (len <= 0)

+                    rc = FAILURE;

+                else

+                    rc = sendPacket(c, len, timer);

+                if (rc == FAILURE)

+                    goto exit; // there was a problem

+            }

+            break;

+        }

+        case PUBREC:

+        case PUBREL:

+        {

+            unsigned short mypacketid;

+            unsigned char dup, type;

+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)

+                rc = FAILURE;

+            else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,

+                (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)

+                rc = FAILURE;

+            else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet

+                rc = FAILURE; // there was a problem

+            if (rc == FAILURE)

+                goto exit; // there was a problem

+            break;

+        }

+

+        case PUBCOMP:

+            break;

+        case PINGRESP:

+            c->ping_outstanding = 0;

+            break;

+    }

+

+    if (keepalive(c) != SUCCESS) {      

+        int socket_stat = 0;        

+#ifndef MBTK_OPENCPU_SUPPORT

+        mqttSendMsg mqttMsg;

+#endif

+        //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT

+        rc = FAILURE;

+        socket_stat = sock_get_errno(c->ipstack->my_socket);

+        if((socket_stat == MQTT_ERR_ABRT)||(socket_stat == MQTT_ERR_RST)||(socket_stat == MQTT_ERR_CLSD)||(socket_stat == MQTT_ERR_BADE))

+        {

+#ifdef MBTK_OPENCPU_SUPPORT

+            if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)

+            {

+                mqtt_try_reconnect(c);

+            }

+#else

+            //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0, P_INFO, 0, ".....now, need reconnect.......");

+            /* send  reconnect msg to send task */

+            memset(&mqttMsg, 0, sizeof(mqttMsg));

+            mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;

+

+            osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);

+#endif

+        }

+        else

+        {

+            if(mqtt_keepalive_retry_count>3)

+            {

+#ifdef MBTK_OPENCPU_SUPPORT

+                if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)

+                {

+                    mqtt_try_reconnect(c);

+                }

+#else

+                mqtt_keepalive_retry_count = 0;

+                //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_ee0, P_INFO, 0, ".....now, need reconnect.......");

+                /* send  reconnect msg to send task */

+                memset(&mqttMsg, 0, sizeof(mqttMsg));

+                mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;

+                

+                osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);

+#endif

+            }

+            else

+            {

+                keepaliveRetry(c);

+            }

+        }

+    }

+

+exit:

+    if (rc == SUCCESS)

+        rc = packet_type;

+    else if (c->isconnected)

+        ;//MQTTCloseSession(c);

+    return rc;

+}

+

+int MQTTYield(MQTTClient* c, int timeout_ms)

+{

+    int rc = SUCCESS;

+    Timer timer;

+

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, timeout_ms);

+

+      do

+    {

+        if (cycle(c, &timer) < 0)

+        {

+            rc = FAILURE;

+            break;

+        }

+    } while (!TimerIsExpired(&timer));

+

+    return rc;

+}

+

+int MQTTIsConnected(MQTTClient* client)

+{

+  return client->isconnected;

+}

+

+void MQTTRun(void* parm)

+{

+    Timer timer;

+    MQTTClient* c = (MQTTClient*)&mqttClient;

+

+#ifndef MBTK_OPENCPU_SUPPORT

+    if(mqttSendMsgHandle == NULL)

+    {

+        mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);

+    }

+    

+    if(appMqttMsgHandle == NULL)

+    {

+        appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);

+    }

+#endif

+    /*

+    if(mqttMutex1.sem == NULL)

+    {

+        MutexInit(&mqttMutex1);

+    }

+    */

+

+    TimerInit(&timer);

+

+    while (1)

+    {

+#if defined(MQTT_TASK)

+        //MutexLock(&c->mutex);

+#endif

+        //MutexLock(&mqttMutex1);

+

+        TimerCountdownMS(&timer, 1500); /* Don't wait too long if no traffic is incoming */

+        cycle(c, &timer);

+        //MutexUnlock(&mqttMutex1);

+        

+#if defined(MQTT_TASK)

+        //MutexUnlock(&c->mutex);

+#endif

+        sleep(200);

+    }

+}

+

+#if defined(MQTT_TASK)

+int MQTTStartTask(MQTTClient* client)

+{

+    return ThreadStart(&client->thread, &MQTTRun, client);

+}

+#endif

+

+int MQTTStartRECVTask(void)

+{

+    //osThreadAttr_t task_attr;

+

+    //memset(&task_attr, 0, sizeof(task_attr));

+    //task_attr.name = "mqttRecv";

+    //task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;

+    #if defined FEATURE_LITEOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal4;

+    #elif defined FEATURE_FREERTOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal7;

+    #endif

+

+    //mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);

+    if(mqttRecvTaskHandle == NULL)

+    {

+        return FAILURE;

+    }

+

+    return SUCCESS;

+}

+

+int waitfor(MQTTClient* c, int packet_type, Timer* timer)

+{

+    int rc = FAILURE;

+

+    do

+    {

+        if (TimerIsExpired(timer))

+            break; // we timed out

+        rc = cycle(c, timer);

+    }

+    while (rc != packet_type && rc >= 0);

+

+    return rc;

+}

+

+int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)

+{

+    Timer connect_timer;

+    int rc = FAILURE;

+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;

+    int len = 0;

+

+#if defined(MQTT_TASK)

+      MutexLock(&c->mutex);

+#endif

+      if (c->isconnected) /* don't send connect packet again if we are already connected */

+          goto exit;

+

+    TimerInit(&connect_timer);

+    TimerCountdownMS(&connect_timer, c->command_timeout_ms);

+

+    if (options == 0)

+        options = &default_options; /* set default options if none were supplied */

+

+    c->keepAliveInterval = options->keepAliveInterval;

+    c->cleansession = options->cleansession;

+    TimerCountdown(&c->last_received, c->keepAliveInterval);

+    if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)

+        goto exit;

+    if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS)  // send the connect packet

+        goto exit; // there was a problem

+

+    // this will be a blocking call, wait for the connack

+    if (waitfor(c, CONNACK, &connect_timer) == CONNACK)

+    {

+        data->rc = 0;

+        data->sessionPresent = 0;

+        if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)

+            rc = data->rc;

+        else

+            rc = FAILURE;

+    }

+    else

+        rc = FAILURE;

+

+exit:

+#ifdef MBTK_OPENCPU_SUPPORT

+    if (rc == SUCCESS)

+    {

+        if(c->mqtt_connect_callback != NULL)

+        {

+            c->mqtt_connect_callback(MQTT_CONNECT_SUCCESS_EVENT,0);

+        }

+        c->isconnected = 1;

+        c->ping_outstanding = 0;

+    }

+#endif

+

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+

+    return rc;

+}

+

+int MQTTReConnect(MQTTClient* client, MQTTPacket_connectData* connectData)

+{

+    int ret = FAILURE;

+    

+    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13hh0, P_INFO, 0, "...start tcp disconnect ..");

+    client->ipstack->disconnect(client->ipstack);

+    

+    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14hh1, P_INFO, 0, "...start tcp connect ...");

+    {

+        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16hh3, P_INFO, 0, "...tcp connect ok...");

+        client->isconnected = 0;

+        if((NetworkConnectTimeout(client->ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) < 0)

+        {

+            //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17hh4, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");

+        }

+        else

+        {

+            //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18hh5, P_INFO, 0, "...start mqtt connect ..");

+    

+            if ((MQTTConnect(client, connectData)) != 0)

+            {

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19hh6, P_INFO, 0, "...mqtt reconnect fial!!!...");

+            }

+            else

+            {

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20hh7, P_INFO, 0, "...mqtt reconnect ok!!!...");

+                ret = SUCCESS;

+            }

+        }

+    }

+    return ret;

+}

+

+int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)

+{

+    MQTTConnackData data;

+    return MQTTConnectWithResults(c, options, &data);

+}

+

+int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)

+{

+    int rc = FAILURE;

+    int i = -1;

+

+    /* first check for an existing matching slot */

+    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)

+    {

+        if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)

+        {

+            if (messageHandler == NULL) /* remove existing */

+            {

+                c->messageHandlers[i].topicFilter = NULL;

+                c->messageHandlers[i].fp = NULL;

+            }

+            rc = SUCCESS; /* return i when adding new subscription */

+            break;

+        }

+    }

+    /* if no existing, look for empty slot (unless we are removing) */

+    if (messageHandler != NULL) {

+        if (rc == FAILURE)

+        {

+            for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)

+            {

+                if (c->messageHandlers[i].topicFilter == NULL)

+                {

+                    rc = SUCCESS;

+                    break;

+                }

+            }

+        }

+        if (i < MAX_MESSAGE_HANDLERS)

+        {

+            c->messageHandlers[i].topicFilter = topicFilter;

+            c->messageHandlers[i].fp = messageHandler;

+#ifdef MBTK_OPENCPU_SUPPORT

+            c->messageHandlers[i].qos = c->qos;

+#endif

+        }

+    }

+    return rc;

+}

+

+#ifdef MBTK_OPENCPU_SUPPORT

+

+void mbtk_MQTTRun(void* parm)

+{

+    Timer timer;

+    MQTTClient* c = (MQTTClient*)parm;

+    int i = c->recv_task_num;

+

+    if(mbtk_mqttMutex1[i].sem == NULL)

+    {

+        MutexInit(&mbtk_mqttMutex1[i]);

+    }

+

+    TimerInit(&timer);

+

+    while (1)

+    {

+#if defined(MQTT_TASK)

+        MutexLock(&c->mutex);

+#endif

+        MutexLock(&mbtk_mqttMutex1[i]);

+

+        TimerCountdownMS(&timer, 1500); 

+        cycle(c, &timer);

+        MutexUnlock(&mbtk_mqttMutex1[i]);

+        

+#if defined(MQTT_TASK)

+        MutexUnlock(&c->mutex);

+#endif

+        osDelay(200);

+    }

+}

+

+

+int mbtk_mqtt_demo_recv_task_init(int i,MQTTClient* c)

+{

+    osThreadAttr_t task_attr;

+    char name[10] = {0};

+    memset(&task_attr, 0, sizeof(task_attr));

+    task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;

+    task_attr.priority = osPriorityBelowNormal7;

+    sprintf(name, "mqttRecv%d", i);

+    task_attr.name = name;

+    ECOMM_TRACE(UNILOG_MQTT, mbtk_mqtt_demo_recv_task_init_1, P_INFO, 0, "mbtk_mqtt_demo_recv_task_init task_attr.name%s",task_attr.name);

+

+

+    mbtk_mqttRecvTaskHandle[i] = osThreadNew(mbtk_MQTTRun, (void *)c,&task_attr);

+    if(mbtk_mqttRecvTaskHandle[i] == NULL)

+    {

+        return FAILURE;

+    }

+    return SUCCESS;

+}

+

+

+#endif

+int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,

+       messageHandler messageHandler, MQTTSubackData* data)

+{

+    int rc = FAILURE;

+    Timer timer;

+    int len = 0;

+    int mqttQos = (int)qos;

+    MQTTString topic = MQTTString_initializer;

+    topic.cstring = (char *)topicFilter;

+

+#if defined(MQTT_TASK)

+      MutexLock(&c->mutex);

+#endif

+      if (!c->isconnected)

+            goto exit;

+

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, c->command_timeout_ms);

+

+    len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);

+    if (len <= 0)

+        goto exit;

+    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet

+        goto exit;             // there was a problem

+

+    if (waitfor(c, SUBACK, &timer) == SUBACK)      // wait for suback

+    {

+        int count = 0;

+        unsigned short mypacketid;

+        //data->grantedQoS = QOS0;

+        mqttQos = QOS0;

+        if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)

+        {

+            if (mqttQos != 0x80)

+            {//mbtk change

+            #ifndef MBTK_OPENCPU_SUPPORT

+                rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);

+                mqttClient = *c;

+                

+                if(mqttRecvTaskHandle == NULL)

+                {

+                    mqtt_demo_recv_task_init();

+                }

+

+           #else

+            rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);

+            int i = 0;

+            if(c->first_sub == 0)

+            {

+                c->first_sub = 1;

+                for(i = 0; i < MAX_RECV_TASK; i++)

+                {

+                    if(mbtk_mqttRecvTaskHandle[i] != NULL)

+                    {

+                        ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_0, P_INFO, 0, "mbtk_mqttRecvTaskHandle %d has create",i);

+                        continue;

+                    }

+                    else

+                    {

+                        c->recv_task_num = i;

+                        mbtk_mqtt_demo_recv_task_init(i,c);

+                        break;

+                    }

+                }

+                if(i >= MAX_RECV_TASK)

+                {

+                

+                    ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_1, P_INFO, 0, "create mqtt recv task error");

+                    rc = FAILURE;

+                }

+            }

+

+            #endif

+            }//mbtk change, end

+        }

+    }

+    else

+        rc = FAILURE;

+

+exit:

+    if (rc == FAILURE)

+        ;//MQTTCloseSession(c);

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,

+       messageHandler messageHandler)

+{

+    MQTTSubackData data;

+    return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);

+}

+

+int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)

+{

+    int rc = FAILURE;

+    Timer timer;

+    MQTTString topic = MQTTString_initializer;

+    topic.cstring = (char *)topicFilter;

+    int len = 0;

+

+#if defined(MQTT_TASK)

+      MutexLock(&c->mutex);

+#endif

+      if (!c->isconnected)

+          goto exit;

+

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, c->command_timeout_ms);

+

+    if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)

+        goto exit;

+    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet

+        goto exit; // there was a problem

+

+    if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)

+    {

+        unsigned short mypacketid;  // should be the same as the packetid above

+        if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)

+        {

+            /* remove the subscription message handler associated with this topic, if there is one */

+            MQTTSetMessageHandler(c, topicFilter, NULL);

+        }

+    }

+    else

+        rc = FAILURE;

+

+exit:

+    if (rc == FAILURE)

+        ;//MQTTCloseSession(c);

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)

+{

+    int rc = FAILURE;

+    Timer timer;

+    MQTTString topic = MQTTString_initializer;

+    topic.cstring = (char *)topicName;

+    int len = 0;

+

+#if defined(MQTT_TASK)

+      MutexLock(&c->mutex);

+#endif

+      if (!c->isconnected)

+            goto exit;

+

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, c->command_timeout_ms);

+

+    if (message->qos == QOS1 || message->qos == QOS2)

+        message->id = getNextPacketId(c);

+

+    len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,

+              topic, (unsigned char*)message->payload, message->payloadlen);

+    if (len <= 0)

+        goto exit;

+    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet

+        goto exit; // there was a problem

+

+    if (message->qos == QOS1)

+    {

+        if (waitfor(c, PUBACK, &timer) == PUBACK)

+        {

+            unsigned short mypacketid;

+            unsigned char dup, type;

+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)

+                rc = FAILURE;

+        }

+        else

+            rc = FAILURE;

+    }

+    else if (message->qos == QOS2)

+    {

+        if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)

+        {

+            unsigned short mypacketid;

+            unsigned char dup, type;

+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)

+                rc = FAILURE;

+        }

+        else

+            rc = FAILURE;

+    }

+

+exit:

+    if (rc == FAILURE)

+        ;//MQTTCloseSession(c);

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+int MQTTDisconnect(MQTTClient* c)

+{

+    int rc = FAILURE;

+    Timer timer;     // we might wait for incomplete incoming publishes to complete

+    int len = 0;

+

+#if defined(MQTT_TASK)

+    MutexLock(&c->mutex);

+#endif

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, c->command_timeout_ms);

+

+      len = MQTTSerialize_disconnect(c->buf, c->buf_size);

+    if (len > 0)

+        rc = sendPacket(c, len, &timer);            // send the disconnect packet

+    MQTTCloseSession(c);

+

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+int MQTTInit(MQTTClient* c, Network* n, unsigned char* sendBuf, unsigned char* readBuf)

+{    

+    NetworkInit(n);

+    MQTTClientInit(c, n, 40000, (unsigned char *)sendBuf, 1000, (unsigned char *)readBuf, 1000);

+

+    return 0;

+}

+

+int MQTTCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)

+{

+    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

+    int clientLen = 0;

+    int usernameLen = 0;

+    int passwordLen = 0;

+

+    if(connData != NULL)

+    {

+        memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));

+    }

+    else

+    {

+        connectData.MQTTVersion = 4;

+        connectData.keepAliveInterval = 120;

+    }

+

+    if(clientID != NULL)

+    {

+        clientLen = strlen(clientID);

+        connectData.clientID.cstring = malloc(clientLen+1);

+        memset(connectData.clientID.cstring, 0, (clientLen+1));

+        memcpy(connectData.clientID.cstring, clientID, clientLen);

+    }

+

+    if(username != NULL)

+    {

+        usernameLen = strlen(username);

+        connectData.username.cstring = malloc(usernameLen+1);;

+        memset(connectData.username.cstring, 0, (usernameLen+1));

+        memcpy(connectData.username.cstring, username, usernameLen);

+    }

+

+    if(password != NULL)

+    {

+        passwordLen = strlen(password);

+        connectData.password.cstring = malloc(passwordLen+1);

+        memset(connectData.password.cstring, 0, (passwordLen+1));

+        memcpy(connectData.password.cstring, password, passwordLen);

+    } 

+      

+    {

+        if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)

+        {

+            c->keepAliveInterval = connectData.keepAliveInterval;

+            c->ping_outstanding = 1;

+            return 1;

+        }

+        else

+        {

+            if ((MQTTConnect(c, &connectData)) != 0)

+            {

+                c->ping_outstanding = 1;

+                return 1;

+            }

+            else

+            {

+                #if defined(MQTT_TASK)

+                    if ((MQTTStartTask(c)) != pdPASS)

+                    {

+                        return 1;

+                    }

+                    else

+                    {

+                        return 0;

+                    }

+                #endif

+            }

+        }

+    }

+    return 1;

+}

+

+#ifdef FEATURE_CUCC_DM_ENABLE

+

+int MQTTCuccCycle(MQTTClient* c, Timer* timer)

+{

+    int rc = SUCCESS;

+

+    int packet_type = readPacket(c, timer);     /* read the socket, see what work is due */

+    //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);

+    //ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);

+

+    switch (packet_type)

+    {

+        default:

+            /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */

+            rc = packet_type;

+            break;

+        case 0: /* timed out reading packet */

+            break;

+        case CONNACK:

+        case PUBACK:

+        case SUBACK:

+        case UNSUBACK:

+            if(packet_type == SUBACK)

+            {

+                //rc = packet_type;

+            }

+            break;

+        case PUBLISH:

+        {

+            break;

+        }

+        case PUBREC:

+        case PUBREL:

+        {

+            break;

+        }

+

+        case PUBCOMP:

+            break;

+        case PINGRESP:

+            c->ping_outstanding = 0;

+            break;

+    }

+

+    if (rc == SUCCESS)

+        rc = packet_type;

+    else if (c->isconnected)

+        ;//MQTTCloseSession(c);

+    return rc;

+}

+

+int MQTTCuccWaitfor(MQTTClient* c, int packet_type, Timer* timer)

+{

+    int rc = FAILURE;

+

+    do

+    {

+        if (TimerIsExpired(timer))

+            break; // we timed out

+        rc = MQTTCuccCycle(c, timer);

+    }

+    while (rc != packet_type && rc >= 0);

+

+    return rc;

+}

+

+int MQTTCuccCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)

+{

+    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

+    int clientLen = 0;

+    int usernameLen = 0;

+    int passwordLen = 0;

+

+    if(connData != NULL)

+    {

+        memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));

+    }

+    else

+    {

+        connectData.MQTTVersion = 4;

+        connectData.keepAliveInterval = 120;

+    }

+

+    if(clientID != NULL)

+    {

+        clientLen = strlen(clientID);

+        connectData.clientID.cstring = malloc(clientLen+1);

+        memset(connectData.clientID.cstring, 0, (clientLen+1));

+        memcpy(connectData.clientID.cstring, clientID, clientLen);

+    }

+

+    if(username != NULL)

+    {

+        usernameLen = strlen(username);

+        connectData.username.cstring = malloc(usernameLen+1);;

+        memset(connectData.username.cstring, 0, (usernameLen+1));

+        memcpy(connectData.username.cstring, username, usernameLen);

+    }

+

+    if(password != NULL)

+    {

+        passwordLen = strlen(password);

+        connectData.password.cstring = malloc(passwordLen+1);

+        memset(connectData.password.cstring, 0, (passwordLen+1));

+        memcpy(connectData.password.cstring, password, passwordLen);

+    } 

+      

+    {

+        if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)

+        {

+            c->keepAliveInterval = connectData.keepAliveInterval;

+            c->ping_outstanding = 1;

+            return 1;

+        }

+        else

+        {

+            if ((MQTTConnect(c, &connectData)) != 0)

+            {

+                //c->ping_outstanding = 1;

+                //return 1;

+            }

+            else

+            {

+                #if defined(MQTT_TASK)

+                    if ((MQTTStartTask(c)) != pdPASS)

+                    {

+                        return 1;

+                    }

+                    else

+                    {

+                        return 0;

+                    }

+                #endif

+            }

+        }

+    }

+    return 0;

+}

+

+int MQTTCuccConnect(MQTTClient* c, char* clientID, char* username, char* password, MQTTPacket_connectData* connData)

+{

+    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

+    int clientLen = 0;

+    int usernameLen = 0;

+    int passwordLen = 0;

+

+    if(connData != NULL)

+    {

+        memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));

+    }

+    else

+    {

+        connectData.MQTTVersion = 4;

+        connectData.keepAliveInterval = 120;

+    }

+

+    if(clientID != NULL)

+    {

+        clientLen = strlen(clientID);

+        connectData.clientID.cstring = malloc(clientLen+1);

+        memset(connectData.clientID.cstring, 0, (clientLen+1));

+        memcpy(connectData.clientID.cstring, clientID, clientLen);

+    }

+

+    if(username != NULL)

+    {

+        usernameLen = strlen(username);

+        connectData.username.cstring = malloc(usernameLen+1);;

+        memset(connectData.username.cstring, 0, (usernameLen+1));

+        memcpy(connectData.username.cstring, username, usernameLen);

+    }

+

+    if(password != NULL)

+    {

+        passwordLen = strlen(password);

+        connectData.password.cstring = malloc(passwordLen+1);

+        memset(connectData.password.cstring, 0, (passwordLen+1));

+        memcpy(connectData.password.cstring, password, passwordLen);

+    } 

+      

+    if ((MQTTConnect(c, &connectData)) != 0)

+    {

+        c->ping_outstanding = 1;

+        return 1;

+    }

+

+    return 0;

+}

+

+int MQTTCuccPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)

+{

+    int rc = FAILURE;

+    Timer timer;

+    MQTTString topic = MQTTString_initializer;

+    topic.cstring = (char *)topicName;

+    int len = 0;

+

+#if defined(MQTT_TASK)

+      MutexLock(&c->mutex);

+#endif

+      if (!c->isconnected)

+            goto exit;

+

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, c->command_timeout_ms);

+

+    if (message->qos == QOS1 || message->qos == QOS2)

+        message->id = getNextPacketId(c);

+

+    len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,

+              topic, (unsigned char*)message->payload, message->payloadlen);

+    if (len <= 0)

+        goto exit;

+    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet

+        goto exit; // there was a problem

+

+    if (message->qos == QOS1)

+    {

+        if (waitfor(c, PUBACK, &timer) == PUBACK)

+        {

+            unsigned short mypacketid;

+            unsigned char dup, type;

+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)

+                rc = FAILURE;

+        }

+        else

+            rc = FAILURE;

+    }

+    else if (message->qos == QOS2)

+    {

+        if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)

+        {

+            unsigned short mypacketid;

+            unsigned char dup, type;

+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)

+                rc = FAILURE;

+        }

+        else

+            rc = FAILURE;

+    }

+

+exit:

+    if (rc == FAILURE)

+        ;//MQTTCloseSession(c);

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+int MQTTCuccSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)

+{

+    int rc = FAILURE;

+    Timer timer;

+    int len = 0;

+    int mqttQos = (int)qos;

+    //MQTTSubackData data;

+    MQTTString topic = MQTTString_initializer;

+    topic.cstring = (char *)topicFilter;

+

+#if defined(MQTT_TASK)

+      MutexLock(&c->mutex);

+#endif

+      if (!c->isconnected)

+            goto exit;

+

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, 40000);

+

+    len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);

+    if (len <= 0)

+        goto exit;

+    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet

+        goto exit;             // there was a problem

+

+    if (waitfor(c, SUBACK, &timer) == SUBACK)      // wait for suback

+    {

+        int count = 0;

+        unsigned short mypacketid;

+        //data.grantedQoS = QOS0;

+        mqttQos = QOS0;

+        if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)

+        {

+            if (mqttQos == 0x80)

+            {

+                rc = FAILURE;

+            }

+            else

+            {

+                rc = SUCCESS;

+            }

+        }

+    }

+    else

+    {

+        rc = FAILURE;

+    }

+

+exit:

+    if (rc == FAILURE)

+        ;//MQTTCloseSession(c);

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+

+int MQTTCuccDisconnect(MQTTClient* c)

+{

+    int rc = FAILURE;

+    Timer timer;     // we might wait for incomplete incoming publishes to complete

+    int len = 0;

+

+#if defined(MQTT_TASK)

+    MutexLock(&c->mutex);

+#endif

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, c->command_timeout_ms);

+

+      len = MQTTSerialize_disconnect(c->buf, c->buf_size);

+    if (len > 0)

+        rc = sendPacket(c, len, &timer);            // send the disconnect packet

+    MQTTCloseSession(c);

+

+#if defined(MQTT_TASK)

+      MutexUnlock(&c->mutex);

+#endif

+    return rc;

+}

+

+int MQTTCuccWaitForRecv(MQTTClient* c, int packet_type, unsigned int timerOut, MQTTString *topicName, char *outPayload)

+{

+    int rc = FAILURE;

+    Timer timer;

+    MQTTMessage msg;

+    int intQoS;

+

+    memset(&msg, 0, sizeof(MQTTMessage));

+    TimerInit(&timer);

+    TimerCountdownMS(&timer, timerOut);

+

+    do

+    {

+        if (TimerIsExpired(&timer))

+            break; // we timed out

+        rc = cycle(c, &timer);

+    }

+    while (rc != packet_type && rc >= 0);

+

+    if(rc == PUBLISH)

+    {

+        if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, topicName,

+                                (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) == 1)

+        {

+            memcpy(outPayload, (unsigned char*)msg.payload, msg.payloadlen);

+            rc = SUCCESS;

+        }

+    }

+    

+    return rc;

+}

+

+

+#endif

+

+#define MQTT_DEMO_EXAMPLE_1_ONENET

+void mqtt_demo_onenet(void)

+{

+    int len = 0;

+    MQTTMessage message;

+    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

+    

+    connectData.MQTTVersion = 4;

+    connectData.clientID.cstring = "34392813";

+    connectData.username.cstring = "122343";

+    connectData.password.cstring = "test001";

+    connectData.keepAliveInterval = 120;

+    //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh00, P_SIG, 0, "mqtt_demo........");

+    ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet0, P_INFO, "mqtt_demo........");

+    mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);

+    mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);

+    memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);

+    memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);

+    

+    NetworkInit(&mqttNetwork);

+    MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);

+

+    if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)

+    {

+        mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+        mqttClient.ping_outstanding = 1;

+    }

+    else

+    {

+        if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)

+        {

+            mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+            mqttClient.ping_outstanding = 1;

+        }

+        else

+        {

+            ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet1, P_INFO, "mqtt_demo socket connect ok");

+            if ((MQTTConnect(&mqttClient, &connectData)) != 0)

+            {

+                mqttClient.ping_outstanding = 1;

+            }

+            else

+            {

+                ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet2, P_INFO, "mqtt_demo connect fail");

+                mqttClient.ping_outstanding = 0;

+            }

+        }

+        if(mqttClient.ping_outstanding == 0)

+        {

+            if ((MQTTStartRECVTask()) != SUCCESS)

+                ;

+        }

+

+    }

+    while(1)

+    {

+        sprintf(mqtt_payload,"{\"ec_smart_sensor_data\":%d}", ec_sensor_temp);

+        len = strlen(mqtt_payload);

+        ec_data_len = len;

+        unsigned char *ptr = mqttJsonbuff;

+        sprintf((char *)mqttJsonbuff,"%c%c%c%s", ec_data_type, ec_data_type,ec_data_type, mqtt_payload);

+        message.payload = mqttJsonbuff;

+        message.payloadlen = strlen((char *)mqttJsonbuff);

+        writeChar(&ptr, ec_data_type);

+        writeInt(&ptr, ec_data_len);

+        ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet3, P_INFO, "mqtt_demo send data");

+

+        MQTTPublish(&mqttClient, "$dp", &message);

+

+        osDelay(10000);

+    }

+    /* do not return */

+}

+

+#define MQTT_DEMO_EXAMPLE_2_ALI

+#ifdef FEATURE_MQTT_TLS_ENABLE

+void mqtt_demo_ali(void)

+{

+    //char *pub_topic;

+    int len = 0;

+    MQTTMessage message;

+    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

+    char hmac_source[256] = {0};

+    char *ali_clientID = NULL;

+    char *ali_username = NULL;

+    char *ali_signature = NULL;

+

+    /* eigencomm  ec_smoke     a1xFDTv3InR  sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg  8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */

+    /* eigencomm  deviceGrape  a1fsx061r0x  WLar6NunAcCJ0aZHbaNw4eQwdsYVKyC9  8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */

+    /* clientID   deviceName   productKey   deviceSecret*/

+    ali_clientID = malloc(128);

+    ali_username = malloc(64);

+    ali_signature = malloc(96);

+    

+    memset(ali_clientID, 0, 128);

+    memset(ali_username, 0, 64);

+    memset(ali_signature, 0, 96);

+    

+    snprintf(hmac_source, sizeof(hmac_source), "clientId%s" "deviceName%s" "productKey%s" "timestamp%s", "eigencomm", "ec_smoke", "a1xFDTv3InR", "8964");

+    

+    mqttAliHmacSha1((unsigned char *)hmac_source, strlen(hmac_source), (unsigned char *)ali_signature,(unsigned char *)"sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg", strlen("sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg"));

+    

+    sprintf(ali_clientID,"%s|securemode=3,signmethod=hmacsha1,timestamp=8964|", "eigencomm");

+    sprintf(ali_username,"%s&%s","ec_smoke","a1xFDTv3InR");

+    connectData.clientID.cstring = ali_clientID;

+    connectData.username.cstring = ali_username;

+    connectData.password.cstring = ali_signature;

+    

+    connectData.MQTTVersion = 4;

+    connectData.keepAliveInterval = 120;

+    //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh001, P_SIG, 0, "mqtt_demo........");

+    mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);

+    mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);

+    memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);

+    memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);

+    

+    NetworkInit(&mqttNetwork);

+    MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);

+

+    if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)

+    {

+        mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+        mqttClient.ping_outstanding = 1;

+    }

+    else

+    {

+        if ((NetworkConnect(&mqttNetwork, "a1xFDTv3InR.iot-as-mqtt.cn-shanghai.aliyuncs.com", 1883)) != 0)

+        {

+            mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+            mqttClient.ping_outstanding = 1;

+        }

+        else

+        {

+            //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh012, P_SIG, 0, "mqtt_demo....1....");

+            if ((MQTTConnect(&mqttClient, &connectData)) != 0)

+            {

+                mqttClient.ping_outstanding = 1;

+            }

+            else

+            {

+                //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh023, P_SIG, 0, "mqtt_demo....2....");

+                mqttClient.ping_outstanding = 0;

+            }

+        }

+        if(mqttClient.ping_outstanding == 0)

+        {

+            if ((MQTTStartRECVTask()) != SUCCESS)

+                ;

+        }

+

+    }

+    while(1)

+    {

+        memset(mqtt_payload, 0, sizeof(mqtt_payload));

+        memcpy(mqtt_payload, "update", strlen("update"));

+        len = strlen(mqtt_payload);

+        message.payload = mqtt_payload;

+        message.payloadlen = len;

+        //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh034, P_SIG, 0, "mqtt_demo....3....");

+

+        MQTTPublish(&mqttClient, "a1xFDTv3InR/ec_smoke/user/get", &message);

+

+        osDelay(10000);

+    }

+    /* do not return */

+}

+#endif

+#define MQTT_DEMO_EXAMPLE_3_APP

+void mqtt_demo_send_task(void *argument)

+{

+    int ret = FAILURE;

+    int msgType = 0xff;

+    mqttSendMsg mqttMsg;

+    mqttDataMsg mqttMessage;

+    int socket_stat = -1;

+    int socket_err = -1;    

+    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

+    

+    connectData.MQTTVersion = 4;

+    connectData.clientID.cstring = "34392813";

+    connectData.username.cstring = "122343";

+    connectData.password.cstring = "test001";

+    connectData.keepAliveInterval = 120;

+    

+    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_0, P_SIG, 0, "mqttSendTask........");

+    mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);

+    mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);

+    memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);

+    memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);

+

+    NetworkInit(&mqttNetwork);

+    MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, MQTT_SEND_BUFF_LEN, (unsigned char *)mqttReadbuf, MQTT_RECV_BUFF_LEN);

+

+    if((NetworkSetConnTimeout(&mqttNetwork, 10000, 10000)) != 0)

+    {

+        mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+        mqttClient.ping_outstanding = 1;

+    }

+    else

+    {

+        if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)

+        {

+            mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+            mqttClient.ping_outstanding = 1;

+            //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_1, P_SIG, 0, "mqttSendTask..tcp connect fail....");

+        }

+        else

+        {

+            //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_2, P_SIG, 0, "mqttSendTask..tcp connect ok....");

+            if ((MQTTConnect(&mqttClient, &connectData)) != 0)

+            {

+                mqttClient.ping_outstanding = 1;

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_3, P_SIG, 0, "mqttSendTask..mqtt connect fail....");

+            }

+            else

+            {

+                mqttClient.keepAliveInterval = connectData.keepAliveInterval;

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4, P_SIG, 0, "mqttSendTask..mqtt connect ok....");

+                mqtt_send_task_status_flag = 1;             

+                mqttClient.ping_outstanding = 0;

+            }

+        }

+    }

+    

+    /* sub topic*/

+    //MQTTSubscribe(&mqttClient, topic_data, 0, NULL);       

+    

+    /*start recv task*/

+    if(mqttClient.ping_outstanding == 0)

+    {

+        mqtt_demo_recv_task_init();

+    }

+    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4000, P_SIG, 0, "mqttSendTask..start recv msg....");

+    

+    while(1)

+    {

+        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4001, P_SIG, 1, "mqttSendTask..recv msg hand=0x%x....",mqttSendMsgHandle);

+        /* recv msg (block mode) */

+        osMessageQueueGet(mqttSendMsgHandle, &mqttMsg, 0, osWaitForever);

+        msgType = mqttMsg.cmdType;

+        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4002, P_SIG, 1, "mqttSendTask..recv msg=%d....",msgType);

+

+        switch(msgType)

+        {

+            case MQTT_DEMO_MSG_PUBLISH:

+                /* send packet */

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_500, P_INFO, 0, ".....start send mqtt publish packet.......");

+                MutexLock(&mqttMutex1);

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_5, P_INFO, 0, ".....start send mqtt publish packet.......");

+                memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);

+                ret = MQTTPublish(&mqttClient, mqttMsg.topic, &mqttMsg.message);

+                

+                if(mqttMsg.topic != NULL)

+                {

+                    free(mqttMsg.topic);

+                    mqttMsg.topic = NULL;

+                }

+                if(mqttMsg.message.payload != NULL)

+                {

+                    free(mqttMsg.message.payload);

+                    mqttMsg.message.payload = NULL;

+                }           

+                

+                /* send result to at task */

+                if(ret == SUCCESS)

+                {

+                    memset(&mqttMessage, 0, sizeof(mqttMessage));

+                    mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;

+                    mqttMessage.ret = SUCCESS;

+                    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600, P_INFO, 0, ".....send mqtt publish packet ok.......");

+                

+                    osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);

+                }

+                else

+                {

+                    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_6, P_INFO, 0, ".....send mqtt publish packet fail.......");

+                    socket_stat = sock_get_errno(mqttClient.ipstack->my_socket);

+                    socket_err = socket_error_is_fatal(socket_stat);

+                    if(socket_err == 1)

+                    {

+                        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_7, P_INFO, 0, ".....find need reconnect when publish packet.......");

+                        MQTTReConnect(&mqttClient, &connectData);

+

+                    }

+                    else

+                    {

+                        memset(&mqttMessage, 0, sizeof(mqttMessage));

+                        mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;

+                        mqttMessage.ret = SUCCESS;

+                        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600ii, P_INFO, 0, ".....send mqtt publish packet ok.......");

+                    

+                        osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);

+                    }

+                    //memset(&mqttMessage, 0, sizeof(mqttMessage));

+                    //mqttMessage.cmdType = MQTT_MSG_RECONNECT;

+                    //mqttMessage.ret = FAILURE;

+                

+                    //xQueueSend(appMqttMsgHandle, &mqttMessage, MQTT_MSG_TIMEOUT);

+                }

+                MutexUnlock(&mqttMutex1);

+                break;

+#if 0

+            case MQTT_DEMO_MSG_KEEPALIVE:

+                /* send keepalive packet */

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_8, P_INFO, 0, ".....start send mqtt keeplive packet.......");

+                ret = keepalive(&mqttClient);

+

+                if(ret == SUCCESS) // send the ping packet

+                {

+                    mqttClient->ping_outstanding = 1;

+                    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_9, P_INFO, 1, ".....mqtt keeplive send ok.......");

+                }

+                else

+                {

+                    socket_stat = sock_get_errno(mqttNewContext->mqtt_client->ipstack->my_socket);

+                    socket_err = socket_error_is_fatal(socket_stat);

+                    if(socket_err == 1)

+                    {

+                        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_10, P_INFO, 0, ".....find need reconnect when send keeplive packet.......");

+                        ret = MQTT_RECONNECT;

+                    }

+                    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_11, P_INFO, 1, ".....mqtt send keeplive Packet fail");

+                }

+                break;

+#endif

+            case MQTT_DEMO_MSG_RECONNECT:

+                MutexLock(&mqttMutex1);

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_12, P_INFO, 0, ".....find need reconnect when read packet.......");

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13, P_INFO, 0, "...start tcp disconnect ..");

+                mqttClient.ipstack->disconnect(mqttClient.ipstack);

+                

+                //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14, P_INFO, 0, "...start tcp connect ...");

+                if ((NetworkSetConnTimeout(mqttClient.ipstack, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) != 0)

+                {

+                    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_15, P_INFO, 0, "...tcp socket set timeout fail...");

+                }

+                else

+                {

+                    //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16, P_INFO, 0, "...tcp connect ok...");

+                    mqttClient.isconnected = 0;

+                    if((NetworkConnect(mqttClient.ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT)) < 0)

+                    {

+                        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");

+                    }

+                    else

+                    {

+                        //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18, P_INFO, 0, "...start mqtt connect ..");

+

+                        if ((MQTTConnect(&mqttClient, &connectData)) != 0)

+                        {

+                            //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19, P_INFO, 0, "...mqtt reconnect fial!!!...");

+                        }

+                        else

+                        {

+                            //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20, P_INFO, 0, "...mqtt reconnect ok!!!...");

+                        }

+                    }

+                }

+                MutexUnlock(&mqttMutex1);

+                break;

+            }

+    }

+}

+

+void app_mqtt_demo_task(void *argument)

+{

+    int msgType = 1;

+    int payloadLen = 0;

+    mqttSendMsg mqttMsg;

+    mqttDataMsg mqttMessage;

+    int ret = FAILURE;

+    

+    /*init driver*/

+

+    /*start mqtt send task*/

+    ret = mqtt_demo_send_task_init();

+    if(ret == FAILURE)

+    {

+        ;

+    }

+    while(1)

+    {

+        if(mqtt_send_task_status_flag == 1)

+        {

+            break;

+        }

+        osDelay(4000);

+    }

+    

+    /*state machine*/

+    while(1)

+    {

+        /*read data*/

+

+        osDelay(4000);

+

+        /*send msg to mqtt send task, and wait for excute result*/

+        //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_0, P_SIG, 0, "appMqttTask...send start.....");

+        memset(&mqttMsg, 0, sizeof(mqttSendMsg));

+        mqttMsg.cmdType = MQTT_DEMO_MSG_PUBLISH;

+

+        mqttMsg.topic = malloc(128);

+        memset(mqttMsg.topic, 0, 128);

+        memcpy(mqttMsg.topic, "$dp", strlen("$dp"));        

+        mqttMsg.message.qos = QOS0;

+        mqttMsg.message.retained = 0;

+        mqttMsg.message.id = 0;

+        mqttMsg.message.payload = malloc(32);

+        memset(mqttMsg.message.payload, 0, 32);

+        memcpy(mqttMsg.message.payload, "{\"data\":90}", strlen("{\"data\":90}"));

+        

+        sprintf(mqttMsg.message.payload, "%c%c%c%s",msgType, msgType, msgType, "{\"data\":90}");

+        payloadLen = strlen(mqttMsg.message.payload);

+        unsigned char *ptr = (unsigned char *)mqttMsg.message.payload;

+        writeChar(&ptr,3);

+        writeInt(&ptr,strlen("{\"data\":90}"));

+        mqttMsg.message.payloadlen = payloadLen;

+

+        osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, (2*MQTT_MSG_TIMEOUT));

+        

+        osMessageQueueGet(appMqttMsgHandle, &mqttMessage, 0, cmsisMAX_DELAY);

+        if(mqttMessage.cmdType == MQTT_DEMO_MSG_PUBLISH_ACK)

+        {

+            //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_1, P_SIG, 0, "appMqttTask...send ok.....");

+        }

+        else

+        {

+            //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_2, P_SIG, 0, "appMqttTask...send fail.....");

+        }

+    }

+}

+

+int mqtt_demo_recv_task_init(void)

+{

+    osThreadAttr_t task_attr;

+

+    memset(&task_attr, 0, sizeof(task_attr));

+    task_attr.name = "mqttRecv";

+#ifdef MBTK_OPENCPU_SUPPORT

+    task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;

+#else

+    task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;

+#endif

+    #if defined FEATURE_LITEOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal4;

+    #elif defined FEATURE_FREERTOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal7;

+    #endif

+

+    mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);

+    if(mqttRecvTaskHandle == NULL)

+    {

+        return FAILURE;

+    }

+

+    return SUCCESS;

+}

+

+int mqtt_demo_send_task_init(void)

+{

+    osThreadAttr_t task_attr;

+    

+    memset(&task_attr, 0, sizeof(task_attr));

+    task_attr.name = "mqttSend";

+    task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;

+    #if defined FEATURE_LITEOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal3;

+    #elif defined FEATURE_FREERTOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal6;

+    #endif

+    mqttSendTaskHandle = osThreadNew(mqtt_demo_send_task, NULL,&task_attr);

+    if(mqttSendTaskHandle == NULL)

+    {

+        return FAILURE;

+    }

+

+    return SUCCESS;

+}

+

+int app_mqtt_demo_task_init(void)

+{

+    osThreadAttr_t task_attr;

+

+    if(mqttSendMsgHandle == NULL)

+    {

+        mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);

+    }

+    

+    if(appMqttMsgHandle == NULL)

+    {

+        appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);

+    }

+    

+    if(mqttMutex1.sem == NULL)

+    {

+        MutexInit(&mqttMutex1);

+    }

+

+    memset(&task_attr, 0, sizeof(task_attr));

+    task_attr.name = "appTask";

+    task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;

+    #if defined FEATURE_LITEOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal2;

+    #elif defined FEATURE_FREERTOS_ENABLE

+    task_attr.priority = osPriorityBelowNormal5;

+    #endif

+    appMqttTaskHandle = osThreadNew(app_mqtt_demo_task, NULL,&task_attr);

+    if(appMqttTaskHandle == NULL)

+    {

+        return FAILURE;

+    }

+

+    return SUCCESS;

+}

+

+