blob: a7380aed85fcbe56495e6d15c7ccae7cfa70f8f8 [file] [log] [blame]
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "lynq_mqtt/lynq_mqtt.h"
#include "liblog/lynq_deflog.h"
#define HOST "58.246.1.50" // IP
#define PORT 63743
#define KEEP_ALIVE 60
static int mqtt_session = 0;
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
char *str_payload = NULL;
if(message->payloadlen){
LYVERBLOG("+[mqtt][message%d]: topic:%s---data:%s\n",mqtt_session,message->topic, (char *)message->payload);//lt mod @2021.7.13 for mqtt_session
str_payload = malloc(message->payloadlen);
memset(str_payload,0,message->payloadlen);
memcpy(str_payload,message->payload,message->payloadlen);
printf("0x:");//lt add @2021.7.13 for debug
for(int i = 0; i < (message->payloadlen); i++)
{
printf(" %02x",str_payload[i]);
}
printf("\n");
free(str_payload);
str_payload = NULL;
}else{
LYVERBLOG("+[mqtt][message%d]: %s (null)\n",mqtt_session,message->topic); //lt mod @2021.7.13 for mqtt_session
}
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
if(!result){
LYVERBLOG("+[mqtt][connect][session%d]: ok!!\n", mqtt_session);
}else{
LYVERBLOG("+[mqtt][connect][session%d]: error num = %d\n", mqtt_session,MQTT_CONNECT_FAIL);
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
if(!result){
LYVERBLOG("+[mqtt][disconnect][session%d]: ok!!\n", mqtt_session);
}else{
LYVERBLOG("+[mqtt][disconnect][session%d]: error num = %d\n", mqtt_session,MQTT_DISCONNECT_FAIL);
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
LYVERBLOG("+[mqtt][subscribe][session%d]: ok!!\n", mqtt_session);
}
void my_unsubscribe_callback(struct mosquitto *mosq, void *obj, int mid)//lt add @2021.7.13 for unsubscribe callback function
{
LYVERBLOG("+[mqtt][unsubscribe][session%d]: ok!!\n", mqtt_session);
}
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
LYVERBLOG("+[mqtt][publish][session%d]: ok!!\n", mqtt_session);
}
struct mosquitto *mosq = NULL;
void lynq_mosquitto_set_message_callback(void (*message_callback)(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message))
{
mosquitto_message_callback_set(mosq, message_callback);
}
void lynq_mosquitto_set_connect_callback(void (*connect_callback)(struct mosquitto *mosq, void *userdata, int result))
{
mosquitto_connect_callback_set(mosq, connect_callback);
}
void lynq_mosquitto_set_subscribe_callback(void (*subscribe_callback)(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos))
{
mosquitto_subscribe_callback_set(mosq, subscribe_callback); //lt mod @2021.7.13 for deleate my_subscribe_callback add subscribe_callback
}
void lynq_mosquitto_set_unsubscribe_callback(void (*on_unsubscribe)(struct mosquitto *mosq, void *obj, int mid)) //lt add @2021.7.13 for set unsubscribe callback function
{
mosquitto_unsubscribe_callback_set(mosq,on_unsubscribe);
}
void lynq_mosquitto_set_publish_callback(void (*on_publish)(struct mosquitto *mosq, void *obj, int mid))
{
mosquitto_publish_callback_set(mosq,on_publish);
}
void lynq_mosquitto_set_disconnect_callback( void (*disconnect_callback)( struct mosquitto *mosq,void *obj, int rc))
{
mosquitto_disconnect_callback_set(mosq,disconnect_callback);
}
int lynq_mqtt_connect_init(struct mqtt_set_parament mqtt_message)
{
int ret;
//libmosquitto ¿â³õʼ»¯
mosquitto_lib_init();
if(mqtt_message.client_id != NULL)
{
mosq = mosquitto_new(mqtt_message.client_id,mqtt_message.clean_session,NULL);//´´½¨mosquitto¿Í»§¶Ë
}else{
mosq = mosquitto_new(NULL,true,NULL);//´´½¨mosquitto¿Í»§¶Ë
}
if(!mosq){
LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,CREATE_LIENT_ERROR);
mosquitto_lib_cleanup();
return CREATE_LIENT_ERROR;
}
lynq_mosquitto_set_connect_callback(my_connect_callback);
lynq_mosquitto_set_message_callback(my_message_callback);
lynq_mosquitto_set_subscribe_callback(my_subscribe_callback);
lynq_mosquitto_set_unsubscribe_callback(my_unsubscribe_callback); //lt add @2021.7.13 for set unsubscribe callback
lynq_mosquitto_set_disconnect_callback(my_disconnect_callback);
lynq_mosquitto_set_publish_callback(my_publish_callback);
// ÉèÖÃÁ¬½ÓµÄÓû§ÃûÓëÃÜÂë:£¬
// ²ÎÊý£º¾ä±ú¡¢Óû§Ãû¡¢ÃÜÂë
ret = mosquitto_username_pw_set(mosq, mqtt_message.usrname, mqtt_message.pwd);
if(ret){
// LYVERBLOG("+[mqtt][init]: set username and password error!\n");
LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,USER_PWD_ERROR);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return USER_PWD_ERROR;
}
if(mqtt_message.set_will.will_flag == 1)
{
mosquitto_will_set( mosq,mqtt_message.set_will.will_topic,mqtt_message.set_will.will_payloadlen,mqtt_message.set_will.will_payload,mqtt_message.set_will.will_qos,mqtt_message.set_will.retain);
}
//Á¬½Ó·þÎñÆ÷
if(mosquitto_connect(mosq, mqtt_message.ip_addr, mqtt_message.port, mqtt_message.keep_time)){
// LYVERBLOG("+[mqtt][init]: unable to connect!\n");
LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,CONNECT_FAIL);
return CONNECT_FAIL;
}
//¿ªÆôÒ»¸öỊ̈߳¬ÔÚÏß³ÌÀﲻͣµÄµ÷Óà mosquitto_loop() À´´¦ÀíÍøÂçÐÅÏ¢
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
// LYVERBLOG("+[mqtt][init]: mosquitto loop error\n");
LYVERBLOG("+[mqtt][init][session%d]: error num = %d\n", mqtt_message.session,CREATE_LOOP_FAIL);
return CREATE_LOOP_FAIL;
}
return NO_ERROR;
}
void lynq_mosquitto_publish_message(char *pub_topic,int pub_buf_len,char *pub_buf,int qos)
{
mosquitto_publish(mosq,NULL,pub_topic,pub_buf_len,pub_buf,qos,0);
}
void lynq_mosquitto_subscribe_message(char *sub_topic,int sub_qos)
{
mosquitto_subscribe(mosq, NULL, sub_topic, sub_qos);
}
void lynq_mosquitto_unsubscribe_message(char *unsub_topic) //lt add @2021.7.13 for unsubscribe function
{
mosquitto_unsubscribe(mosq,NULL,unsub_topic);
}
void lynq_mosquitto_disconnect(void)
{
mosquitto_disconnect(mosq);
}
#if 1
struct mqtt_set_parament mqtt_set_data;
void factory_mqtt_test(void)
{
char buff[10] = {"123456789"};
buff[10] = '\0';
mqtt_set_data.usrname = "admin";
mqtt_set_data.pwd = "password";
mqtt_set_data.ip_addr = HOST;
mqtt_set_data.port = PORT;
mqtt_set_data.keep_time = KEEP_ALIVE;
lynq_mqtt_connect_init(mqtt_set_data);
lynq_mosquitto_set_connect_callback(my_connect_callback);
lynq_mosquitto_set_message_callback(my_message_callback);
lynq_mosquitto_set_subscribe_callback(my_subscribe_callback);
lynq_mosquitto_set_disconnect_callback(my_disconnect_callback);
lynq_mosquitto_set_publish_callback(my_publish_callback);
lynq_mosquitto_subscribe_message("/S2C/DATA ",2);
for(int i = 1000; i > 1; i--)
{
if((i % 10) == 0)
{
/*·¢²¼ÏûÏ¢*/
lynq_mosquitto_publish_message("/S2C/DATA ",10,buff,0);
}
sleep(1);
memset(buff,0,10);
}
}
#endif
int lynq_mqtt_login(struct mqtt_set_parament mqtt_message)
{
mqtt_session = mqtt_message.session;
lynq_mqtt_connect_init(mqtt_message);
}