blob: b197a85e84e8dcf4f5bb9568843947b5419775f8 [file] [log] [blame]
liubin281ac462023-07-19 14:22:54 +08001/*******************************************************************************
2 * Copyright (c) 2014 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
15 *******************************************************************************/
16
17#include "MQTTClient.h"
18
19regnwl_info_t regnwl_info = {0};
20
21
22int core_json_value(const char *input, uint32_t input_len, const char *key, uint32_t key_len, char **value,
23 uint32_t *value_len)
24{
25 int idx = 0;
26
27 for (idx = 0; idx < input_len; idx++) {
28 if (idx + key_len >= input_len) {
29 return -1;
30 }
31 if ((memcmp(&input[idx], key, key_len) == 0) &&
32 ((idx > 0) && (input[idx - 1] == '"')) &&
33 ((idx + key_len < input_len) && (input[idx + key_len] == '"'))) {
34 idx += key_len;
35 /* shortest ":x, or ":x} or ":x] */
36 if ((idx + 2 >= input_len) ||
37 (input[idx + 1] != ':')) {
38 return -1;
39 }
40 idx += 2;
41 if (input[idx] == '"') {
42 *value = (char *)&input[++idx];
43 for (; idx < input_len; idx++) {
44 if ((input[idx] == '"')) {
45 *value_len = (uint32_t)(idx - (*value - input));
46 return SUCCESS;
47 }
48 }
49 } else if (input[idx] == '{' || input[idx] == '[') {
50 char start = input[idx];
51 char end = (start == '{') ? ('}') : (']');
52 uint8_t count = 0;
53 *value = (char *)&input[idx];
54 for (; idx < input_len; idx++) {
55 if ((input[idx] == start)) {
56 count++;
57 } else if ((input[idx] == end)) {
58 if (--count == 0) {
59 *value_len = (uint32_t)(idx - (*value - input) + 1);
60 return SUCCESS;
61 }
62 }
63 }
64 } else {
65 *value = (char *)&input[idx];
66 for (; idx < input_len; idx++) {
67 if ((input[idx] == ',' || input[idx] == ']' || input[idx] == '}')) {
68 *value_len = (uint32_t)(idx - (*value - input));
69 return SUCCESS;
70 }
71 }
72 }
73 }
74 }
75
76 return -1;
77}
78
79
80void mbtk_ali_auth_regnwl_sava(MQTTMessage msg)
81{
b.liu62240ee2024-11-07 17:52:45 +080082 printf("pub, payload: %d,%s\n", msg.payloadlen, (char*)msg.payload);
liubin281ac462023-07-19 14:22:54 +080083 int32_t res = SUCCESS;
84
85 char *client_key = "clientId",*deviceToken_key="deviceToken";
86 uint32_t clientId_value_len = 0,deviceToken_value_len = 0;
87
88 char *client_value=NULL,*deviceToken_value=NULL;
89 if ((res = core_json_value((char *)msg.payload, msg.payloadlen,
90 client_key, strlen(client_key), &client_value, &clientId_value_len)) == SUCCESS)
91 {
92 memset(regnwl_info.clientId,0,clientId_value_len+1);
93 memcpy(regnwl_info.clientId,client_value,clientId_value_len);
94 printf("client_value:%s,\n regnwl_info->clientId:%s\n", client_value, regnwl_info.clientId);
95 }
96
97 if ((res = core_json_value((char *)msg.payload, msg.payloadlen,
98 deviceToken_key, strlen(deviceToken_key), &deviceToken_value, &deviceToken_value_len)) == SUCCESS)
99 {
100 memset(regnwl_info.deviceToken,0,deviceToken_value_len+1);
101 memcpy(regnwl_info.deviceToken,deviceToken_value,deviceToken_value_len);
102 printf("deviceToken_value:%s,\n regnwl_info->deviceToken:%s\n", deviceToken_value, regnwl_info.deviceToken);
103 }
104
105}
106
107
108void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage) {
109 md->topicName = aTopicName;
110 md->message = aMessgage;
111}
112
113
114int getNextPacketId(Client *c) {
115 return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
116}
117
118
119int sendPacket(Client* c, int length, Timer* timer)
120{
b.liu62240ee2024-11-07 17:52:45 +0800121 int rc = FAILURE,
liubin281ac462023-07-19 14:22:54 +0800122 sent = 0;
b.liu62240ee2024-11-07 17:52:45 +0800123
liubin281ac462023-07-19 14:22:54 +0800124 while (sent < length && !expired(timer))
125 {
126 rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer));
127 if (rc < 0) // there was an error writing the data
128 break;
129 sent += rc;
130 }
131 printf("rc ----%d,sent----%d,length----%d\n",rc,sent,length);
132 if (sent == length)
133 {
b.liu62240ee2024-11-07 17:52:45 +0800134 countdown(&c->ping_timer, c->keepAliveInterval); // record the fact that we have successfully sent the packet
liubin281ac462023-07-19 14:22:54 +0800135 rc = SUCCESS;
136 }
137 else
138 rc = FAILURE;
139 return rc;
140}
141
142
143void MQTTClient(Client* c, Network* network, unsigned int command_timeout_ms, unsigned char* buf, size_t buf_size, unsigned char* readbuf, size_t readbuf_size)
144{
145 int i;
146 c->ipstack = network;
b.liu62240ee2024-11-07 17:52:45 +0800147
liubin281ac462023-07-19 14:22:54 +0800148 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
149 c->messageHandlers[i].topicFilter = 0;
150 c->command_timeout_ms = command_timeout_ms;
151 c->buf = buf;
152 c->buf_size = buf_size;
153 c->readbuf = readbuf;
154 c->readbuf_size = readbuf_size;
155 c->isconnected = 0;
156 c->ping_outstanding = 0;
157 c->defaultMessageHandler = NULL;
158 InitTimer(&c->ping_timer);
159}
160
161
162int decodePacket(Client* c, int* value, int timeout)
163{
164 unsigned char i;
165 int multiplier = 1;
166 int len = 0;
167 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
168
169 *value = 0;
170 do
171 {
172 int rc = MQTTPACKET_READ_ERROR;
173
174 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
175 {
176 rc = MQTTPACKET_READ_ERROR; /* bad data */
177 goto exit;
178 }
179 rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
180 if (rc != 1)
181 goto exit;
182 *value += (i & 127) * multiplier;
183 multiplier *= 128;
184 } while ((i & 128) != 0);
185exit:
186 return len;
187}
188
189
b.liu62240ee2024-11-07 17:52:45 +0800190int readPacket(Client* c, Timer* timer)
liubin281ac462023-07-19 14:22:54 +0800191{
192 int rc = FAILURE;
193 MQTTHeader header = {0};
194 int len = 0;
195 int rem_len = 0;
196
197 /* 1. read the header byte. This has the packet type in it */
198 memset(c->readbuf,0x0,c->readbuf_size);
199 if ((rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, left_ms(timer))) != 1)
200 goto exit;
201
202 len = 1;
203 /* 2. read the remaining length. This is variable in itself */
204 decodePacket(c, &rem_len, left_ms(timer));
205 len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
206
207 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
208 if (rem_len > 0 && ((rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, left_ms(timer)) )!= rem_len))
209 goto exit;
210
211 header.byte = c->readbuf[0];
212 rc = header.bits.type;
213exit:
214 return rc;
215}
216
217
218// assume topic filter and name is in correct format
219// # can only be at end
220// + and # can only be next to separator
221char isTopicMatched(char* topicFilter, MQTTString* topicName)
222{
223 char* curf = topicFilter;
224 char* curn = topicName->lenstring.data;
225 char* curn_end = curn + topicName->lenstring.len;
b.liu62240ee2024-11-07 17:52:45 +0800226
liubin281ac462023-07-19 14:22:54 +0800227 while (*curf && curn < curn_end)
228 {
229 if (*curn == '/' && *curf != '/')
230 break;
231 if (*curf != '+' && *curf != '#' && *curf != *curn)
232 break;
233 if (*curf == '+')
234 { // skip until we meet the next separator, or end of string
235 char* nextpos = curn + 1;
236 while (nextpos < curn_end && *nextpos != '/')
237 nextpos = ++curn + 1;
238 }
239 else if (*curf == '#')
240 curn = curn_end - 1; // skip until end of string
241 curf++;
242 curn++;
243 };
b.liu62240ee2024-11-07 17:52:45 +0800244
liubin281ac462023-07-19 14:22:54 +0800245 return (curn == curn_end) && (*curf == '\0');
246}
247
248
249int deliverMessage(Client* c, MQTTString* topicName, MQTTMessage* message)
250{
251 int i;
252 int rc = FAILURE;
253
254 // we have to find the right message handler - indexed by topic
255 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
256 {
257 if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
258 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
259 {
260 if (c->messageHandlers[i].fp != NULL)
261 {
262 MessageData md;
263 NewMessageData(&md, topicName, message);
264 c->messageHandlers[i].fp(&md);
265 rc = SUCCESS;
266 }
267 }
268 }
b.liu62240ee2024-11-07 17:52:45 +0800269
270 if (rc == FAILURE && c->defaultMessageHandler != NULL)
liubin281ac462023-07-19 14:22:54 +0800271 {
272 MessageData md;
273 NewMessageData(&md, topicName, message);
274 c->defaultMessageHandler(&md);
275 rc = SUCCESS;
b.liu62240ee2024-11-07 17:52:45 +0800276 }
277
liubin281ac462023-07-19 14:22:54 +0800278 return rc;
279}
280
281
282int keepalive(Client* c)
283{
284 int rc = SUCCESS;
285 if (c->keepAliveInterval == 0)
286 {
287 rc = SUCCESS;
288 goto exit;
289 }
b.liu62240ee2024-11-07 17:52:45 +0800290
liubin281ac462023-07-19 14:22:54 +0800291 if (expired(&c->ping_timer))
292 {
293 if (!c->ping_outstanding)
294 {
295 Timer timer;
296 InitTimer(&timer);
297 countdown_ms(&timer, 1000);
298 int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
299 printf("len %d\n",len);
300 if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
301 c->ping_outstanding = 1;
302 printf("sendPacket---------------------------------------------\n");
303 }
304 else
305 {
b.liu62240ee2024-11-07 17:52:45 +0800306 printf("fail_count:%d-------------\n",c->fail_count);
liubin281ac462023-07-19 14:22:54 +0800307 ++(c->fail_count);
308 if (c->fail_count >= MAX_FAIL_ALLOWED)
309 {
310 rc = DISCONNECTED;
311 goto exit;
312 }
313
314 }
315 countdown(&(c->ping_timer), c->keepAliveInterval);
316 }
317 //printf("keepalive rc is %d\n",rc);
318exit:
319 return rc;
320}
321
322
323int cycle(Client* c, Timer* timer)
324{
325 int len = 0,
326 rc = SUCCESS;
327 // read the socket, see what work is due
328 short packet_type = readPacket(c, timer);
329
330 if(packet_type <= 0)
331 {
332 //printf("[%s][%d]readPacket retrun socket close \n",__FUNCTION__, __LINE__);
333 //printf("[%s]....packet_type = %d\n", __FUNCTION__, packet_type);
334 rc = DISCONNECTED;
335 goto exit;
336 }
b.liu62240ee2024-11-07 17:52:45 +0800337
liubin281ac462023-07-19 14:22:54 +0800338 switch (packet_type)
339 {
340 case CONNACK:
341 case PUBACK:
342 case SUBACK:
343 break;
344 case PUBLISH:
345 {
346 memset(regnwl_info.deviceToken,0,sizeof(regnwl_info.deviceToken));
347 memset(regnwl_info.clientId,0,sizeof(regnwl_info.clientId));
348 MQTTString topicName;
349 MQTTMessage msg;
350 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
351 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
352 goto exit;
353 if(memcmp(topicName.lenstring.data,"/ext/regnwl",strlen("/ext/regnwl")) == 0)
354 {
355 mbtk_ali_auth_regnwl_sava(msg);
356 }
357 deliverMessage(c, &topicName, &msg);
358 if (msg.qos != QOS0)
359 {
360 if (msg.qos == QOS1)
361 len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
362 else if (msg.qos == QOS2)
363 len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
364 if (len <= 0)
365 rc = FAILURE;
366 else
367 rc = sendPacket(c, len, timer);
368 if (rc == FAILURE)
369 {
370 goto exit; // there was a problem
371 }
372
373 }
374 break;
375 }
376 case PUBREC:
377 {
378 unsigned short mypacketid;
379 unsigned char dup, type;
380 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
381 rc = FAILURE;
382 else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0)
383 rc = FAILURE;
384 else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
385 rc = FAILURE; // there was a problem
386 if (rc == FAILURE)
387 {
388 goto exit; // there was a problem
389 }
390
391 break;
392 }
393 case PUBCOMP:
394 break;
395 case PINGRESP:
396 c->ping_outstanding = 0;
397 c->fail_count = 0;
398 break;
399 }
400
401exit:
402 if (rc == SUCCESS)
403 rc = packet_type;
404
405;
406 return rc;
407}
408
409
410int MQTTYield(Client* c, int timeout_ms)
411{
412 int rc = SUCCESS;
413 Timer timer;
414
b.liu62240ee2024-11-07 17:52:45 +0800415 InitTimer(&timer);
liubin281ac462023-07-19 14:22:54 +0800416 countdown_ms(&timer, timeout_ms);
b.liu62240ee2024-11-07 17:52:45 +0800417// static int i = 0;
liubin281ac462023-07-19 14:22:54 +0800418 if (c->isconnected)
419 rc = keepalive(c);
420 if(rc < 0)
421 {
422 return rc;
423 }
424/*
425 while (!expired(&timer))
426 {
427 rc = cycle(c, &timer);
428 if (rc == DISCONNECTED)
429 {
430 printf("cycle DISCONNECTED \n");
431 break;
432 }
433 rc = SUCCESS;
434 }
435*/
436 do
437 {
b.liu62240ee2024-11-07 17:52:45 +0800438 if (expired(&timer))
liubin281ac462023-07-19 14:22:54 +0800439 break; // we timed out
440 }
b.liu62240ee2024-11-07 17:52:45 +0800441 while ((rc = cycle(c, &timer)) != PUBLISH);
liubin281ac462023-07-19 14:22:54 +0800442 rc = SUCCESS;
443 return rc;
444}
445
446
447// only used in single-threaded mode where one command at a time is in process
448int waitfor(Client* c, int packet_type, Timer* timer)
449{
450 int rc = FAILURE;
451 do
452 {
b.liu62240ee2024-11-07 17:52:45 +0800453 if (expired(timer))
liubin281ac462023-07-19 14:22:54 +0800454 break; // we timed out
455 }
b.liu62240ee2024-11-07 17:52:45 +0800456 while ((rc = cycle(c, timer)) != packet_type && (rc = cycle(c, timer)) != DISCONNECTED);
457
liubin281ac462023-07-19 14:22:54 +0800458 return rc;
459}
460
461
462int MQTTConnect(Client* c, MQTTPacket_connectData* options)
463{
464 Timer connect_timer;
465 int rc = FAILURE;
466 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
467 int len = 0;
468 InitTimer(&connect_timer);
469 countdown_ms(&connect_timer, c->command_timeout_ms);
470
471 if (c->isconnected) // don't send connect packet again if we are already connected
472 goto exit;
473
474 if (options == 0)
475 options = &default_options; // set default options if none were supplied
b.liu62240ee2024-11-07 17:52:45 +0800476
liubin281ac462023-07-19 14:22:54 +0800477 c->keepAliveInterval = options->keepAliveInterval;
478 countdown(&c->ping_timer, c->keepAliveInterval);
479 printf("[%s]c->keepAliveInterval = %d", __FUNCTION__,c->keepAliveInterval);
480 if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
481 goto exit;
482 if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
483 goto exit; // there was a problem
b.liu62240ee2024-11-07 17:52:45 +0800484
liubin281ac462023-07-19 14:22:54 +0800485 // this will be a blocking call, wait for the connack
486 if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
487 {
488 unsigned char connack_rc = 255;
489 char sessionPresent = 0;
490 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1)
491 rc = connack_rc;
492 else
493 rc = FAILURE;
494 }
495 else
496 rc = FAILURE;
b.liu62240ee2024-11-07 17:52:45 +0800497
liubin281ac462023-07-19 14:22:54 +0800498exit:
499 if (rc == SUCCESS)
500 c->isconnected = 1;
501 return rc;
502}
503
504
505int MQTTSubscribe(Client* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
b.liu62240ee2024-11-07 17:52:45 +0800506{
507 int rc = FAILURE;
liubin281ac462023-07-19 14:22:54 +0800508 Timer timer;
509 int len = 0;
510 MQTTString topic = MQTTString_initializer;
511 topic.cstring = (char *)topicFilter;
b.liu62240ee2024-11-07 17:52:45 +0800512
liubin281ac462023-07-19 14:22:54 +0800513 InitTimer(&timer);
514 countdown_ms(&timer, c->command_timeout_ms);
515
516 if (!c->isconnected)
517 goto exit;
b.liu62240ee2024-11-07 17:52:45 +0800518
liubin281ac462023-07-19 14:22:54 +0800519 len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
520 if (len <= 0)
521 goto exit;
522 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
523 goto exit; // there was a problem
b.liu62240ee2024-11-07 17:52:45 +0800524
525 if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
liubin281ac462023-07-19 14:22:54 +0800526 {
527 int count = 0, grantedQoS = -1;
528 unsigned short mypacketid;
529 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1)
b.liu62240ee2024-11-07 17:52:45 +0800530 rc = grantedQoS; // 0, 1, 2 or 0x80
liubin281ac462023-07-19 14:22:54 +0800531 if (rc != 0x80)
532 {
533 int i;
534 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
535 {
536 if (c->messageHandlers[i].topicFilter == 0)
537 {
538 c->messageHandlers[i].topicFilter = topicFilter;
539 c->messageHandlers[i].fp = messageHandler;
540 rc = 0;
541 break;
542 }
543 }
544 }
545 }
b.liu62240ee2024-11-07 17:52:45 +0800546 else
liubin281ac462023-07-19 14:22:54 +0800547 rc = FAILURE;
b.liu62240ee2024-11-07 17:52:45 +0800548
liubin281ac462023-07-19 14:22:54 +0800549exit:
550 return rc;
551}
552
553int messageHandlersFindIndex(Client* c, const char* topicFilter)
554{
555 int i;
556 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
557 {
558 if ((c->messageHandlers[i].topicFilter != NULL)
559 && (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0))
560 {
561 return i;
562 }
563 }
564 return -1;
565}
566
567int MQTTUnsubscribe(Client* c, const char* topicFilter)
b.liu62240ee2024-11-07 17:52:45 +0800568{
liubin281ac462023-07-19 14:22:54 +0800569 int rc = FAILURE;
b.liu62240ee2024-11-07 17:52:45 +0800570 Timer timer;
liubin281ac462023-07-19 14:22:54 +0800571 MQTTString topic = MQTTString_initializer;
572 topic.cstring = (char *)topicFilter;
573 int len = 0;
574
575 InitTimer(&timer);
576 countdown_ms(&timer, c->command_timeout_ms);
b.liu62240ee2024-11-07 17:52:45 +0800577
liubin281ac462023-07-19 14:22:54 +0800578 if (!c->isconnected)
579 goto exit;
580 if(messageHandlersFindIndex(c,topicFilter) < 0)
581 {
582 printf("This topic is not subscribed and cannot be unsubscribed.\n");
583 goto exit;
584 }
585 if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
586 goto exit;
587 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
588 goto exit; // there was a problem
589 if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
590 {
591 unsigned short mypacketid; // should be the same as the packetid above
592 if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
593 {
594 int i;
595 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
596 {
597 if ((c->messageHandlers[i].topicFilter != NULL)
598 && (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0))
599 {
600 c->messageHandlers[i].topicFilter=NULL;
b.liu62240ee2024-11-07 17:52:45 +0800601 rc = 0;
liubin281ac462023-07-19 14:22:54 +0800602 }
603 }
604 }
605 }
606 else
607 rc = FAILURE;
b.liu62240ee2024-11-07 17:52:45 +0800608
liubin281ac462023-07-19 14:22:54 +0800609exit:
610 return rc;
611}
612
613
614int MQTTPublish(Client* c, const char* topicName, MQTTMessage* message)
615{
616 int rc = FAILURE;
b.liu62240ee2024-11-07 17:52:45 +0800617 Timer timer;
liubin281ac462023-07-19 14:22:54 +0800618 MQTTString topic = MQTTString_initializer;
619 topic.cstring = (char *)topicName;
620 int len = 0;
621
622 InitTimer(&timer);
623 countdown_ms(&timer, c->command_timeout_ms);
b.liu62240ee2024-11-07 17:52:45 +0800624
liubin281ac462023-07-19 14:22:54 +0800625 if (!c->isconnected)
626 goto exit;
627 if (message->qos == QOS1 || message->qos == QOS2)
628 message->id = getNextPacketId(c);
b.liu62240ee2024-11-07 17:52:45 +0800629
630 len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
liubin281ac462023-07-19 14:22:54 +0800631 topic, (unsigned char*)message->payload, message->payloadlen);
632
633 if (len <= 0)
634 goto exit;
635 if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
636 goto exit; // there was a problem
637
638 if (message->qos == QOS1)
639 {
640 if (waitfor(c, PUBACK, &timer) == PUBACK)
641 {
642 unsigned short mypacketid;
643 unsigned char dup, type;
644 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
645 rc = FAILURE;
646 }
647 else
648 rc = FAILURE;
649 }
650 else if (message->qos == QOS2)
651 {
652 if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
653 {
654 unsigned short mypacketid;
655 unsigned char dup, type;
656 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
657 rc = FAILURE;
658 }
659 else
660 rc = FAILURE;
661 }
b.liu62240ee2024-11-07 17:52:45 +0800662
liubin281ac462023-07-19 14:22:54 +0800663exit:
664 return rc;
665}
666
667
668int MQTTDisconnect(Client* c ,Network * n)
b.liu62240ee2024-11-07 17:52:45 +0800669{
liubin281ac462023-07-19 14:22:54 +0800670 int rc = FAILURE;
671 Timer timer; // we might wait for incomplete incoming publishes to complete
672 int len = MQTTSerialize_disconnect(c->buf, c->buf_size);
673
674 InitTimer(&timer);
675 countdown_ms(&timer, c->command_timeout_ms);
676
677 if (len > 0)
678 rc = sendPacket(c, len, &timer); // send the disconnect packet
b.liu62240ee2024-11-07 17:52:45 +0800679
liubin281ac462023-07-19 14:22:54 +0800680 c->isconnected = 0;
681 n->disconnect(n);
682 return rc;
683}
684