blob: a7380aed85fcbe56495e6d15c7ccae7cfa70f8f8 [file] [log] [blame]
xjb04a4022021-11-25 15:01:52 +08001#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4#include <unistd.h>
5#include "lynq_mqtt/lynq_mqtt.h"
6#include "liblog/lynq_deflog.h"
7
8#define HOST "58.246.1.50" // IP
9#define PORT 63743
10
11#define KEEP_ALIVE 60
12
13
14static int mqtt_session = 0;
15
16
17void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
18{
19 char *str_payload = NULL;
20 if(message->payloadlen){
21 LYVERBLOG("+[mqtt][message%d]: topic:%s---data:%s\n",mqtt_session,message->topic, (char *)message->payload);//lt mod @2021.7.13 for mqtt_session
22 str_payload = malloc(message->payloadlen);
23 memset(str_payload,0,message->payloadlen);
24 memcpy(str_payload,message->payload,message->payloadlen);
25printf("0x:");//lt add @2021.7.13 for debug
26for(int i = 0; i < (message->payloadlen); i++)
27{
28printf(" %02x",str_payload[i]);
29}
30printf("\n");
31 free(str_payload);
32 str_payload = NULL;
33 }else{
34 LYVERBLOG("+[mqtt][message%d]: %s (null)\n",mqtt_session,message->topic); //lt mod @2021.7.13 for mqtt_session
35 }
36}
37
38void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
39{
40 int i;
41 if(!result){
42 LYVERBLOG("+[mqtt][connect][session%d]: ok!!\n", mqtt_session);
43 }else{
44 LYVERBLOG("+[mqtt][connect][session%d]: error num = %d\n", mqtt_session,MQTT_CONNECT_FAIL);
45 }
46}
47
48void my_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
49{
50 int i;
51 if(!result){
52 LYVERBLOG("+[mqtt][disconnect][session%d]: ok!!\n", mqtt_session);
53 }else{
54 LYVERBLOG("+[mqtt][disconnect][session%d]: error num = %d\n", mqtt_session,MQTT_DISCONNECT_FAIL);
55 }
56}
57
58void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
59{
60 LYVERBLOG("+[mqtt][subscribe][session%d]: ok!!\n", mqtt_session);
61}
62
63void my_unsubscribe_callback(struct mosquitto *mosq, void *obj, int mid)//lt add @2021.7.13 for unsubscribe callback function
64{
65 LYVERBLOG("+[mqtt][unsubscribe][session%d]: ok!!\n", mqtt_session);
66}
67
68void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
69{
70 LYVERBLOG("+[mqtt][publish][session%d]: ok!!\n", mqtt_session);
71}
72
73struct mosquitto *mosq = NULL;
74
75void lynq_mosquitto_set_message_callback(void (*message_callback)(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message))
76{
77 mosquitto_message_callback_set(mosq, message_callback);
78}
79
80
81void lynq_mosquitto_set_connect_callback(void (*connect_callback)(struct mosquitto *mosq, void *userdata, int result))
82{
83 mosquitto_connect_callback_set(mosq, connect_callback);
84}
85void lynq_mosquitto_set_subscribe_callback(void (*subscribe_callback)(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos))
86{
87 mosquitto_subscribe_callback_set(mosq, subscribe_callback); //lt mod @2021.7.13 for deleate my_subscribe_callback add subscribe_callback
88}
89
90void lynq_mosquitto_set_unsubscribe_callback(void (*on_unsubscribe)(struct mosquitto *mosq, void *obj, int mid)) //lt add @2021.7.13 for set unsubscribe callback function
91{
92 mosquitto_unsubscribe_callback_set(mosq,on_unsubscribe);
93}
94
95void lynq_mosquitto_set_publish_callback(void (*on_publish)(struct mosquitto *mosq, void *obj, int mid))
96{
97 mosquitto_publish_callback_set(mosq,on_publish);
98}
99
100void lynq_mosquitto_set_disconnect_callback( void (*disconnect_callback)( struct mosquitto *mosq,void *obj, int rc))
101{
102 mosquitto_disconnect_callback_set(mosq,disconnect_callback);
103}
104int lynq_mqtt_connect_init(struct mqtt_set_parament mqtt_message)
105{
106
107 int ret;
108 //libmosquitto ¿â³õʼ»¯
109 mosquitto_lib_init();
110
111 if(mqtt_message.client_id != NULL)
112 {
113 mosq = mosquitto_new(mqtt_message.client_id,mqtt_message.clean_session,NULL);//´´½¨mosquitto¿Í»§¶Ë
114 }else{
115 mosq = mosquitto_new(NULL,true,NULL);//´´½¨mosquitto¿Í»§¶Ë
116 }
117
118 if(!mosq){
119 LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,CREATE_LIENT_ERROR);
120 mosquitto_lib_cleanup();
121 return CREATE_LIENT_ERROR;
122 }
123
124
125 lynq_mosquitto_set_connect_callback(my_connect_callback);
126 lynq_mosquitto_set_message_callback(my_message_callback);
127 lynq_mosquitto_set_subscribe_callback(my_subscribe_callback);
128 lynq_mosquitto_set_unsubscribe_callback(my_unsubscribe_callback); //lt add @2021.7.13 for set unsubscribe callback
129 lynq_mosquitto_set_disconnect_callback(my_disconnect_callback);
130 lynq_mosquitto_set_publish_callback(my_publish_callback);
131
132 // ÉèÖÃÁ¬½ÓµÄÓû§ÃûÓëÃÜÂë:£¬
133 // ²ÎÊý£º¾ä±ú¡¢Óû§Ãû¡¢ÃÜÂë
134 ret = mosquitto_username_pw_set(mosq, mqtt_message.usrname, mqtt_message.pwd);
135 if(ret){
136// LYVERBLOG("+[mqtt][init]: set username and password error!\n");
137 LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,USER_PWD_ERROR);
138 mosquitto_destroy(mosq);
139 mosquitto_lib_cleanup();
140 return USER_PWD_ERROR;
141 }
142
143 if(mqtt_message.set_will.will_flag == 1)
144 {
145 mosquitto_will_set( mosq,mqtt_message.set_will.will_topic,mqtt_message.set_will.will_payloadlen,mqtt_message.set_will.will_payload,mqtt_message.set_will.will_qos,mqtt_message.set_will.retain);
146 }
147 //Á¬½Ó·þÎñÆ÷
148 if(mosquitto_connect(mosq, mqtt_message.ip_addr, mqtt_message.port, mqtt_message.keep_time)){
149// LYVERBLOG("+[mqtt][init]: unable to connect!\n");
150 LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,CONNECT_FAIL);
151 return CONNECT_FAIL;
152 }
153 //¿ªÆôÒ»¸öỊ̈߳¬ÔÚÏß³ÌÀﲻͣµÄµ÷Óà mosquitto_loop() À´´¦ÀíÍøÂçÐÅÏ¢
154 int loop = mosquitto_loop_start(mosq);
155 if(loop != MOSQ_ERR_SUCCESS)
156 {
157// LYVERBLOG("+[mqtt][init]: mosquitto loop error\n");
158 LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,CREATE_LOOP_FAIL);
159 return CREATE_LOOP_FAIL;
160 }
161 return NO_ERROR;
162}
163
164void lynq_mosquitto_publish_message(char *pub_topic,int pub_buf_len,char *pub_buf,int qos)
165{
166 mosquitto_publish(mosq,NULL,pub_topic,pub_buf_len,pub_buf,qos,0);
167}
168
169void lynq_mosquitto_subscribe_message(char *sub_topic,int sub_qos)
170{
171 mosquitto_subscribe(mosq, NULL, sub_topic, sub_qos);
172}
173
174void lynq_mosquitto_unsubscribe_message(char *unsub_topic) //lt add @2021.7.13 for unsubscribe function
175{
176 mosquitto_unsubscribe(mosq,NULL,unsub_topic);
177}
178
179void lynq_mosquitto_disconnect(void)
180{
181 mosquitto_disconnect(mosq);
182}
183
184#if 1
185struct mqtt_set_parament mqtt_set_data;
186void factory_mqtt_test(void)
187{
188 char buff[10] = {"123456789"};
189 buff[10] = '\0';
190
191 mqtt_set_data.usrname = "admin";
192 mqtt_set_data.pwd = "password";
193 mqtt_set_data.ip_addr = HOST;
194 mqtt_set_data.port = PORT;
195 mqtt_set_data.keep_time = KEEP_ALIVE;
196
197
198 lynq_mqtt_connect_init(mqtt_set_data);
199
200 lynq_mosquitto_set_connect_callback(my_connect_callback);
201 lynq_mosquitto_set_message_callback(my_message_callback);
202 lynq_mosquitto_set_subscribe_callback(my_subscribe_callback);
203 lynq_mosquitto_set_disconnect_callback(my_disconnect_callback);
204 lynq_mosquitto_set_publish_callback(my_publish_callback);
205
206 lynq_mosquitto_subscribe_message("/S2C/DATA ",2);
207
208 for(int i = 1000; i > 1; i--)
209 {
210 if((i % 10) == 0)
211 {
212 /*·¢²¼ÏûÏ¢*/
213 lynq_mosquitto_publish_message("/S2C/DATA ",10,buff,0);
214 }
215 sleep(1);
216 memset(buff,0,10);
217 }
218}
219#endif
220
221
222int lynq_mqtt_login(struct mqtt_set_parament mqtt_message)
223{
224 mqtt_session = mqtt_message.session;
225 lynq_mqtt_connect_init(mqtt_message);
226}
227