blob: d653917ef1833c584b28443f6ccd85a5db153ed8 [file] [log] [blame]
xf.li6c8fc1e2023-08-12 00:11:09 -07001/***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 1998 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 * SPDX-License-Identifier: curl
22 *
23 ***************************************************************************/
24#include "server_setup.h"
25#include <stdlib.h>
26#include <string.h>
27#include "util.h"
28
29/* Function
30 *
31 * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT.
32 *
33 * Read commands from FILE (set with --config). The commands control how to
34 * act and is reset to defaults each client TCP connect.
35 *
36 * Config file keywords:
37 *
38 * TODO
39 */
40
41/* based on sockfilt.c */
42
43#ifdef HAVE_SIGNAL_H
44#include <signal.h>
45#endif
46#ifdef HAVE_NETINET_IN_H
47#include <netinet/in.h>
48#endif
49#ifdef HAVE_NETINET_IN6_H
50#include <netinet/in6.h>
51#endif
52#ifdef HAVE_ARPA_INET_H
53#include <arpa/inet.h>
54#endif
55#ifdef HAVE_NETDB_H
56#include <netdb.h>
57#endif
58
59#define ENABLE_CURLX_PRINTF
60/* make the curlx header define all printf() functions to use the curlx_*
61 versions instead */
62#include "curlx.h" /* from the private lib dir */
63#include "getpart.h"
64#include "inet_pton.h"
65#include "util.h"
66#include "server_sockaddr.h"
67#include "warnless.h"
68
69/* include memdebug.h last */
70#include "memdebug.h"
71
72#ifdef USE_WINSOCK
73#undef EINTR
74#define EINTR 4 /* errno.h value */
75#undef EAGAIN
76#define EAGAIN 11 /* errno.h value */
77#undef ENOMEM
78#define ENOMEM 12 /* errno.h value */
79#undef EINVAL
80#define EINVAL 22 /* errno.h value */
81#endif
82
83#define DEFAULT_PORT 1883 /* MQTT default port */
84
85#ifndef DEFAULT_LOGFILE
86#define DEFAULT_LOGFILE "log/mqttd.log"
87#endif
88
89#ifndef DEFAULT_CONFIG
90#define DEFAULT_CONFIG "mqttd.config"
91#endif
92
93#define MQTT_MSG_CONNECT 0x10
94#define MQTT_MSG_CONNACK 0x20
95#define MQTT_MSG_PUBLISH 0x30
96#define MQTT_MSG_PUBACK 0x40
97#define MQTT_MSG_SUBSCRIBE 0x82
98#define MQTT_MSG_SUBACK 0x90
99#define MQTT_MSG_DISCONNECT 0xe0
100
101#define MQTT_CONNACK_LEN 4
102#define MQTT_SUBACK_LEN 5
103#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
104#define MQTT_HEADER_LEN 5 /* max 5 bytes */
105
106struct configurable {
107 unsigned char version; /* initial version byte in the request must match
108 this */
109 bool publish_before_suback;
110 bool short_publish;
111 bool excessive_remaining;
112 unsigned char error_connack;
113 int testnum;
114};
115
116#define REQUEST_DUMP "log/server.input"
117#define CONFIG_VERSION 5
118
119static struct configurable config;
120
121const char *serverlogfile = DEFAULT_LOGFILE;
122static const char *configfile = DEFAULT_CONFIG;
123
124#ifdef ENABLE_IPV6
125static bool use_ipv6 = FALSE;
126#endif
127static const char *ipv_inuse = "IPv4";
128static unsigned short port = DEFAULT_PORT;
129
130static void resetdefaults(void)
131{
132 logmsg("Reset to defaults");
133 config.version = CONFIG_VERSION;
134 config.publish_before_suback = FALSE;
135 config.short_publish = FALSE;
136 config.excessive_remaining = FALSE;
137 config.error_connack = 0;
138 config.testnum = 0;
139}
140
141static unsigned char byteval(char *value)
142{
143 unsigned long num = strtoul(value, NULL, 10);
144 return num & 0xff;
145}
146
147static void getconfig(void)
148{
149 FILE *fp = fopen(configfile, FOPEN_READTEXT);
150 resetdefaults();
151 if(fp) {
152 char buffer[512];
153 logmsg("parse config file");
154 while(fgets(buffer, sizeof(buffer), fp)) {
155 char key[32];
156 char value[32];
157 if(2 == sscanf(buffer, "%31s %31s", key, value)) {
158 if(!strcmp(key, "version")) {
159 config.version = byteval(value);
160 logmsg("version [%d] set", config.version);
161 }
162 else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
163 logmsg("PUBLISH-before-SUBACK set");
164 config.publish_before_suback = TRUE;
165 }
166 else if(!strcmp(key, "short-PUBLISH")) {
167 logmsg("short-PUBLISH set");
168 config.short_publish = TRUE;
169 }
170 else if(!strcmp(key, "error-CONNACK")) {
171 config.error_connack = byteval(value);
172 logmsg("error-CONNACK = %d", config.error_connack);
173 }
174 else if(!strcmp(key, "Testnum")) {
175 config.testnum = atoi(value);
176 logmsg("testnum = %d", config.testnum);
177 }
178 else if(!strcmp(key, "excessive-remaining")) {
179 logmsg("excessive-remaining set");
180 config.excessive_remaining = TRUE;
181 }
182 }
183 }
184 fclose(fp);
185 }
186 else {
187 logmsg("No config file '%s' to read", configfile);
188 }
189}
190
191static void loghex(unsigned char *buffer, ssize_t len)
192{
193 char data[12000];
194 ssize_t i;
195 unsigned char *ptr = buffer;
196 char *optr = data;
197 ssize_t width = 0;
198 int left = sizeof(data);
199
200 for(i = 0; i<len && (left >= 0); i++) {
201 msnprintf(optr, left, "%02x", ptr[i]);
202 width += 2;
203 optr += 2;
204 left -= 2;
205 }
206 if(width)
207 logmsg("'%s'", data);
208}
209
210typedef enum {
211 FROM_CLIENT,
212 FROM_SERVER
213} mqttdir;
214
215static void logprotocol(mqttdir dir,
216 const char *prefix, size_t remlen,
217 FILE *output,
218 unsigned char *buffer, ssize_t len)
219{
220 char data[12000] = "";
221 ssize_t i;
222 unsigned char *ptr = buffer;
223 char *optr = data;
224 int left = sizeof(data);
225
226 for(i = 0; i<len && (left >= 0); i++) {
227 msnprintf(optr, left, "%02x", ptr[i]);
228 optr += 2;
229 left -= 2;
230 }
231 fprintf(output, "%s %s %zx %s\n",
232 dir == FROM_CLIENT? "client": "server",
233 prefix, remlen,
234 data);
235}
236
237
238/* return 0 on success */
239static int connack(FILE *dump, curl_socket_t fd)
240{
241 unsigned char packet[]={
242 MQTT_MSG_CONNACK, 0x02,
243 0x00, 0x00
244 };
245 ssize_t rc;
246
247 packet[3] = config.error_connack;
248
249 rc = swrite(fd, (char *)packet, sizeof(packet));
250 if(rc > 0) {
251 logmsg("WROTE %d bytes [CONNACK]", rc);
252 loghex(packet, rc);
253 logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
254 }
255 if(rc == sizeof(packet)) {
256 return 0;
257 }
258 return 1;
259}
260
261/* return 0 on success */
262static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
263{
264 unsigned char packet[]={
265 MQTT_MSG_SUBACK, 0x03,
266 0, 0, /* filled in below */
267 0x00
268 };
269 ssize_t rc;
270 packet[2] = (unsigned char)(packetid >> 8);
271 packet[3] = (unsigned char)(packetid & 0xff);
272
273 rc = swrite(fd, (char *)packet, sizeof(packet));
274 if(rc == sizeof(packet)) {
275 logmsg("WROTE %d bytes [SUBACK]", rc);
276 loghex(packet, rc);
277 logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
278 return 0;
279 }
280 return 1;
281}
282
283#ifdef QOS
284/* return 0 on success */
285static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
286{
287 unsigned char packet[]={
288 MQTT_MSG_PUBACK, 0x00,
289 0, 0 /* filled in below */
290 };
291 ssize_t rc;
292 packet[2] = (unsigned char)(packetid >> 8);
293 packet[3] = (unsigned char)(packetid & 0xff);
294
295 rc = swrite(fd, (char *)packet, sizeof(packet));
296 if(rc == sizeof(packet)) {
297 logmsg("WROTE %d bytes [PUBACK]", rc);
298 loghex(packet, rc);
299 logprotocol(FROM_SERVER, dump, packet, rc);
300 return 0;
301 }
302 logmsg("Failed sending [PUBACK]");
303 return 1;
304}
305#endif
306
307/* return 0 on success */
308static int disconnect(FILE *dump, curl_socket_t fd)
309{
310 unsigned char packet[]={
311 MQTT_MSG_DISCONNECT, 0x00,
312 };
313 ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
314 if(rc == sizeof(packet)) {
315 logmsg("WROTE %d bytes [DISCONNECT]", rc);
316 loghex(packet, rc);
317 logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
318 return 0;
319 }
320 logmsg("Failed sending [DISCONNECT]");
321 return 1;
322}
323
324
325
326/*
327 do
328
329 encodedByte = X MOD 128
330
331 X = X DIV 128
332
333 // if there are more data to encode, set the top bit of this byte
334
335 if ( X > 0 )
336
337 encodedByte = encodedByte OR 128
338
339 endif
340
341 'output' encodedByte
342
343 while ( X > 0 )
344
345*/
346
347/* return number of bytes used */
348static int encode_length(size_t packetlen,
349 unsigned char *remlength) /* 4 bytes */
350{
351 int bytes = 0;
352 unsigned char encode;
353
354 do {
355 encode = packetlen % 0x80;
356 packetlen /= 0x80;
357 if(packetlen)
358 encode |= 0x80;
359
360 remlength[bytes++] = encode;
361
362 if(bytes > 3) {
363 logmsg("too large packet!");
364 return 0;
365 }
366 } while(packetlen);
367
368 return bytes;
369}
370
371
372static size_t decode_length(unsigned char *buf,
373 size_t buflen, size_t *lenbytes)
374{
375 size_t len = 0;
376 size_t mult = 1;
377 size_t i;
378 unsigned char encoded = 0x80;
379
380 for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
381 encoded = buf[i];
382 len += (encoded & 0x7f) * mult;
383 mult *= 0x80;
384 }
385
386 if(lenbytes)
387 *lenbytes = i;
388
389 return len;
390}
391
392
393/* return 0 on success */
394static int publish(FILE *dump,
395 curl_socket_t fd, unsigned short packetid,
396 char *topic, char *payload, size_t payloadlen)
397{
398 size_t topiclen = strlen(topic);
399 unsigned char *packet;
400 size_t payloadindex;
401 ssize_t remaininglength = topiclen + 2 + payloadlen;
402 ssize_t packetlen;
403 ssize_t sendamount;
404 ssize_t rc;
405 unsigned char rembuffer[4];
406 int encodedlen;
407
408 if(config.excessive_remaining) {
409 /* manually set illegal remaining length */
410 rembuffer[0] = 0xff;
411 rembuffer[1] = 0xff;
412 rembuffer[2] = 0xff;
413 rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
414 encodedlen = 4;
415 }
416 else
417 encodedlen = encode_length(remaininglength, rembuffer);
418
419 /* one packet type byte (possibly two more for packetid) */
420 packetlen = remaininglength + encodedlen + 1;
421 packet = malloc(packetlen);
422 if(!packet)
423 return 1;
424
425 packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
426 memcpy(&packet[1], rembuffer, encodedlen);
427
428 (void)packetid;
429 /* packet_id if QoS is set */
430
431 packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
432 packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
433 memcpy(&packet[3 + encodedlen], topic, topiclen);
434
435 payloadindex = 3 + topiclen + encodedlen;
436 memcpy(&packet[payloadindex], payload, payloadlen);
437
438 sendamount = packetlen;
439 if(config.short_publish)
440 sendamount -= 2;
441
442 rc = swrite(fd, (char *)packet, sendamount);
443 if(rc > 0) {
444 logmsg("WROTE %d bytes [PUBLISH]", rc);
445 loghex(packet, rc);
446 logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
447 }
448 if(rc == packetlen)
449 return 0;
450 return 1;
451}
452
453#define MAX_TOPIC_LENGTH 65535
454#define MAX_CLIENT_ID_LENGTH 32
455
456static char topic[MAX_TOPIC_LENGTH + 1];
457
458static int fixedheader(curl_socket_t fd,
459 unsigned char *bytep,
460 size_t *remaining_lengthp,
461 size_t *remaining_length_bytesp)
462{
463 /* get the fixed header */
464 unsigned char buffer[10];
465
466 /* get the first two bytes */
467 ssize_t rc = sread(fd, (char *)buffer, 2);
468 int i;
469 if(rc < 2) {
470 logmsg("READ %d bytes [SHORT!]", rc);
471 return 1; /* fail */
472 }
473 logmsg("READ %d bytes", rc);
474 loghex(buffer, rc);
475 *bytep = buffer[0];
476
477 /* if the length byte has the top bit set, get the next one too */
478 i = 1;
479 while(buffer[i] & 0x80) {
480 i++;
481 rc = sread(fd, (char *)&buffer[i], 1);
482 if(rc != 1) {
483 logmsg("Remaining Length broken");
484 return 1;
485 }
486 }
487 *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
488 logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
489 *remaining_length_bytesp);
490 return 0;
491}
492
493static curl_socket_t mqttit(curl_socket_t fd)
494{
495 size_t buff_size = 10*1024;
496 unsigned char *buffer = NULL;
497 ssize_t rc;
498 unsigned char byte;
499 unsigned short packet_id;
500 size_t payload_len;
501 size_t client_id_length;
502 unsigned int topic_len;
503 size_t remaining_length = 0;
504 size_t bytes = 0; /* remaining length field size in bytes */
505 char client_id[MAX_CLIENT_ID_LENGTH];
506 long testno;
507 FILE *stream = NULL;
508
509
510 static const char protocol[7] = {
511 0x00, 0x04, /* protocol length */
512 'M','Q','T','T', /* protocol name */
513 0x04 /* protocol level */
514 };
515 FILE *dump = fopen(REQUEST_DUMP, "ab");
516 if(!dump)
517 goto end;
518
519 getconfig();
520
521 testno = config.testnum;
522
523 if(testno)
524 logmsg("Found test number %ld", testno);
525
526 buffer = malloc(buff_size);
527 if(!buffer) {
528 logmsg("Out of memory, unable to allocate buffer");
529 goto end;
530 }
531
532 do {
533 unsigned char usr_flag = 0x80;
534 unsigned char passwd_flag = 0x40;
535 unsigned char conn_flags;
536 const size_t client_id_offset = 12;
537 size_t start_usr;
538 size_t start_passwd;
539
540 /* get the fixed header */
541 rc = fixedheader(fd, &byte, &remaining_length, &bytes);
542 if(rc)
543 break;
544
545 if(remaining_length >= buff_size) {
546 buff_size = remaining_length;
547 buffer = realloc(buffer, buff_size);
548 if(!buffer) {
549 logmsg("Failed realloc of size %lu", buff_size);
550 goto end;
551 }
552 }
553
554 if(remaining_length) {
555 /* reading variable header and payload into buffer */
556 rc = sread(fd, (char *)buffer, remaining_length);
557 if(rc > 0) {
558 logmsg("READ %d bytes", rc);
559 loghex(buffer, rc);
560 }
561 }
562
563 if(byte == MQTT_MSG_CONNECT) {
564 logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
565 dump, buffer, rc);
566
567 if(memcmp(protocol, buffer, sizeof(protocol))) {
568 logmsg("Protocol preamble mismatch");
569 goto end;
570 }
571 /* ignore the connect flag byte and two keepalive bytes */
572 payload_len = (buffer[10] << 8) | buffer[11];
573 /* first part of the payload is the client ID */
574 client_id_length = payload_len;
575
576 /* checking if user and password flags were set */
577 conn_flags = buffer[7];
578
579 start_usr = client_id_offset + payload_len;
580 if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
581 logmsg("User flag is present in CONN flag");
582 payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1];
583 payload_len += 2; /* MSB and LSB for user length */
584 }
585
586 start_passwd = client_id_offset + payload_len;
587 if(passwd_flag == (char)(conn_flags & passwd_flag)) {
588 logmsg("Password flag is present in CONN flags");
589 payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1];
590 payload_len += 2; /* MSB and LSB for password length */
591 }
592
593 /* check the length of the payload */
594 if((ssize_t)payload_len != (rc - 12)) {
595 logmsg("Payload length mismatch, expected %x got %x",
596 rc - 12, payload_len);
597 goto end;
598 }
599 /* check the length of the client ID */
600 else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
601 logmsg("Too large client id");
602 goto end;
603 }
604 memcpy(client_id, &buffer[12], client_id_length);
605 client_id[client_id_length] = 0;
606
607 logmsg("MQTT client connect accepted: %s", client_id);
608
609 /* The first packet sent from the Server to the Client MUST be a
610 CONNACK Packet */
611
612 if(connack(dump, fd)) {
613 logmsg("failed sending CONNACK");
614 goto end;
615 }
616 }
617 else if(byte == MQTT_MSG_SUBSCRIBE) {
618 int error;
619 char *data;
620 size_t datalen;
621 logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
622 dump, buffer, rc);
623 logmsg("Incoming SUBSCRIBE");
624
625 if(rc < 6) {
626 logmsg("Too small SUBSCRIBE");
627 goto end;
628 }
629
630 /* two bytes packet id */
631 packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
632
633 /* two bytes topic length */
634 topic_len = (buffer[2] << 8) | buffer[3];
635 if(topic_len != (remaining_length - 5)) {
636 logmsg("Wrong topic length, got %d expected %d",
637 topic_len, remaining_length - 5);
638 goto end;
639 }
640 memcpy(topic, &buffer[4], topic_len);
641 topic[topic_len] = 0;
642
643 /* there's a QoS byte (two bits) after the topic */
644
645 logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
646 stream = test2fopen(testno);
647 error = getpart(&data, &datalen, "reply", "data", stream);
648 if(!error) {
649 if(!config.publish_before_suback) {
650 if(suback(dump, fd, packet_id)) {
651 logmsg("failed sending SUBACK");
652 goto end;
653 }
654 }
655 if(publish(dump, fd, packet_id, topic, data, datalen)) {
656 logmsg("PUBLISH failed");
657 goto end;
658 }
659 if(config.publish_before_suback) {
660 if(suback(dump, fd, packet_id)) {
661 logmsg("failed sending SUBACK");
662 goto end;
663 }
664 }
665 }
666 else {
667 char *def = (char *)"this is random payload yes yes it is";
668 publish(dump, fd, packet_id, topic, def, strlen(def));
669 }
670 disconnect(dump, fd);
671 }
672 else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
673 size_t topiclen;
674
675 logmsg("Incoming PUBLISH");
676 logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
677 dump, buffer, rc);
678
679 topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
680 logmsg("Got %d bytes topic", topiclen);
681 /* TODO: verify topiclen */
682
683#ifdef QOS
684 /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
685 puback(dump, fd, 0);
686#endif
687 /* expect a disconnect here */
688 /* get the request */
689 rc = sread(fd, (char *)&buffer[0], 2);
690
691 logmsg("READ %d bytes [DISCONNECT]", rc);
692 loghex(buffer, rc);
693 logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
694 goto end;
695 }
696 else {
697 /* not supported (yet) */
698 goto end;
699 }
700 } while(1);
701
702 end:
703 if(buffer)
704 free(buffer);
705 if(dump)
706 fclose(dump);
707 if(stream)
708 fclose(stream);
709 return CURL_SOCKET_BAD;
710}
711
712/*
713 sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
714
715 if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
716 accept()
717*/
718static bool incoming(curl_socket_t listenfd)
719{
720 fd_set fds_read;
721 fd_set fds_write;
722 fd_set fds_err;
723 int clients = 0; /* connected clients */
724
725 if(got_exit_signal) {
726 logmsg("signalled to die, exiting...");
727 return FALSE;
728 }
729
730#ifdef HAVE_GETPPID
731 /* As a last resort, quit if socks5 process becomes orphan. */
732 if(getppid() <= 1) {
733 logmsg("process becomes orphan, exiting");
734 return FALSE;
735 }
736#endif
737
738 do {
739 ssize_t rc;
740 int error = 0;
741 curl_socket_t sockfd = listenfd;
742 int maxfd = (int)sockfd;
743
744 FD_ZERO(&fds_read);
745 FD_ZERO(&fds_write);
746 FD_ZERO(&fds_err);
747
748 /* there's always a socket to wait for */
749 FD_SET(sockfd, &fds_read);
750
751 do {
752 /* select() blocking behavior call on blocking descriptors please */
753 rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
754 if(got_exit_signal) {
755 logmsg("signalled to die, exiting...");
756 return FALSE;
757 }
758 } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
759
760 if(rc < 0) {
761 logmsg("select() failed with error: (%d) %s",
762 error, strerror(error));
763 return FALSE;
764 }
765
766 if(FD_ISSET(sockfd, &fds_read)) {
767 curl_socket_t newfd = accept(sockfd, NULL, NULL);
768 if(CURL_SOCKET_BAD == newfd) {
769 error = SOCKERRNO;
770 logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
771 sockfd, error, strerror(error));
772 }
773 else {
774 logmsg("====> Client connect, fd %d. Read config from %s",
775 newfd, configfile);
776 set_advisor_read_lock(SERVERLOGS_LOCK);
777 (void)mqttit(newfd); /* until done */
778 clear_advisor_read_lock(SERVERLOGS_LOCK);
779
780 logmsg("====> Client disconnect");
781 sclose(newfd);
782 }
783 }
784 } while(clients);
785
786 return TRUE;
787}
788
789static curl_socket_t sockdaemon(curl_socket_t sock,
790 unsigned short *listenport)
791{
792 /* passive daemon style */
793 srvr_sockaddr_union_t listener;
794 int flag;
795 int rc;
796 int totdelay = 0;
797 int maxretr = 10;
798 int delay = 20;
799 int attempt = 0;
800 int error = 0;
801
802 do {
803 attempt++;
804 flag = 1;
805 rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
806 (void *)&flag, sizeof(flag));
807 if(rc) {
808 error = SOCKERRNO;
809 logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
810 error, strerror(error));
811 if(maxretr) {
812 rc = wait_ms(delay);
813 if(rc) {
814 /* should not happen */
815 logmsg("wait_ms() failed with error: %d", rc);
816 sclose(sock);
817 return CURL_SOCKET_BAD;
818 }
819 if(got_exit_signal) {
820 logmsg("signalled to die, exiting...");
821 sclose(sock);
822 return CURL_SOCKET_BAD;
823 }
824 totdelay += delay;
825 delay *= 2; /* double the sleep for next attempt */
826 }
827 }
828 } while(rc && maxretr--);
829
830 if(rc) {
831 logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
832 attempt, totdelay, error, strerror(error));
833 logmsg("Continuing anyway...");
834 }
835
836 /* When the specified listener port is zero, it is actually a
837 request to let the system choose a non-zero available port. */
838
839#ifdef ENABLE_IPV6
840 if(!use_ipv6) {
841#endif
842 memset(&listener.sa4, 0, sizeof(listener.sa4));
843 listener.sa4.sin_family = AF_INET;
844 listener.sa4.sin_addr.s_addr = INADDR_ANY;
845 listener.sa4.sin_port = htons(*listenport);
846 rc = bind(sock, &listener.sa, sizeof(listener.sa4));
847#ifdef ENABLE_IPV6
848 }
849 else {
850 memset(&listener.sa6, 0, sizeof(listener.sa6));
851 listener.sa6.sin6_family = AF_INET6;
852 listener.sa6.sin6_addr = in6addr_any;
853 listener.sa6.sin6_port = htons(*listenport);
854 rc = bind(sock, &listener.sa, sizeof(listener.sa6));
855 }
856#endif /* ENABLE_IPV6 */
857 if(rc) {
858 error = SOCKERRNO;
859 logmsg("Error binding socket on port %hu: (%d) %s",
860 *listenport, error, strerror(error));
861 sclose(sock);
862 return CURL_SOCKET_BAD;
863 }
864
865 if(!*listenport) {
866 /* The system was supposed to choose a port number, figure out which
867 port we actually got and update the listener port value with it. */
868 curl_socklen_t la_size;
869 srvr_sockaddr_union_t localaddr;
870#ifdef ENABLE_IPV6
871 if(!use_ipv6)
872#endif
873 la_size = sizeof(localaddr.sa4);
874#ifdef ENABLE_IPV6
875 else
876 la_size = sizeof(localaddr.sa6);
877#endif
878 memset(&localaddr.sa, 0, (size_t)la_size);
879 if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
880 error = SOCKERRNO;
881 logmsg("getsockname() failed with error: (%d) %s",
882 error, strerror(error));
883 sclose(sock);
884 return CURL_SOCKET_BAD;
885 }
886 switch(localaddr.sa.sa_family) {
887 case AF_INET:
888 *listenport = ntohs(localaddr.sa4.sin_port);
889 break;
890#ifdef ENABLE_IPV6
891 case AF_INET6:
892 *listenport = ntohs(localaddr.sa6.sin6_port);
893 break;
894#endif
895 default:
896 break;
897 }
898 if(!*listenport) {
899 /* Real failure, listener port shall not be zero beyond this point. */
900 logmsg("Apparently getsockname() succeeded, with listener port zero.");
901 logmsg("A valid reason for this failure is a binary built without");
902 logmsg("proper network library linkage. This might not be the only");
903 logmsg("reason, but double check it before anything else.");
904 sclose(sock);
905 return CURL_SOCKET_BAD;
906 }
907 }
908
909 /* start accepting connections */
910 rc = listen(sock, 5);
911 if(0 != rc) {
912 error = SOCKERRNO;
913 logmsg("listen(%d, 5) failed with error: (%d) %s",
914 sock, error, strerror(error));
915 sclose(sock);
916 return CURL_SOCKET_BAD;
917 }
918
919 return sock;
920}
921
922
923int main(int argc, char *argv[])
924{
925 curl_socket_t sock = CURL_SOCKET_BAD;
926 curl_socket_t msgsock = CURL_SOCKET_BAD;
927 int wrotepidfile = 0;
928 int wroteportfile = 0;
929 const char *pidname = ".mqttd.pid";
930 const char *portname = ".mqttd.port";
931 bool juggle_again;
932 int error;
933 int arg = 1;
934
935 while(argc>arg) {
936 if(!strcmp("--version", argv[arg])) {
937 printf("mqttd IPv4%s\n",
938#ifdef ENABLE_IPV6
939 "/IPv6"
940#else
941 ""
942#endif
943 );
944 return 0;
945 }
946 else if(!strcmp("--pidfile", argv[arg])) {
947 arg++;
948 if(argc>arg)
949 pidname = argv[arg++];
950 }
951 else if(!strcmp("--portfile", argv[arg])) {
952 arg++;
953 if(argc>arg)
954 portname = argv[arg++];
955 }
956 else if(!strcmp("--config", argv[arg])) {
957 arg++;
958 if(argc>arg)
959 configfile = argv[arg++];
960 }
961 else if(!strcmp("--logfile", argv[arg])) {
962 arg++;
963 if(argc>arg)
964 serverlogfile = argv[arg++];
965 }
966 else if(!strcmp("--ipv6", argv[arg])) {
967#ifdef ENABLE_IPV6
968 ipv_inuse = "IPv6";
969 use_ipv6 = TRUE;
970#endif
971 arg++;
972 }
973 else if(!strcmp("--ipv4", argv[arg])) {
974 /* for completeness, we support this option as well */
975#ifdef ENABLE_IPV6
976 ipv_inuse = "IPv4";
977 use_ipv6 = FALSE;
978#endif
979 arg++;
980 }
981 else if(!strcmp("--port", argv[arg])) {
982 arg++;
983 if(argc>arg) {
984 char *endptr;
985 unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
986 if((endptr != argv[arg] + strlen(argv[arg])) ||
987 ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
988 fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
989 argv[arg]);
990 return 0;
991 }
992 port = curlx_ultous(ulnum);
993 arg++;
994 }
995 }
996 else {
997 puts("Usage: mqttd [option]\n"
998 " --config [file]\n"
999 " --version\n"
1000 " --logfile [file]\n"
1001 " --pidfile [file]\n"
1002 " --portfile [file]\n"
1003 " --ipv4\n"
1004 " --ipv6\n"
1005 " --port [port]\n");
1006 return 0;
1007 }
1008 }
1009
1010#ifdef WIN32
1011 win32_init();
1012 atexit(win32_cleanup);
1013
1014 setmode(fileno(stdin), O_BINARY);
1015 setmode(fileno(stdout), O_BINARY);
1016 setmode(fileno(stderr), O_BINARY);
1017#endif
1018
1019 install_signal_handlers(FALSE);
1020
1021#ifdef ENABLE_IPV6
1022 if(!use_ipv6)
1023#endif
1024 sock = socket(AF_INET, SOCK_STREAM, 0);
1025#ifdef ENABLE_IPV6
1026 else
1027 sock = socket(AF_INET6, SOCK_STREAM, 0);
1028#endif
1029
1030 if(CURL_SOCKET_BAD == sock) {
1031 error = SOCKERRNO;
1032 logmsg("Error creating socket: (%d) %s",
1033 error, strerror(error));
1034 goto mqttd_cleanup;
1035 }
1036
1037 {
1038 /* passive daemon style */
1039 sock = sockdaemon(sock, &port);
1040 if(CURL_SOCKET_BAD == sock) {
1041 goto mqttd_cleanup;
1042 }
1043 msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
1044 }
1045
1046 logmsg("Running %s version", ipv_inuse);
1047 logmsg("Listening on port %hu", port);
1048
1049 wrotepidfile = write_pidfile(pidname);
1050 if(!wrotepidfile) {
1051 goto mqttd_cleanup;
1052 }
1053
1054 wroteportfile = write_portfile(portname, port);
1055 if(!wroteportfile) {
1056 goto mqttd_cleanup;
1057 }
1058
1059 do {
1060 juggle_again = incoming(sock);
1061 } while(juggle_again);
1062
1063mqttd_cleanup:
1064
1065 if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
1066 sclose(msgsock);
1067
1068 if(sock != CURL_SOCKET_BAD)
1069 sclose(sock);
1070
1071 if(wrotepidfile)
1072 unlink(pidname);
1073 if(wroteportfile)
1074 unlink(portname);
1075
1076 restore_signal_handlers(FALSE);
1077
1078 if(got_exit_signal) {
1079 logmsg("============> mqttd exits with signal (%d)", exit_signal);
1080 /*
1081 * To properly set the return status of the process we
1082 * must raise the same signal SIGINT or SIGTERM that we
1083 * caught and let the old handler take care of it.
1084 */
1085 raise(exit_signal);
1086 }
1087
1088 logmsg("============> mqttd quits");
1089 return 0;
1090}