blob: 922a7e7e8c408e459c80a7187d0f8ec9f5247c71 [file] [log] [blame]
#include "mqtt_manager.h"
#include "common.h"
#include <log/log.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
static struct list_head mqtt_list;
static MQTT_LIST_LINK_S* mqtt_slider;
static struct mqtt_set_parament mqtt_cmd = {0};
struct mqtt_set_parament* lynq_mqtt_new()
{
struct mqtt_set_parament* mqtt = (struct mqtt_set_parament*)calloc(1, sizeof(struct mqtt_set_parament));
return mqtt;
}
void *mqtt_handler(void *list)
{
int error_code = 0;
MQTT_LIST_LINK_S* pos_list = (MQTT_LIST_LINK_S *)list;
while(1)
{
if (pos_list->data.modify_thread != 1 && pos_list->data.add_thread != 1)
continue;
pos_list->data.modify_thread = 0;
pos_list->data.add_thread = 0;
LYDBGLOG("[%s-%d] current id is %lu .\n",__FUNCTION__, __LINE__, pthread_self());
if (!strcmp(pos_list->data.action, "login")) {
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new();
mqtt_data->session = pos_list->data.session;
mqtt_data->action = pos_list->data.action;
mqtt_data->usrname = pos_list->data.usrname;
mqtt_data->pwd = pos_list->data.pwd;
mqtt_data->ip_addr = pos_list->data.ip_addr;
mqtt_data->port = pos_list->data.port;
mqtt_data->keep_time = pos_list->data.keep_time;
mqtt_data->client_id = pos_list->data.client_id;
mqtt_data->clean_session = pos_list->data.clean_session;
mqtt_data->set_will.will_flag = pos_list->data.set_will.will_flag;
if(mqtt_data->set_will.will_flag == 1)
{
mqtt_data->set_will.will_topic = pos_list->data.set_will.will_topic;
mqtt_data->set_will.will_qos = pos_list->data.set_will.will_qos;
mqtt_data->set_will.retain = pos_list->data.set_will.retain;
mqtt_data->set_will.will_payloadlen = pos_list->data.set_will.will_payloadlen;
mqtt_data->set_will.will_payload = pos_list->data.set_will.will_payload;
}
printf("mqtt_data:%s---%d---%d\n",mqtt_data->ip_addr,mqtt_data->port,mqtt_data->keep_time);
lynq_mqtt_login(*mqtt_data);
free(mqtt_data);
mqtt_data = NULL;
// sleep(10);
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code);
// list_del((struct list_head* )pos_list);
}else if (!strcmp(pos_list->data.action, "subscribe")) {
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new();
mqtt_data->session = pos_list->data.session;
mqtt_data->action = pos_list->data.action;
mqtt_data->subscribe_qos = pos_list->data.subscribe_qos;
mqtt_data->subscribe_topic = pos_list->data.subscribe_topic;
lynq_mosquitto_subscribe_message(mqtt_data->subscribe_topic,mqtt_data->subscribe_qos);
free(mqtt_data);
mqtt_data = NULL;
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code);
// list_del((struct list_head* )pos_list);
}else if (!strcmp(pos_list->data.action, "unsubscribe")) { //lt add @2021.7.13 for unsubscribe action
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new();
mqtt_data->session = pos_list->data.session;
mqtt_data->action = pos_list->data.action;
mqtt_data->unsubscribe_topic = pos_list->data.unsubscribe_topic;
lynq_mosquitto_unsubscribe_message(mqtt_data->unsubscribe_topic);
free(mqtt_data);
mqtt_data = NULL;
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code);
// list_del((struct list_head* )pos_list);
}else if (!strcmp(pos_list->data.action, "publish")) {
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new();
mqtt_data->session = pos_list->data.session;
mqtt_data->action = pos_list->data.action;
mqtt_data->publish_qos = pos_list->data.publish_qos;
mqtt_data->publish_topic = pos_list->data.publish_topic;
mqtt_data->publish_data = pos_list->data.publish_data;
lynq_mosquitto_publish_message(mqtt_data->publish_topic,strlen(mqtt_data->publish_data),mqtt_data->publish_data,mqtt_data->publish_qos);
free(mqtt_data);
mqtt_data = NULL;
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code);
// list_del((struct list_head* )pos_list);
}else if (!strcmp(pos_list->data.action, "disconnect")) {
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new();
mqtt_data->session = pos_list->data.session;
mqtt_data->action = pos_list->data.action;
lynq_mosquitto_disconnect();
free(mqtt_data);
mqtt_data = NULL;
// list_del((struct list_head* )pos_list);
}
}
}
int mqtt_act_handler(thread_pool_t *pool)
{
MQTT_LIST_LINK_S* tmp_list = (struct mqtt_set_parament *)mqtt_slider;
struct list_head* slider = NULL;
MQTT_LIST_LINK_S* tmp = NULL;
// printf("mqtt_act_handler\n");
// printf("tmp_list1\n");
if (tmp_list == NULL ) {
printf("mqtt_cmd.action:%s\n",mqtt_cmd.action);
if((!strcmp(mqtt_cmd.action, "logindeploy")) || (!strcmp(mqtt_cmd.action, "willdeploy")) \
|| (!strcmp(mqtt_cmd.action, "login")) || (!strcmp(mqtt_cmd.action, "publish")) \
|| (!strcmp(mqtt_cmd.action, "subscribe")) || (!strcmp(mqtt_cmd.action, "unsubscribe"))) {//lt add @2021.7.13 for add action create a thread
if(tmp == NULL)
{
tmp = (MQTT_LIST_LINK_S*)malloc(sizeof(MQTT_LIST_LINK_S)); //lt add @2021.7.13 for tmp malloc
if(tmp == NULL)
{
return ERR_MALLOCVALID;
}
}
memset(tmp, 0, sizeof(MQTT_LIST_LINK_S));
tmp->data = mqtt_cmd;
tmp->data.add_thread = 1;
list_add_tail(&tmp->list, &mqtt_list);
mqtt_list_locate();
if (!threadpool_add(pool, mqtt_handler, (void *)mqtt_slider)) {
free(tmp); //lt add @2021.7.13 for tmp free
tmp = NULL;
LYVERBLOG("+[thhandle]: error num = %d\n", ERR_INVOKE);
return ERR_INVOKE;
}
free(tmp);//lt add @2021.7.13 for tmp free
tmp = NULL;
}
}
else {
// printf("no threadpool_add\n");
if((!strcmp(mqtt_cmd.action, "logindeploy"))) {
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);//lt add @2021.8.6 for release the linked list data
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
free(tmp_list->data.usrname);
tmp_list->data.usrname = NULL;
free(tmp_list->data.pwd);
tmp_list->data.pwd = NULL;
free(tmp_list->data.ip_addr);
tmp_list->data.ip_addr = NULL;
free(tmp_list->data.client_id);
tmp_list->data.client_id = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.usrname = mqtt_cmd.usrname;
tmp_list->data.pwd = mqtt_cmd.pwd;
tmp_list->data.ip_addr = mqtt_cmd.ip_addr;
tmp_list->data.port = mqtt_cmd.port;
tmp_list->data.keep_time = mqtt_cmd.keep_time;
tmp_list->data.client_id = mqtt_cmd.client_id;
tmp_list->data.clean_session = mqtt_cmd.clean_session;
tmp_list->data.modify_thread = 1;
}else if((!strcmp(mqtt_cmd.action, "willdeploy")))
{
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
free(tmp_list->data.set_will.will_topic);
tmp_list->data.set_will.will_topic = NULL;
free(tmp_list->data.set_will.will_payload);
tmp_list->data.set_will.will_payload = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.set_will.will_flag = mqtt_cmd.set_will.will_flag;
tmp_list->data.set_will.will_topic = mqtt_cmd.set_will.will_topic;
tmp_list->data.set_will.will_qos = mqtt_cmd.set_will.will_qos;
tmp_list->data.set_will.retain = mqtt_cmd.set_will.retain;
tmp_list->data.set_will.will_payloadlen = mqtt_cmd.set_will.will_payloadlen;
tmp_list->data.set_will.will_payload = mqtt_cmd.set_will.will_payload;
tmp_list->data.modify_thread = 1;
}else if((!strcmp(mqtt_cmd.action, "publish")))
{
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
free(tmp_list->data.publish_topic);
tmp_list->data.publish_topic = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.publish_qos = mqtt_cmd.publish_qos;
tmp_list->data.publish_topic = mqtt_cmd.publish_topic;
tmp_list->data.publish_data = mqtt_cmd.publish_data;
tmp_list->data.modify_thread = 1;
}else if((!strcmp(mqtt_cmd.action, "subscribe")))
{
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
free(tmp_list->data.subscribe_topic);
tmp_list->data.subscribe_topic = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.subscribe_qos = mqtt_cmd.subscribe_qos;
tmp_list->data.subscribe_topic = mqtt_cmd.subscribe_topic;
tmp_list->data.modify_thread = 1;
}else if((!strcmp(mqtt_cmd.action, "unsubscribe"))) //lt add @2021.7.13 for unsubscribe action
{
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
free(tmp_list->data.unsubscribe_topic);
tmp_list->data.unsubscribe_topic = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.unsubscribe_topic = mqtt_cmd.unsubscribe_topic;
tmp_list->data.modify_thread = 1;
}else if((!strcmp(mqtt_cmd.action, "login")))
{
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.modify_thread = 1;
}else if((!strcmp(mqtt_cmd.action, "disconnect")))
{
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL))
{
free(tmp_list->data.action);
tmp_list->data.action = NULL;
free(tmp_list->data.protocol);
tmp_list->data.protocol = NULL;
}
tmp_list->data.protocol = mqtt_cmd.protocol;
tmp_list->data.action = mqtt_cmd.action;
tmp_list->data.modify_thread = 1;
}else{
LYVERBLOG("[%s-%d] cmd error \n", __FUNCTION__, __LINE__);
return ERR_CMDVALID;
}
}
//lt delete @2021.7.13 for tmp free
return NO_ERROR;
}
static int mqtt_init = 0;
void mqtt_list_init()
{
if (mqtt_init == 0)
{
INIT_LIST_HEAD(&mqtt_list);
mqtt_init = 1;
}
}
int mqtt_param_verification(char result[][BUF_SIZE], int line)
{
char *mqtt_result[20];
if (line < 2)
{
LYDBGLOG("[%s-%d] command error\n", __FUNCTION__, __LINE__);
LYVERBLOG("+[thhandle]: error num = %d\n", ERR_CMDVALID);
return ERR_CMDVALID;
}
for(int i = 0; i < line; i++)
{
mqtt_result[i] = (char *)malloc((strlen(result[i]))+1); //lt add @2021.8.6 for malloc the linked list data
memset(mqtt_result[i], 0, (strlen(result[i]))+1);
memcpy(mqtt_result[i],result[i],strlen(result[i]));
}
mqtt_cmd.protocol = mqtt_result[0]; //module
if(!atoi(mqtt_result[2])) { //session
LYVERBLOG("+[thhandle]: error num = %d\n", ERR_SESSIONVALID);
return ERR_SESSIONVALID;
}
mqtt_cmd.session = atoi(mqtt_result[2]);
mqtt_cmd.action = mqtt_result[1];
LYDBGLOG("[%s-%d] mqtt_param_verification:%s\n", __FUNCTION__, __LINE__,mqtt_result[1]);
if (!(strcmp(mqtt_result[1], "logindeploy"))) {
mqtt_cmd.usrname = mqtt_result[3];
mqtt_cmd.pwd = mqtt_result[4];
mqtt_cmd.ip_addr = mqtt_result[5];
mqtt_cmd.port = atoi(mqtt_result[6]);
free(mqtt_result[6]);
mqtt_result[6] = NULL;
mqtt_cmd.keep_time = atoi(mqtt_result[7]);
free(mqtt_result[7]);
mqtt_result[7] = NULL;
mqtt_cmd.client_id = mqtt_result[8];
mqtt_cmd.clean_session = mqtt_result[9];
free(mqtt_result[9]);
mqtt_result[9] = NULL;
}else if (!(strcmp(mqtt_result[1], "willdeploy"))) {
mqtt_cmd.set_will.will_flag = atoi(mqtt_result[3]);
free(mqtt_result[3]);
mqtt_result[3] = NULL;
mqtt_cmd.set_will.will_topic = mqtt_result[4];
mqtt_cmd.set_will.will_qos = atoi(mqtt_result[5]);
free(mqtt_result[5]);
mqtt_result[5] = NULL;
mqtt_cmd.set_will.retain= mqtt_result[6];
free(mqtt_result[6]);
mqtt_result[6] = NULL;
mqtt_cmd.set_will.will_payloadlen = atoi(mqtt_result[7]);
free(mqtt_result[7]);
mqtt_result[7] = NULL;
mqtt_cmd.set_will.will_payload = mqtt_result[8];//
}else if (!(strcmp(mqtt_result[1], "login"))) {
}else if (!(strcmp(mqtt_result[1], "disconnect"))) {
}else if (!(strcmp(mqtt_result[1], "publish"))) {
mqtt_cmd.publish_qos = atoi(mqtt_result[3]);
free(mqtt_result[3]);
mqtt_result[3] = NULL;
mqtt_cmd.publish_topic = mqtt_result[4];
mqtt_cmd.publish_data = mqtt_result[5];
}else if (!(strcmp(mqtt_result[1], "unsubscribe"))) {//lt add @2021.7.13 for unsubscribe action
mqtt_cmd.unsubscribe_topic = mqtt_result[3];
}else if (!(strcmp(mqtt_result[1], "subscribe"))) {
mqtt_cmd.subscribe_qos = atoi(mqtt_result[3]);
free(mqtt_result[3]);
mqtt_result[3] = NULL;
mqtt_cmd.subscribe_topic = mqtt_result[4];
}else{
LYVERBLOG("[%s-%d] cmd error \n", __FUNCTION__, __LINE__);
return ERR_CMDVALID;
}
return NO_ERROR;
}
int mqtt_list_locate()
{
struct list_head* slider = NULL;
mqtt_slider = NULL;
list_for_each(slider, &mqtt_list)
{
mqtt_slider = (MQTT_LIST_LINK_S*)slider;
LYDBGLOG("[%s-%d] =================================== \n", __func__, __LINE__);
LYDBGLOG("[%s-%d] action : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.action);
LYDBGLOG("[%s-%d] protocol : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.protocol);
LYDBGLOG("[%s-%d] port : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.port);
LYDBGLOG("[%s-%d] clean_session : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.clean_session);
LYDBGLOG("[%s-%d] usrname : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.usrname);
LYDBGLOG("[%s-%d] pwd : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.pwd);
LYDBGLOG("[%s-%d] ip_addr : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.ip_addr);
LYDBGLOG("[%s-%d] port : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.port);
LYDBGLOG("[%s-%d] keep_time : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.keep_time);
LYDBGLOG("[%s-%d] =================================== \n", __func__, __LINE__);
if ((!strcmp(mqtt_slider->data.protocol, mqtt_cmd.protocol)) && (mqtt_slider->data.session == mqtt_cmd.session))
return NO_ERROR;
}
mqtt_slider = NULL;
return NO_ERROR;
}