blob: 39f6a8e2f51c7309537f797851bb09ad087abc3b [file] [log] [blame]
yuezonghec78e2ef2025-02-13 17:57:46 -08001/*******************************************************************************
2 * Copyright (c) 2014, 2017 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs - fix for #96 - check rem_len in readPacket
16 * Ian Craggs - add ability to set message handler separately #6
17 *******************************************************************************/
18
19#include <stdio.h>
20#include <stdarg.h>
21//#include "commontypedef.h"
22#include "MQTTClient.h"
23#ifdef FEATURE_MQTT_TLS_ENABLE
24#include "sha1.h"
25#include "sha256.h"
26#include "md5.h"
27#endif
28//#include "debug_trace.h"
29//#include DEBUG_LOG_HEADER_FILE
30
31char *mqttSendbuf = NULL;
32char *mqttReadbuf = NULL;
33
34unsigned char mqttJsonbuff[256] = {0};
35char mqtt_payload[128] = {0};
36int ec_sensor_temp = 20;
37char ec_data_type = 3;
38int ec_data_len = 0;
39//osMessageQueueId_t mqttRecvMsgHandle = NULL;
40//osMessageQueueId_t mqttSendMsgHandle = NULL;
41//osMessageQueueId_t appMqttMsgHandle = NULL;
42
43///osThreadId_t mqttRecvTaskHandle = NULL;
44
45#ifdef MBTK_OPENCPU_SUPPORT
46#define MAX_RECV_TASK 5
47osThreadId_t mbtk_mqttRecvTaskHandle[MAX_RECV_TASK];
48Mutex mbtk_mqttMutex1[MAX_RECV_TASK];
49#endif
50
51//osThreadId_t mqttSendTaskHandle = NULL;
52//osThreadId_t appMqttTaskHandle = NULL;
53
54//Mutex mqttMutex1;
55//Mutex mqttMutex2;
56MQTTClient mqttClient;
57Network mqttNetwork;
58int mqtt_send_task_status_flag = 0;
59int mqtt_keepalive_retry_count = 0;
60char mqttHb2Hex(unsigned char hb)
61{
62 hb = hb&0xF;
63 return (char)(hb<10 ? '0'+hb : hb-10+'a');
64}
65
66#ifdef FEATURE_MQTT_TLS_ENABLE
67void mqttAliHmacSha1(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
68{
69 int i;
70 mbedtls_sha1_context ctx;
71 unsigned char k_ipad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};
72 unsigned char k_opad[ALI_SHA1_KEY_IOPAD_SIZE] = {0};
73 unsigned char tempbuf[ALI_SHA1_DIGEST_SIZE];
74
75 memset(k_ipad, 0x36, ALI_SHA1_KEY_IOPAD_SIZE);
76 memset(k_opad, 0x5C, ALI_SHA1_KEY_IOPAD_SIZE);
77
78 for(i=0; i<keylen; i++)
79 {
80 if(i>=ALI_SHA1_KEY_IOPAD_SIZE)
81 {
82 break;
83 }
84 k_ipad[i] ^=key[i];
85 k_opad[i] ^=key[i];
86 }
87 mbedtls_sha1_init(&ctx);
88
89 mbedtls_sha1_starts(&ctx);
90 mbedtls_sha1_update(&ctx, k_ipad, ALI_SHA1_KEY_IOPAD_SIZE);
91 mbedtls_sha1_update(&ctx, input, ilen);
92 mbedtls_sha1_finish(&ctx, tempbuf);
93
94 mbedtls_sha1_starts(&ctx);
95 mbedtls_sha1_update(&ctx, k_opad, ALI_SHA1_KEY_IOPAD_SIZE);
96 mbedtls_sha1_update(&ctx, tempbuf, ALI_SHA1_DIGEST_SIZE);
97 mbedtls_sha1_finish(&ctx, tempbuf);
98
99 for(i=0; i<ALI_SHA1_DIGEST_SIZE; ++i)
100 {
101 output[i*2] = mqttHb2Hex(tempbuf[i]>>4);
102 output[i*2+1] = mqttHb2Hex(tempbuf[i]);
103 }
104 mbedtls_sha1_free(&ctx);
105}
106/*
107 * output = SHA-256( input buffer )
108 */
109void mqttAliHmacSha256(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
110{
111 int i;
112 mbedtls_sha256_context ctx;
113 unsigned char k_ipad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};
114 unsigned char k_opad[ALI_SHA256_KEY_IOPAD_SIZE] = {0};
115
116 memset(k_ipad, 0x36, 64);
117 memset(k_opad, 0x5C, 64);
118
119 if ((NULL == input) || (NULL == key) || (NULL == output)) {
120 return;
121 }
122
123 if (keylen > ALI_SHA256_KEY_IOPAD_SIZE) {
124 return;
125 }
126
127 for(i=0; i<keylen; i++)
128 {
129 if(i>=ALI_SHA256_KEY_IOPAD_SIZE)
130 {
131 break;
132 }
133 k_ipad[i] ^=key[i];
134 k_opad[i] ^=key[i];
135 }
136 mbedtls_sha256_init(&ctx);
137
138 mbedtls_sha256_starts(&ctx, 0);
139 mbedtls_sha256_update(&ctx, k_ipad, ALI_SHA256_KEY_IOPAD_SIZE);
140 mbedtls_sha256_update(&ctx, input, ilen);
141 mbedtls_sha256_finish(&ctx, output);
142
143 mbedtls_sha256_starts(&ctx, 0);
144 mbedtls_sha256_update(&ctx, k_opad, ALI_SHA256_KEY_IOPAD_SIZE);
145 mbedtls_sha256_update(&ctx, output, ALI_SHA256_DIGEST_SIZE);
146 mbedtls_sha256_finish(&ctx, output);
147
148 mbedtls_sha256_free(&ctx);
149}
150/*
151 * output = MD-5( input buffer )
152 */
153void mqttAliHmacMd5(const unsigned char *input, int ilen, unsigned char *output,const unsigned char *key, int keylen)
154{
155 int i;
156 mbedtls_md5_context ctx;
157 unsigned char k_ipad[ALI_MD5_KEY_IOPAD_SIZE] = {0};
158 unsigned char k_opad[ALI_MD5_KEY_IOPAD_SIZE] = {0};
159 unsigned char tempbuf[ALI_MD5_DIGEST_SIZE];
160
161 memset(k_ipad, 0x36, ALI_MD5_KEY_IOPAD_SIZE);
162 memset(k_opad, 0x5C, ALI_MD5_KEY_IOPAD_SIZE);
163
164 for(i=0; i<keylen; i++)
165 {
166 if(i>=ALI_MD5_KEY_IOPAD_SIZE)
167 {
168 break;
169 }
170 k_ipad[i] ^=key[i];
171 k_opad[i] ^=key[i];
172 }
173 mbedtls_md5_init(&ctx);
174
175 mbedtls_md5_starts(&ctx);
176 mbedtls_md5_update(&ctx, k_ipad, ALI_MD5_KEY_IOPAD_SIZE);
177 mbedtls_md5_update(&ctx, input, ilen);
178 mbedtls_md5_finish(&ctx, tempbuf);
179
180 mbedtls_md5_starts(&ctx);
181 mbedtls_md5_update(&ctx, k_opad, ALI_MD5_KEY_IOPAD_SIZE);
182 mbedtls_md5_update(&ctx, tempbuf, ALI_MD5_DIGEST_SIZE);
183 mbedtls_md5_finish(&ctx, tempbuf);
184
185 for(i=0; i<ALI_MD5_DIGEST_SIZE; ++i)
186 {
187 output[i*2] = mqttHb2Hex(tempbuf[i]>>4);
188 output[i*2+1] = mqttHb2Hex(tempbuf[i]);
189 }
190 mbedtls_md5_free(&ctx);
191}
192#endif
193void mqttDefMessageArrived(MessageData* data)
194{
195 char *bufTemp = NULL;
196 bufTemp = malloc(data->message->payloadlen+1);
197 memset(bufTemp, 0, data->message->payloadlen+1);
198 memcpy(bufTemp, data->message->payload, data->message->payloadlen);
199 //ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_2, P_SIG, ".........MQTT topic is:%s", (const uint8_t *)data->topicName->lenstring.data);
200 //ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_1, P_SIG, ".........MQTT_messageArrived is:%s", (const uint8_t *)bufTemp);
201 free(bufTemp);
202}
203
204static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
205 md->topicName = aTopicName;
206 md->message = aMessage;
207}
208
209
210static int getNextPacketId(MQTTClient *c) {
211 return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
212}
213
214static int sendPacket(MQTTClient* c, int length, Timer* timer)
215{
216 int rc = FAILURE,
217 sent = 0;
218
219#ifdef FEATURE_MQTT_TLS_ENABLE
220#ifdef MBTK_OPENCPU_SUPPORT
221 if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
222 {
223 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls3, P_INFO, 0, "...mqttSendPacket..0.");
224 rc = mqttSslSend(c->mqtts_client, &c->mqtts_client->sendBuf[sent], length);
225 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls4, P_INFO, 1, "...mqttSendPacket..=%d.", rc);
226 TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
227 return rc;
228 }
229 else
230#endif
231#endif
232 {
233
234 while (sent < length && !TimerIsExpired(timer))
235 {
236 #ifdef MQTT_RAI_OPTIMIZE
237 rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer), 0, false);
238 #else
239 rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
240 #endif
241 if (rc < 0) // there was an error writing the data
242 break;
243 sent += rc;
244 }
245 if (sent == length)
246 {
247 TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
248 rc = SUCCESS;
249 }
250 else
251 rc = FAILURE;
252 return rc;
253
254 }
255}
256
257void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
258 unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
259{
260 int i;
261 c->ipstack = network;
262
263 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
264 c->messageHandlers[i].topicFilter = 0;
265 c->command_timeout_ms = command_timeout_ms;
266 c->buf = sendbuf;
267 c->buf_size = sendbuf_size;
268 c->readbuf = readbuf;
269 c->readbuf_size = readbuf_size;
270 c->isconnected = 0;
271 c->cleansession = 0;
272 c->ping_outstanding = 0;
273 c->defaultMessageHandler = mqttDefMessageArrived;
274 c->next_packetid = 1;
275 TimerInit(&c->last_sent);
276 TimerInit(&c->last_received);
277
278#ifdef FEATURE_MQTT_TLS_ENABLE
279#ifdef MBTK_OPENCPU_SUPPORT
280 c->recv_task_num = -1;
281 if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
282 {
283 c->mqtts_client->sendBuf = sendbuf;
284 c->mqtts_client->sendBufSize = sendbuf_size;
285 c->mqtts_client->readBuf = readbuf;
286 c->mqtts_client->readBufSize = readbuf_size;
287 }
288#endif
289#endif
290
291#if defined(MQTT_TASK)
292 MutexInit(&c->mutex);
293#endif
294}
295
296static int decodePacket(MQTTClient* c, int* value, int timeout)
297{
298 unsigned char i;
299 int multiplier = 1;
300 int len = 0;
301 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
302
303 *value = 0;
304 do
305 {
306 int rc = MQTTPACKET_READ_ERROR;
307
308 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
309 {
310 rc = MQTTPACKET_READ_ERROR; /* bad data */
311 goto exit;
312 }
313
314#ifdef FEATURE_MQTT_TLS_ENABLE
315 #ifdef MBTK_OPENCPU_SUPPORT
316 if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
317 {
318 rc = mqttSslRead(c->mqtts_client, &i, 1, timeout);
319 }
320 else
321 {
322 rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
323 }
324 #endif
325#else
326 rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
327#endif
328
329 if (rc != 1)
330 goto exit;
331 *value += (i & 127) * multiplier;
332 multiplier *= 128;
333 } while ((i & 128) != 0);
334exit:
335 return len;
336}
337
338static int readPacket(MQTTClient* c, Timer* timer)
339{
340 MQTTHeader header = {0};
341 int len = 0;
342 int rem_len = 0;
343 int rc = 0;
344
345#ifdef FEATURE_MQTT_TLS_ENABLE
346#ifdef MBTK_OPENCPU_SUPPORT
347 if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
348 {
349 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls0, P_INFO, 0, "...mqttReadPacket..0.");
350 /* 1. read the header byte. This has the packet type in it */
351 rc = mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf, 1, TimerLeftMS(timer));
352 if (rc != 1)
353 {
354 goto exit;
355 }
356
357 len = 1;
358 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls1, P_INFO, 0, "...mqttReadPacket..1.");
359 /* 2. read the remaining length. This is variable in itself */
360 decodePacket(c, &rem_len, TimerLeftMS(timer));
361 len += MQTTPacket_encode(c->mqtts_client->readBuf + 1, rem_len); /* put the original remaining length back into the buffer */
362
363 if (rem_len > (c->mqtts_client->readBufSize - len))
364 {
365 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls202, P_INFO, 0, "...mqttReadPacket..202.");
366 rc = BUFFER_OVERFLOW;
367 goto exit;
368 }
369
370 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls2, P_INFO, 0, "...mqttReadPacket..2.");
371 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
372 if (rem_len > 0 && (mqttSslRead(c->mqtts_client, c->mqtts_client->readBuf + len, rem_len, TimerLeftMS(timer)) != rem_len))
373 {
374 rc = 0;
375 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls200, P_INFO, 0, "...mqttReadPacket..200.");
376 goto exit;
377 }
378
379 header.byte = c->mqtts_client->readBuf[0];
380 rc = header.bits.type;
381 if (c->keepAliveInterval > 0)
382 {
383 ECOMM_TRACE(UNILOG_MQTT, mqtt_task_tls201, P_INFO, 0, "...mqttReadPacket..201.");
384 TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
385 }
386 }
387 else
388#endif
389#endif
390 {
391
392 /* 1. read the header byte. This has the packet type in it */
393 rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
394 if (rc != 1)
395 goto exit;
396
397 len = 1;
398 /* 2. read the remaining length. This is variable in itself */
399 decodePacket(c, &rem_len, TimerLeftMS(timer));
400 len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
401
402 if (rem_len > (c->readbuf_size - len))
403 {
404 rc = BUFFER_OVERFLOW;
405 goto exit;
406 }
407
408 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
409 if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
410 rc = 0;
411 goto exit;
412 }
413
414 header.byte = c->readbuf[0];
415 rc = header.bits.type;
416 if (c->keepAliveInterval > 0)
417 TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
418
419 }
420
421exit:
422 return rc;
423}
424
425// assume topic filter and name is in correct format
426// # can only be at end
427// + and # can only be next to separator
428static char isTopicMatched(char* topicFilter, MQTTString* topicName)
429{
430 char* curf = topicFilter;
431 char* curn = topicName->lenstring.data;
432 char* curn_end = curn + topicName->lenstring.len;
433
434 while (*curf && curn < curn_end)
435 {
436 if (*curn == '/' && *curf != '/')
437 break;
438 if (*curf != '+' && *curf != '#' && *curf != *curn)
439 break;
440 if (*curf == '+')
441 { // skip until we meet the next separator, or end of string
442 char* nextpos = curn + 1;
443 while (nextpos < curn_end && *nextpos != '/')
444 nextpos = ++curn + 1;
445 }
446 else if (*curf == '#')
447 curn = curn_end - 1; // skip until end of string
448 curf++;
449 curn++;
450 };
451
452 return (curn == curn_end) && (*curf == '\0');
453}
454
455int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
456{
457 int i;
458 int rc = FAILURE;
459 //ECOMM_TRACE(UNILOG_MQTT, deliverMessage1, P_SIG, 0, "....1....deliverMessage..");
460
461 // we have to find the right message handler - indexed by topic
462 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
463 {
464 if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
465 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
466 {
467 if (c->messageHandlers[i].fp != NULL)
468 {
469 MessageData md;
470 NewMessageData(&md, topicName, message);
471 c->messageHandlers[i].fp(&md);
472 rc = SUCCESS;
473 //ECOMM_TRACE(UNILOG_MQTT, deliverMessage2, P_SIG, 0, "....2....deliverMessage..");
474 }
475 }
476 }
477
478 if (rc == FAILURE && c->defaultMessageHandler != NULL)
479 {
480 MessageData md;
481 //ECOMM_TRACE(UNILOG_MQTT, deliverMessage3, P_SIG, 0, "....3....deliverMessage..");
482 NewMessageData(&md, topicName, message);
483 c->defaultMessageHandler(&md);
484 rc = SUCCESS;
485 //ECOMM_TRACE(UNILOG_MQTT, deliverMessage4, P_SIG, 0, "....4....deliverMessage..");
486 }
487
488 return rc;
489}
490
491int keepalive(MQTTClient* c)
492{
493 int rc = SUCCESS;
494
495 if (c->keepAliveInterval == 0)
496 {
497 goto exit;
498 }
499
500 if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
501 {
502 if (c->ping_outstanding)
503 {
504 mqtt_keepalive_retry_count++;
505 //ECOMM_TRACE(UNILOG_MQTT, keepalive_0, P_SIG, 0, "....keepalive....ping_outstanding..=1..");
506 rc = FAILURE; /* PINGRESP not received in keepalive interval */
507 }
508 else
509 {
510 Timer timer;
511 TimerInit(&timer);
512 TimerCountdownMS(&timer, 1000);
513
514 #ifdef MBTK_OPENCPU_SUPPORT
515 memset(c->buf, 0, MQTT_SEND_BUFF_LEN);
516 memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);
517 #endif
518
519 //memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
520 //memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
521 int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
522
523 //ECOMM_TRACE(UNILOG_MQTT, keepalive_1, P_SIG, 0, "....keepalive....send packet..");
524 if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
525 c->ping_outstanding = 1;
526 }
527 }
528
529exit:
530 return rc;
531}
532
533int keepaliveRetry(MQTTClient* c)
534{
535 int rc = SUCCESS;
536
537 if (c->keepAliveInterval == 0)
538 {
539 goto exit;
540 }
541
542 if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
543 {
544 {
545 Timer timer;
546 TimerInit(&timer);
547 TimerCountdownMS(&timer, 1000);
548
549 #ifdef MBTK_OPENCPU_SUPPORT
550 memset(c->buf, 0, MQTT_SEND_BUFF_LEN);
551 memset(c->readbuf, 0, MQTT_RECV_BUFF_LEN);
552 #endif
553
554 //memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
555 //memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
556 int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
557
558 //ECOMM_TRACE(UNILOG_MQTT, keepalive_1pp, P_SIG, 0, "....keepalive....send packet..");
559 if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
560 c->ping_outstanding = 1;
561 }
562 }
563
564exit:
565 return rc;
566}
567
568void MQTTCleanSession(MQTTClient* c)
569{
570 int i = 0;
571
572 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
573 c->messageHandlers[i].topicFilter = NULL;
574}
575
576void MQTTCloseSession(MQTTClient* c)
577{
578 c->ping_outstanding = 0;
579 c->isconnected = 0;
580 if (c->cleansession)
581 MQTTCleanSession(c);
582}
583
584#ifdef MBTK_OPENCPU_SUPPORT
585
586int mqtt_try_resubscribe(MQTTClient* c)
587{
588 int i;
589 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
590 {
591 if (c->messageHandlers[i].topicFilter != 0)
592 {
593 MQTTSubscribe(c, c->messageHandlers[i].topicFilter, c->messageHandlers[i].qos,c->messageHandlers[i].fp);
594 }
595 }
596 return 0;
597}
598int mqtt_try_reconnect(MQTTClient* c)
599{
600 int reconnect_flag = 1;
601 int rc = 0;
602 mqtt_keepalive_retry_count = 0;
603 int ret = 0;
604
605 if(c->isconnected == 1)
606 {
607 if(c->is_mqtts == 0)
608 {
609 ret = c->ipstack->disconnect(c->ipstack);
610 }
611 else
612 {
613#ifdef FEATURE_MQTT_TLS_ENABLE
614#ifdef MBTK_OPENCPU_SUPPORT
615 ret = mqttSslClose(c->mqtts_client);
616#endif
617#endif
618 }
619 if(ret == 0)
620 {
621 c->isconnected = 0;
622 }
623 }
624
625 if(c->mqtt_connect_callback != 0)
626 {
627 c->mqtt_connect_callback(MQTT_START_RECONNECT_EVENT,0);
628 }
629
630 c->isconnected = 0;
631#ifdef FEATURE_MQTT_TLS_ENABLE
632#ifdef MBTK_OPENCPU_SUPPORT
633 if(c->is_mqtts == MBTK_MQTT_SSL_HAVE)
634 {
635 if(mqttRecvTaskHandle)
636 rc = mqttSslConn_new(c->mqtts_client, c->mbtk_mqtt_server_url);
637 }
638 else
639#endif
640#endif
641 {
642 rc = NetworkConnectTimeout(c->ipstack, c->mbtk_mqtt_server_url, c->mbtk_mqtt_port, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT);
643 }
644 if(rc < 0)
645 {
646 reconnect_flag = 0;
647 }
648 else
649 {
650 if ((rc = (MQTTConnect(c, c->mbtk_mqtt_options))) != 0)
651 {
652 reconnect_flag = 0;
653 }
654 else
655 {
656 mqtt_try_resubscribe(c);
657 return 0;
658 }
659 }
660
661
662
663 if(reconnect_flag == 0)
664 {
665 if(c->mqtt_connect_callback != NULL)
666 {
667 int socket_status = sock_get_errno(c->ipstack->my_socket);
668 c->mqtt_connect_callback(MQTT_RECONNECT_FAIL_EVENT,socket_status);
669 }
670 }
671
672 return 0;
673}
674#endif
675
676int cycle(MQTTClient* c, Timer* timer)
677{
678 int len = 0,
679 rc = SUCCESS;
680
681 int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
682 //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
683 //ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
684
685#ifdef MBTK_OPENCPU_SUPPORT
686 if(c->is_mqtts == MBTK_MQTT_SSL_NONE)
687 {
688 int socket_status = sock_get_errno(c->ipstack->my_socket);
689 if(socket_status == ENOTCONN && c->isconnected == 1)
690 {
691 if(c->mqtt_connect_callback != NULL)
692 {
693 c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,socket_status);
694 }
695 if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)
696 {
697 mqtt_try_reconnect(c);
698 }
699 else
700 {
701 goto exit;
702 }
703 }
704 else if(c->isconnected == 0)
705 {
706 goto exit;
707 }
708 }
709 if(c->is_mqtts && packet_type < 0)
710 {
711 if(c->isconnected)
712 {
713 if(c->mqtt_connect_callback != NULL)
714 {
715 c->mqtt_connect_callback(MQTT_CONNECT_ABORT_EVENT,ENOTCONN);
716 }
717 if(MBTK_MQTT_RECONNECT_ENABLE == c->mqtt_reconn_enable)
718 {
719 mqtt_try_reconnect(c);
720 }
721 else
722 {
723 goto exit;
724 }
725 }
726 else
727 {
728 goto exit;
729 }
730 }
731#endif
732 switch (packet_type)
733 {
734 default:
735 /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
736 rc = packet_type;
737 break;
738 case 0: /* timed out reading packet */
739 break;
740 case CONNACK:
741 case PUBACK:
742 case SUBACK:
743 case UNSUBACK:
744 if(packet_type == SUBACK)
745 {
746 //rc = packet_type;
747 }
748 break;
749 case PUBLISH:
750 {
751 MQTTString topicName;
752 MQTTMessage msg;
753 int intQoS;
754 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
755 if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
756 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
757 goto exit;
758 msg.qos = (enum QoS)intQoS;
759 deliverMessage(c, &topicName, &msg);
760 if (msg.qos != QOS0)
761 {
762 if (msg.qos == QOS1)
763 len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
764 else if (msg.qos == QOS2)
765 len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
766 if (len <= 0)
767 rc = FAILURE;
768 else
769 rc = sendPacket(c, len, timer);
770 if (rc == FAILURE)
771 goto exit; // there was a problem
772 }
773 break;
774 }
775 case PUBREC:
776 case PUBREL:
777 {
778 unsigned short mypacketid;
779 unsigned char dup, type;
780 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
781 rc = FAILURE;
782 else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
783 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
784 rc = FAILURE;
785 else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
786 rc = FAILURE; // there was a problem
787 if (rc == FAILURE)
788 goto exit; // there was a problem
789 break;
790 }
791
792 case PUBCOMP:
793 break;
794 case PINGRESP:
795 c->ping_outstanding = 0;
796 break;
797 }
798
799 if (keepalive(c) != SUCCESS) {
800 int socket_stat = 0;
801#ifndef MBTK_OPENCPU_SUPPORT
802 mqttSendMsg mqttMsg;
803#endif
804 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
805 rc = FAILURE;
806 socket_stat = sock_get_errno(c->ipstack->my_socket);
807 if((socket_stat == MQTT_ERR_ABRT)||(socket_stat == MQTT_ERR_RST)||(socket_stat == MQTT_ERR_CLSD)||(socket_stat == MQTT_ERR_BADE))
808 {
809#ifdef MBTK_OPENCPU_SUPPORT
810 if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)
811 {
812 mqtt_try_reconnect(c);
813 }
814#else
815 //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0, P_INFO, 0, ".....now, need reconnect.......");
816 /* send reconnect msg to send task */
817 memset(&mqttMsg, 0, sizeof(mqttMsg));
818 mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
819
820 osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);
821#endif
822 }
823 else
824 {
825 if(mqtt_keepalive_retry_count>3)
826 {
827#ifdef MBTK_OPENCPU_SUPPORT
828 if(c->mqtt_reconn_enable == MBTK_MQTT_RECONNECT_ENABLE)
829 {
830 mqtt_try_reconnect(c);
831 }
832#else
833 mqtt_keepalive_retry_count = 0;
834 //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_ee0, P_INFO, 0, ".....now, need reconnect.......");
835 /* send reconnect msg to send task */
836 memset(&mqttMsg, 0, sizeof(mqttMsg));
837 mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
838
839 osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, MQTT_MSG_TIMEOUT);
840#endif
841 }
842 else
843 {
844 keepaliveRetry(c);
845 }
846 }
847 }
848
849exit:
850 if (rc == SUCCESS)
851 rc = packet_type;
852 else if (c->isconnected)
853 ;//MQTTCloseSession(c);
854 return rc;
855}
856
857int MQTTYield(MQTTClient* c, int timeout_ms)
858{
859 int rc = SUCCESS;
860 Timer timer;
861
862 TimerInit(&timer);
863 TimerCountdownMS(&timer, timeout_ms);
864
865 do
866 {
867 if (cycle(c, &timer) < 0)
868 {
869 rc = FAILURE;
870 break;
871 }
872 } while (!TimerIsExpired(&timer));
873
874 return rc;
875}
876
877int MQTTIsConnected(MQTTClient* client)
878{
879 return client->isconnected;
880}
881
882void MQTTRun(void* parm)
883{
884 Timer timer;
885 MQTTClient* c = (MQTTClient*)&mqttClient;
886
887#ifndef MBTK_OPENCPU_SUPPORT
888 if(mqttSendMsgHandle == NULL)
889 {
890 mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);
891 }
892
893 if(appMqttMsgHandle == NULL)
894 {
895 appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);
896 }
897#endif
898 /*
899 if(mqttMutex1.sem == NULL)
900 {
901 MutexInit(&mqttMutex1);
902 }
903 */
904
905 TimerInit(&timer);
906
907 while (1)
908 {
909#if defined(MQTT_TASK)
910 //MutexLock(&c->mutex);
911#endif
912 //MutexLock(&mqttMutex1);
913
914 TimerCountdownMS(&timer, 1500); /* Don't wait too long if no traffic is incoming */
915 cycle(c, &timer);
916 //MutexUnlock(&mqttMutex1);
917
918#if defined(MQTT_TASK)
919 //MutexUnlock(&c->mutex);
920#endif
921 sleep(200);
922 }
923}
924
925#if defined(MQTT_TASK)
926int MQTTStartTask(MQTTClient* client)
927{
928 return ThreadStart(&client->thread, &MQTTRun, client);
929}
930#endif
931
932int MQTTStartRECVTask(void)
933{
934 //osThreadAttr_t task_attr;
935
936 //memset(&task_attr, 0, sizeof(task_attr));
937 //task_attr.name = "mqttRecv";
938 //task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
939 #if defined FEATURE_LITEOS_ENABLE
940 task_attr.priority = osPriorityBelowNormal4;
941 #elif defined FEATURE_FREERTOS_ENABLE
942 task_attr.priority = osPriorityBelowNormal7;
943 #endif
944
945 //mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);
946 if(mqttRecvTaskHandle == NULL)
947 {
948 return FAILURE;
949 }
950
951 return SUCCESS;
952}
953
954int waitfor(MQTTClient* c, int packet_type, Timer* timer)
955{
956 int rc = FAILURE;
957
958 do
959 {
960 if (TimerIsExpired(timer))
961 break; // we timed out
962 rc = cycle(c, timer);
963 }
964 while (rc != packet_type && rc >= 0);
965
966 return rc;
967}
968
969int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
970{
971 Timer connect_timer;
972 int rc = FAILURE;
973 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
974 int len = 0;
975
976#if defined(MQTT_TASK)
977 MutexLock(&c->mutex);
978#endif
979 if (c->isconnected) /* don't send connect packet again if we are already connected */
980 goto exit;
981
982 TimerInit(&connect_timer);
983 TimerCountdownMS(&connect_timer, c->command_timeout_ms);
984
985 if (options == 0)
986 options = &default_options; /* set default options if none were supplied */
987
988 c->keepAliveInterval = options->keepAliveInterval;
989 c->cleansession = options->cleansession;
990 TimerCountdown(&c->last_received, c->keepAliveInterval);
991 if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
992 goto exit;
993 if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
994 goto exit; // there was a problem
995
996 // this will be a blocking call, wait for the connack
997 if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
998 {
999 data->rc = 0;
1000 data->sessionPresent = 0;
1001 if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
1002 rc = data->rc;
1003 else
1004 rc = FAILURE;
1005 }
1006 else
1007 rc = FAILURE;
1008
1009exit:
1010#ifdef MBTK_OPENCPU_SUPPORT
1011 if (rc == SUCCESS)
1012 {
1013 if(c->mqtt_connect_callback != NULL)
1014 {
1015 c->mqtt_connect_callback(MQTT_CONNECT_SUCCESS_EVENT,0);
1016 }
1017 c->isconnected = 1;
1018 c->ping_outstanding = 0;
1019 }
1020#endif
1021
1022#if defined(MQTT_TASK)
1023 MutexUnlock(&c->mutex);
1024#endif
1025
1026 return rc;
1027}
1028
1029int MQTTReConnect(MQTTClient* client, MQTTPacket_connectData* connectData)
1030{
1031 int ret = FAILURE;
1032
1033 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13hh0, P_INFO, 0, "...start tcp disconnect ..");
1034 client->ipstack->disconnect(client->ipstack);
1035
1036 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14hh1, P_INFO, 0, "...start tcp connect ...");
1037 {
1038 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16hh3, P_INFO, 0, "...tcp connect ok...");
1039 client->isconnected = 0;
1040 if((NetworkConnectTimeout(client->ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) < 0)
1041 {
1042 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17hh4, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
1043 }
1044 else
1045 {
1046 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18hh5, P_INFO, 0, "...start mqtt connect ..");
1047
1048 if ((MQTTConnect(client, connectData)) != 0)
1049 {
1050 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19hh6, P_INFO, 0, "...mqtt reconnect fial!!!...");
1051 }
1052 else
1053 {
1054 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20hh7, P_INFO, 0, "...mqtt reconnect ok!!!...");
1055 ret = SUCCESS;
1056 }
1057 }
1058 }
1059 return ret;
1060}
1061
1062int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
1063{
1064 MQTTConnackData data;
1065 return MQTTConnectWithResults(c, options, &data);
1066}
1067
1068int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
1069{
1070 int rc = FAILURE;
1071 int i = -1;
1072
1073 /* first check for an existing matching slot */
1074 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
1075 {
1076 if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
1077 {
1078 if (messageHandler == NULL) /* remove existing */
1079 {
1080 c->messageHandlers[i].topicFilter = NULL;
1081 c->messageHandlers[i].fp = NULL;
1082 }
1083 rc = SUCCESS; /* return i when adding new subscription */
1084 break;
1085 }
1086 }
1087 /* if no existing, look for empty slot (unless we are removing) */
1088 if (messageHandler != NULL) {
1089 if (rc == FAILURE)
1090 {
1091 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
1092 {
1093 if (c->messageHandlers[i].topicFilter == NULL)
1094 {
1095 rc = SUCCESS;
1096 break;
1097 }
1098 }
1099 }
1100 if (i < MAX_MESSAGE_HANDLERS)
1101 {
1102 c->messageHandlers[i].topicFilter = topicFilter;
1103 c->messageHandlers[i].fp = messageHandler;
1104#ifdef MBTK_OPENCPU_SUPPORT
1105 c->messageHandlers[i].qos = c->qos;
1106#endif
1107 }
1108 }
1109 return rc;
1110}
1111
1112#ifdef MBTK_OPENCPU_SUPPORT
1113
1114void mbtk_MQTTRun(void* parm)
1115{
1116 Timer timer;
1117 MQTTClient* c = (MQTTClient*)parm;
1118 int i = c->recv_task_num;
1119
1120 if(mbtk_mqttMutex1[i].sem == NULL)
1121 {
1122 MutexInit(&mbtk_mqttMutex1[i]);
1123 }
1124
1125 TimerInit(&timer);
1126
1127 while (1)
1128 {
1129#if defined(MQTT_TASK)
1130 MutexLock(&c->mutex);
1131#endif
1132 MutexLock(&mbtk_mqttMutex1[i]);
1133
1134 TimerCountdownMS(&timer, 1500);
1135 cycle(c, &timer);
1136 MutexUnlock(&mbtk_mqttMutex1[i]);
1137
1138#if defined(MQTT_TASK)
1139 MutexUnlock(&c->mutex);
1140#endif
1141 osDelay(200);
1142 }
1143}
1144
1145
1146int mbtk_mqtt_demo_recv_task_init(int i,MQTTClient* c)
1147{
1148 osThreadAttr_t task_attr;
1149 char name[10] = {0};
1150 memset(&task_attr, 0, sizeof(task_attr));
1151 task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;
1152 task_attr.priority = osPriorityBelowNormal7;
1153 sprintf(name, "mqttRecv%d", i);
1154 task_attr.name = name;
1155 ECOMM_TRACE(UNILOG_MQTT, mbtk_mqtt_demo_recv_task_init_1, P_INFO, 0, "mbtk_mqtt_demo_recv_task_init task_attr.name%s",task_attr.name);
1156
1157
1158 mbtk_mqttRecvTaskHandle[i] = osThreadNew(mbtk_MQTTRun, (void *)c,&task_attr);
1159 if(mbtk_mqttRecvTaskHandle[i] == NULL)
1160 {
1161 return FAILURE;
1162 }
1163 return SUCCESS;
1164}
1165
1166
1167#endif
1168int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
1169 messageHandler messageHandler, MQTTSubackData* data)
1170{
1171 int rc = FAILURE;
1172 Timer timer;
1173 int len = 0;
1174 int mqttQos = (int)qos;
1175 MQTTString topic = MQTTString_initializer;
1176 topic.cstring = (char *)topicFilter;
1177
1178#if defined(MQTT_TASK)
1179 MutexLock(&c->mutex);
1180#endif
1181 if (!c->isconnected)
1182 goto exit;
1183
1184 TimerInit(&timer);
1185 TimerCountdownMS(&timer, c->command_timeout_ms);
1186
1187 len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
1188 if (len <= 0)
1189 goto exit;
1190 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
1191 goto exit; // there was a problem
1192
1193 if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
1194 {
1195 int count = 0;
1196 unsigned short mypacketid;
1197 //data->grantedQoS = QOS0;
1198 mqttQos = QOS0;
1199 if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
1200 {
1201 if (mqttQos != 0x80)
1202 {//mbtk change
1203 #ifndef MBTK_OPENCPU_SUPPORT
1204 rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
1205 mqttClient = *c;
1206
1207 if(mqttRecvTaskHandle == NULL)
1208 {
1209 mqtt_demo_recv_task_init();
1210 }
1211
1212 #else
1213 rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
1214 int i = 0;
1215 if(c->first_sub == 0)
1216 {
1217 c->first_sub = 1;
1218 for(i = 0; i < MAX_RECV_TASK; i++)
1219 {
1220 if(mbtk_mqttRecvTaskHandle[i] != NULL)
1221 {
1222 ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_0, P_INFO, 0, "mbtk_mqttRecvTaskHandle %d has create",i);
1223 continue;
1224 }
1225 else
1226 {
1227 c->recv_task_num = i;
1228 mbtk_mqtt_demo_recv_task_init(i,c);
1229 break;
1230 }
1231 }
1232 if(i >= MAX_RECV_TASK)
1233 {
1234
1235 ECOMM_TRACE(UNILOG_MQTT, MQTTSubscribeWithResults_1, P_INFO, 0, "create mqtt recv task error");
1236 rc = FAILURE;
1237 }
1238 }
1239
1240 #endif
1241 }//mbtk change, end
1242 }
1243 }
1244 else
1245 rc = FAILURE;
1246
1247exit:
1248 if (rc == FAILURE)
1249 ;//MQTTCloseSession(c);
1250#if defined(MQTT_TASK)
1251 MutexUnlock(&c->mutex);
1252#endif
1253 return rc;
1254}
1255
1256int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
1257 messageHandler messageHandler)
1258{
1259 MQTTSubackData data;
1260 return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
1261}
1262
1263int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
1264{
1265 int rc = FAILURE;
1266 Timer timer;
1267 MQTTString topic = MQTTString_initializer;
1268 topic.cstring = (char *)topicFilter;
1269 int len = 0;
1270
1271#if defined(MQTT_TASK)
1272 MutexLock(&c->mutex);
1273#endif
1274 if (!c->isconnected)
1275 goto exit;
1276
1277 TimerInit(&timer);
1278 TimerCountdownMS(&timer, c->command_timeout_ms);
1279
1280 if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
1281 goto exit;
1282 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
1283 goto exit; // there was a problem
1284
1285 if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
1286 {
1287 unsigned short mypacketid; // should be the same as the packetid above
1288 if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
1289 {
1290 /* remove the subscription message handler associated with this topic, if there is one */
1291 MQTTSetMessageHandler(c, topicFilter, NULL);
1292 }
1293 }
1294 else
1295 rc = FAILURE;
1296
1297exit:
1298 if (rc == FAILURE)
1299 ;//MQTTCloseSession(c);
1300#if defined(MQTT_TASK)
1301 MutexUnlock(&c->mutex);
1302#endif
1303 return rc;
1304}
1305
1306int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
1307{
1308 int rc = FAILURE;
1309 Timer timer;
1310 MQTTString topic = MQTTString_initializer;
1311 topic.cstring = (char *)topicName;
1312 int len = 0;
1313
1314#if defined(MQTT_TASK)
1315 MutexLock(&c->mutex);
1316#endif
1317 if (!c->isconnected)
1318 goto exit;
1319
1320 TimerInit(&timer);
1321 TimerCountdownMS(&timer, c->command_timeout_ms);
1322
1323 if (message->qos == QOS1 || message->qos == QOS2)
1324 message->id = getNextPacketId(c);
1325
1326 len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
1327 topic, (unsigned char*)message->payload, message->payloadlen);
1328 if (len <= 0)
1329 goto exit;
1330 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
1331 goto exit; // there was a problem
1332
1333 if (message->qos == QOS1)
1334 {
1335 if (waitfor(c, PUBACK, &timer) == PUBACK)
1336 {
1337 unsigned short mypacketid;
1338 unsigned char dup, type;
1339 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
1340 rc = FAILURE;
1341 }
1342 else
1343 rc = FAILURE;
1344 }
1345 else if (message->qos == QOS2)
1346 {
1347 if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
1348 {
1349 unsigned short mypacketid;
1350 unsigned char dup, type;
1351 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
1352 rc = FAILURE;
1353 }
1354 else
1355 rc = FAILURE;
1356 }
1357
1358exit:
1359 if (rc == FAILURE)
1360 ;//MQTTCloseSession(c);
1361#if defined(MQTT_TASK)
1362 MutexUnlock(&c->mutex);
1363#endif
1364 return rc;
1365}
1366
1367int MQTTDisconnect(MQTTClient* c)
1368{
1369 int rc = FAILURE;
1370 Timer timer; // we might wait for incomplete incoming publishes to complete
1371 int len = 0;
1372
1373#if defined(MQTT_TASK)
1374 MutexLock(&c->mutex);
1375#endif
1376 TimerInit(&timer);
1377 TimerCountdownMS(&timer, c->command_timeout_ms);
1378
1379 len = MQTTSerialize_disconnect(c->buf, c->buf_size);
1380 if (len > 0)
1381 rc = sendPacket(c, len, &timer); // send the disconnect packet
1382 MQTTCloseSession(c);
1383
1384#if defined(MQTT_TASK)
1385 MutexUnlock(&c->mutex);
1386#endif
1387 return rc;
1388}
1389
1390int MQTTInit(MQTTClient* c, Network* n, unsigned char* sendBuf, unsigned char* readBuf)
1391{
1392 NetworkInit(n);
1393 MQTTClientInit(c, n, 40000, (unsigned char *)sendBuf, 1000, (unsigned char *)readBuf, 1000);
1394
1395 return 0;
1396}
1397
1398int MQTTCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)
1399{
1400 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
1401 int clientLen = 0;
1402 int usernameLen = 0;
1403 int passwordLen = 0;
1404
1405 if(connData != NULL)
1406 {
1407 memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
1408 }
1409 else
1410 {
1411 connectData.MQTTVersion = 4;
1412 connectData.keepAliveInterval = 120;
1413 }
1414
1415 if(clientID != NULL)
1416 {
1417 clientLen = strlen(clientID);
1418 connectData.clientID.cstring = malloc(clientLen+1);
1419 memset(connectData.clientID.cstring, 0, (clientLen+1));
1420 memcpy(connectData.clientID.cstring, clientID, clientLen);
1421 }
1422
1423 if(username != NULL)
1424 {
1425 usernameLen = strlen(username);
1426 connectData.username.cstring = malloc(usernameLen+1);;
1427 memset(connectData.username.cstring, 0, (usernameLen+1));
1428 memcpy(connectData.username.cstring, username, usernameLen);
1429 }
1430
1431 if(password != NULL)
1432 {
1433 passwordLen = strlen(password);
1434 connectData.password.cstring = malloc(passwordLen+1);
1435 memset(connectData.password.cstring, 0, (passwordLen+1));
1436 memcpy(connectData.password.cstring, password, passwordLen);
1437 }
1438
1439 {
1440 if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)
1441 {
1442 c->keepAliveInterval = connectData.keepAliveInterval;
1443 c->ping_outstanding = 1;
1444 return 1;
1445 }
1446 else
1447 {
1448 if ((MQTTConnect(c, &connectData)) != 0)
1449 {
1450 c->ping_outstanding = 1;
1451 return 1;
1452 }
1453 else
1454 {
1455 #if defined(MQTT_TASK)
1456 if ((MQTTStartTask(c)) != pdPASS)
1457 {
1458 return 1;
1459 }
1460 else
1461 {
1462 return 0;
1463 }
1464 #endif
1465 }
1466 }
1467 }
1468 return 1;
1469}
1470
1471#ifdef FEATURE_CUCC_DM_ENABLE
1472
1473int MQTTCuccCycle(MQTTClient* c, Timer* timer)
1474{
1475 int rc = SUCCESS;
1476
1477 int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
1478 //ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
1479 //ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
1480
1481 switch (packet_type)
1482 {
1483 default:
1484 /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
1485 rc = packet_type;
1486 break;
1487 case 0: /* timed out reading packet */
1488 break;
1489 case CONNACK:
1490 case PUBACK:
1491 case SUBACK:
1492 case UNSUBACK:
1493 if(packet_type == SUBACK)
1494 {
1495 //rc = packet_type;
1496 }
1497 break;
1498 case PUBLISH:
1499 {
1500 break;
1501 }
1502 case PUBREC:
1503 case PUBREL:
1504 {
1505 break;
1506 }
1507
1508 case PUBCOMP:
1509 break;
1510 case PINGRESP:
1511 c->ping_outstanding = 0;
1512 break;
1513 }
1514
1515 if (rc == SUCCESS)
1516 rc = packet_type;
1517 else if (c->isconnected)
1518 ;//MQTTCloseSession(c);
1519 return rc;
1520}
1521
1522int MQTTCuccWaitfor(MQTTClient* c, int packet_type, Timer* timer)
1523{
1524 int rc = FAILURE;
1525
1526 do
1527 {
1528 if (TimerIsExpired(timer))
1529 break; // we timed out
1530 rc = MQTTCuccCycle(c, timer);
1531 }
1532 while (rc != packet_type && rc >= 0);
1533
1534 return rc;
1535}
1536
1537int MQTTCuccCreate(MQTTClient* c, Network* n, char* clientID, char* username, char* password, char *serverAddr, int port, MQTTPacket_connectData* connData)
1538{
1539 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
1540 int clientLen = 0;
1541 int usernameLen = 0;
1542 int passwordLen = 0;
1543
1544 if(connData != NULL)
1545 {
1546 memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
1547 }
1548 else
1549 {
1550 connectData.MQTTVersion = 4;
1551 connectData.keepAliveInterval = 120;
1552 }
1553
1554 if(clientID != NULL)
1555 {
1556 clientLen = strlen(clientID);
1557 connectData.clientID.cstring = malloc(clientLen+1);
1558 memset(connectData.clientID.cstring, 0, (clientLen+1));
1559 memcpy(connectData.clientID.cstring, clientID, clientLen);
1560 }
1561
1562 if(username != NULL)
1563 {
1564 usernameLen = strlen(username);
1565 connectData.username.cstring = malloc(usernameLen+1);;
1566 memset(connectData.username.cstring, 0, (usernameLen+1));
1567 memcpy(connectData.username.cstring, username, usernameLen);
1568 }
1569
1570 if(password != NULL)
1571 {
1572 passwordLen = strlen(password);
1573 connectData.password.cstring = malloc(passwordLen+1);
1574 memset(connectData.password.cstring, 0, (passwordLen+1));
1575 memcpy(connectData.password.cstring, password, passwordLen);
1576 }
1577
1578 {
1579 if ((NetworkConnectTimeout(n, serverAddr, port, 5000, 5000)) != 0)
1580 {
1581 c->keepAliveInterval = connectData.keepAliveInterval;
1582 c->ping_outstanding = 1;
1583 return 1;
1584 }
1585 else
1586 {
1587 if ((MQTTConnect(c, &connectData)) != 0)
1588 {
1589 //c->ping_outstanding = 1;
1590 //return 1;
1591 }
1592 else
1593 {
1594 #if defined(MQTT_TASK)
1595 if ((MQTTStartTask(c)) != pdPASS)
1596 {
1597 return 1;
1598 }
1599 else
1600 {
1601 return 0;
1602 }
1603 #endif
1604 }
1605 }
1606 }
1607 return 0;
1608}
1609
1610int MQTTCuccConnect(MQTTClient* c, char* clientID, char* username, char* password, MQTTPacket_connectData* connData)
1611{
1612 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
1613 int clientLen = 0;
1614 int usernameLen = 0;
1615 int passwordLen = 0;
1616
1617 if(connData != NULL)
1618 {
1619 memcpy(&connectData, connData, sizeof(MQTTPacket_connectData));
1620 }
1621 else
1622 {
1623 connectData.MQTTVersion = 4;
1624 connectData.keepAliveInterval = 120;
1625 }
1626
1627 if(clientID != NULL)
1628 {
1629 clientLen = strlen(clientID);
1630 connectData.clientID.cstring = malloc(clientLen+1);
1631 memset(connectData.clientID.cstring, 0, (clientLen+1));
1632 memcpy(connectData.clientID.cstring, clientID, clientLen);
1633 }
1634
1635 if(username != NULL)
1636 {
1637 usernameLen = strlen(username);
1638 connectData.username.cstring = malloc(usernameLen+1);;
1639 memset(connectData.username.cstring, 0, (usernameLen+1));
1640 memcpy(connectData.username.cstring, username, usernameLen);
1641 }
1642
1643 if(password != NULL)
1644 {
1645 passwordLen = strlen(password);
1646 connectData.password.cstring = malloc(passwordLen+1);
1647 memset(connectData.password.cstring, 0, (passwordLen+1));
1648 memcpy(connectData.password.cstring, password, passwordLen);
1649 }
1650
1651 if ((MQTTConnect(c, &connectData)) != 0)
1652 {
1653 c->ping_outstanding = 1;
1654 return 1;
1655 }
1656
1657 return 0;
1658}
1659
1660int MQTTCuccPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
1661{
1662 int rc = FAILURE;
1663 Timer timer;
1664 MQTTString topic = MQTTString_initializer;
1665 topic.cstring = (char *)topicName;
1666 int len = 0;
1667
1668#if defined(MQTT_TASK)
1669 MutexLock(&c->mutex);
1670#endif
1671 if (!c->isconnected)
1672 goto exit;
1673
1674 TimerInit(&timer);
1675 TimerCountdownMS(&timer, c->command_timeout_ms);
1676
1677 if (message->qos == QOS1 || message->qos == QOS2)
1678 message->id = getNextPacketId(c);
1679
1680 len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
1681 topic, (unsigned char*)message->payload, message->payloadlen);
1682 if (len <= 0)
1683 goto exit;
1684 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
1685 goto exit; // there was a problem
1686
1687 if (message->qos == QOS1)
1688 {
1689 if (waitfor(c, PUBACK, &timer) == PUBACK)
1690 {
1691 unsigned short mypacketid;
1692 unsigned char dup, type;
1693 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
1694 rc = FAILURE;
1695 }
1696 else
1697 rc = FAILURE;
1698 }
1699 else if (message->qos == QOS2)
1700 {
1701 if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
1702 {
1703 unsigned short mypacketid;
1704 unsigned char dup, type;
1705 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
1706 rc = FAILURE;
1707 }
1708 else
1709 rc = FAILURE;
1710 }
1711
1712exit:
1713 if (rc == FAILURE)
1714 ;//MQTTCloseSession(c);
1715#if defined(MQTT_TASK)
1716 MutexUnlock(&c->mutex);
1717#endif
1718 return rc;
1719}
1720
1721int MQTTCuccSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
1722{
1723 int rc = FAILURE;
1724 Timer timer;
1725 int len = 0;
1726 int mqttQos = (int)qos;
1727 //MQTTSubackData data;
1728 MQTTString topic = MQTTString_initializer;
1729 topic.cstring = (char *)topicFilter;
1730
1731#if defined(MQTT_TASK)
1732 MutexLock(&c->mutex);
1733#endif
1734 if (!c->isconnected)
1735 goto exit;
1736
1737 TimerInit(&timer);
1738 TimerCountdownMS(&timer, 40000);
1739
1740 len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
1741 if (len <= 0)
1742 goto exit;
1743 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
1744 goto exit; // there was a problem
1745
1746 if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
1747 {
1748 int count = 0;
1749 unsigned short mypacketid;
1750 //data.grantedQoS = QOS0;
1751 mqttQos = QOS0;
1752 if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
1753 {
1754 if (mqttQos == 0x80)
1755 {
1756 rc = FAILURE;
1757 }
1758 else
1759 {
1760 rc = SUCCESS;
1761 }
1762 }
1763 }
1764 else
1765 {
1766 rc = FAILURE;
1767 }
1768
1769exit:
1770 if (rc == FAILURE)
1771 ;//MQTTCloseSession(c);
1772#if defined(MQTT_TASK)
1773 MutexUnlock(&c->mutex);
1774#endif
1775 return rc;
1776}
1777
1778
1779int MQTTCuccDisconnect(MQTTClient* c)
1780{
1781 int rc = FAILURE;
1782 Timer timer; // we might wait for incomplete incoming publishes to complete
1783 int len = 0;
1784
1785#if defined(MQTT_TASK)
1786 MutexLock(&c->mutex);
1787#endif
1788 TimerInit(&timer);
1789 TimerCountdownMS(&timer, c->command_timeout_ms);
1790
1791 len = MQTTSerialize_disconnect(c->buf, c->buf_size);
1792 if (len > 0)
1793 rc = sendPacket(c, len, &timer); // send the disconnect packet
1794 MQTTCloseSession(c);
1795
1796#if defined(MQTT_TASK)
1797 MutexUnlock(&c->mutex);
1798#endif
1799 return rc;
1800}
1801
1802int MQTTCuccWaitForRecv(MQTTClient* c, int packet_type, unsigned int timerOut, MQTTString *topicName, char *outPayload)
1803{
1804 int rc = FAILURE;
1805 Timer timer;
1806 MQTTMessage msg;
1807 int intQoS;
1808
1809 memset(&msg, 0, sizeof(MQTTMessage));
1810 TimerInit(&timer);
1811 TimerCountdownMS(&timer, timerOut);
1812
1813 do
1814 {
1815 if (TimerIsExpired(&timer))
1816 break; // we timed out
1817 rc = cycle(c, &timer);
1818 }
1819 while (rc != packet_type && rc >= 0);
1820
1821 if(rc == PUBLISH)
1822 {
1823 if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, topicName,
1824 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) == 1)
1825 {
1826 memcpy(outPayload, (unsigned char*)msg.payload, msg.payloadlen);
1827 rc = SUCCESS;
1828 }
1829 }
1830
1831 return rc;
1832}
1833
1834
1835#endif
1836
1837#define MQTT_DEMO_EXAMPLE_1_ONENET
1838void mqtt_demo_onenet(void)
1839{
1840 int len = 0;
1841 MQTTMessage message;
1842 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
1843
1844 connectData.MQTTVersion = 4;
1845 connectData.clientID.cstring = "34392813";
1846 connectData.username.cstring = "122343";
1847 connectData.password.cstring = "test001";
1848 connectData.keepAliveInterval = 120;
1849 //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh00, P_SIG, 0, "mqtt_demo........");
1850 ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet0, P_INFO, "mqtt_demo........");
1851 mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
1852 mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
1853 memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
1854 memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
1855
1856 NetworkInit(&mqttNetwork);
1857 MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);
1858
1859 if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)
1860 {
1861 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
1862 mqttClient.ping_outstanding = 1;
1863 }
1864 else
1865 {
1866 if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)
1867 {
1868 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
1869 mqttClient.ping_outstanding = 1;
1870 }
1871 else
1872 {
1873 ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet1, P_INFO, "mqtt_demo socket connect ok");
1874 if ((MQTTConnect(&mqttClient, &connectData)) != 0)
1875 {
1876 mqttClient.ping_outstanding = 1;
1877 }
1878 else
1879 {
1880 ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet2, P_INFO, "mqtt_demo connect fail");
1881 mqttClient.ping_outstanding = 0;
1882 }
1883 }
1884 if(mqttClient.ping_outstanding == 0)
1885 {
1886 if ((MQTTStartRECVTask()) != SUCCESS)
1887 ;
1888 }
1889
1890 }
1891 while(1)
1892 {
1893 sprintf(mqtt_payload,"{\"ec_smart_sensor_data\":%d}", ec_sensor_temp);
1894 len = strlen(mqtt_payload);
1895 ec_data_len = len;
1896 unsigned char *ptr = mqttJsonbuff;
1897 sprintf((char *)mqttJsonbuff,"%c%c%c%s", ec_data_type, ec_data_type,ec_data_type, mqtt_payload);
1898 message.payload = mqttJsonbuff;
1899 message.payloadlen = strlen((char *)mqttJsonbuff);
1900 writeChar(&ptr, ec_data_type);
1901 writeInt(&ptr, ec_data_len);
1902 ECPLAT_PRINTF(UNILOG_MQTT, mqtt_demo_onenet3, P_INFO, "mqtt_demo send data");
1903
1904 MQTTPublish(&mqttClient, "$dp", &message);
1905
1906 osDelay(10000);
1907 }
1908 /* do not return */
1909}
1910
1911#define MQTT_DEMO_EXAMPLE_2_ALI
1912#ifdef FEATURE_MQTT_TLS_ENABLE
1913void mqtt_demo_ali(void)
1914{
1915 //char *pub_topic;
1916 int len = 0;
1917 MQTTMessage message;
1918 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
1919 char hmac_source[256] = {0};
1920 char *ali_clientID = NULL;
1921 char *ali_username = NULL;
1922 char *ali_signature = NULL;
1923
1924 /* eigencomm ec_smoke a1xFDTv3InR sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg 8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */
1925 /* eigencomm deviceGrape a1fsx061r0x WLar6NunAcCJ0aZHbaNw4eQwdsYVKyC9 8964 ---"eigencomm|securemode=3,signmethod=hmacsha1,timestamp=8964|" "deviceGrape&a1fsx061r0x" */
1926 /* clientID deviceName productKey deviceSecret*/
1927 ali_clientID = malloc(128);
1928 ali_username = malloc(64);
1929 ali_signature = malloc(96);
1930
1931 memset(ali_clientID, 0, 128);
1932 memset(ali_username, 0, 64);
1933 memset(ali_signature, 0, 96);
1934
1935 snprintf(hmac_source, sizeof(hmac_source), "clientId%s" "deviceName%s" "productKey%s" "timestamp%s", "eigencomm", "ec_smoke", "a1xFDTv3InR", "8964");
1936
1937 mqttAliHmacSha1((unsigned char *)hmac_source, strlen(hmac_source), (unsigned char *)ali_signature,(unsigned char *)"sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg", strlen("sWZtNMYkODMxvauyaSiGeJrVEp9jZ4Tg"));
1938
1939 sprintf(ali_clientID,"%s|securemode=3,signmethod=hmacsha1,timestamp=8964|", "eigencomm");
1940 sprintf(ali_username,"%s&%s","ec_smoke","a1xFDTv3InR");
1941 connectData.clientID.cstring = ali_clientID;
1942 connectData.username.cstring = ali_username;
1943 connectData.password.cstring = ali_signature;
1944
1945 connectData.MQTTVersion = 4;
1946 connectData.keepAliveInterval = 120;
1947 //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh001, P_SIG, 0, "mqtt_demo........");
1948 mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
1949 mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
1950 memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
1951 memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
1952
1953 NetworkInit(&mqttNetwork);
1954 MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, 1000, (unsigned char *)mqttReadbuf, 1000);
1955
1956 if((NetworkSetConnTimeout(&mqttNetwork, 5000, 5000)) != 0)
1957 {
1958 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
1959 mqttClient.ping_outstanding = 1;
1960 }
1961 else
1962 {
1963 if ((NetworkConnect(&mqttNetwork, "a1xFDTv3InR.iot-as-mqtt.cn-shanghai.aliyuncs.com", 1883)) != 0)
1964 {
1965 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
1966 mqttClient.ping_outstanding = 1;
1967 }
1968 else
1969 {
1970 //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh012, P_SIG, 0, "mqtt_demo....1....");
1971 if ((MQTTConnect(&mqttClient, &connectData)) != 0)
1972 {
1973 mqttClient.ping_outstanding = 1;
1974 }
1975 else
1976 {
1977 //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh023, P_SIG, 0, "mqtt_demo....2....");
1978 mqttClient.ping_outstanding = 0;
1979 }
1980 }
1981 if(mqttClient.ping_outstanding == 0)
1982 {
1983 if ((MQTTStartRECVTask()) != SUCCESS)
1984 ;
1985 }
1986
1987 }
1988 while(1)
1989 {
1990 memset(mqtt_payload, 0, sizeof(mqtt_payload));
1991 memcpy(mqtt_payload, "update", strlen("update"));
1992 len = strlen(mqtt_payload);
1993 message.payload = mqtt_payload;
1994 message.payloadlen = len;
1995 //ECOMM_TRACE(UNILOG_MQTT, mqtt_hh034, P_SIG, 0, "mqtt_demo....3....");
1996
1997 MQTTPublish(&mqttClient, "a1xFDTv3InR/ec_smoke/user/get", &message);
1998
1999 osDelay(10000);
2000 }
2001 /* do not return */
2002}
2003#endif
2004#define MQTT_DEMO_EXAMPLE_3_APP
2005void mqtt_demo_send_task(void *argument)
2006{
2007 int ret = FAILURE;
2008 int msgType = 0xff;
2009 mqttSendMsg mqttMsg;
2010 mqttDataMsg mqttMessage;
2011 int socket_stat = -1;
2012 int socket_err = -1;
2013 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
2014
2015 connectData.MQTTVersion = 4;
2016 connectData.clientID.cstring = "34392813";
2017 connectData.username.cstring = "122343";
2018 connectData.password.cstring = "test001";
2019 connectData.keepAliveInterval = 120;
2020
2021 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_0, P_SIG, 0, "mqttSendTask........");
2022 mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
2023 mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
2024 memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
2025 memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
2026
2027 NetworkInit(&mqttNetwork);
2028 MQTTClientInit(&mqttClient, &mqttNetwork, 40000, (unsigned char *)mqttSendbuf, MQTT_SEND_BUFF_LEN, (unsigned char *)mqttReadbuf, MQTT_RECV_BUFF_LEN);
2029
2030 if((NetworkSetConnTimeout(&mqttNetwork, 10000, 10000)) != 0)
2031 {
2032 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
2033 mqttClient.ping_outstanding = 1;
2034 }
2035 else
2036 {
2037 if ((NetworkConnect(&mqttNetwork, "183.230.40.39", 6002)) != 0)
2038 {
2039 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
2040 mqttClient.ping_outstanding = 1;
2041 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_1, P_SIG, 0, "mqttSendTask..tcp connect fail....");
2042 }
2043 else
2044 {
2045 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_2, P_SIG, 0, "mqttSendTask..tcp connect ok....");
2046 if ((MQTTConnect(&mqttClient, &connectData)) != 0)
2047 {
2048 mqttClient.ping_outstanding = 1;
2049 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_3, P_SIG, 0, "mqttSendTask..mqtt connect fail....");
2050 }
2051 else
2052 {
2053 mqttClient.keepAliveInterval = connectData.keepAliveInterval;
2054 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4, P_SIG, 0, "mqttSendTask..mqtt connect ok....");
2055 mqtt_send_task_status_flag = 1;
2056 mqttClient.ping_outstanding = 0;
2057 }
2058 }
2059 }
2060
2061 /* sub topic*/
2062 //MQTTSubscribe(&mqttClient, topic_data, 0, NULL);
2063
2064 /*start recv task*/
2065 if(mqttClient.ping_outstanding == 0)
2066 {
2067 mqtt_demo_recv_task_init();
2068 }
2069 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4000, P_SIG, 0, "mqttSendTask..start recv msg....");
2070
2071 while(1)
2072 {
2073 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4001, P_SIG, 1, "mqttSendTask..recv msg hand=0x%x....",mqttSendMsgHandle);
2074 /* recv msg (block mode) */
2075 osMessageQueueGet(mqttSendMsgHandle, &mqttMsg, 0, osWaitForever);
2076 msgType = mqttMsg.cmdType;
2077 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_4002, P_SIG, 1, "mqttSendTask..recv msg=%d....",msgType);
2078
2079 switch(msgType)
2080 {
2081 case MQTT_DEMO_MSG_PUBLISH:
2082 /* send packet */
2083 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_500, P_INFO, 0, ".....start send mqtt publish packet.......");
2084 MutexLock(&mqttMutex1);
2085 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_5, P_INFO, 0, ".....start send mqtt publish packet.......");
2086 memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
2087 ret = MQTTPublish(&mqttClient, mqttMsg.topic, &mqttMsg.message);
2088
2089 if(mqttMsg.topic != NULL)
2090 {
2091 free(mqttMsg.topic);
2092 mqttMsg.topic = NULL;
2093 }
2094 if(mqttMsg.message.payload != NULL)
2095 {
2096 free(mqttMsg.message.payload);
2097 mqttMsg.message.payload = NULL;
2098 }
2099
2100 /* send result to at task */
2101 if(ret == SUCCESS)
2102 {
2103 memset(&mqttMessage, 0, sizeof(mqttMessage));
2104 mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;
2105 mqttMessage.ret = SUCCESS;
2106 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600, P_INFO, 0, ".....send mqtt publish packet ok.......");
2107
2108 osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);
2109 }
2110 else
2111 {
2112 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_6, P_INFO, 0, ".....send mqtt publish packet fail.......");
2113 socket_stat = sock_get_errno(mqttClient.ipstack->my_socket);
2114 socket_err = socket_error_is_fatal(socket_stat);
2115 if(socket_err == 1)
2116 {
2117 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_7, P_INFO, 0, ".....find need reconnect when publish packet.......");
2118 MQTTReConnect(&mqttClient, &connectData);
2119
2120 }
2121 else
2122 {
2123 memset(&mqttMessage, 0, sizeof(mqttMessage));
2124 mqttMessage.cmdType = MQTT_DEMO_MSG_PUBLISH_ACK;
2125 mqttMessage.ret = SUCCESS;
2126 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_600ii, P_INFO, 0, ".....send mqtt publish packet ok.......");
2127
2128 osMessageQueuePut(appMqttMsgHandle, &mqttMessage, 0, MQTT_MSG_TIMEOUT);
2129 }
2130 //memset(&mqttMessage, 0, sizeof(mqttMessage));
2131 //mqttMessage.cmdType = MQTT_MSG_RECONNECT;
2132 //mqttMessage.ret = FAILURE;
2133
2134 //xQueueSend(appMqttMsgHandle, &mqttMessage, MQTT_MSG_TIMEOUT);
2135 }
2136 MutexUnlock(&mqttMutex1);
2137 break;
2138#if 0
2139 case MQTT_DEMO_MSG_KEEPALIVE:
2140 /* send keepalive packet */
2141 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_8, P_INFO, 0, ".....start send mqtt keeplive packet.......");
2142 ret = keepalive(&mqttClient);
2143
2144 if(ret == SUCCESS) // send the ping packet
2145 {
2146 mqttClient->ping_outstanding = 1;
2147 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_9, P_INFO, 1, ".....mqtt keeplive send ok.......");
2148 }
2149 else
2150 {
2151 socket_stat = sock_get_errno(mqttNewContext->mqtt_client->ipstack->my_socket);
2152 socket_err = socket_error_is_fatal(socket_stat);
2153 if(socket_err == 1)
2154 {
2155 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_10, P_INFO, 0, ".....find need reconnect when send keeplive packet.......");
2156 ret = MQTT_RECONNECT;
2157 }
2158 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_11, P_INFO, 1, ".....mqtt send keeplive Packet fail");
2159 }
2160 break;
2161#endif
2162 case MQTT_DEMO_MSG_RECONNECT:
2163 MutexLock(&mqttMutex1);
2164 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_12, P_INFO, 0, ".....find need reconnect when read packet.......");
2165 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13, P_INFO, 0, "...start tcp disconnect ..");
2166 mqttClient.ipstack->disconnect(mqttClient.ipstack);
2167
2168 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_14, P_INFO, 0, "...start tcp connect ...");
2169 if ((NetworkSetConnTimeout(mqttClient.ipstack, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) != 0)
2170 {
2171 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_15, P_INFO, 0, "...tcp socket set timeout fail...");
2172 }
2173 else
2174 {
2175 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_16, P_INFO, 0, "...tcp connect ok...");
2176 mqttClient.isconnected = 0;
2177 if((NetworkConnect(mqttClient.ipstack, MQTT_SERVER_URI, MQTT_SERVER_PORT)) < 0)
2178 {
2179 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
2180 }
2181 else
2182 {
2183 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18, P_INFO, 0, "...start mqtt connect ..");
2184
2185 if ((MQTTConnect(&mqttClient, &connectData)) != 0)
2186 {
2187 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19, P_INFO, 0, "...mqtt reconnect fial!!!...");
2188 }
2189 else
2190 {
2191 //ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20, P_INFO, 0, "...mqtt reconnect ok!!!...");
2192 }
2193 }
2194 }
2195 MutexUnlock(&mqttMutex1);
2196 break;
2197 }
2198 }
2199}
2200
2201void app_mqtt_demo_task(void *argument)
2202{
2203 int msgType = 1;
2204 int payloadLen = 0;
2205 mqttSendMsg mqttMsg;
2206 mqttDataMsg mqttMessage;
2207 int ret = FAILURE;
2208
2209 /*init driver*/
2210
2211 /*start mqtt send task*/
2212 ret = mqtt_demo_send_task_init();
2213 if(ret == FAILURE)
2214 {
2215 ;
2216 }
2217 while(1)
2218 {
2219 if(mqtt_send_task_status_flag == 1)
2220 {
2221 break;
2222 }
2223 osDelay(4000);
2224 }
2225
2226 /*state machine*/
2227 while(1)
2228 {
2229 /*read data*/
2230
2231 osDelay(4000);
2232
2233 /*send msg to mqtt send task, and wait for excute result*/
2234 //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_0, P_SIG, 0, "appMqttTask...send start.....");
2235 memset(&mqttMsg, 0, sizeof(mqttSendMsg));
2236 mqttMsg.cmdType = MQTT_DEMO_MSG_PUBLISH;
2237
2238 mqttMsg.topic = malloc(128);
2239 memset(mqttMsg.topic, 0, 128);
2240 memcpy(mqttMsg.topic, "$dp", strlen("$dp"));
2241 mqttMsg.message.qos = QOS0;
2242 mqttMsg.message.retained = 0;
2243 mqttMsg.message.id = 0;
2244 mqttMsg.message.payload = malloc(32);
2245 memset(mqttMsg.message.payload, 0, 32);
2246 memcpy(mqttMsg.message.payload, "{\"data\":90}", strlen("{\"data\":90}"));
2247
2248 sprintf(mqttMsg.message.payload, "%c%c%c%s",msgType, msgType, msgType, "{\"data\":90}");
2249 payloadLen = strlen(mqttMsg.message.payload);
2250 unsigned char *ptr = (unsigned char *)mqttMsg.message.payload;
2251 writeChar(&ptr,3);
2252 writeInt(&ptr,strlen("{\"data\":90}"));
2253 mqttMsg.message.payloadlen = payloadLen;
2254
2255 osMessageQueuePut(mqttSendMsgHandle, &mqttMsg, 0, (2*MQTT_MSG_TIMEOUT));
2256
2257 osMessageQueueGet(appMqttMsgHandle, &mqttMessage, 0, cmsisMAX_DELAY);
2258 if(mqttMessage.cmdType == MQTT_DEMO_MSG_PUBLISH_ACK)
2259 {
2260 //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_1, P_SIG, 0, "appMqttTask...send ok.....");
2261 }
2262 else
2263 {
2264 //ECOMM_TRACE(UNILOG_MQTT, appMqttTask_2, P_SIG, 0, "appMqttTask...send fail.....");
2265 }
2266 }
2267}
2268
2269int mqtt_demo_recv_task_init(void)
2270{
2271 osThreadAttr_t task_attr;
2272
2273 memset(&task_attr, 0, sizeof(task_attr));
2274 task_attr.name = "mqttRecv";
2275#ifdef MBTK_OPENCPU_SUPPORT
2276 task_attr.stack_size = MQTT_RECV_DEMO_TASK_STACK_SIZE;
2277#else
2278 task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
2279#endif
2280 #if defined FEATURE_LITEOS_ENABLE
2281 task_attr.priority = osPriorityBelowNormal4;
2282 #elif defined FEATURE_FREERTOS_ENABLE
2283 task_attr.priority = osPriorityBelowNormal7;
2284 #endif
2285
2286 mqttRecvTaskHandle = osThreadNew(MQTTRun, NULL,&task_attr);
2287 if(mqttRecvTaskHandle == NULL)
2288 {
2289 return FAILURE;
2290 }
2291
2292 return SUCCESS;
2293}
2294
2295int mqtt_demo_send_task_init(void)
2296{
2297 osThreadAttr_t task_attr;
2298
2299 memset(&task_attr, 0, sizeof(task_attr));
2300 task_attr.name = "mqttSend";
2301 task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
2302 #if defined FEATURE_LITEOS_ENABLE
2303 task_attr.priority = osPriorityBelowNormal3;
2304 #elif defined FEATURE_FREERTOS_ENABLE
2305 task_attr.priority = osPriorityBelowNormal6;
2306 #endif
2307 mqttSendTaskHandle = osThreadNew(mqtt_demo_send_task, NULL,&task_attr);
2308 if(mqttSendTaskHandle == NULL)
2309 {
2310 return FAILURE;
2311 }
2312
2313 return SUCCESS;
2314}
2315
2316int app_mqtt_demo_task_init(void)
2317{
2318 osThreadAttr_t task_attr;
2319
2320 if(mqttSendMsgHandle == NULL)
2321 {
2322 mqttSendMsgHandle = osMessageQueueNew(16, sizeof(mqttSendMsg), NULL);
2323 }
2324
2325 if(appMqttMsgHandle == NULL)
2326 {
2327 appMqttMsgHandle = osMessageQueueNew(16, sizeof(mqttDataMsg), NULL);
2328 }
2329
2330 if(mqttMutex1.sem == NULL)
2331 {
2332 MutexInit(&mqttMutex1);
2333 }
2334
2335 memset(&task_attr, 0, sizeof(task_attr));
2336 task_attr.name = "appTask";
2337 task_attr.stack_size = MQTT_DEMO_TASK_STACK_SIZE;
2338 #if defined FEATURE_LITEOS_ENABLE
2339 task_attr.priority = osPriorityBelowNormal2;
2340 #elif defined FEATURE_FREERTOS_ENABLE
2341 task_attr.priority = osPriorityBelowNormal5;
2342 #endif
2343 appMqttTaskHandle = osThreadNew(app_mqtt_demo_task, NULL,&task_attr);
2344 if(appMqttTaskHandle == NULL)
2345 {
2346 return FAILURE;
2347 }
2348
2349 return SUCCESS;
2350}
2351
2352