[Feature][MD310EU][task-view-1009] Create MD310EU project code directory
Change-Id: I57cd3c474efe4493ae8a242d7e2fa643ad8ecbde
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTClient.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTClient.h
new file mode 100755
index 0000000..9d9eb7a
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTClient.h
@@ -0,0 +1,372 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - documentation and platform specific header
+ * Ian Craggs - add setMessageHandler function
+ *******************************************************************************/
+
+#if !defined(__MQTT_CLIENT_C_)
+#define __MQTT_CLIENT_C_
+
+#if defined(__cplusplus)
+ extern "C" {
+#endif
+
+#if defined(WIN32_DLL) || defined(WIN64_DLL)
+ #define DLLImport __declspec(dllimport)
+ #define DLLExport __declspec(dllexport)
+#elif defined(LINUX_SO)
+ #define DLLImport extern
+ #define DLLExport __attribute__ ((visibility ("default")))
+#else
+ #define DLLImport
+ #define DLLExport
+#endif
+
+#include "MQTTLinux.h"
+#include "MQTTPacket.h"
+#if defined FEATURE_FREERTOS_ENABLE
+#include "MQTTFreeRTOS.h"
+#endif
+
+#if defined FEATURE_LITEOS_ENABLE
+#include "MQTTLiteOS.h"
+#endif
+
+#if defined(MQTTCLIENT_PLATFORM_HEADER)
+/* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value
+ * into a string constant suitable for use with include.
+ */
+#define xstr(s) str(s)
+#define str(s) #s
+#include xstr(MQTTCLIENT_PLATFORM_HEADER)
+#endif
+
+#define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */
+
+#if !defined(MAX_MESSAGE_HANDLERS)
+#ifdef MBTK_OPENCPU_SUPPORT
+#define MAX_MESSAGE_HANDLERS 10
+#else
+#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */
+#endif
+#endif
+
+enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 };
+
+/* all failure return codes must be negative */
+enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
+
+/* The Platform specific header must define the Network and Timer structures and functions
+ * which operate on them.
+ *
+typedef struct Network
+{
+ int (*mqttread)(Network*, unsigned char* read_buffer, int, int);
+ int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int);
+} Network;*/
+
+/* The Timer structure must be defined in the platform specific header,
+ * and have the following functions to operate on it. */
+extern void TimerInit(Timer*);
+extern char TimerIsExpired(Timer*);
+extern void TimerCountdownMS(Timer*, unsigned int);
+extern void TimerCountdown(Timer*, unsigned int);
+extern int TimerLeftMS(Timer*);
+
+#ifdef MBTK_OPENCPU_SUPPORT
+enum MBTK_MQTT_SSL_TYPE
+{
+ MBTK_MQTT_SSL_NONE = 0,
+ MBTK_MQTT_SSL_HAVE = 1,
+};
+typedef enum
+{
+ MQTT_CONNECT_SUCCESS_EVENT = 0,
+ MQTT_DISCONNECT_SUCCESS_EVENT = 1,
+ MQTT_CONNECT_ABORT_EVENT = 2,
+ MQTT_START_RECONNECT_EVENT = 3,
+ MQTT_RECONNECT_FAIL_EVENT = 4,
+}MqttConnectionEvent;
+
+typedef enum
+{
+ MBTK_MQTT_RECONNECT_ENABLE = 0,
+ MBTK_MQTT_RECONNECT_DISABLE = 1,
+}MBTK_Mqtt_Reconnect_FLAG;
+
+typedef void (*connect_event_cb_t)(uint32_t event,uint32_t errno);
+
+#endif
+
+typedef struct MQTTMessage
+{
+ enum QoS qos;
+ unsigned char retained;
+ unsigned char dup;
+ unsigned short id;
+ void *payload;
+ size_t payloadlen;
+} MQTTMessage;
+
+typedef struct MessageData
+{
+ MQTTMessage* message;
+ MQTTString* topicName;
+} MessageData;
+
+typedef struct MQTTConnackData
+{
+ unsigned char rc;
+ unsigned char sessionPresent;
+} MQTTConnackData;
+
+typedef struct MQTTSubackData
+{
+ enum QoS grantedQoS;
+} MQTTSubackData;
+
+typedef void (*messageHandler)(MessageData*);
+
+typedef struct MQTTClient
+{
+ unsigned int next_packetid,
+ command_timeout_ms;
+ size_t buf_size,
+ readbuf_size;
+ unsigned char *buf,
+ *readbuf;
+ unsigned int keepAliveInterval;
+ char ping_outstanding;
+ int isconnected;
+ int cleansession;
+
+ struct MessageHandlers
+ {
+ const char* topicFilter;
+ void (*fp) (MessageData*);
+#ifdef MBTK_OPENCPU_SUPPORT
+ int qos;
+#endif
+ } messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */
+
+ void (*defaultMessageHandler) (MessageData*);
+
+ Network* ipstack;
+ Timer last_sent, last_received;
+#if defined(MQTT_TASK)
+ Mutex mutex;
+ Thread thread;
+#endif
+
+#ifdef MBTK_OPENCPU_SUPPORT
+ int qos;
+ char* mbtk_mqtt_server_url;
+ int mbtk_mqtt_port;
+ MQTTPacket_connectData* mbtk_mqtt_options;
+ int is_mqtts;
+ connect_event_cb_t mqtt_connect_callback;
+ int mqtt_reconn_enable;
+ int first_sub;
+ int recv_task_num;
+ #ifdef FEATURE_MQTT_TLS_ENABLE
+ mqttsClientContext* mqtts_client;
+ #endif
+#endif
+
+} MQTTClient;
+
+typedef struct
+{
+ int cmdType;
+ char *topic;
+ int topicLen;
+ MQTTMessage message;
+}mqttSendMsg;
+
+typedef struct
+{
+ int cmdType;
+ int ret;
+ char *data;
+ int dataLen;
+}mqttDataMsg;
+enum MQTT_MSG_CMD_
+{
+ MQTT_DEMO_MSG_PUBLISH = 1,
+ MQTT_DEMO_MSG_PUBLISH_ACK = 2,
+ MQTT_DEMO_MSG_SUB,
+ MQTT_DEMO_MSG_UNSUB,
+ MQTT_DEMO_MSG_RECONNECT,
+};
+
+#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
+
+#define ALI_SHA1_KEY_IOPAD_SIZE (64)
+#define ALI_SHA1_DIGEST_SIZE (20)
+
+#define ALI_SHA256_KEY_IOPAD_SIZE (64)
+#define ALI_SHA256_DIGEST_SIZE (32)
+
+#define ALI_MD5_KEY_IOPAD_SIZE (64)
+#define ALI_MD5_DIGEST_SIZE (16)
+
+#define ALI_HMAC_USED (1)
+#define ALI_HMAC_NOT_USED (0)
+
+#define MQTT_DEMO_TASK_STACK_SIZE 2048
+
+#ifdef MBTK_OPENCPU_SUPPORT
+#define MQTT_RECV_DEMO_TASK_STACK_SIZE (4096 * 2)
+#endif
+
+#define MQTT_SEND_BUFF_LEN (1024)
+#define MQTT_RECV_BUFF_LEN (1024)
+
+#define MQTT_SERVER_URI "183.230.40.39"
+
+#define MQTT_SERVER_PORT (6002)
+#define MQTT_SERVER_TOPIC_0 "$dp"
+#define MQTT_SERVER_TOPIC_1 "$dp"
+#define MQTT_SERVER_TOPIC_2 "$dp"
+
+#define MQTT_MSG_TIMEOUT 1000
+
+#define MQTT_ERR_ABRT (-13)
+#define MQTT_ERR_RST (-14)
+#define MQTT_ERR_CLSD (-15)
+#define MQTT_ERR_BADE (9)
+
+#define MQTT_SEND_TIMEOUT 2000
+#define MQTT_RECV_TIMEOUT 5000
+
+/**
+ * Create an MQTT client object
+ * @param client
+ * @param network
+ * @param command_timeout_ms
+ * @param
+ */
+DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms,
+ unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size);
+
+/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+ * The nework object must be connected to the network endpoint before calling this
+ * @param options - connect options
+ * @return success code
+ */
+DLLExport int MQTTConnectWithResults(MQTTClient* client, MQTTPacket_connectData* options,
+ MQTTConnackData* data);
+
+/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+ * The nework object must be connected to the network endpoint before calling this
+ * @param options - connect options
+ * @return success code
+ */
+DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options);
+
+/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
+ * @param client - the client object to use
+ * @param topic - the topic to publish to
+ * @param message - the message to send
+ * @return success code
+ */
+DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);
+
+/** MQTT SetMessageHandler - set or remove a per topic message handler
+ * @param client - the client object to use
+ * @param topicFilter - the topic filter set the message handler for
+ * @param messageHandler - pointer to the message handler function or NULL to remove
+ * @return success code
+ */
+DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler);
+
+/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
+ * @param client - the client object to use
+ * @param topicFilter - the topic filter to subscribe to
+ * @param message - the message to send
+ * @return success code
+ */
+DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
+
+/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
+ * @param client - the client object to use
+ * @param topicFilter - the topic filter to subscribe to
+ * @param message - the message to send
+ * @param data - suback granted QoS returned
+ * @return success code
+ */
+DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data);
+
+/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning.
+ * @param client - the client object to use
+ * @param topicFilter - the topic filter to unsubscribe from
+ * @return success code
+ */
+DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter);
+
+/** MQTT Disconnect - send an MQTT disconnect packet and close the connection
+ * @param client - the client object to use
+ * @return success code
+ */
+DLLExport int MQTTDisconnect(MQTTClient* client);
+
+/** MQTT Yield - MQTT background
+ * @param client - the client object to use
+ * @param time - the time, in milliseconds, to yield for
+ * @return success code
+ */
+DLLExport int MQTTYield(MQTTClient* client, int time);
+
+/** MQTT isConnected
+ * @param client - the client object to use
+ * @return truth value indicating whether the client is connected to the server
+ */
+DLLExport int MQTTIsConnected(MQTTClient* client);
+
+#if defined(MQTT_TASK)
+/** MQTT start background thread for a client. After this, MQTTYield should not be called.
+* @param client - the client object to use
+* @return success code
+*/
+DLLExport int MQTTStartTask(MQTTClient* client);
+#endif
+
+int MQTTInit(MQTTClient* c, Network* n, unsigned char* sendBuf, unsigned char* readBuf);
+int MQTTCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData);
+
+#ifdef FEATURE_CUCC_DM_ENABLE
+int MQTTCuccCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData);
+int MQTTCuccConnect(MQTTClient* c, char* clientID, char* username, char* password, MQTTPacket_connectData* connData);
+int MQTTCuccPublish(MQTTClient* c, const char* topicName, MQTTMessage* message);
+int MQTTCuccSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler);
+int MQTTCuccDisconnect(MQTTClient* c);
+int MQTTCuccWaitForRecv(MQTTClient* c, int packet_type, unsigned int timerOut, MQTTString *topicName, char *outPayload);
+#endif
+
+void mqtt_demo_onenet(void);
+void mqtt_demo_ali(void);
+
+void mqtt_demo_send_task(void *argument);
+void app_mqtt_demo_task(void *argument);
+int mqtt_demo_recv_task_init(void);
+int mqtt_demo_send_task_init(void);
+int app_mqtt_demo_task_init(void);
+
+
+#if defined(__cplusplus)
+ }
+#endif
+
+#endif
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTConnect.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTConnect.h
new file mode 100755
index 0000000..189a592
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTConnect.h
@@ -0,0 +1,151 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2023 IBM Corp. and others
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - add connack return code definitions
+ * Xiang Rong - 442039 Add makefile to Embedded C client
+ * Ian Craggs - fix for issue #64, bit order in connack response
+ *******************************************************************************/
+
+#ifndef MQTTCONNECT_H_
+#define MQTTCONNECT_H_
+
+#include <stdint.h>
+
+enum MQTTConnackReturnCodes
+{
+ MQTTCONNACK_CONNECTION_ACCEPTED = 0,
+ MQTTCONNACK_UNNACCEPTABLE_PROTOCOL = 1,
+ MQTTCONNACK_CLIENTID_REJECTED = 2,
+ MQTTCONNACK_SERVER_UNAVAILABLE = 3,
+ MQTTCONNACK_BAD_USERNAME_OR_PASSWORD = 4,
+ MQTTCONNACK_NOT_AUTHORIZED = 5,
+};
+
+#if !defined(DLLImport)
+ #define DLLImport
+#endif
+#if !defined(DLLExport)
+ #define DLLExport
+#endif
+
+
+typedef union
+{
+ unsigned char all; /**< all connect flags */
+#if defined(REVERSED)
+ struct
+ {
+ unsigned int username : 1; /**< 3.1 user name */
+ unsigned int password : 1; /**< 3.1 password */
+ unsigned int willRetain : 1; /**< will retain setting */
+ unsigned int willQoS : 2; /**< will QoS value */
+ unsigned int will : 1; /**< will flag */
+ unsigned int cleansession : 1; /**< clean session flag */
+ unsigned int : 1; /**< unused */
+ } bits;
+#else
+ struct
+ {
+ unsigned int : 1; /**< unused */
+ unsigned int cleansession : 1; /**< cleansession flag */
+ unsigned int will : 1; /**< will flag */
+ unsigned int willQoS : 2; /**< will QoS value */
+ unsigned int willRetain : 1; /**< will retain setting */
+ unsigned int password : 1; /**< 3.1 password */
+ unsigned int username : 1; /**< 3.1 user name */
+ } bits;
+#endif
+} MQTTConnectFlags; /**< connect flags byte */
+
+
+
+/**
+ * Defines the MQTT "Last Will and Testament" (LWT) settings for
+ * the connect packet.
+ */
+typedef struct
+{
+ /** The eyecatcher for this structure. must be MQTW. */
+ char struct_id[4];
+ /** The version number of this structure. Must be 0 */
+ int struct_version;
+ /** The LWT topic to which the LWT message will be published. */
+ MQTTString topicName;
+ /** The LWT payload. */
+ MQTTString message;
+ /**
+ * The retained flag for the LWT message (see MQTTAsync_message.retained).
+ */
+ unsigned char retained;
+ /**
+ * The quality of service setting for the LWT message (see
+ * MQTTAsync_message.qos and @ref qos).
+ */
+ char qos;
+} MQTTPacket_willOptions;
+
+
+#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 }
+
+
+typedef struct
+{
+ /** The eyecatcher for this structure. must be MQTC. */
+ char struct_id[4];
+ /** The version number of this structure. Must be 0 */
+ int struct_version;
+ /** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
+ */
+ unsigned char MQTTVersion;
+ MQTTString clientID;
+ unsigned short keepAliveInterval;
+ unsigned char cleansession;
+ unsigned char willFlag;
+ MQTTPacket_willOptions will;
+ MQTTString username;
+ MQTTString password;
+} MQTTPacket_connectData;
+
+typedef union
+{
+ unsigned char all; /**< all connack flags */
+#if defined(REVERSED)
+ struct
+ {
+ unsigned int reserved : 7; /**< unused */
+ unsigned int sessionpresent : 1; /**< session present flag */
+ } bits;
+#else
+ struct
+ {
+ unsigned int sessionpresent : 1; /**< session present flag */
+ unsigned int reserved: 7; /**< unused */
+ } bits;
+#endif
+} MQTTConnackFlags; /**< connack flags byte */
+
+#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \
+ MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} }
+
+DLLExport int MQTTSerialize_connect(unsigned char* buf, int32_t buflen, MQTTPacket_connectData* options);
+DLLExport int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int32_t len);
+
+DLLExport int MQTTSerialize_connack(unsigned char* buf, int32_t buflen, unsigned char connack_rc, unsigned char sessionPresent);
+DLLExport int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int32_t buflen);
+
+DLLExport int MQTTSerialize_disconnect(unsigned char* buf, int32_t buflen);
+DLLExport int MQTTDeserialize_disconnect(unsigned char* buf, int32_t buflen);
+DLLExport int MQTTSerialize_pingreq(unsigned char* buf, int32_t buflen);
+
+#endif /* MQTTCONNECT_H_ */
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTFormat.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTFormat.h
new file mode 100755
index 0000000..cbae3fb
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTFormat.h
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2023 IBM Corp., Ian Craggs and others
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#if !defined(MQTTFORMAT_H)
+#define MQTTFORMAT_H
+
+#include "StackTrace.h"
+#include "MQTTPacket.h"
+
+const char* MQTTPacket_getName(unsigned short packetid);
+int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data);
+int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent);
+int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained,
+ unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen);
+int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid);
+int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count,
+ MQTTString topicFilters[], unsigned char requestedQoSs[]);
+int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, unsigned char* grantedQoSs);
+int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid,
+ int count, MQTTString topicFilters[]);
+char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int32_t buflen);
+char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int32_t buflen);
+
+#endif
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTLinux.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTLinux.h
new file mode 100755
index 0000000..749cd37
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTLinux.h
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Allan Stockdill-Mander - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#if !defined(__MQTT_LINUX_)
+#define __MQTT_LINUX_
+
+#if defined(WIN32_DLL) || defined(WIN64_DLL)
+ #define DLLImport __declspec(dllimport)
+ #define DLLExport __declspec(dllexport)
+#elif defined(LINUX_SO)
+ #define DLLImport extern
+ #define DLLExport __attribute__ ((visibility ("default")))
+#else
+ #define DLLImport
+ #define DLLExport
+#endif
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/param.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <openssl/evp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+
+typedef struct Timer
+{
+ struct timeval end_time;
+} Timer;
+
+void TimerInit(Timer*);
+char TimerIsExpired(Timer*);
+void TimerCountdownMS(Timer*, unsigned int);
+void TimerCountdown(Timer*, unsigned int);
+int TimerLeftMS(Timer*);
+
+typedef struct Network
+{
+ int my_socket;
+ int (*mqttread) (struct Network*, unsigned char*, int, int);
+ int (*mqttwrite) (struct Network*, unsigned char*, int, int);
+ SSL *ssl;
+ int useSSL;
+} Network;
+
+int linux_read(Network*, unsigned char*, int, int);
+int linux_write(Network*, unsigned char*, int, int);
+
+DLLExport void NetworkInit(Network*);
+DLLExport int NetworkConnect(Network*, char*, int);
+DLLExport int NetworkConnectBySSL(Network*, const char*, int, const char*);
+DLLExport int NetworkConnectNotSSL(Network*, const char*, const char*);
+DLLExport void NetworkDisconnect(Network*);
+DLLExport void ShowCerts(SSL *);
+
+#endif
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTPacket.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTPacket.h
new file mode 100755
index 0000000..6a55f3f
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTPacket.h
@@ -0,0 +1,139 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2023 IBM Corp., Ian Craggs and others
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Xiang Rong - 442039 Add makefile to Embedded C client
+ *******************************************************************************/
+
+#ifndef MQTTPACKET_H_
+#define MQTTPACKET_H_
+
+#include <stdint.h>
+
+#if defined(__cplusplus) /* If this is a C++ compiler, use C linkage */
+extern "C" {
+#endif
+
+#if defined(WIN32_DLL) || defined(WIN64_DLL)
+ #define DLLImport __declspec(dllimport)
+ #define DLLExport __declspec(dllexport)
+#elif defined(LINUX_SO)
+ #define DLLImport extern
+ #define DLLExport __attribute__ ((visibility ("default")))
+#else
+ #define DLLImport
+ #define DLLExport
+#endif
+
+enum errors
+{
+ MQTTPACKET_BUFFER_TOO_SHORT = -2,
+ MQTTPACKET_READ_ERROR = -1,
+ MQTTPACKET_READ_COMPLETE
+};
+
+enum msgTypes
+{
+ CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
+ PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
+ PINGREQ, PINGRESP, DISCONNECT
+#if defined(MQTTV5)
+ , AUTH
+#endif
+};
+
+/**
+ * Bitfields for the MQTT header byte.
+ */
+typedef union
+{
+ unsigned char byte; /**< the whole byte */
+#if defined(REVERSED)
+ struct
+ {
+ unsigned int type : 4; /**< message type nibble */
+ unsigned int dup : 1; /**< DUP flag bit */
+ unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
+ unsigned int retain : 1; /**< retained flag bit */
+ } bits;
+#else
+ struct
+ {
+ unsigned int retain : 1; /**< retained flag bit */
+ unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
+ unsigned int dup : 1; /**< DUP flag bit */
+ unsigned int type : 4; /**< message type nibble */
+ } bits;
+#endif
+} MQTTHeader;
+
+typedef struct
+{
+ int32_t len;
+ char* data;
+} MQTTLenString;
+
+typedef struct
+{
+ char* cstring;
+ MQTTLenString lenstring;
+} MQTTString;
+
+#define MQTTString_initializer {NULL, {0, NULL}}
+
+int MQTTstrlen(MQTTString mqttstring);
+
+#include "MQTTConnect.h"
+#include "MQTTPublish.h"
+#include "MQTTSubscribe.h"
+#include "MQTTUnsubscribe.h"
+#include "MQTTFormat.h"
+
+DLLExport int32_t MQTTSerialize_ack(unsigned char* buf, int32_t buflen, unsigned char type, unsigned char dup, unsigned short packetid);
+DLLExport int32_t MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int32_t buflen);
+
+int32_t MQTTPacket_VBIlen(int32_t rem_len);
+int32_t MQTTPacket_len(int32_t rem_len);
+DLLExport int MQTTPacket_equals(MQTTString* a, char* b);
+
+DLLExport int32_t MQTTPacket_encode(unsigned char* buf, int32_t length);
+int32_t MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int32_t* value);
+int32_t MQTTPacket_decodeBuf(unsigned char* buf, int32_t* value);
+
+int readInt(unsigned char** pptr);
+char readChar(unsigned char** pptr);
+void writeChar(unsigned char** pptr, char c);
+void writeInt(unsigned char** pptr, int anInt);
+int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata);
+void writeCString(unsigned char** pptr, const char* string);
+void writeMQTTString(unsigned char** pptr, MQTTString mqttstring);
+
+DLLExport int MQTTPacket_read(unsigned char* buf, int32_t buflen, int (*getfn)(unsigned char*, int));
+
+typedef struct {
+ int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */
+ void *sck; /* pointer to whatever the system may use to identify the transport */
+ int multiplier;
+ int rem_len;
+ int32_t len;
+ char state;
+}MQTTTransport;
+
+int MQTTPacket_readnb(unsigned char* buf, int32_t buflen, MQTTTransport *trp);
+
+#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */
+}
+#endif
+
+
+#endif /* MQTTPACKET_H_ */
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTPublish.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTPublish.h
new file mode 100755
index 0000000..afd1c42
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTPublish.h
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2023 IBM Corp., Ian Craggs and others
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Xiang Rong - 442039 Add makefile to Embedded C client
+ *******************************************************************************/
+
+#ifndef MQTTPUBLISH_H_
+#define MQTTPUBLISH_H_
+
+#if !defined(DLLImport)
+ #define DLLImport
+#endif
+#if !defined(DLLExport)
+ #define DLLExport
+#endif
+
+DLLExport int32_t MQTTSerialize_publish(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned char qos, unsigned char retained, unsigned short packetid,
+ MQTTString topicName, unsigned char* payload, int32_t payloadlen);
+
+DLLExport int32_t MQTTDeserialize_publish(unsigned char* dup, unsigned char* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,
+ unsigned char** payload, int32_t* payloadlen, unsigned char* buf, int32_t len);
+
+DLLExport int32_t MQTTSerialize_puback(unsigned char* buf, int32_t buflen, unsigned short packetid);
+DLLExport int32_t MQTTSerialize_pubrel(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid);
+DLLExport int32_t MQTTSerialize_pubcomp(unsigned char* buf, int32_t buflen, unsigned short packetid);
+
+#endif /* MQTTPUBLISH_H_ */
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTSubscribe.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTSubscribe.h
new file mode 100755
index 0000000..18f53cf
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTSubscribe.h
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Xiang Rong - 442039 Add makefile to Embedded C client
+ *******************************************************************************/
+
+#ifndef MQTTSUBSCRIBE_H_
+#define MQTTSUBSCRIBE_H_
+
+#if !defined(DLLImport)
+ #define DLLImport
+#endif
+#if !defined(DLLExport)
+ #define DLLExport
+#endif
+
+DLLExport int32_t MQTTSerialize_subscribe(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid,
+ int count, MQTTString topicFilters[], unsigned char requestedQoSs[]);
+
+DLLExport int32_t MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid,
+ int maxcount, int* count, MQTTString topicFilters[], unsigned char requestedQoSs[], unsigned char* buf, int32_t len);
+
+DLLExport int32_t MQTTSerialize_suback(unsigned char* buf, int32_t buflen, unsigned short packetid, int count, unsigned char* grantedQoSs);
+
+DLLExport int32_t MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, unsigned char grantedQoSs[], unsigned char* buf, int32_t len);
+
+
+#endif /* MQTTSUBSCRIBE_H_ */
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTTls.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTTls.h
new file mode 100755
index 0000000..65d1538
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTTls.h
@@ -0,0 +1,71 @@
+
+#ifndef MQTT_DTLS_H
+#define MQTT_DTLS_H
+
+
+#include "mbedtls/net.h"
+#include "mbedtls/ssl.h"
+#include "mbedtls/certs.h"
+#include "mbedtls/entropy.h"
+#include "mbedtls/ctr_drbg.h"
+
+
+
+
+#define MQTT_MAX_TIMEOUT (10 * 60) //10 min
+
+
+typedef struct mqttsClientSslTag
+{
+ mbedtls_ssl_context sslContext;
+ mbedtls_net_context netContext;
+ mbedtls_ssl_config sslConfig;
+ mbedtls_entropy_context entropyContext;
+ mbedtls_ctr_drbg_context ctrDrbgContext;
+ mbedtls_x509_crt_profile crtProfile;
+ mbedtls_x509_crt caCert;
+ mbedtls_x509_crt clientCert;
+ mbedtls_pk_context pkContext;
+}mqttsClientSsl;
+
+typedef struct mqttsClientContextTag
+{
+ int socket;
+ int timeout_s;
+ int timeout_r;
+ int isMqtts;
+ int method;
+ uint16_t port;
+ unsigned int keepAliveInterval;
+ size_t sendBufSize;
+ size_t readBufSize;
+ unsigned char *sendBuf;
+ unsigned char *readBuf;
+
+ mqttsClientSsl * ssl;
+ char *caCert;
+ char *clientCert;
+ char *clientPk;
+ char *hostName;
+ char *psk_key;
+ char *psk_identity;
+ int caCertLen;
+ int clientCertLen;
+ int clientPkLen;
+ uint8_t seclevel;//0:no verify; 1:verify server; 2:both verify
+ int32_t ciphersuite[2];//just like 0x0035 TLS_RSA_WITH_AES_256_CBC_SHA,ciphersuite[1] must NULL
+ uint8_t pdpId;//pdp context id--cid--0 is default
+
+}mqttsClientContext;
+
+
+
+int mqttSslConn_old(mqttsClientContext* context, char* host);
+int mqttSslSend(mqttsClientContext* context, unsigned char* buf, int len);
+int mqttSslRecv(mqttsClientContext* context, unsigned char* buf, int minLen, int maxLen, int* pReadLen);
+int mqttSslRead(mqttsClientContext* context, unsigned char *buffer, int len, int timeout_ms);
+int mqttSslClose(mqttsClientContext* context);
+int mqttSslConn_new(mqttsClientContext* context, char* host);
+
+#endif
+
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTUnsubscribe.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTUnsubscribe.h
new file mode 100755
index 0000000..866da98
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/MQTTUnsubscribe.h
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2023 IBM Corp., Ian Craggs and others
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Xiang Rong - 442039 Add makefile to Embedded C client
+ *******************************************************************************/
+
+#ifndef MQTTUNSUBSCRIBE_H_
+#define MQTTUNSUBSCRIBE_H_
+
+#if !defined(DLLImport)
+ #define DLLImport
+#endif
+#if !defined(DLLExport)
+ #define DLLExport
+#endif
+
+DLLExport int32_t MQTTSerialize_unsubscribe(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid,
+ int count, MQTTString topicFilters[]);
+
+DLLExport int32_t MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int max_count, int* count, MQTTString topicFilters[],
+ unsigned char* buf, int32_t len);
+
+DLLExport int32_t MQTTSerialize_unsuback(unsigned char* buf, int32_t buflen, unsigned short packetid);
+
+DLLExport int32_t MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int32_t len);
+
+#endif /* MQTTUNSUBSCRIBE_H_ */
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/StackTrace.h b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/StackTrace.h
new file mode 100755
index 0000000..2808a0d
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/include/StackTrace.h
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - fix for bug #434081
+ *******************************************************************************/
+
+#ifndef STACKTRACE_H_
+#define STACKTRACE_H_
+
+#include <stdio.h>
+#define NOSTACKTRACE 1
+
+#if defined(NOSTACKTRACE)
+#define FUNC_ENTRY
+#define FUNC_ENTRY_NOLOG
+#define FUNC_ENTRY_MED
+#define FUNC_ENTRY_MAX
+#define FUNC_EXIT
+#define FUNC_EXIT_NOLOG
+#define FUNC_EXIT_MED
+#define FUNC_EXIT_MAX
+#define FUNC_EXIT_RC(x)
+#define FUNC_EXIT_MED_RC(x)
+#define FUNC_EXIT_MAX_RC(x)
+
+#else
+
+#if defined(WIN32)
+#define inline __inline
+#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM)
+#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1)
+#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM)
+#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM)
+#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM)
+#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1)
+#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM)
+#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM)
+#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM)
+#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM)
+#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM)
+#else
+#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM)
+#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1)
+#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM)
+#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM)
+#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM)
+#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1)
+#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM)
+#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM)
+#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM)
+#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM)
+#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM)
+
+void StackTrace_entry(const char* name, int line, int trace);
+void StackTrace_exit(const char* name, int line, void* return_value, int trace);
+
+void StackTrace_printStack(FILE* dest);
+char* StackTrace_get(unsigned long);
+
+#endif
+
+#endif
+
+
+
+
+#endif /* STACKTRACE_H_ */
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so
new file mode 120000
index 0000000..ef0786f
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so
@@ -0,0 +1 @@
+libpaho-embed-mqtt3c.so.1
\ No newline at end of file
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so.1 b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so.1
new file mode 120000
index 0000000..b1804cc
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so.1
@@ -0,0 +1 @@
+libpaho-embed-mqtt3c.so.1.0
\ No newline at end of file
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so.1.0 b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so.1.0
new file mode 100755
index 0000000..fb6d5d5
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/lib/libpaho-embed-mqtt3c.so.1.0
Binary files differ
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/*.o b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/*.o
new file mode 100755
index 0000000..a08962c
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/*.o
Binary files differ
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTClient.c b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTClient.c
new file mode 100755
index 0000000..39f6a8e
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTClient.c
@@ -0,0 +1,2352 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - fix for #96 - check rem_len in readPacket
+ * Ian Craggs - add ability to set message handler separately #6
+ *******************************************************************************/
+
+#include <stdio.h>
+#include <stdarg.h>
+//#include "commontypedef.h"
+#include "MQTTClient.h"
+#ifdef FEATURE_MQTT_TLS_ENABLE
+#include "sha1.h"
+#include "sha256.h"
+#include "md5.h"
+#endif
+//#include "debug_trace.h"
+//#include DEBUG_LOG_HEADER_FILE
+
+char *mqttSendbuf = NULL;
+char *mqttReadbuf = NULL;
+
+unsigned char mqttJsonbuff[256] = {0};
+char mqtt_payload[128] = {0};
+int ec_sensor_temp = 20;
+char ec_data_type = 3;
+int ec_data_len = 0;
+//osMessageQueueId_t mqttRecvMsgHandle = NULL;
+//osMessageQueueId_t mqttSendMsgHandle = NULL;
+//osMessageQueueId_t appMqttMsgHandle = NULL;
+
+///osThreadId_t mqttRecvTaskHandle = NULL;
+
+#ifdef MBTK_OPENCPU_SUPPORT
+#define MAX_RECV_TASK 5
+osThreadId_t mbtk_mqttRecvTaskHandle[MAX_RECV_TASK];
+Mutex mbtk_mqttMutex1[MAX_RECV_TASK];
+#endif
+
+//osThreadId_t mqttSendTaskHandle = NULL;
+//osThreadId_t appMqttTaskHandle = NULL;
+
+//Mutex mqttMutex1;
+//Mutex mqttMutex2;
+MQTTClient mqttClient;
+Network mqttNetwork;
+int mqtt_send_task_status_flag = 0;
+int mqtt_keepalive_retry_count = 0;
+char mqttHb2Hex(unsigned char hb)
+{
+ hb = hb&0xF;
+ return (char)(hb<10 ? '0'+hb : hb-10+'a');
+}
+
+#ifdef FEATURE_MQTT_TLS_ENABLE
+void mqttAliHmacSha1(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
+{
+ int i;
+ mbedtls_sha1_context ctx;
+ unsigned char k_ipad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};
+ unsigned char k_opad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};
+ unsigned char tempbuf[ALI_SHA1_DIGEST_SIZE];
+
+ memset(k_ipad, 0x36, ALI_SHA1_KEY_IOPAD_SIZE);
+ memset(k_opad, 0x5C, ALI_SHA1_KEY_IOPAD_SIZE);
+
+ for(i=0; i<keylen; i++)
+ {
+ if(i>=ALI_SHA1_KEY_IOPAD_SIZE)
+ {
+ break;
+ }
+ k_ipad[i] ^=key[i];
+ k_opad[i] ^=key[i];
+ }
+ mbedtls_sha1_init(&ctx);
+
+ mbedtls_sha1_starts(&ctx);
+ mbedtls_sha1_update(&ctx, k_ipad, ALI_SHA1_KEY_IOPAD_SIZE);
+ mbedtls_sha1_update(&ctx, input, ilen);
+ mbedtls_sha1_finish(&ctx, tempbuf);
+
+ mbedtls_sha1_starts(&ctx);
+ mbedtls_sha1_update(&ctx, k_opad, ALI_SHA1_KEY_IOPAD_SIZE);
+ mbedtls_sha1_update(&ctx, tempbuf, ALI_SHA1_DIGEST_SIZE);
+ mbedtls_sha1_finish(&ctx, tempbuf);
+
+ for(i=0; i<ALI_SHA1_DIGEST_SIZE; ++i)
+ {
+ output[i*2] = mqttHb2Hex(tempbuf[i]>>4);
+ output[i*2+1] = mqttHb2Hex(tempbuf[i]);
+ }
+ mbedtls_sha1_free(&ctx);
+}
+/*
+ * output = SHA-256( input buffer )
+ */
+void mqttAliHmacSha256(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
+{
+ int i;
+ mbedtls_sha256_context ctx;
+ unsigned char k_ipad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};
+ unsigned char k_opad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};
+
+ memset(k_ipad, 0x36, 64);
+ memset(k_opad, 0x5C, 64);
+
+ if ((NULL == input) || (NULL == key) || (NULL == output)) {
+ return;
+ }
+
+ if (keylen > ALI_SHA256_KEY_IOPAD_SIZE) {
+ return;
+ }
+
+ for(i=0; i<keylen; i++)
+ {
+ if(i>=ALI_SHA256_KEY_IOPAD_SIZE)
+ {
+ break;
+ }
+ k_ipad[i] ^=key[i];
+ k_opad[i] ^=key[i];
+ }
+ mbedtls_sha256_init(&ctx);
+
+ mbedtls_sha256_starts(&ctx, 0);
+ mbedtls_sha256_update(&ctx, k_ipad, ALI_SHA256_KEY_IOPAD_SIZE);
+ mbedtls_sha256_update(&ctx, input, ilen);
+ mbedtls_sha256_finish(&ctx, output);
+
+ mbedtls_sha256_starts(&ctx, 0);
+ mbedtls_sha256_update(&ctx, k_opad, ALI_SHA256_KEY_IOPAD_SIZE);
+ mbedtls_sha256_update(&ctx, output, ALI_SHA256_DIGEST_SIZE);
+ mbedtls_sha256_finish(&ctx, output);
+
+ mbedtls_sha256_free(&ctx);
+}
+/*
+ * output = MD-5( input buffer )
+ */
+void mqttAliHmacMd5(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
+{
+ int i;
+ mbedtls_md5_context ctx;
+ unsigned char k_ipad[ALI_MD5_KEY_IOPAD_SIZE] = {0};
+ unsigned char k_opad[ALI_MD5_KEY_IOPAD_SIZE] = {0};
+ unsigned char tempbuf[ALI_MD5_DIGEST_SIZE];
+
+ memset(k_ipad, 0x36, ALI_MD5_KEY_IOPAD_SIZE);
+ memset(k_opad, 0x5C, ALI_MD5_KEY_IOPAD_SIZE);
+
+ for(i=0; i<keylen; i++)
+ {
+ if(i>=ALI_MD5_KEY_IOPAD_SIZE)
+ {
+ break;
+ }
+ k_ipad[i] ^=key[i];
+ k_opad[i] ^=key[i];
+ }
+ mbedtls_md5_init(&ctx);
+
+ mbedtls_md5_starts(&ctx);
+ mbedtls_md5_update(&ctx, k_ipad, ALI_MD5_KEY_IOPAD_SIZE);
+ mbedtls_md5_update(&ctx, input, ilen);
+ mbedtls_md5_finish(&ctx, tempbuf);
+
+ mbedtls_md5_starts(&ctx);
+ mbedtls_md5_update(&ctx, k_opad, ALI_MD5_KEY_IOPAD_SIZE);
+ mbedtls_md5_update(&ctx, tempbuf, ALI_MD5_DIGEST_SIZE);
+ mbedtls_md5_finish(&ctx, tempbuf);
+
+ for(i=0; i<ALI_MD5_DIGEST_SIZE; ++i)
+ {
+ output[i*2] = mqttHb2Hex(tempbuf[i]>>4);
+ output[i*2+1] = mqttHb2Hex(tempbuf[i]);
+ }
+ mbedtls_md5_free(&ctx);
+}
+#endif
+void mqttDefMessageArrived(MessageData* data)
+{
+ char *bufTemp = NULL;
+ bufTemp = malloc(data->message->payloadlen+1);
+ memset(bufTemp, 0, data->message->payloadlen+1);
+ memcpy(bufTemp, data->message->payload, data->message->payloadlen);
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_2, P_SIG, ".........MQTT topic is:%s", (const uint8_t *)data->topicName->lenstring.data);
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_1, P_SIG, ".........MQTT_messageArrived is:%s", (const uint8_t *)bufTemp);
+ free(bufTemp);
+}
+
+static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
+ md->topicName = aTopicName;
+ md->message = aMessage;
+}
+
+
+static int getNextPacketId(MQTTClient *c) {
+ return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
+}
+
+static int sendPacket(MQTTClient* c, int length, Timer* timer)
+{
+ int rc = FAILURE,
+ sent = 0;
+
+#ifdef FEATURE_MQTT_TLS_ENABLE
+#ifdef MBTK_OPENCPU_SUPPORT
+ if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
+ {
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls3, P_INFO, 0, "...mqttSendPacket..0.");
+ rc = mqttSslSend(c->mqtts_client, &c->mqtts_client->sendBuf[sent], length);
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls4, P_INFO, 1, "...mqttSendPacket..=%d.", rc);
+ TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
+ return rc;
+ }
+ else
+#endif
+#endif
+ {
+
+ while (sent < length && !TimerIsExpired(timer))
+ {
+ #ifdef MQTT_RAI_OPTIMIZE
+ rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer), 0, false);
+ #else
+ rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
+ #endif
+ if (rc < 0) // there was an error writing the data
+ break;
+ sent += rc;
+ }
+ if (sent == length)
+ {
+ TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
+ rc = SUCCESS;
+ }
+ else
+ rc = FAILURE;
+ return rc;
+
+ }
+}
+
+void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
+ unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
+{
+ int i;
+ c->ipstack = network;
+
+ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ c->messageHandlers[i].topicFilter = 0;
+ c->command_timeout_ms = command_timeout_ms;
+ c->buf = sendbuf;
+ c->buf_size = sendbuf_size;
+ c->readbuf = readbuf;
+ c->readbuf_size = readbuf_size;
+ c->isconnected = 0;
+ c->cleansession = 0;
+ c->ping_outstanding = 0;
+ c->defaultMessageHandler = mqttDefMessageArrived;
+ c->next_packetid = 1;
+ TimerInit(&c->last_sent);
+ TimerInit(&c->last_received);
+
+#ifdef FEATURE_MQTT_TLS_ENABLE
+#ifdef MBTK_OPENCPU_SUPPORT
+ c->recv_task_num = -1;
+ if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
+ {
+ c->mqtts_client->sendBuf = sendbuf;
+ c->mqtts_client->sendBufSize = sendbuf_size;
+ c->mqtts_client->readBuf = readbuf;
+ c->mqtts_client->readBufSize = readbuf_size;
+ }
+#endif
+#endif
+
+#if defined(MQTT_TASK)
+ MutexInit(&c->mutex);
+#endif
+}
+
+static int decodePacket(MQTTClient* c, int* value, int timeout)
+{
+ unsigned char i;
+ int multiplier = 1;
+ int len = 0;
+ const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
+
+ *value = 0;
+ do
+ {
+ int rc = MQTTPACKET_READ_ERROR;
+
+ if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+ {
+ rc = MQTTPACKET_READ_ERROR; /* bad data */
+ goto exit;
+ }
+
+#ifdef FEATURE_MQTT_TLS_ENABLE
+ #ifdef MBTK_OPENCPU_SUPPORT
+ if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
+ {
+ rc = mqttSslRead(c->mqtts_client, &i, 1, timeout);
+ }
+ else
+ {
+ rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
+ }
+ #endif
+#else
+ rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
+#endif
+
+ if (rc != 1)
+ goto exit;
+ *value += (i & 127) * multiplier;
+ multiplier *= 128;
+ } while ((i & 128) != 0);
+exit:
+ return len;
+}
+
+static int readPacket(MQTTClient* c, Timer* timer)
+{
+ MQTTHeader header = {0};
+ int len = 0;
+ int rem_len = 0;
+ int rc = 0;
+
+#ifdef FEATURE_MQTT_TLS_ENABLE
+#ifdef MBTK_OPENCPU_SUPPORT
+ if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
+ {
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls0, P_INFO, 0, "...mqttReadPacket..0.");
+ /* 1. read the header byte. This has the packet type in it */
+ rc = mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf, 1, TimerLeftMS(timer));
+ if (rc != 1)
+ {
+ goto exit;
+ }
+
+ len = 1;
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls1, P_INFO, 0, "...mqttReadPacket..1.");
+ /* 2. read the remaining length. This is variable in itself */
+ decodePacket(c, &rem_len, TimerLeftMS(timer));
+ len += MQTTPacket_encode(c->mqtts_client->readBuf + 1, rem_len); /* put the original remaining length back into the buffer */
+
+ if (rem_len > (c->mqtts_client->readBufSize - len))
+ {
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls202, P_INFO, 0, "...mqttReadPacket..202.");
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls2, P_INFO, 0, "...mqttReadPacket..2.");
+ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+ if (rem_len > 0 && (mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf + len, rem_len, TimerLeftMS(timer)) != rem_len))
+ {
+ rc = 0;
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls200, P_INFO, 0, "...mqttReadPacket..200.");
+ goto exit;
+ }
+
+ header.byte = c->mqtts_client->readBuf[0];
+ rc = header.bits.type;
+ if (c->keepAliveInterval > 0)
+ {
+ ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls201, P_INFO, 0, "...mqttReadPacket..201.");
+ TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
+ }
+ }
+ else
+#endif
+#endif
+ {
+
+ /* 1. read the header byte. This has the packet type in it */
+ rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
+ if (rc != 1)
+ goto exit;
+
+ len = 1;
+ /* 2. read the remaining length. This is variable in itself */
+ decodePacket(c, &rem_len, TimerLeftMS(timer));
+ len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
+
+ if (rem_len > (c->readbuf_size - len))
+ {
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
+ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+ if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
+ rc = 0;
+ goto exit;
+ }
+
+ header.byte = c->readbuf[0];
+ rc = header.bits.type;
+ if (c->keepAliveInterval > 0)
+ TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
+
+ }
+
+exit:
+ return rc;
+}
+
+// assume topic filter and name is in correct format
+// # can only be at end
+// + and # can only be next to separator
+static char isTopicMatched(char* topicFilter, MQTTString* topicName)
+{
+ char* curf = topicFilter;
+ char* curn = topicName->lenstring.data;
+ char* curn_end = curn + topicName->lenstring.len;
+
+ while (*curf && curn < curn_end)
+ {
+ if (*curn == '/' && *curf != '/')
+ break;
+ if (*curf != '+' && *curf != '#' && *curf != *curn)
+ break;
+ if (*curf == '+')
+ { // skip until we meet the next separator, or end of string
+ char* nextpos = curn + 1;
+ while (nextpos < curn_end && *nextpos != '/')
+ nextpos = ++curn + 1;
+ }
+ else if (*curf == '#')
+ curn = curn_end - 1; // skip until end of string
+ curf++;
+ curn++;
+ };
+
+ return (curn == curn_end) && (*curf == '\0');
+}
+
+int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
+{
+ int i;
+ int rc = FAILURE;
+ //ECOMM_TRACE(UNILOG_MQTT, deliverMessage1, P_SIG, 0, "....1....deliverMessage..");
+
+ // we have to find the right message handler - indexed by topic
+ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ {
+ if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
+ isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
+ {
+ if (c->messageHandlers[i].fp != NULL)
+ {
+ MessageData md;
+ NewMessageData(&md, topicName, message);
+ c->messageHandlers[i].fp(&md);
+ rc = SUCCESS;
+ //ECOMM_TRACE(UNILOG_MQTT, deliverMessage2, P_SIG, 0, "....2....deliverMessage..");
+ }
+ }
+ }
+
+ if (rc == FAILURE && c->defaultMessageHandler != NULL)
+ {
+ MessageData md;
+ //ECOMM_TRACE(UNILOG_MQTT, deliverMessage3, P_SIG, 0, "....3....deliverMessage..");
+ NewMessageData(&md, topicName, message);
+ c->defaultMessageHandler(&md);
+ rc = SUCCESS;
+ //ECOMM_TRACE(UNILOG_MQTT, deliverMessage4, P_SIG, 0, "....4....deliverMessage..");
+ }
+
+ return rc;
+}
+
+int keepalive(MQTTClient* c)
+{
+ int rc = SUCCESS;
+
+ if (c->keepAliveInterval == 0)
+ {
+ goto exit;
+ }
+
+ if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
+ {
+ if (c->ping_outstanding)
+ {
+ mqtt_keepalive_retry_count++;
+ //ECOMM_TRACE(UNILOG_MQTT, keepalive_0, P_SIG, 0, "....keepalive....ping_outstanding..=1..");
+ rc = FAILURE; /* PINGRESP not received in keepalive interval */
+ }
+ else
+ {
+ Timer timer;
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, 1000);
+
+ #ifdef MBTK_OPENCPU_SUPPORT
+ memset(c->buf, 0, MQTT_SEND_BUFF_LEN);
+ memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);
+ #endif
+
+ //memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
+ //memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
+ int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
+
+ //ECOMM_TRACE(UNILOG_MQTT, keepalive_1, P_SIG, 0, "....keepalive....send packet..");
+ if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
+ c->ping_outstanding = 1;
+ }
+ }
+
+exit:
+ return rc;
+}
+
+int keepaliveRetry(MQTTClient* c)
+{
+ int rc = SUCCESS;
+
+ if (c->keepAliveInterval == 0)
+ {
+ goto exit;
+ }
+
+ if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
+ {
+ {
+ Timer timer;
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, 1000);
+
+ #ifdef MBTK_OPENCPU_SUPPORT
+ memset(c->buf, 0, MQTT_SEND_BUFF_LEN);
+ memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);
+ #endif
+
+ //memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
+ //memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
+ int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
+
+ //ECOMM_TRACE(UNILOG_MQTT, keepalive_1pp, P_SIG, 0, "....keepalive....send packet..");
+ if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
+ c->ping_outstanding = 1;
+ }
+ }
+
+exit:
+ return rc;
+}
+
+void MQTTCleanSession(MQTTClient* c)
+{
+ int i = 0;
+
+ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ c->messageHandlers[i].topicFilter = NULL;
+}
+
+void MQTTCloseSession(MQTTClient* c)
+{
+ c->ping_outstanding = 0;
+ c->isconnected = 0;
+ if (c->cleansession)
+ MQTTCleanSession(c);
+}
+
+#ifdef MBTK_OPENCPU_SUPPORT
+
+int mqtt_try_resubscribe(MQTTClient* c)
+{
+ int i;
+ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ {
+ if (c->messageHandlers[i].topicFilter != 0)
+ {
+ MQTTSubscribe(c, c->messageHandlers[i].topicFilter, c->messageHandlers[i].qos,c->messageHandlers[i].fp);
+ }
+ }
+ return 0;
+}
+int mqtt_try_reconnect(MQTTClient* c)
+{
+ int reconnect_flag = 1;
+ int rc = 0;
+ mqtt_keepalive_retry_count = 0;
+ int ret = 0;
+
+ if(c->isconnected == 1)
+ {
+ if(c->is_mqtts == 0)
+ {
+ ret = c->ipstack->disconnect(c->ipstack);
+ }
+ else
+ {
+#ifdef FEATURE_MQTT_TLS_ENABLE
+#ifdef MBTK_OPENCPU_SUPPORT
+ ret = mqttSslClose(c->mqtts_client);
+#endif
+#endif
+ }
+ if(ret == 0)
+ {
+ c->isconnected = 0;
+ }
+ }
+
+ if(c->mqtt_connect_callback != 0)
+ {
+ c->mqtt_connect_callback(MQTT_START_RECONNECT_EVENT,0);
+ }
+
+ c->isconnected = 0;
+#ifdef FEATURE_MQTT_TLS_ENABLE
+#ifdef MBTK_OPENCPU_SUPPORT
+ if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
+ {
+ if(mqttRecvTaskHandle)
+ rc = mqttSslConn_new(c->mqtts_client, c->mbtk_mqtt_server_url);
+ }
+ else
+#endif
+#endif
+ {
+ rc = NetworkConnectTimeout(c->ipstack, c->mbtk_mqtt_server_url, c->mbtk_mqtt_port, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT);
+ }
+ if(rc < 0)
+ {
+ reconnect_flag = 0;
+ }
+ else
+ {
+ if ((rc = (MQTTConnect(c, c->mbtk_mqtt_options))) != 0)
+ {
+ reconnect_flag = 0;
+ }
+ else
+ {
+ mqtt_try_resubscribe(c);
+ return 0;
+ }
+ }
+
+
+
+ if(reconnect_flag == 0)
+ {
+ if(c->mqtt_connect_callback != NULL)
+ {
+ int socket_status = sock_get_errno(c->ipstack->my_socket);
+ c->mqtt_connect_callback(MQTT_RECONNECT_FAIL_EVENT,socket_status);
+ }
+ }
+
+ return 0;
+}
+#endif
+
+int cycle(MQTTClient* c, Timer* timer)
+{
+ int len = 0,
+ rc = SUCCESS;
+
+ int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
+ //ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
+
+#ifdef MBTK_OPENCPU_SUPPORT
+ if(c->is_mqtts == MBTK_MQTT_SSL_NONE)
+ {
+ int socket_status = sock_get_errno(c->ipstack->my_socket);
+ if(socket_status == ENOTCONN && c->isconnected == 1)
+ {
+ if(c->mqtt_connect_callback != NULL)
+ {
+ c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,socket_status);
+ }
+ if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)
+ {
+ mqtt_try_reconnect(c);
+ }
+ else
+ {
+ goto exit;
+ }
+ }
+ else if(c->isconnected == 0)
+ {
+ goto exit;
+ }
+ }
+ if(c->is_mqtts && packet_type < 0)
+ {
+ if(c->isconnected)
+ {
+ if(c->mqtt_connect_callback != NULL)
+ {
+ c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,ENOTCONN);
+ }
+ if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)
+ {
+ mqtt_try_reconnect(c);
+ }
+ else
+ {
+ goto exit;
+ }
+ }
+ else
+ {
+ goto exit;
+ }
+ }
+#endif
+ switch (packet_type)
+ {
+ default:
+ /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
+ rc = packet_type;
+ break;
+ case 0: /* timed out reading packet */
+ break;
+ case CONNACK:
+ case PUBACK:
+ case SUBACK:
+ case UNSUBACK:
+ if(packet_type == SUBACK)
+ {
+ //rc = packet_type;
+ }
+ break;
+ case PUBLISH:
+ {
+ MQTTString topicName;
+ MQTTMessage msg;
+ int intQoS;
+ msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
+ if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
+ (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
+ goto exit;
+ msg.qos = (enum QoS)intQoS;
+ deliverMessage(c, &topicName, &msg);
+ if (msg.qos != QOS0)
+ {
+ if (msg.qos == QOS1)
+ len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
+ else if (msg.qos == QOS2)
+ len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
+ if (len <= 0)
+ rc = FAILURE;
+ else
+ rc = sendPacket(c, len, timer);
+ if (rc == FAILURE)
+ goto exit; // there was a problem
+ }
+ break;
+ }
+ case PUBREC:
+ case PUBREL:
+ {
+ unsigned short mypacketid;
+ unsigned char dup, type;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
+ rc = FAILURE;
+ else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
+ (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
+ rc = FAILURE;
+ else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
+ rc = FAILURE; // there was a problem
+ if (rc == FAILURE)
+ goto exit; // there was a problem
+ break;
+ }
+
+ case PUBCOMP:
+ break;
+ case PINGRESP:
+ c->ping_outstanding = 0;
+ break;
+ }
+
+ if (keepalive(c) != SUCCESS) {
+ int socket_stat = 0;
+#ifndef MBTK_OPENCPU_SUPPORT
+ mqttSendMsg mqttMsg;
+#endif
+ //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
+ rc = FAILURE;
+ socket_stat = sock_get_errno(c->ipstack->my_socket);
+ if((socket_stat == MQTT_ERR_ABRT)||(socket_stat == MQTT_ERR_RST)||(socket_stat == MQTT_ERR_CLSD)||(socket_stat == MQTT_ERR_BADE))
+ {
+#ifdef MBTK_OPENCPU_SUPPORT
+ if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)
+ {
+ mqtt_try_reconnect(c);
+ }
+#else
+ //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0, P_INFO, 0, ".....now, need reconnect.......");
+ /* send reconnect msg to send task */
+ memset(&mqttMsg, 0, sizeof(mqttMsg));
+ mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
+
+ osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);
+#endif
+ }
+ else
+ {
+ if(mqtt_keepalive_retry_count>3)
+ {
+#ifdef MBTK_OPENCPU_SUPPORT
+ if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)
+ {
+ mqtt_try_reconnect(c);
+ }
+#else
+ mqtt_keepalive_retry_count = 0;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_ee0, P_INFO, 0, ".....now, need reconnect.......");
+ /* send reconnect msg to send task */
+ memset(&mqttMsg, 0, sizeof(mqttMsg));
+ mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
+
+ osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);
+#endif
+ }
+ else
+ {
+ keepaliveRetry(c);
+ }
+ }
+ }
+
+exit:
+ if (rc == SUCCESS)
+ rc = packet_type;
+ else if (c->isconnected)
+ ;//MQTTCloseSession(c);
+ return rc;
+}
+
+int MQTTYield(MQTTClient* c, int timeout_ms)
+{
+ int rc = SUCCESS;
+ Timer timer;
+
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, timeout_ms);
+
+ do
+ {
+ if (cycle(c, &timer) < 0)
+ {
+ rc = FAILURE;
+ break;
+ }
+ } while (!TimerIsExpired(&timer));
+
+ return rc;
+}
+
+int MQTTIsConnected(MQTTClient* client)
+{
+ return client->isconnected;
+}
+
+void MQTTRun(void* parm)
+{
+ Timer timer;
+ MQTTClient* c = (MQTTClient*)&mqttClient;
+
+#ifndef MBTK_OPENCPU_SUPPORT
+ if(mqttSendMsgHandle == NULL)
+ {
+ mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);
+ }
+
+ if(appMqttMsgHandle == NULL)
+ {
+ appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);
+ }
+#endif
+ /*
+ if(mqttMutex1.sem == NULL)
+ {
+ MutexInit(&mqttMutex1);
+ }
+ */
+
+ TimerInit(&timer);
+
+ while (1)
+ {
+#if defined(MQTT_TASK)
+ //MutexLock(&c->mutex);
+#endif
+ //MutexLock(&mqttMutex1);
+
+ TimerCountdownMS(&timer, 1500); /* Don't wait too long if no traffic is incoming */
+ cycle(c, &timer);
+ //MutexUnlock(&mqttMutex1);
+
+#if defined(MQTT_TASK)
+ //MutexUnlock(&c->mutex);
+#endif
+ sleep(200);
+ }
+}
+
+#if defined(MQTT_TASK)
+int MQTTStartTask(MQTTClient* client)
+{
+ return ThreadStart(&client->thread, &MQTTRun, client);
+}
+#endif
+
+int MQTTStartRECVTask(void)
+{
+ //osThreadAttr_t task_attr;
+
+ //memset(&task_attr, 0, sizeof(task_attr));
+ //task_attr.name = "mqttRecv";
+ //task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
+ #if defined FEATURE_LITEOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal4;
+ #elif defined FEATURE_FREERTOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal7;
+ #endif
+
+ //mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);
+ if(mqttRecvTaskHandle == NULL)
+ {
+ return FAILURE;
+ }
+
+ return SUCCESS;
+}
+
+int waitfor(MQTTClient* c, int packet_type, Timer* timer)
+{
+ int rc = FAILURE;
+
+ do
+ {
+ if (TimerIsExpired(timer))
+ break; // we timed out
+ rc = cycle(c, timer);
+ }
+ while (rc != packet_type && rc >= 0);
+
+ return rc;
+}
+
+int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
+{
+ Timer connect_timer;
+ int rc = FAILURE;
+ MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+ int len = 0;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ if (c->isconnected) /* don't send connect packet again if we are already connected */
+ goto exit;
+
+ TimerInit(&connect_timer);
+ TimerCountdownMS(&connect_timer, c->command_timeout_ms);
+
+ if (options == 0)
+ options = &default_options; /* set default options if none were supplied */
+
+ c->keepAliveInterval = options->keepAliveInterval;
+ c->cleansession = options->cleansession;
+ TimerCountdown(&c->last_received, c->keepAliveInterval);
+ if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
+ goto exit;
+ if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
+ goto exit; // there was a problem
+
+ // this will be a blocking call, wait for the connack
+ if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
+ {
+ data->rc = 0;
+ data->sessionPresent = 0;
+ if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
+ rc = data->rc;
+ else
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+
+exit:
+#ifdef MBTK_OPENCPU_SUPPORT
+ if (rc == SUCCESS)
+ {
+ if(c->mqtt_connect_callback != NULL)
+ {
+ c->mqtt_connect_callback(MQTT_CONNECT_SUCCESS_EVENT,0);
+ }
+ c->isconnected = 1;
+ c->ping_outstanding = 0;
+ }
+#endif
+
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+
+ return rc;
+}
+
+int MQTTReConnect(MQTTClient* client, MQTTPacket_connectData* connectData)
+{
+ int ret = FAILURE;
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13hh0, P_INFO, 0, "...start tcp disconnect ..");
+ client->ipstack->disconnect(client->ipstack);
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14hh1, P_INFO, 0, "...start tcp connect ...");
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16hh3, P_INFO, 0, "...tcp connect ok...");
+ client->isconnected = 0;
+ if((NetworkConnectTimeout(client->ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) < 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17hh4, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18hh5, P_INFO, 0, "...start mqtt connect ..");
+
+ if ((MQTTConnect(client, connectData)) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19hh6, P_INFO, 0, "...mqtt reconnect fial!!!...");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20hh7, P_INFO, 0, "...mqtt reconnect ok!!!...");
+ ret = SUCCESS;
+ }
+ }
+ }
+ return ret;
+}
+
+int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
+{
+ MQTTConnackData data;
+ return MQTTConnectWithResults(c, options, &data);
+}
+
+int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
+{
+ int rc = FAILURE;
+ int i = -1;
+
+ /* first check for an existing matching slot */
+ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ {
+ if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
+ {
+ if (messageHandler == NULL) /* remove existing */
+ {
+ c->messageHandlers[i].topicFilter = NULL;
+ c->messageHandlers[i].fp = NULL;
+ }
+ rc = SUCCESS; /* return i when adding new subscription */
+ break;
+ }
+ }
+ /* if no existing, look for empty slot (unless we are removing) */
+ if (messageHandler != NULL) {
+ if (rc == FAILURE)
+ {
+ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ {
+ if (c->messageHandlers[i].topicFilter == NULL)
+ {
+ rc = SUCCESS;
+ break;
+ }
+ }
+ }
+ if (i < MAX_MESSAGE_HANDLERS)
+ {
+ c->messageHandlers[i].topicFilter = topicFilter;
+ c->messageHandlers[i].fp = messageHandler;
+#ifdef MBTK_OPENCPU_SUPPORT
+ c->messageHandlers[i].qos = c->qos;
+#endif
+ }
+ }
+ return rc;
+}
+
+#ifdef MBTK_OPENCPU_SUPPORT
+
+void mbtk_MQTTRun(void* parm)
+{
+ Timer timer;
+ MQTTClient* c = (MQTTClient*)parm;
+ int i = c->recv_task_num;
+
+ if(mbtk_mqttMutex1[i].sem == NULL)
+ {
+ MutexInit(&mbtk_mqttMutex1[i]);
+ }
+
+ TimerInit(&timer);
+
+ while (1)
+ {
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ MutexLock(&mbtk_mqttMutex1[i]);
+
+ TimerCountdownMS(&timer, 1500);
+ cycle(c, &timer);
+ MutexUnlock(&mbtk_mqttMutex1[i]);
+
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ osDelay(200);
+ }
+}
+
+
+int mbtk_mqtt_demo_recv_task_init(int i,MQTTClient* c)
+{
+ osThreadAttr_t task_attr;
+ char name[10] = {0};
+ memset(&task_attr, 0, sizeof(task_attr));
+ task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;
+ task_attr.priority = osPriorityBelowNormal7;
+ sprintf(name, "mqttRecv%d", i);
+ task_attr.name = name;
+ ECOMM_TRACE(UNILOG_MQTT, mbtk_mqtt_demo_recv_task_init_1, P_INFO, 0, "mbtk_mqtt_demo_recv_task_init task_attr.name%s",task_attr.name);
+
+
+ mbtk_mqttRecvTaskHandle[i] = osThreadNew(mbtk_MQTTRun, (void *)c,&task_attr);
+ if(mbtk_mqttRecvTaskHandle[i] == NULL)
+ {
+ return FAILURE;
+ }
+ return SUCCESS;
+}
+
+
+#endif
+int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
+ messageHandler messageHandler, MQTTSubackData* data)
+{
+ int rc = FAILURE;
+ Timer timer;
+ int len = 0;
+ int mqttQos = (int)qos;
+ MQTTString topic = MQTTString_initializer;
+ topic.cstring = (char *)topicFilter;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ if (!c->isconnected)
+ goto exit;
+
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, c->command_timeout_ms);
+
+ len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
+ if (len <= 0)
+ goto exit;
+ if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
+ goto exit; // there was a problem
+
+ if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
+ {
+ int count = 0;
+ unsigned short mypacketid;
+ //data->grantedQoS = QOS0;
+ mqttQos = QOS0;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
+ {
+ if (mqttQos != 0x80)
+ {//mbtk change
+ #ifndef MBTK_OPENCPU_SUPPORT
+ rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
+ mqttClient = *c;
+
+ if(mqttRecvTaskHandle == NULL)
+ {
+ mqtt_demo_recv_task_init();
+ }
+
+ #else
+ rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
+ int i = 0;
+ if(c->first_sub == 0)
+ {
+ c->first_sub = 1;
+ for(i = 0; i < MAX_RECV_TASK; i++)
+ {
+ if(mbtk_mqttRecvTaskHandle[i] != NULL)
+ {
+ ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_0, P_INFO, 0, "mbtk_mqttRecvTaskHandle %d has create",i);
+ continue;
+ }
+ else
+ {
+ c->recv_task_num = i;
+ mbtk_mqtt_demo_recv_task_init(i,c);
+ break;
+ }
+ }
+ if(i >= MAX_RECV_TASK)
+ {
+
+ ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_1, P_INFO, 0, "create mqtt recv task error");
+ rc = FAILURE;
+ }
+ }
+
+ #endif
+ }//mbtk change, end
+ }
+ }
+ else
+ rc = FAILURE;
+
+exit:
+ if (rc == FAILURE)
+ ;//MQTTCloseSession(c);
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
+ messageHandler messageHandler)
+{
+ MQTTSubackData data;
+ return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
+}
+
+int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
+{
+ int rc = FAILURE;
+ Timer timer;
+ MQTTString topic = MQTTString_initializer;
+ topic.cstring = (char *)topicFilter;
+ int len = 0;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ if (!c->isconnected)
+ goto exit;
+
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, c->command_timeout_ms);
+
+ if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
+ goto exit;
+ if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
+ goto exit; // there was a problem
+
+ if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
+ {
+ unsigned short mypacketid; // should be the same as the packetid above
+ if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
+ {
+ /* remove the subscription message handler associated with this topic, if there is one */
+ MQTTSetMessageHandler(c, topicFilter, NULL);
+ }
+ }
+ else
+ rc = FAILURE;
+
+exit:
+ if (rc == FAILURE)
+ ;//MQTTCloseSession(c);
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
+{
+ int rc = FAILURE;
+ Timer timer;
+ MQTTString topic = MQTTString_initializer;
+ topic.cstring = (char *)topicName;
+ int len = 0;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ if (!c->isconnected)
+ goto exit;
+
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, c->command_timeout_ms);
+
+ if (message->qos == QOS1 || message->qos == QOS2)
+ message->id = getNextPacketId(c);
+
+ len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
+ topic, (unsigned char*)message->payload, message->payloadlen);
+ if (len <= 0)
+ goto exit;
+ if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
+ goto exit; // there was a problem
+
+ if (message->qos == QOS1)
+ {
+ if (waitfor(c, PUBACK, &timer) == PUBACK)
+ {
+ unsigned short mypacketid;
+ unsigned char dup, type;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+ }
+ else if (message->qos == QOS2)
+ {
+ if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
+ {
+ unsigned short mypacketid;
+ unsigned char dup, type;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+ }
+
+exit:
+ if (rc == FAILURE)
+ ;//MQTTCloseSession(c);
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+int MQTTDisconnect(MQTTClient* c)
+{
+ int rc = FAILURE;
+ Timer timer; // we might wait for incomplete incoming publishes to complete
+ int len = 0;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, c->command_timeout_ms);
+
+ len = MQTTSerialize_disconnect(c->buf, c->buf_size);
+ if (len > 0)
+ rc = sendPacket(c, len, &timer); // send the disconnect packet
+ MQTTCloseSession(c);
+
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+int MQTTInit(MQTTClient* c, Network* n, unsigned char* sendBuf, unsigned char* readBuf)
+{
+ NetworkInit(n);
+ MQTTClientInit(c, n, 40000, (unsigned char *)sendBuf, 1000, (unsigned char *)readBuf, 1000);
+
+ return 0;
+}
+
+int MQTTCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)
+{
+ MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
+ int clientLen = 0;
+ int usernameLen = 0;
+ int passwordLen = 0;
+
+ if(connData != NULL)
+ {
+ memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
+ }
+ else
+ {
+ connectData.MQTTVersion = 4;
+ connectData.keepAliveInterval = 120;
+ }
+
+ if(clientID != NULL)
+ {
+ clientLen = strlen(clientID);
+ connectData.clientID.cstring = malloc(clientLen+1);
+ memset(connectData.clientID.cstring, 0, (clientLen+1));
+ memcpy(connectData.clientID.cstring, clientID, clientLen);
+ }
+
+ if(username != NULL)
+ {
+ usernameLen = strlen(username);
+ connectData.username.cstring = malloc(usernameLen+1);;
+ memset(connectData.username.cstring, 0, (usernameLen+1));
+ memcpy(connectData.username.cstring, username, usernameLen);
+ }
+
+ if(password != NULL)
+ {
+ passwordLen = strlen(password);
+ connectData.password.cstring = malloc(passwordLen+1);
+ memset(connectData.password.cstring, 0, (passwordLen+1));
+ memcpy(connectData.password.cstring, password, passwordLen);
+ }
+
+ {
+ if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)
+ {
+ c->keepAliveInterval = connectData.keepAliveInterval;
+ c->ping_outstanding = 1;
+ return 1;
+ }
+ else
+ {
+ if ((MQTTConnect(c, &connectData)) != 0)
+ {
+ c->ping_outstanding = 1;
+ return 1;
+ }
+ else
+ {
+ #if defined(MQTT_TASK)
+ if ((MQTTStartTask(c)) != pdPASS)
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+ #endif
+ }
+ }
+ }
+ return 1;
+}
+
+#ifdef FEATURE_CUCC_DM_ENABLE
+
+int MQTTCuccCycle(MQTTClient* c, Timer* timer)
+{
+ int rc = SUCCESS;
+
+ int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
+ //ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
+
+ switch (packet_type)
+ {
+ default:
+ /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
+ rc = packet_type;
+ break;
+ case 0: /* timed out reading packet */
+ break;
+ case CONNACK:
+ case PUBACK:
+ case SUBACK:
+ case UNSUBACK:
+ if(packet_type == SUBACK)
+ {
+ //rc = packet_type;
+ }
+ break;
+ case PUBLISH:
+ {
+ break;
+ }
+ case PUBREC:
+ case PUBREL:
+ {
+ break;
+ }
+
+ case PUBCOMP:
+ break;
+ case PINGRESP:
+ c->ping_outstanding = 0;
+ break;
+ }
+
+ if (rc == SUCCESS)
+ rc = packet_type;
+ else if (c->isconnected)
+ ;//MQTTCloseSession(c);
+ return rc;
+}
+
+int MQTTCuccWaitfor(MQTTClient* c, int packet_type, Timer* timer)
+{
+ int rc = FAILURE;
+
+ do
+ {
+ if (TimerIsExpired(timer))
+ break; // we timed out
+ rc = MQTTCuccCycle(c, timer);
+ }
+ while (rc != packet_type && rc >= 0);
+
+ return rc;
+}
+
+int MQTTCuccCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)
+{
+ MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
+ int clientLen = 0;
+ int usernameLen = 0;
+ int passwordLen = 0;
+
+ if(connData != NULL)
+ {
+ memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
+ }
+ else
+ {
+ connectData.MQTTVersion = 4;
+ connectData.keepAliveInterval = 120;
+ }
+
+ if(clientID != NULL)
+ {
+ clientLen = strlen(clientID);
+ connectData.clientID.cstring = malloc(clientLen+1);
+ memset(connectData.clientID.cstring, 0, (clientLen+1));
+ memcpy(connectData.clientID.cstring, clientID, clientLen);
+ }
+
+ if(username != NULL)
+ {
+ usernameLen = strlen(username);
+ connectData.username.cstring = malloc(usernameLen+1);;
+ memset(connectData.username.cstring, 0, (usernameLen+1));
+ memcpy(connectData.username.cstring, username, usernameLen);
+ }
+
+ if(password != NULL)
+ {
+ passwordLen = strlen(password);
+ connectData.password.cstring = malloc(passwordLen+1);
+ memset(connectData.password.cstring, 0, (passwordLen+1));
+ memcpy(connectData.password.cstring, password, passwordLen);
+ }
+
+ {
+ if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)
+ {
+ c->keepAliveInterval = connectData.keepAliveInterval;
+ c->ping_outstanding = 1;
+ return 1;
+ }
+ else
+ {
+ if ((MQTTConnect(c, &connectData)) != 0)
+ {
+ //c->ping_outstanding = 1;
+ //return 1;
+ }
+ else
+ {
+ #if defined(MQTT_TASK)
+ if ((MQTTStartTask(c)) != pdPASS)
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+ #endif
+ }
+ }
+ }
+ return 0;
+}
+
+int MQTTCuccConnect(MQTTClient* c, char* clientID, char* username, char* password, MQTTPacket_connectData* connData)
+{
+ MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
+ int clientLen = 0;
+ int usernameLen = 0;
+ int passwordLen = 0;
+
+ if(connData != NULL)
+ {
+ memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
+ }
+ else
+ {
+ connectData.MQTTVersion = 4;
+ connectData.keepAliveInterval = 120;
+ }
+
+ if(clientID != NULL)
+ {
+ clientLen = strlen(clientID);
+ connectData.clientID.cstring = malloc(clientLen+1);
+ memset(connectData.clientID.cstring, 0, (clientLen+1));
+ memcpy(connectData.clientID.cstring, clientID, clientLen);
+ }
+
+ if(username != NULL)
+ {
+ usernameLen = strlen(username);
+ connectData.username.cstring = malloc(usernameLen+1);;
+ memset(connectData.username.cstring, 0, (usernameLen+1));
+ memcpy(connectData.username.cstring, username, usernameLen);
+ }
+
+ if(password != NULL)
+ {
+ passwordLen = strlen(password);
+ connectData.password.cstring = malloc(passwordLen+1);
+ memset(connectData.password.cstring, 0, (passwordLen+1));
+ memcpy(connectData.password.cstring, password, passwordLen);
+ }
+
+ if ((MQTTConnect(c, &connectData)) != 0)
+ {
+ c->ping_outstanding = 1;
+ return 1;
+ }
+
+ return 0;
+}
+
+int MQTTCuccPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
+{
+ int rc = FAILURE;
+ Timer timer;
+ MQTTString topic = MQTTString_initializer;
+ topic.cstring = (char *)topicName;
+ int len = 0;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ if (!c->isconnected)
+ goto exit;
+
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, c->command_timeout_ms);
+
+ if (message->qos == QOS1 || message->qos == QOS2)
+ message->id = getNextPacketId(c);
+
+ len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
+ topic, (unsigned char*)message->payload, message->payloadlen);
+ if (len <= 0)
+ goto exit;
+ if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
+ goto exit; // there was a problem
+
+ if (message->qos == QOS1)
+ {
+ if (waitfor(c, PUBACK, &timer) == PUBACK)
+ {
+ unsigned short mypacketid;
+ unsigned char dup, type;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+ }
+ else if (message->qos == QOS2)
+ {
+ if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
+ {
+ unsigned short mypacketid;
+ unsigned char dup, type;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+ }
+
+exit:
+ if (rc == FAILURE)
+ ;//MQTTCloseSession(c);
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+int MQTTCuccSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
+{
+ int rc = FAILURE;
+ Timer timer;
+ int len = 0;
+ int mqttQos = (int)qos;
+ //MQTTSubackData data;
+ MQTTString topic = MQTTString_initializer;
+ topic.cstring = (char *)topicFilter;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ if (!c->isconnected)
+ goto exit;
+
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, 40000);
+
+ len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
+ if (len <= 0)
+ goto exit;
+ if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
+ goto exit; // there was a problem
+
+ if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
+ {
+ int count = 0;
+ unsigned short mypacketid;
+ //data.grantedQoS = QOS0;
+ mqttQos = QOS0;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
+ {
+ if (mqttQos == 0x80)
+ {
+ rc = FAILURE;
+ }
+ else
+ {
+ rc = SUCCESS;
+ }
+ }
+ }
+ else
+ {
+ rc = FAILURE;
+ }
+
+exit:
+ if (rc == FAILURE)
+ ;//MQTTCloseSession(c);
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+
+int MQTTCuccDisconnect(MQTTClient* c)
+{
+ int rc = FAILURE;
+ Timer timer; // we might wait for incomplete incoming publishes to complete
+ int len = 0;
+
+#if defined(MQTT_TASK)
+ MutexLock(&c->mutex);
+#endif
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, c->command_timeout_ms);
+
+ len = MQTTSerialize_disconnect(c->buf, c->buf_size);
+ if (len > 0)
+ rc = sendPacket(c, len, &timer); // send the disconnect packet
+ MQTTCloseSession(c);
+
+#if defined(MQTT_TASK)
+ MutexUnlock(&c->mutex);
+#endif
+ return rc;
+}
+
+int MQTTCuccWaitForRecv(MQTTClient* c, int packet_type, unsigned int timerOut, MQTTString *topicName, char *outPayload)
+{
+ int rc = FAILURE;
+ Timer timer;
+ MQTTMessage msg;
+ int intQoS;
+
+ memset(&msg, 0, sizeof(MQTTMessage));
+ TimerInit(&timer);
+ TimerCountdownMS(&timer, timerOut);
+
+ do
+ {
+ if (TimerIsExpired(&timer))
+ break; // we timed out
+ rc = cycle(c, &timer);
+ }
+ while (rc != packet_type && rc >= 0);
+
+ if(rc == PUBLISH)
+ {
+ if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, topicName,
+ (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) == 1)
+ {
+ memcpy(outPayload, (unsigned char*)msg.payload, msg.payloadlen);
+ rc = SUCCESS;
+ }
+ }
+
+ return rc;
+}
+
+
+#endif
+
+#define MQTT_DEMO_EXAMPLE_1_ONENET
+void mqtt_demo_onenet(void)
+{
+ int len = 0;
+ MQTTMessage message;
+ MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
+
+ connectData.MQTTVersion = 4;
+ connectData.clientID.cstring = "34392813";
+ connectData.username.cstring = "122343";
+ connectData.password.cstring = "test001";
+ connectData.keepAliveInterval = 120;
+ //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh00, P_SIG, 0, "mqtt_demo........");
+ ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet0, P_INFO, "mqtt_demo........");
+ mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
+ mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
+ memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
+ memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
+
+ NetworkInit(&mqttNetwork);
+ MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);
+
+ if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet1, P_INFO, "mqtt_demo socket connect ok");
+ if ((MQTTConnect(&mqttClient, &connectData)) != 0)
+ {
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet2, P_INFO, "mqtt_demo connect fail");
+ mqttClient.ping_outstanding = 0;
+ }
+ }
+ if(mqttClient.ping_outstanding == 0)
+ {
+ if ((MQTTStartRECVTask()) != SUCCESS)
+ ;
+ }
+
+ }
+ while(1)
+ {
+ sprintf(mqtt_payload,"{\"ec_smart_sensor_data\":%d}", ec_sensor_temp);
+ len = strlen(mqtt_payload);
+ ec_data_len = len;
+ unsigned char *ptr = mqttJsonbuff;
+ sprintf((char *)mqttJsonbuff,"%c%c%c%s", ec_data_type, ec_data_type,ec_data_type, mqtt_payload);
+ message.payload = mqttJsonbuff;
+ message.payloadlen = strlen((char *)mqttJsonbuff);
+ writeChar(&ptr, ec_data_type);
+ writeInt(&ptr, ec_data_len);
+ ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet3, P_INFO, "mqtt_demo send data");
+
+ MQTTPublish(&mqttClient, "$dp", &message);
+
+ osDelay(10000);
+ }
+ /* do not return */
+}
+
+#define MQTT_DEMO_EXAMPLE_2_ALI
+#ifdef FEATURE_MQTT_TLS_ENABLE
+void mqtt_demo_ali(void)
+{
+ //char *pub_topic;
+ int len = 0;
+ MQTTMessage message;
+ MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
+ char hmac_source[256] = {0};
+ char *ali_clientID = NULL;
+ char *ali_username = NULL;
+ char *ali_signature = NULL;
+
+ /* eigencomm ec_smoke a1xFDTv3InR sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg 8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */
+ /* eigencomm deviceGrape a1fsx061r0x WLar6NunAcCJ0aZHbaNw4eQwdsYVKyC9 8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */
+ /* clientID deviceName productKey deviceSecret*/
+ ali_clientID = malloc(128);
+ ali_username = malloc(64);
+ ali_signature = malloc(96);
+
+ memset(ali_clientID, 0, 128);
+ memset(ali_username, 0, 64);
+ memset(ali_signature, 0, 96);
+
+ snprintf(hmac_source, sizeof(hmac_source), "clientId%s" "deviceName%s" "productKey%s" "timestamp%s", "eigencomm", "ec_smoke", "a1xFDTv3InR", "8964");
+
+ mqttAliHmacSha1((unsigned char *)hmac_source, strlen(hmac_source), (unsigned char *)ali_signature,(unsigned char *)"sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg", strlen("sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg"));
+
+ sprintf(ali_clientID,"%s|securemode=3,signmethod=hmacsha1,timestamp=8964|", "eigencomm");
+ sprintf(ali_username,"%s&%s","ec_smoke","a1xFDTv3InR");
+ connectData.clientID.cstring = ali_clientID;
+ connectData.username.cstring = ali_username;
+ connectData.password.cstring = ali_signature;
+
+ connectData.MQTTVersion = 4;
+ connectData.keepAliveInterval = 120;
+ //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh001, P_SIG, 0, "mqtt_demo........");
+ mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
+ mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
+ memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
+ memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
+
+ NetworkInit(&mqttNetwork);
+ MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);
+
+ if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ if ((NetworkConnect(&mqttNetwork, "a1xFDTv3InR.iot-as-mqtt.cn-shanghai.aliyuncs.com", 1883)) != 0)
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh012, P_SIG, 0, "mqtt_demo....1....");
+ if ((MQTTConnect(&mqttClient, &connectData)) != 0)
+ {
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh023, P_SIG, 0, "mqtt_demo....2....");
+ mqttClient.ping_outstanding = 0;
+ }
+ }
+ if(mqttClient.ping_outstanding == 0)
+ {
+ if ((MQTTStartRECVTask()) != SUCCESS)
+ ;
+ }
+
+ }
+ while(1)
+ {
+ memset(mqtt_payload, 0, sizeof(mqtt_payload));
+ memcpy(mqtt_payload, "update", strlen("update"));
+ len = strlen(mqtt_payload);
+ message.payload = mqtt_payload;
+ message.payloadlen = len;
+ //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh034, P_SIG, 0, "mqtt_demo....3....");
+
+ MQTTPublish(&mqttClient, "a1xFDTv3InR/ec_smoke/user/get", &message);
+
+ osDelay(10000);
+ }
+ /* do not return */
+}
+#endif
+#define MQTT_DEMO_EXAMPLE_3_APP
+void mqtt_demo_send_task(void *argument)
+{
+ int ret = FAILURE;
+ int msgType = 0xff;
+ mqttSendMsg mqttMsg;
+ mqttDataMsg mqttMessage;
+ int socket_stat = -1;
+ int socket_err = -1;
+ MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
+
+ connectData.MQTTVersion = 4;
+ connectData.clientID.cstring = "34392813";
+ connectData.username.cstring = "122343";
+ connectData.password.cstring = "test001";
+ connectData.keepAliveInterval = 120;
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_0, P_SIG, 0, "mqttSendTask........");
+ mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
+ mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
+ memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
+ memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
+
+ NetworkInit(&mqttNetwork);
+ MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, MQTT_SEND_BUFF_LEN, (unsigned char *)mqttReadbuf, MQTT_RECV_BUFF_LEN);
+
+ if((NetworkSetConnTimeout(&mqttNetwork, 10000, 10000)) != 0)
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ mqttClient.ping_outstanding = 1;
+ }
+ else
+ {
+ if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ mqttClient.ping_outstanding = 1;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_1, P_SIG, 0, "mqttSendTask..tcp connect fail....");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_2, P_SIG, 0, "mqttSendTask..tcp connect ok....");
+ if ((MQTTConnect(&mqttClient, &connectData)) != 0)
+ {
+ mqttClient.ping_outstanding = 1;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_3, P_SIG, 0, "mqttSendTask..mqtt connect fail....");
+ }
+ else
+ {
+ mqttClient.keepAliveInterval = connectData.keepAliveInterval;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4, P_SIG, 0, "mqttSendTask..mqtt connect ok....");
+ mqtt_send_task_status_flag = 1;
+ mqttClient.ping_outstanding = 0;
+ }
+ }
+ }
+
+ /* sub topic*/
+ //MQTTSubscribe(&mqttClient, topic_data, 0, NULL);
+
+ /*start recv task*/
+ if(mqttClient.ping_outstanding == 0)
+ {
+ mqtt_demo_recv_task_init();
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4000, P_SIG, 0, "mqttSendTask..start recv msg....");
+
+ while(1)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4001, P_SIG, 1, "mqttSendTask..recv msg hand=0x%x....",mqttSendMsgHandle);
+ /* recv msg (block mode) */
+ osMessageQueueGet(mqttSendMsgHandle, &mqttMsg, 0, osWaitForever);
+ msgType = mqttMsg.cmdType;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4002, P_SIG, 1, "mqttSendTask..recv msg=%d....",msgType);
+
+ switch(msgType)
+ {
+ case MQTT_DEMO_MSG_PUBLISH:
+ /* send packet */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_500, P_INFO, 0, ".....start send mqtt publish packet.......");
+ MutexLock(&mqttMutex1);
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_5, P_INFO, 0, ".....start send mqtt publish packet.......");
+ memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
+ ret = MQTTPublish(&mqttClient, mqttMsg.topic, &mqttMsg.message);
+
+ if(mqttMsg.topic != NULL)
+ {
+ free(mqttMsg.topic);
+ mqttMsg.topic = NULL;
+ }
+ if(mqttMsg.message.payload != NULL)
+ {
+ free(mqttMsg.message.payload);
+ mqttMsg.message.payload = NULL;
+ }
+
+ /* send result to at task */
+ if(ret == SUCCESS)
+ {
+ memset(&mqttMessage, 0, sizeof(mqttMessage));
+ mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;
+ mqttMessage.ret = SUCCESS;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600, P_INFO, 0, ".....send mqtt publish packet ok.......");
+
+ osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_6, P_INFO, 0, ".....send mqtt publish packet fail.......");
+ socket_stat = sock_get_errno(mqttClient.ipstack->my_socket);
+ socket_err = socket_error_is_fatal(socket_stat);
+ if(socket_err == 1)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_7, P_INFO, 0, ".....find need reconnect when publish packet.......");
+ MQTTReConnect(&mqttClient, &connectData);
+
+ }
+ else
+ {
+ memset(&mqttMessage, 0, sizeof(mqttMessage));
+ mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;
+ mqttMessage.ret = SUCCESS;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600ii, P_INFO, 0, ".....send mqtt publish packet ok.......");
+
+ osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);
+ }
+ //memset(&mqttMessage, 0, sizeof(mqttMessage));
+ //mqttMessage.cmdType = MQTT_MSG_RECONNECT;
+ //mqttMessage.ret = FAILURE;
+
+ //xQueueSend(appMqttMsgHandle, &mqttMessage, MQTT_MSG_TIMEOUT);
+ }
+ MutexUnlock(&mqttMutex1);
+ break;
+#if 0
+ case MQTT_DEMO_MSG_KEEPALIVE:
+ /* send keepalive packet */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_8, P_INFO, 0, ".....start send mqtt keeplive packet.......");
+ ret = keepalive(&mqttClient);
+
+ if(ret == SUCCESS) // send the ping packet
+ {
+ mqttClient->ping_outstanding = 1;
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_9, P_INFO, 1, ".....mqtt keeplive send ok.......");
+ }
+ else
+ {
+ socket_stat = sock_get_errno(mqttNewContext->mqtt_client->ipstack->my_socket);
+ socket_err = socket_error_is_fatal(socket_stat);
+ if(socket_err == 1)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_10, P_INFO, 0, ".....find need reconnect when send keeplive packet.......");
+ ret = MQTT_RECONNECT;
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_11, P_INFO, 1, ".....mqtt send keeplive Packet fail");
+ }
+ break;
+#endif
+ case MQTT_DEMO_MSG_RECONNECT:
+ MutexLock(&mqttMutex1);
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_12, P_INFO, 0, ".....find need reconnect when read packet.......");
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13, P_INFO, 0, "...start tcp disconnect ..");
+ mqttClient.ipstack->disconnect(mqttClient.ipstack);
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14, P_INFO, 0, "...start tcp connect ...");
+ if ((NetworkSetConnTimeout(mqttClient.ipstack, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_15, P_INFO, 0, "...tcp socket set timeout fail...");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16, P_INFO, 0, "...tcp connect ok...");
+ mqttClient.isconnected = 0;
+ if((NetworkConnect(mqttClient.ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT)) < 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18, P_INFO, 0, "...start mqtt connect ..");
+
+ if ((MQTTConnect(&mqttClient, &connectData)) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19, P_INFO, 0, "...mqtt reconnect fial!!!...");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20, P_INFO, 0, "...mqtt reconnect ok!!!...");
+ }
+ }
+ }
+ MutexUnlock(&mqttMutex1);
+ break;
+ }
+ }
+}
+
+void app_mqtt_demo_task(void *argument)
+{
+ int msgType = 1;
+ int payloadLen = 0;
+ mqttSendMsg mqttMsg;
+ mqttDataMsg mqttMessage;
+ int ret = FAILURE;
+
+ /*init driver*/
+
+ /*start mqtt send task*/
+ ret = mqtt_demo_send_task_init();
+ if(ret == FAILURE)
+ {
+ ;
+ }
+ while(1)
+ {
+ if(mqtt_send_task_status_flag == 1)
+ {
+ break;
+ }
+ osDelay(4000);
+ }
+
+ /*state machine*/
+ while(1)
+ {
+ /*read data*/
+
+ osDelay(4000);
+
+ /*send msg to mqtt send task, and wait for excute result*/
+ //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_0, P_SIG, 0, "appMqttTask...send start.....");
+ memset(&mqttMsg, 0, sizeof(mqttSendMsg));
+ mqttMsg.cmdType = MQTT_DEMO_MSG_PUBLISH;
+
+ mqttMsg.topic = malloc(128);
+ memset(mqttMsg.topic, 0, 128);
+ memcpy(mqttMsg.topic, "$dp", strlen("$dp"));
+ mqttMsg.message.qos = QOS0;
+ mqttMsg.message.retained = 0;
+ mqttMsg.message.id = 0;
+ mqttMsg.message.payload = malloc(32);
+ memset(mqttMsg.message.payload, 0, 32);
+ memcpy(mqttMsg.message.payload, "{\"data\":90}", strlen("{\"data\":90}"));
+
+ sprintf(mqttMsg.message.payload, "%c%c%c%s",msgType, msgType, msgType, "{\"data\":90}");
+ payloadLen = strlen(mqttMsg.message.payload);
+ unsigned char *ptr = (unsigned char *)mqttMsg.message.payload;
+ writeChar(&ptr,3);
+ writeInt(&ptr,strlen("{\"data\":90}"));
+ mqttMsg.message.payloadlen = payloadLen;
+
+ osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, (2*MQTT_MSG_TIMEOUT));
+
+ osMessageQueueGet(appMqttMsgHandle, &mqttMessage, 0, cmsisMAX_DELAY);
+ if(mqttMessage.cmdType == MQTT_DEMO_MSG_PUBLISH_ACK)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_1, P_SIG, 0, "appMqttTask...send ok.....");
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_2, P_SIG, 0, "appMqttTask...send fail.....");
+ }
+ }
+}
+
+int mqtt_demo_recv_task_init(void)
+{
+ osThreadAttr_t task_attr;
+
+ memset(&task_attr, 0, sizeof(task_attr));
+ task_attr.name = "mqttRecv";
+#ifdef MBTK_OPENCPU_SUPPORT
+ task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;
+#else
+ task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
+#endif
+ #if defined FEATURE_LITEOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal4;
+ #elif defined FEATURE_FREERTOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal7;
+ #endif
+
+ mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);
+ if(mqttRecvTaskHandle == NULL)
+ {
+ return FAILURE;
+ }
+
+ return SUCCESS;
+}
+
+int mqtt_demo_send_task_init(void)
+{
+ osThreadAttr_t task_attr;
+
+ memset(&task_attr, 0, sizeof(task_attr));
+ task_attr.name = "mqttSend";
+ task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
+ #if defined FEATURE_LITEOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal3;
+ #elif defined FEATURE_FREERTOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal6;
+ #endif
+ mqttSendTaskHandle = osThreadNew(mqtt_demo_send_task, NULL,&task_attr);
+ if(mqttSendTaskHandle == NULL)
+ {
+ return FAILURE;
+ }
+
+ return SUCCESS;
+}
+
+int app_mqtt_demo_task_init(void)
+{
+ osThreadAttr_t task_attr;
+
+ if(mqttSendMsgHandle == NULL)
+ {
+ mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);
+ }
+
+ if(appMqttMsgHandle == NULL)
+ {
+ appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);
+ }
+
+ if(mqttMutex1.sem == NULL)
+ {
+ MutexInit(&mqttMutex1);
+ }
+
+ memset(&task_attr, 0, sizeof(task_attr));
+ task_attr.name = "appTask";
+ task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
+ #if defined FEATURE_LITEOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal2;
+ #elif defined FEATURE_FREERTOS_ENABLE
+ task_attr.priority = osPriorityBelowNormal5;
+ #endif
+ appMqttTaskHandle = osThreadNew(app_mqtt_demo_task, NULL,&task_attr);
+ if(appMqttTaskHandle == NULL)
+ {
+ return FAILURE;
+ }
+
+ return SUCCESS;
+}
+
+
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTLinux.c b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTLinux.c
new file mode 100755
index 0000000..baa02dc
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTLinux.c
@@ -0,0 +1,166 @@
+/*******************************************************************************
+ * Copyright (c) 2014, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Allan Stockdill-Mander - initial API and implementation and/or initial documentation
+ * Ian Craggs - return codes from linux_read
+ *******************************************************************************/
+
+#include "MQTTLinux.h"
+
+void TimerInit(Timer* timer)
+{
+ timer->end_time = (struct timeval){0, 0};
+}
+
+char TimerIsExpired(Timer* timer)
+{
+ struct timeval now, res;
+ gettimeofday(&now, NULL);
+ timersub(&timer->end_time, &now, &res);
+ return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
+}
+
+
+void TimerCountdownMS(Timer* timer, unsigned int timeout)
+{
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
+ timeradd(&now, &interval, &timer->end_time);
+}
+
+
+void TimerCountdown(Timer* timer, unsigned int timeout)
+{
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ struct timeval interval = {timeout, 0};
+ timeradd(&now, &interval, &timer->end_time);
+}
+
+
+int TimerLeftMS(Timer* timer)
+{
+ struct timeval now, res;
+ gettimeofday(&now, NULL);
+ timersub(&timer->end_time, &now, &res);
+ //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
+ return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
+}
+
+
+int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
+{
+ struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
+ if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
+ {
+ interval.tv_sec = 0;
+ interval.tv_usec = 100;
+ }
+
+ setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
+
+ int bytes = 0;
+ while (bytes < len)
+ {
+ int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
+ if (rc == -1)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ bytes = -1;
+ break;
+ }
+ else if (rc == 0)
+ {
+ bytes = 0;
+ break;
+ }
+ else
+ bytes += rc;
+ }
+ return bytes;
+}
+
+
+int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
+{
+ struct timeval tv;
+
+ tv.tv_sec = 0; /* 30 Secs Timeout */
+ tv.tv_usec = timeout_ms * 1000; // Not init'ing this can cause strange errors
+
+ setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
+ int rc = write(n->my_socket, buffer, len);
+ return rc;
+}
+
+
+void NetworkInit(Network* n)
+{
+ n->my_socket = 0;
+ n->mqttread = linux_read;
+ n->mqttwrite = linux_write;
+}
+
+
+int NetworkConnect(Network* n, char* addr, int port)
+{
+ int type = SOCK_STREAM;
+ struct sockaddr_in address;
+ int rc = -1;
+ sa_family_t family = AF_INET;
+ struct addrinfo *result = NULL;
+ struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
+
+ if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
+ {
+ struct addrinfo* res = result;
+
+ /* prefer ip4 addresses */
+ while (res)
+ {
+ if (res->ai_family == AF_INET)
+ {
+ result = res;
+ break;
+ }
+ res = res->ai_next;
+ }
+
+ if (result->ai_family == AF_INET)
+ {
+ address.sin_port = htons(port);
+ address.sin_family = family = AF_INET;
+ address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
+ }
+ else
+ rc = -1;
+
+ freeaddrinfo(result);
+ }
+
+ if (rc == 0)
+ {
+ n->my_socket = socket(family, type, 0);
+ if (n->my_socket != -1)
+ rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
+ }
+
+ return rc;
+}
+
+
+void NetworkDisconnect(Network* n)
+{
+ close(n->my_socket);
+}
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTLinux.o b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTLinux.o
new file mode 100755
index 0000000..a08962c
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTLinux.o
Binary files differ
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTTls.c b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTTls.c
new file mode 100755
index 0000000..c53c1da
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTTls.c
@@ -0,0 +1,637 @@
+
+#include "sha1.h"
+#include "sha256.h"
+#include "md5.h"
+//#include DEBUG_LOG_HEADER_FILE
+
+#include "MQTTTls.h"
+#include "MQTTLinux.h"
+#include "error.h"
+#include <sys/socket.h>
+
+
+int mqttSslRandom(void *p_rng, unsigned char *output, size_t output_len)
+{
+ uint32_t rnglen = output_len;
+ uint8_t rngoffset = 0;
+
+ while (rnglen > 0)
+ {
+ *(output + rngoffset) = (unsigned char)rand();
+ rngoffset++;
+ rnglen--;
+ }
+ return 0;
+}
+
+static void mqttSslDebug(void *ctx, int level, const char *file, int line, const char *str)
+{
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTls_00, P_INFO, "%s(%d):%s", file, line, str);
+
+// DBG("%s", str);
+}
+
+int mqttSslNonblockRecv(void *netContext, uint8_t *buf, size_t len)
+{
+ int ret;
+ int fd = ((mbedtls_net_context *)netContext)->fd;
+
+ if(fd < 0)
+ {
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ ret = (int)recv(fd, buf, len, MSG_DONTWAIT);
+ if(ret<0)
+ {
+ if( errno == EPIPE || errno == ECONNRESET)
+ {
+ return (MBEDTLS_ERR_NET_CONN_RESET);
+ }
+
+ if( errno == EINTR )
+ {
+ return (MBEDTLS_ERR_SSL_WANT_READ);
+ }
+
+ if(ret == -1 && errno == EWOULDBLOCK)
+ {
+ return ret;
+ }
+ return (MBEDTLS_ERR_NET_RECV_FAILED);
+ }
+ return (ret);
+}
+extern void mbedtls_debug_set_threshold( int threshold );
+
+int mqttSslConn_new(mqttsClientContext* context, char* host)
+{
+ int value;
+ mqttsClientSsl *ssl;
+ const char *custom = "mqtts";
+ char port[10] = {0};
+ int authmode = MBEDTLS_SSL_VERIFY_NONE;
+ uint32_t flag;
+
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_2, P_INFO, "before ssl context malloc:%d", xBytesTaskMalloced);
+ context->ssl = malloc(sizeof(mqttsClientSsl));
+ ssl = context->ssl;
+
+ /*
+ * 0. Initialize the RNG and the session data
+ */
+#if defined(MBEDTLS_DEBUG_C)
+ mbedtls_debug_set_threshold((int)2);
+#endif
+ mbedtls_net_init(&ssl->netContext);
+ mbedtls_ssl_init(&ssl->sslContext);
+ mbedtls_ssl_config_init(&ssl->sslConfig);
+ mbedtls_x509_crt_init(&ssl->caCert);
+ mbedtls_x509_crt_init(&ssl->clientCert);
+ mbedtls_pk_init(&ssl->pkContext);
+ mbedtls_ctr_drbg_init(&ssl->ctrDrbgContext);
+ mbedtls_entropy_init(&ssl->entropyContext);
+
+ if((value = mbedtls_ctr_drbg_seed(&ssl->ctrDrbgContext,
+ mbedtls_entropy_func,
+ &ssl->entropyContext,
+ (const unsigned char*)custom,
+ strlen(custom))) != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_0, P_INFO, "mbedtls_ctr_drbg_seed failed, value:-0x%x.", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_3, P_INFO, "after ssl init:%d", xBytesTaskMalloced);
+ /*
+ * 0. Initialize certificates
+ */
+ if(context->seclevel != 0){
+ if (NULL != context->caCert) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_1, P_INFO, "STEP 0. Loading the CA root certificate ...");
+ authmode = MBEDTLS_SSL_VERIFY_REQUIRED;
+ if (0 != (value = mbedtls_x509_crt_parse(&(ssl->caCert), (const unsigned char *)context->caCert, context->caCertLen))) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_2, P_INFO, "failed ! value:-0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+ }
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_4, P_INFO, "after ca cert parse:%d", xBytesTaskMalloced);
+ /* Setup Client Cert/Key */
+ if(context->seclevel == 2){
+ if (context->clientCert != NULL && context->clientPk != NULL) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_3, P_INFO, "STEP 0. start prepare client cert ...");
+ value = mbedtls_x509_crt_parse(&(ssl->clientCert), (const unsigned char *) context->clientCert, context->clientCertLen);
+ if (value != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_4, P_INFO, "failed! mbedtls_x509_crt_parse returned -0x%x\n", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_5, P_INFO, "context->clientPkLen=%d", context->clientPkLen);
+
+
+ value = mbedtls_pk_parse_key(&ssl->pkContext, (const unsigned char *) context->clientPk, context->clientPkLen, NULL, 0);
+
+ if (value != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_6, P_INFO, "failed ! mbedtls_pk_parse_key returned -0x%x\n", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+ }
+ if(context->seclevel == 0){
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_7, P_INFO, "user set verify none");
+ authmode = MBEDTLS_SSL_VERIFY_NONE;
+ }
+ //ali mqtts is psk tls
+ if((context->psk_key != NULL)&&(context->psk_identity != NULL))
+ {
+ mbedtls_ssl_conf_psk(&ssl->sslConfig, (const unsigned char *)context->psk_key, strlen(context->psk_key),
+ (const unsigned char *)context->psk_identity, strlen(context->psk_identity));
+ }
+
+ /*
+ * 1. Start the connection
+ */
+ snprintf(port, sizeof(port), "%d", context->port);
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_0, P_INFO, "STEP 1. host:%s", host);
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_1, P_INFO, "STEP 1. Connecting to PORT:%d",context->port);
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_2, P_INFO, "STEP 1. port:%s", port);
+ if (0 != (value = mbedtls_net_connect(&ssl->netContext, host, port, MBEDTLS_NET_PROTO_TCP, 1))) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_9, P_INFO, " failed ! mbedtls_net_connect returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+
+ /*
+ * 2. Setup stuff
+ */
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_10, P_INFO, "STEP 2. Setting up the SSL/TLS structure...");
+ if ((value = mbedtls_ssl_config_defaults(&(ssl->sslConfig), MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM,
+ MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_11, P_INFO, " failed! mbedtls_ssl_config_defaults returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_6, P_INFO, "after net connect:%d", xBytesTaskMalloced);
+ mbedtls_ssl_conf_max_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
+ mbedtls_ssl_conf_min_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
+
+ memcpy(&(ssl->crtProfile), ssl->sslConfig.cert_profile, sizeof(mbedtls_x509_crt_profile));
+ mbedtls_ssl_conf_authmode(&(ssl->sslConfig), authmode);
+
+#if defined(MBEDTLS_SSL_MAX_FRAGMENT_LENGTH)
+ if ((value = mbedtls_ssl_conf_max_frag_len(&(ssl->sslConfig), MBEDTLS_SSL_MAX_FRAG_LEN_4096)) != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_12, P_INFO, " mbedtls_ssl_conf_max_frag_len returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+#endif
+
+#if defined(MBEDTLS_X509_CRT_PARSE_C)
+ mbedtls_ssl_conf_cert_profile(&ssl->sslConfig, &ssl->crtProfile);
+ mbedtls_ssl_conf_ca_chain(&(ssl->sslConfig), &(ssl->caCert), NULL);
+ if(context->clientCert) {
+ if ((value = mbedtls_ssl_conf_own_cert(&(ssl->sslConfig), &(ssl->clientCert), &(ssl->pkContext))) != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_13, P_INFO, " failed! mbedtls_ssl_conf_own_cert returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+#endif
+
+ if(context->ciphersuite[0] != 0xFFFF){
+ mbedtls_ssl_conf_ciphersuites(&(ssl->sslConfig), (const int *)(context->ciphersuite));
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_14, P_INFO, "conf ciphersuite 0x%x", context->ciphersuite[0]);
+ }
+
+ mbedtls_ssl_conf_rng(&(ssl->sslConfig), mqttSslRandom, &(ssl->ctrDrbgContext));
+ mbedtls_ssl_conf_dbg(&(ssl->sslConfig), mqttSslDebug, NULL);
+
+#if defined(MBEDTLS_SSL_ALPN)
+ const char *alpn_list[] = { "http/1.1", NULL };
+ mbedtls_ssl_conf_alpn_protocols(&(ssl->sslConfig),alpn_list);
+#endif
+
+ if(context->timeout_r > 0) {
+ uint32_t recvTimeout;
+ recvTimeout = context->timeout_r > MQTT_MAX_TIMEOUT ? MQTT_MAX_TIMEOUT * 1000 : context->timeout_r * 1000;
+ mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), recvTimeout);
+ }
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_7, P_INFO, "before ssl setup:%d", xBytesTaskMalloced);
+ if ((value = mbedtls_ssl_setup(&(ssl->sslContext), &(ssl->sslConfig))) != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_15, P_INFO, " failed! mbedtls_ssl_setup returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ if(context->hostName != NULL)
+ {
+ mbedtls_ssl_set_hostname(&(ssl->sslContext), context->hostName);
+ }
+ else
+ {
+ mbedtls_ssl_set_hostname(&(ssl->sslContext), host);
+ }
+ mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), (mbedtls_ssl_send_t*)mbedtls_net_send, (mbedtls_ssl_recv_t*)mbedtls_net_recv, (mbedtls_ssl_recv_timeout_t*)mbedtls_net_recv_timeout);
+
+
+ /*
+ * 3. Handshake
+ */
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_8, P_INFO, "after ssl setup before handshake:%d", xBytesTaskMalloced);
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_16, P_INFO, "STEP 3. Performing the SSL/TLS handshake...");
+
+ while ((value = mbedtls_ssl_handshake(&(ssl->sslContext))) != 0) {
+ if ((value != MBEDTLS_ERR_SSL_WANT_READ) && (value != MBEDTLS_ERR_SSL_WANT_WRITE)) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_17, P_INFO, "failed ! mbedtls_ssl_handshake returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+ //ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_9, P_INFO, "after handshake:%d", xBytesTaskMalloced);
+
+ /*
+ * 4. Verify the server certificate
+ */
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_18, P_INFO, "STEP 4. Verifying peer X.509 certificate..");
+ flag = mbedtls_ssl_get_verify_result(&(ssl->sslContext));
+ if (flag != 0) {
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_19, P_INFO, " failed ! verify result not confirmed.");
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_20, P_INFO, "caCert varification ok");
+
+ //return MQTT_CONN_OK;
+ return 0;
+}
+
+int mqttSslConn_old(mqttsClientContext* context, char* host)
+{
+ int32_t value;
+ mqttsClientSsl *ssl;
+ const char *custom = "mqtts";
+ char port[10] = {0};
+ int32_t authmode = MBEDTLS_SSL_VERIFY_NONE;
+ uint32_t flag;
+
+ context->ssl = malloc(sizeof(mqttsClientSsl));
+ ssl = context->ssl;
+
+ /*
+ * 0. Initialize the RNG and the session data
+ */
+#if defined(MBEDTLS_DEBUG_C)
+ mbedtls_debug_set_threshold((int)2);
+#endif
+ mbedtls_net_init(&ssl->netContext);
+ mbedtls_ssl_init(&ssl->sslContext);
+ mbedtls_ssl_config_init(&ssl->sslConfig);
+ mbedtls_x509_crt_init(&ssl->caCert);
+ mbedtls_x509_crt_init(&ssl->clientCert);
+ mbedtls_pk_init(&ssl->pkContext);
+ mbedtls_ctr_drbg_init(&ssl->ctrDrbgContext);
+ mbedtls_entropy_init(&ssl->entropyContext);
+ if((context->psk_key != NULL)&&(context->psk_identity != NULL))
+ {
+ mbedtls_ssl_conf_psk(&ssl->sslConfig, (const unsigned char *)context->psk_key, strlen(context->psk_key),
+ (const unsigned char *)context->psk_identity, strlen(context->psk_identity));
+ }
+ if((value = mbedtls_ctr_drbg_seed(&ssl->ctrDrbgContext,
+ mbedtls_entropy_func,
+ &ssl->entropyContext,
+ (const unsigned char*)custom,
+ strlen(custom))) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_0, P_SIG, 1, "mbedtls_ctr_drbg_seed failed, value:-0x%x.", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ /*
+ * 0. Initialize certificates
+ */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_1, P_SIG, 0, "STEP 0. Loading the CA root certificate ...");
+ if (NULL != context->caCert)
+ {
+ //authmode = MBEDTLS_SSL_VERIFY_REQUIRED;
+ if (0 != (value = mbedtls_x509_crt_parse(&(ssl->caCert), (const unsigned char *)context->caCert, context->caCertLen)))
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_2, P_SIG, 1, "failed ! value:-0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_3, P_SIG, 1, " ok (%d skipped)", value);
+ }
+
+ /* Setup Client Cert/Key */
+ if (context->clientCert != NULL && context->clientPk != NULL)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_4, P_SIG, 0, "STEP 0. start prepare client cert ...");
+ value = mbedtls_x509_crt_parse(&(ssl->clientCert), (const unsigned char *) context->clientCert, context->clientCertLen);
+ if (value != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_5, P_SIG, 1, " failed! mbedtls_x509_crt_parse returned -0x%x\n", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_6, P_SIG, 1, "STEP 0. start mbedtls_pk_parse_key[%s]", context->clientPk);
+ value = mbedtls_pk_parse_key(&ssl->pkContext, (const unsigned char *) context->clientPk, context->clientPkLen, NULL, 0);
+ if (value != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_7, P_SIG, 1, " failed\n ! mbedtls_pk_parse_key returned -0x%x\n", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+
+ /*
+ * 1. Start the connection
+ */
+ snprintf(port, sizeof(port), "%d", context->port);
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_8, P_SIG, 2, "STEP 1. Connecting to /%s/%s...", host, port);
+ if (0 != (value = mbedtls_net_connect(&ssl->netContext, host, port, MBEDTLS_NET_PROTO_TCP, 1)))
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_9, P_SIG, 1, " failed ! mbedtls_net_connect returned -0x%x", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_10, P_SIG, 0, " ok");
+
+ /*
+ * 2. Setup stuff
+ */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_11, P_SIG, 0, "STEP 2. Setting up the SSL/TLS structure...");
+ if ((value = mbedtls_ssl_config_defaults(&(ssl->sslConfig), MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM,
+ MBEDTLS_SSL_PRESET_DEFAULT)) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_12, P_SIG, 1, " failed! mbedtls_ssl_config_defaults returned %d", value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ mbedtls_ssl_conf_max_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
+ mbedtls_ssl_conf_min_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
+
+ memcpy(&(ssl->crtProfile), ssl->sslConfig.cert_profile, sizeof(mbedtls_x509_crt_profile));
+ mbedtls_ssl_conf_authmode(&(ssl->sslConfig), authmode);
+
+#if defined(MBEDTLS_SSL_MAX_FRAGMENT_LENGTH)
+ if ((mbedtls_ssl_conf_max_frag_len(&(ssl->sslConfig), MBEDTLS_SSL_MAX_FRAG_LEN_1024)) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_13, P_SIG, 0, "mbedtls_ssl_conf_max_frag_len returned\r\n");
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+#endif
+
+#if defined(MBEDTLS_X509_CRT_PARSE_C)
+ mbedtls_ssl_conf_cert_profile(&ssl->sslConfig, &ssl->crtProfile);
+ mbedtls_ssl_conf_ca_chain(&(ssl->sslConfig), &(ssl->caCert), NULL);
+ if(context->clientCert)
+ {
+ if ((value = mbedtls_ssl_conf_own_cert(&(ssl->sslConfig), &(ssl->clientCert), &(ssl->pkContext))) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_14, P_SIG, 1, " failed\n ! mbedtls_ssl_conf_own_cert returned %d\n", value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+#endif
+
+ if(context->ciphersuite[0] != 0xFFFF){
+ mbedtls_ssl_conf_ciphersuites(&(ssl->sslConfig), (const int *)(context->ciphersuite));
+ //ECPLAT_PRINTF(UNILOG_MQTT, mqttTls_14_1, P_INFO, "conf ciphersuite 0x%x", context->ciphersuite[0]);
+ }
+
+ mbedtls_ssl_conf_rng(&(ssl->sslConfig), mqttSslRandom, &(ssl->ctrDrbgContext));
+ mbedtls_ssl_conf_dbg(&(ssl->sslConfig), mqttSslDebug, NULL);
+
+ if(context->timeout_r > 0)
+ {
+ uint32_t recvTimeout;
+ recvTimeout = context->timeout_r > MQTT_MAX_TIMEOUT ? MQTT_MAX_TIMEOUT * 1000 : context->timeout_r * 1000;
+ mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), recvTimeout);
+ }
+ if ((value = mbedtls_ssl_setup(&(ssl->sslContext), &(ssl->sslConfig))) != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_15, P_SIG, 1, "failed! mbedtls_ssl_setup returned %d", value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+
+ if(context->hostName != NULL)
+ {
+ mbedtls_ssl_set_hostname(&(ssl->sslContext), context->hostName);
+ //mbedtls_ssl_set_hostname(&(ssl->sslContext), "OneNET MQTTS");
+ }
+ else
+ {
+ mbedtls_ssl_set_hostname(&(ssl->sslContext), host);
+ }
+
+ mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), mbedtls_net_send, mbedtls_net_recv, mbedtls_net_recv_timeout);
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_16, P_SIG, 0, " ok");
+
+ /*
+ * 3. Handshake
+ */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_17, P_SIG, 0, "STEP 3. Performing the SSL/TLS handshake...");
+ while ((value = mbedtls_ssl_handshake(&(ssl->sslContext))) != 0)
+ {
+ if ((value != MBEDTLS_ERR_SSL_WANT_READ) && (value != MBEDTLS_ERR_SSL_WANT_WRITE))
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_18, P_SIG, 1, "failed ! mbedtls_ssl_handshake returned -0x%x\n", -value);
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_19, P_SIG, 0, " ok");
+
+ /*
+ * 4. Verify the server certificate
+ */
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_20, P_SIG, 0, "STEP 4. Verifying peer X.509 certificate..");
+ flag = mbedtls_ssl_get_verify_result(&(ssl->sslContext));
+ if (flag != 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_21, P_SIG, 0, " failed ! verify result not confirmed.");
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_22, P_SIG, 0, "caCert varification ok");
+
+ //return MQTT_CONN_OK;
+ return 0;
+}
+
+//INT32 mqttSslSend(mbedtls_ssl_context* sslContext, const char* buf, UINT16 len)
+int mqttSslSend(mqttsClientContext* context, unsigned char* buf, int len)
+{
+ int32_t waitToSend = len;
+ int32_t hasSend = 0;
+
+ do
+ {
+ hasSend = mbedtls_ssl_write(&(context->ssl->sslContext), (unsigned char *)(buf + len - waitToSend), waitToSend);
+ if(hasSend > 0)
+ {
+ waitToSend -= hasSend;
+ }
+ else if(hasSend == 0)
+ {
+ //return MQTT_CONN_OK;
+ return 0;
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_23, P_SIG, 0, "mqtt_client(ssl): send failed \n");
+ //return MQTT_CONN;
+ return 1;
+ }
+ }while(waitToSend>0);
+
+ //return MQTT_CONN_OK;
+ return 0;
+}
+
+int mqttSslRecv(mqttsClientContext* context, unsigned char* buf, int minLen, int maxLen, int* pReadLen) //0 on success, err code on failure
+{
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_24, P_INFO, 2, "Trying to read between %d and %d bytes", minLen, maxLen);
+ int32_t readLen = 0;
+ int32_t ret;
+
+ while (readLen < maxLen)
+ {
+ mqttsClientSsl *ssl = (mqttsClientSsl *)context->ssl;
+ if (readLen < minLen)
+ {
+ mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), mbedtls_net_send, mbedtls_net_recv, NULL);
+ ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char *)(buf+readLen), minLen-readLen);
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_30, P_INFO, 1, "mbedtls_ssl_read [blocking] return:0x%x", ret);
+ if(ret == 0)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_31, P_INFO, 0, "mbedtls_ssl_read [blocking] return 0 connect error");
+ //return MQTT_CONN;
+ return 1;
+ }
+ }
+ else
+ {
+ mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), mbedtls_net_send, mqttSslNonblockRecv, NULL);
+ ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char*)(buf+readLen), maxLen-readLen);
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_32, P_INFO, 1, "mbedtls_ssl_read [not blocking] return:0x%x", ret);
+ if(ret == -1 && errno == EWOULDBLOCK)
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_33, P_INFO, 0, "mbedtls_ssl_read [not blocking] errno == EWOULDBLOCK");
+ break;
+ }
+ }
+ if(ret == MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY)
+ {
+ //return MQTT_CLOSED;
+ return 2;
+ }
+
+ if (ret > 0)
+ {
+ readLen += ret;
+ }
+ else if ( ret == 0 )
+ {
+ break;
+ }
+ else
+ {
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_34, P_INFO, 1, "Connection error (recv returned %d)", ret);
+ *pReadLen = readLen;
+ //return MQTT_CONN;
+ return 1;
+ }
+ }
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_35, P_INFO, 1, "Read %d bytes", readLen);
+ buf[readLen] = '\0'; // DS makes it easier to see what's new.
+ *pReadLen = readLen;
+ //return MQTT_CONN_OK;
+ return 0;
+}
+
+int mqttSslRead(mqttsClientContext* context, unsigned char *buffer, int len, int timeout_ms)
+{
+ uint32_t readLen = 0;
+ static int net_status = 0;
+ int32_t ret = -1;
+ char err_str[33];
+ mqttsClientSsl *ssl = (mqttsClientSsl *)context->ssl;
+
+ mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), timeout_ms);
+ while (readLen < len) {
+ ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char *)(buffer + readLen), (len - readLen));
+ if (ret > 0) {
+ readLen += ret;
+ net_status = 0;
+ } else if (ret == 0) {
+ /* if ret is 0 and net_status is -2, indicate the connection is closed during last call */
+ net_status = -2;
+ return -2;//(net_status == -2) ? net_status : readLen;
+ } else {
+ if (MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY == ret) {
+ //mbedtls_strerror(ret, err_str, sizeof(err_str));
+ printf("ssl recv error: code = -0x%04X, err_str = '%s'\n", -ret, err_str);
+ net_status = -2; /* connection is closed */
+ break;
+ } else if ((MBEDTLS_ERR_SSL_TIMEOUT == ret)
+ || (MBEDTLS_ERR_SSL_CONN_EOF == ret)
+ || (MBEDTLS_ERR_SSL_SESSION_TICKET_EXPIRED == ret)
+ || (MBEDTLS_ERR_SSL_NON_FATAL == ret)) {
+ /* read already complete */
+ /* if call mbedtls_ssl_read again, it will return 0 (means EOF) */
+
+ return readLen;
+ } else {
+ //mbedtls_strerror(ret, err_str, sizeof(err_str));
+ printf("ssl recv error: code = -0x%04X, err_str = '%s'\n", -ret, err_str);
+ net_status = -1;
+ return -1; /* Connection error */
+ }
+ }
+ }
+
+ return (readLen > 0) ? readLen : net_status;
+}
+
+int mqttSslClose(mqttsClientContext* context)
+{
+ mqttsClientSsl *ssl = (mqttsClientSsl *)context->ssl;
+ /*context->clientCert = NULL;
+ context->caCert = NULL;
+ context->clientPk = NULL; let up level free it*/
+ if(ssl == NULL)
+ //return MQTT_MBEDTLS_ERR;
+ return -1;
+
+ mbedtls_ssl_close_notify(&(ssl->sslContext));
+ mbedtls_net_free(&(ssl->netContext));
+ mbedtls_x509_crt_free(&(ssl->caCert));
+ mbedtls_x509_crt_free(&(ssl->clientCert));
+ mbedtls_pk_free(&(ssl->pkContext));
+ mbedtls_ssl_free(&(ssl->sslContext));
+ mbedtls_ssl_config_free(&(ssl->sslConfig));
+ mbedtls_ctr_drbg_free(&(ssl->ctrDrbgContext));
+ mbedtls_entropy_free(&(ssl->entropyContext));
+
+ free(ssl);
+ context->ssl = NULL;
+
+ //ECOMM_TRACE(UNILOG_MQTT, mqttTls_36, P_INFO, 0, "mqtt tls close ok");
+ //return MQTT_CONN_OK;
+ return 0;
+}
+
+
diff --git a/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTTls.o b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTTls.o
new file mode 100755
index 0000000..6c17101
--- /dev/null
+++ b/lynq/MD310EU/ap/app/dmp-test/libpaho-embed-mqtt/src/MQTTTls.o
Binary files differ