#include "mqtt_manager.h" | |
#include "common.h" | |
#include <log/log.h> | |
#include <unistd.h> | |
#include <stdio.h> | |
#include <string.h> | |
static struct list_head mqtt_list; | |
static MQTT_LIST_LINK_S* mqtt_slider; | |
static struct mqtt_set_parament mqtt_cmd = {0}; | |
struct mqtt_set_parament* lynq_mqtt_new() | |
{ | |
struct mqtt_set_parament* mqtt = (struct mqtt_set_parament*)calloc(1, sizeof(struct mqtt_set_parament)); | |
return mqtt; | |
} | |
void *mqtt_handler(void *list) | |
{ | |
int error_code = 0; | |
MQTT_LIST_LINK_S* pos_list = (MQTT_LIST_LINK_S *)list; | |
while(1) | |
{ | |
if (pos_list->data.modify_thread != 1 && pos_list->data.add_thread != 1) | |
continue; | |
pos_list->data.modify_thread = 0; | |
pos_list->data.add_thread = 0; | |
LYDBGLOG("[%s-%d] current id is %lu .\n",__FUNCTION__, __LINE__, pthread_self()); | |
if (!strcmp(pos_list->data.action, "login")) { | |
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new(); | |
mqtt_data->session = pos_list->data.session; | |
mqtt_data->action = pos_list->data.action; | |
mqtt_data->usrname = pos_list->data.usrname; | |
mqtt_data->pwd = pos_list->data.pwd; | |
mqtt_data->ip_addr = pos_list->data.ip_addr; | |
mqtt_data->port = pos_list->data.port; | |
mqtt_data->keep_time = pos_list->data.keep_time; | |
mqtt_data->client_id = pos_list->data.client_id; | |
mqtt_data->clean_session = pos_list->data.clean_session; | |
mqtt_data->set_will.will_flag = pos_list->data.set_will.will_flag; | |
if(mqtt_data->set_will.will_flag == 1) | |
{ | |
mqtt_data->set_will.will_topic = pos_list->data.set_will.will_topic; | |
mqtt_data->set_will.will_qos = pos_list->data.set_will.will_qos; | |
mqtt_data->set_will.retain = pos_list->data.set_will.retain; | |
mqtt_data->set_will.will_payloadlen = pos_list->data.set_will.will_payloadlen; | |
mqtt_data->set_will.will_payload = pos_list->data.set_will.will_payload; | |
} | |
printf("mqtt_data:%s---%d---%d\n",mqtt_data->ip_addr,mqtt_data->port,mqtt_data->keep_time); | |
lynq_mqtt_login(*mqtt_data); | |
free(mqtt_data); | |
mqtt_data = NULL; | |
// sleep(10); | |
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code); | |
// list_del((struct list_head* )pos_list); | |
}else if (!strcmp(pos_list->data.action, "subscribe")) { | |
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new(); | |
mqtt_data->session = pos_list->data.session; | |
mqtt_data->action = pos_list->data.action; | |
mqtt_data->subscribe_qos = pos_list->data.subscribe_qos; | |
mqtt_data->subscribe_topic = pos_list->data.subscribe_topic; | |
lynq_mosquitto_subscribe_message(mqtt_data->subscribe_topic,mqtt_data->subscribe_qos); | |
free(mqtt_data); | |
mqtt_data = NULL; | |
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code); | |
// list_del((struct list_head* )pos_list); | |
}else if (!strcmp(pos_list->data.action, "unsubscribe")) { //lt add @2021.7.13 for unsubscribe action | |
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new(); | |
mqtt_data->session = pos_list->data.session; | |
mqtt_data->action = pos_list->data.action; | |
mqtt_data->unsubscribe_topic = pos_list->data.unsubscribe_topic; | |
lynq_mosquitto_unsubscribe_message(mqtt_data->unsubscribe_topic); | |
free(mqtt_data); | |
mqtt_data = NULL; | |
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code); | |
// list_del((struct list_head* )pos_list); | |
}else if (!strcmp(pos_list->data.action, "publish")) { | |
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new(); | |
mqtt_data->session = pos_list->data.session; | |
mqtt_data->action = pos_list->data.action; | |
mqtt_data->publish_qos = pos_list->data.publish_qos; | |
mqtt_data->publish_topic = pos_list->data.publish_topic; | |
mqtt_data->publish_data = pos_list->data.publish_data; | |
lynq_mosquitto_publish_message(mqtt_data->publish_topic,strlen(mqtt_data->publish_data),mqtt_data->publish_data,mqtt_data->publish_qos); | |
free(mqtt_data); | |
mqtt_data = NULL; | |
// LYVERBLOG("[%s-%d] error_code = %d\n", __FUNCTION__, __LINE__, error_code); | |
// list_del((struct list_head* )pos_list); | |
}else if (!strcmp(pos_list->data.action, "disconnect")) { | |
struct mqtt_set_parament * mqtt_data = lynq_mqtt_new(); | |
mqtt_data->session = pos_list->data.session; | |
mqtt_data->action = pos_list->data.action; | |
lynq_mosquitto_disconnect(); | |
free(mqtt_data); | |
mqtt_data = NULL; | |
// list_del((struct list_head* )pos_list); | |
} | |
} | |
} | |
int mqtt_act_handler(thread_pool_t *pool) | |
{ | |
MQTT_LIST_LINK_S* tmp_list = (struct mqtt_set_parament *)mqtt_slider; | |
struct list_head* slider = NULL; | |
MQTT_LIST_LINK_S* tmp = NULL; | |
// printf("mqtt_act_handler\n"); | |
// printf("tmp_list1\n"); | |
if (tmp_list == NULL ) { | |
printf("mqtt_cmd.action:%s\n",mqtt_cmd.action); | |
if((!strcmp(mqtt_cmd.action, "logindeploy")) || (!strcmp(mqtt_cmd.action, "willdeploy")) \ | |
|| (!strcmp(mqtt_cmd.action, "login")) || (!strcmp(mqtt_cmd.action, "publish")) \ | |
|| (!strcmp(mqtt_cmd.action, "subscribe")) || (!strcmp(mqtt_cmd.action, "unsubscribe"))) {//lt add @2021.7.13 for add action create a thread | |
if(tmp == NULL) | |
{ | |
tmp = (MQTT_LIST_LINK_S*)malloc(sizeof(MQTT_LIST_LINK_S)); //lt add @2021.7.13 for tmp malloc | |
if(tmp == NULL) | |
{ | |
return ERR_MALLOCVALID; | |
} | |
} | |
memset(tmp, 0, sizeof(MQTT_LIST_LINK_S)); | |
tmp->data = mqtt_cmd; | |
tmp->data.add_thread = 1; | |
list_add_tail(&tmp->list, &mqtt_list); | |
mqtt_list_locate(); | |
if (!threadpool_add(pool, mqtt_handler, (void *)mqtt_slider)) { | |
free(tmp); //lt add @2021.7.13 for tmp free | |
tmp = NULL; | |
LYVERBLOG("+[thhandle]: error num = %d\n", ERR_INVOKE); | |
return ERR_INVOKE; | |
} | |
free(tmp);//lt add @2021.7.13 for tmp free | |
tmp = NULL; | |
} | |
} | |
else { | |
// printf("no threadpool_add\n"); | |
if((!strcmp(mqtt_cmd.action, "logindeploy"))) { | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action);//lt add @2021.8.6 for release the linked list data | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
free(tmp_list->data.usrname); | |
tmp_list->data.usrname = NULL; | |
free(tmp_list->data.pwd); | |
tmp_list->data.pwd = NULL; | |
free(tmp_list->data.ip_addr); | |
tmp_list->data.ip_addr = NULL; | |
free(tmp_list->data.client_id); | |
tmp_list->data.client_id = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.usrname = mqtt_cmd.usrname; | |
tmp_list->data.pwd = mqtt_cmd.pwd; | |
tmp_list->data.ip_addr = mqtt_cmd.ip_addr; | |
tmp_list->data.port = mqtt_cmd.port; | |
tmp_list->data.keep_time = mqtt_cmd.keep_time; | |
tmp_list->data.client_id = mqtt_cmd.client_id; | |
tmp_list->data.clean_session = mqtt_cmd.clean_session; | |
tmp_list->data.modify_thread = 1; | |
}else if((!strcmp(mqtt_cmd.action, "willdeploy"))) | |
{ | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action); | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
free(tmp_list->data.set_will.will_topic); | |
tmp_list->data.set_will.will_topic = NULL; | |
free(tmp_list->data.set_will.will_payload); | |
tmp_list->data.set_will.will_payload = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.set_will.will_flag = mqtt_cmd.set_will.will_flag; | |
tmp_list->data.set_will.will_topic = mqtt_cmd.set_will.will_topic; | |
tmp_list->data.set_will.will_qos = mqtt_cmd.set_will.will_qos; | |
tmp_list->data.set_will.retain = mqtt_cmd.set_will.retain; | |
tmp_list->data.set_will.will_payloadlen = mqtt_cmd.set_will.will_payloadlen; | |
tmp_list->data.set_will.will_payload = mqtt_cmd.set_will.will_payload; | |
tmp_list->data.modify_thread = 1; | |
}else if((!strcmp(mqtt_cmd.action, "publish"))) | |
{ | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action); | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
free(tmp_list->data.publish_topic); | |
tmp_list->data.publish_topic = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.publish_qos = mqtt_cmd.publish_qos; | |
tmp_list->data.publish_topic = mqtt_cmd.publish_topic; | |
tmp_list->data.publish_data = mqtt_cmd.publish_data; | |
tmp_list->data.modify_thread = 1; | |
}else if((!strcmp(mqtt_cmd.action, "subscribe"))) | |
{ | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action); | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
free(tmp_list->data.subscribe_topic); | |
tmp_list->data.subscribe_topic = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.subscribe_qos = mqtt_cmd.subscribe_qos; | |
tmp_list->data.subscribe_topic = mqtt_cmd.subscribe_topic; | |
tmp_list->data.modify_thread = 1; | |
}else if((!strcmp(mqtt_cmd.action, "unsubscribe"))) //lt add @2021.7.13 for unsubscribe action | |
{ | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action); | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
free(tmp_list->data.unsubscribe_topic); | |
tmp_list->data.unsubscribe_topic = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.unsubscribe_topic = mqtt_cmd.unsubscribe_topic; | |
tmp_list->data.modify_thread = 1; | |
}else if((!strcmp(mqtt_cmd.action, "login"))) | |
{ | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action); | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.modify_thread = 1; | |
}else if((!strcmp(mqtt_cmd.action, "disconnect"))) | |
{ | |
if((tmp_list->data.action != NULL) && (tmp_list->data.protocol != NULL)) | |
{ | |
free(tmp_list->data.action); | |
tmp_list->data.action = NULL; | |
free(tmp_list->data.protocol); | |
tmp_list->data.protocol = NULL; | |
} | |
tmp_list->data.protocol = mqtt_cmd.protocol; | |
tmp_list->data.action = mqtt_cmd.action; | |
tmp_list->data.modify_thread = 1; | |
}else{ | |
LYVERBLOG("[%s-%d] cmd error \n", __FUNCTION__, __LINE__); | |
return ERR_CMDVALID; | |
} | |
} | |
//lt delete @2021.7.13 for tmp free | |
return NO_ERROR; | |
} | |
static int mqtt_init = 0; | |
void mqtt_list_init() | |
{ | |
if (mqtt_init == 0) | |
{ | |
INIT_LIST_HEAD(&mqtt_list); | |
mqtt_init = 1; | |
} | |
} | |
int mqtt_param_verification(char result[][BUF_SIZE], int line) | |
{ | |
char *mqtt_result[20]; | |
if (line < 2) | |
{ | |
LYDBGLOG("[%s-%d] command error\n", __FUNCTION__, __LINE__); | |
LYVERBLOG("+[thhandle]: error num = %d\n", ERR_CMDVALID); | |
return ERR_CMDVALID; | |
} | |
for(int i = 0; i < line; i++) | |
{ | |
mqtt_result[i] = (char *)malloc((strlen(result[i]))+1); //lt add @2021.8.6 for malloc the linked list data | |
memset(mqtt_result[i], 0, (strlen(result[i]))+1); | |
memcpy(mqtt_result[i],result[i],strlen(result[i])); | |
} | |
mqtt_cmd.protocol = mqtt_result[0]; //module | |
if(!atoi(mqtt_result[2])) { //session | |
LYVERBLOG("+[thhandle]: error num = %d\n", ERR_SESSIONVALID); | |
return ERR_SESSIONVALID; | |
} | |
mqtt_cmd.session = atoi(mqtt_result[2]); | |
mqtt_cmd.action = mqtt_result[1]; | |
LYDBGLOG("[%s-%d] mqtt_param_verification:%s\n", __FUNCTION__, __LINE__,mqtt_result[1]); | |
if (!(strcmp(mqtt_result[1], "logindeploy"))) { | |
mqtt_cmd.usrname = mqtt_result[3]; | |
mqtt_cmd.pwd = mqtt_result[4]; | |
mqtt_cmd.ip_addr = mqtt_result[5]; | |
mqtt_cmd.port = atoi(mqtt_result[6]); | |
free(mqtt_result[6]); | |
mqtt_result[6] = NULL; | |
mqtt_cmd.keep_time = atoi(mqtt_result[7]); | |
free(mqtt_result[7]); | |
mqtt_result[7] = NULL; | |
mqtt_cmd.client_id = mqtt_result[8]; | |
mqtt_cmd.clean_session = mqtt_result[9]; | |
free(mqtt_result[9]); | |
mqtt_result[9] = NULL; | |
}else if (!(strcmp(mqtt_result[1], "willdeploy"))) { | |
mqtt_cmd.set_will.will_flag = atoi(mqtt_result[3]); | |
free(mqtt_result[3]); | |
mqtt_result[3] = NULL; | |
mqtt_cmd.set_will.will_topic = mqtt_result[4]; | |
mqtt_cmd.set_will.will_qos = atoi(mqtt_result[5]); | |
free(mqtt_result[5]); | |
mqtt_result[5] = NULL; | |
mqtt_cmd.set_will.retain= mqtt_result[6]; | |
free(mqtt_result[6]); | |
mqtt_result[6] = NULL; | |
mqtt_cmd.set_will.will_payloadlen = atoi(mqtt_result[7]); | |
free(mqtt_result[7]); | |
mqtt_result[7] = NULL; | |
mqtt_cmd.set_will.will_payload = mqtt_result[8];// | |
}else if (!(strcmp(mqtt_result[1], "login"))) { | |
}else if (!(strcmp(mqtt_result[1], "disconnect"))) { | |
}else if (!(strcmp(mqtt_result[1], "publish"))) { | |
mqtt_cmd.publish_qos = atoi(mqtt_result[3]); | |
free(mqtt_result[3]); | |
mqtt_result[3] = NULL; | |
mqtt_cmd.publish_topic = mqtt_result[4]; | |
mqtt_cmd.publish_data = mqtt_result[5]; | |
}else if (!(strcmp(mqtt_result[1], "unsubscribe"))) {//lt add @2021.7.13 for unsubscribe action | |
mqtt_cmd.unsubscribe_topic = mqtt_result[3]; | |
}else if (!(strcmp(mqtt_result[1], "subscribe"))) { | |
mqtt_cmd.subscribe_qos = atoi(mqtt_result[3]); | |
free(mqtt_result[3]); | |
mqtt_result[3] = NULL; | |
mqtt_cmd.subscribe_topic = mqtt_result[4]; | |
}else{ | |
LYVERBLOG("[%s-%d] cmd error \n", __FUNCTION__, __LINE__); | |
return ERR_CMDVALID; | |
} | |
return NO_ERROR; | |
} | |
int mqtt_list_locate() | |
{ | |
struct list_head* slider = NULL; | |
mqtt_slider = NULL; | |
list_for_each(slider, &mqtt_list) | |
{ | |
mqtt_slider = (MQTT_LIST_LINK_S*)slider; | |
LYDBGLOG("[%s-%d] =================================== \n", __func__, __LINE__); | |
LYDBGLOG("[%s-%d] action : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.action); | |
LYDBGLOG("[%s-%d] protocol : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.protocol); | |
LYDBGLOG("[%s-%d] port : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.port); | |
LYDBGLOG("[%s-%d] clean_session : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.clean_session); | |
LYDBGLOG("[%s-%d] usrname : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.usrname); | |
LYDBGLOG("[%s-%d] pwd : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.pwd); | |
LYDBGLOG("[%s-%d] ip_addr : %s \n", __FUNCTION__, __LINE__, mqtt_slider->data.ip_addr); | |
LYDBGLOG("[%s-%d] port : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.port); | |
LYDBGLOG("[%s-%d] keep_time : %d \n", __FUNCTION__, __LINE__, mqtt_slider->data.keep_time); | |
LYDBGLOG("[%s-%d] =================================== \n", __func__, __LINE__); | |
if ((!strcmp(mqtt_slider->data.protocol, mqtt_cmd.protocol)) && (mqtt_slider->data.session == mqtt_cmd.session)) | |
return NO_ERROR; | |
} | |
mqtt_slider = NULL; | |
return NO_ERROR; | |
} |