blob: d09098c93a071dd4ea3b1fdbf979724e73ddfa0b [file] [log] [blame]
/*
* main.c
*
* RTP server main source.
*
*/
/******************************************************************************
EDIT HISTORY FOR FILE
WHEN WHO WHAT,WHERE,WHY
-------- -------- -------------------------------------------------------
2024/11/30 LiuBin Initial version
******************************************************************************/
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include "mbtk_rtp_internal.h"
#include "mbtk_log.h"
#include "mbtk_utils.h"
#define SOCK_CLIENT_MAX 5
#define EPOLL_LISTEN_MAX 100
rtp_info_t rtp_info;
static rtp_config_t rtp_confs =
{
.rtp_state_pre = RTP_STATE_DISABLE,
.rtp_state_cur = RTP_STATE_DISABLE,
.volume = 7,
.remote_ip = "198.18.38.15", // 198.18.38.15
.server_port = RTP_UDP_SER_PORT_DEFAULT,
.client_port = RTP_UDP_CLI_PORT_DEFAULT,
.vlan = {0},
.sample_rate = MBTK_AUDIO_SAMPLE_RATE_8000,
.channel = 1
};
int rtp_udp_server_start(rtp_config_t *conf_info);
int rtp_udp_server_stop();
int rtp_voip_server_start(const rtp_config_t *conf_info);
int rtp_voip_server_stop();
static void rtp_main_thread_wait(const char* tag)
{
LOGD("main(%s) waitting...", tag);
pthread_mutex_lock(&rtp_info.mutex);
pthread_cond_wait(&rtp_info.cond, &rtp_info.mutex);
pthread_mutex_unlock(&rtp_info.mutex);
LOGD("main(%s) running...", tag);
}
static void rtp_main_thread_cond()
{
pthread_mutex_lock(&rtp_info.mutex);
pthread_cond_signal(&rtp_info.cond);
pthread_mutex_unlock(&rtp_info.mutex);
}
static void rtp_msg_process(int fd, const char *msg, int msg_len)
{
LOGD("CMD <%s> <len-%d>", msg, msg_len);
// gnss_init:x
usleep(10 * 1000); // sleep 10ms
/*
volume = 7,
.remote_ip = "127.0.0.1", // 198.18.38.15
.server_port = RTP_UDP_SER_PORT_DEFAULT,
.client_port = RTP_UDP_CLI_PORT_DEFAULT,
.vlan = {0},
.sample_rate = MBTK_AUDIO_SAMPLE_RATE_8000,
.channel = 1
*/
if(memcmp(msg, "rtp_mode", 8) == 0) { // rtp_mode <0/1>
int rtp_mode = atoi(msg + 9);
int ret = 0;
if(rtp_mode == 0) { // Disable RTP
if(rtp_confs.rtp_state_cur == RTP_STATE_ENABLE) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_DISABLE;
rtp_main_thread_cond();
} else if(rtp_confs.rtp_state_cur == RTP_STATE_VOIP_PROCESS) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_DISABLE;
// rtp_main_thread_cond();
}
} else {
if(rtp_confs.rtp_state_cur == RTP_STATE_DISABLE) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_ENABLE;
rtp_main_thread_cond();
}
}
char rsp[100] = {0};
sprintf(rsp, "%crtp_mode:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else if(memcmp(msg, "volume", 6) == 0) {// volume <0-7>
int volume = atoi(msg + 7);
int ret = -1;
if(volume >= 0 && volume <= 7) {
rtp_confs.volume = volume;
ret = 0;
}
char rsp[100] = {0};
sprintf(rsp, "%cvolume:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else if(memcmp(msg, "remote_ip", 9) == 0) {// remote_ip <xxx:xxx:xxx:xxx>
int ret = 0;
memcpy(rtp_confs.remote_ip, msg + 10, strlen(msg + 10) + 1);
char rsp[100] = {0};
sprintf(rsp, "%cremote_ip:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else if(memcmp(msg, "server_port", 11) == 0) {// client_port <port>
int port = atoi(msg + 12);
int ret = -1;
if(port >= 0 && port <= 7) {
rtp_confs.server_port = port;
ret = 0;
}
char rsp[100] = {0};
sprintf(rsp, "%cserver_port:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else if(memcmp(msg, "client_port", 11) == 0) {// client_port <port>
int port = atoi(msg + 12);
int ret = -1;
if(port > 1024 && port < 65535) {
rtp_confs.client_port = port;
ret = 0;
}
char rsp[100] = {0};
sprintf(rsp, "%cclient_port:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else if(memcmp(msg, "sample_rate", 11) == 0) {// client_port <port>
int sample_rate = atoi(msg + 12);
int ret = 0;
if(sample_rate == 8000) {
rtp_confs.sample_rate = MBTK_AUDIO_SAMPLE_RATE_8000;
} else if(sample_rate == 16000) {
rtp_confs.sample_rate = MBTK_AUDIO_SAMPLE_RATE_16000;
} else {
ret = -1;
}
char rsp[100] = {0};
sprintf(rsp, "%csample_rate:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else if(memcmp(msg, "channel", 7) == 0) {// client_port <port>
int channel = atoi(msg + 8);
int ret = -1;
if(channel == 1) {
rtp_confs.channel = channel;
ret = 0;
}
char rsp[100] = {0};
sprintf(rsp, "%cchannel:%d%c", MBTK_IND_START_FLAG, ret, MBTK_IND_END_FLAG);
mbtk_write(fd, rsp, strlen(rsp));
} else {
LOGW("Unknown RTP msg : %s", msg);
}
}
int epoll_fd_add(int fd)
{
if(rtp_info.epoll_fd > 0) {
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLET;
return epoll_ctl(rtp_info.epoll_fd, EPOLL_CTL_ADD, fd, &ev);
} else {
return -1;
}
}
int epoll_fd_del(int fd)
{
if(rtp_info.epoll_fd > 0) {
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLERR | EPOLLET;
return epoll_ctl(rtp_info.epoll_fd, EPOLL_CTL_DEL, fd, &ev);
} else {
return -1;
}
}
static void* rtp_ipc_ser_pthread(void* arg)
{
int sock_listen_fd = *((int*)arg);
rtp_info.epoll_fd = epoll_create(SOCK_CLIENT_MAX + 1);
if(rtp_info.epoll_fd < 0)
{
LOGE("epoll_create() fail[%d].", errno);
return NULL;
}
rtp_info.unix_sock_cli.fd = -1;
rtp_info.unix_sock_cli.read_cb = NULL;
epoll_fd_add(sock_listen_fd);
int nready = -1;
int i = 0;
struct epoll_event epoll_events[EPOLL_LISTEN_MAX];
while(1)
{
nready = epoll_wait(rtp_info.epoll_fd, epoll_events, EPOLL_LISTEN_MAX, -1);
if(nready > 0)
{
for(i = 0; i < nready; i++)
{
LOGV("fd[%d] event = %x",epoll_events[i].data.fd, epoll_events[i].events);
if(epoll_events[i].events & EPOLLHUP) // Client Close.
{
epoll_fd_del(epoll_events[i].data.fd);
close(epoll_events[i].data.fd);
if(rtp_info.unix_sock_cli.fd == epoll_events[i].data.fd) {
rtp_info.unix_sock_cli.fd = -1;
LOGD("Local unix socket client close.");
} else if(rtp_info.udp_recv_sock.fd == epoll_events[i].data.fd) {
rtp_info.udp_recv_sock.fd = -1;
LOGD("RTP UDP socket client close.");
} else {
LOGE("Can not occur.");
}
}
else if(epoll_events[i].events & EPOLLIN)
{
if(epoll_events[i].data.fd == sock_listen_fd) // New clients connected.
{
int client_fd = -1;
while(1)
{
struct sockaddr_in cliaddr;
socklen_t clilen = sizeof(cliaddr);
client_fd = accept(epoll_events[i].data.fd, (struct sockaddr *) &cliaddr, &clilen);
if(client_fd <= 0)
{
if(errno == EAGAIN)
{
LOGE("All client connect get.");
}
else
{
LOGE("accept() error[%d].", errno);
}
break;
} else {
if(rtp_info.unix_sock_cli.fd > 0) {
LOGE("Client is full.");
break;
}
rtp_info.unix_sock_cli.fd = client_fd;
rtp_info.unix_sock_cli.read_cb = NULL;
}
epoll_fd_add(client_fd);
LOGD("Start monitor client cmd : %d", client_fd);
}
}
else if(rtp_info.unix_sock_cli.fd == epoll_events[i].data.fd) // Client data arrive.
{
char buff[1024] = {0};
int len = read(epoll_events[i].data.fd, buff, sizeof(buff));
if(len > 0) {
rtp_msg_process(epoll_events[i].data.fd, buff, len);
}
}
else if(rtp_info.udp_recv_sock.fd == epoll_events[i].data.fd) // RTP UDP data reach.
{
if(rtp_info.udp_recv_sock.read_cb) {
rtp_info.udp_recv_sock.read_cb(epoll_events[i].data.fd);
}
}
else
{
LOGE("Unknown socket : %d", epoll_events[i].data.fd);
}
}
else
{
LOGE("Unknown event : %x", epoll_events[i].events);
}
}
}
else
{
LOGE("epoll_wait() fail[%d].", errno);
}
}
return NULL;
}
static int rtp_ipc_server_start()
{
struct sockaddr_un server_addr;
int sock_listen_fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if(sock_listen_fd < 0)
{
LOGE("socket() fail[%d].", errno);
return -1;
}
#if 1
// Set O_NONBLOCK
int flags = fcntl(sock_listen_fd, F_GETFL, 0);
if (flags < 0)
{
LOGE("Get flags error:%d", errno);
goto error;
}
flags |= O_NONBLOCK;
if (fcntl(sock_listen_fd, F_SETFL, flags) < 0)
{
LOGE("Set flags error:%d", errno);
goto error;
}
#endif
unlink(RTP_IPC_SOCK_PATH);
memset(&server_addr, 0, sizeof(struct sockaddr_un));
server_addr.sun_family = AF_LOCAL;
strcpy(server_addr.sun_path, RTP_IPC_SOCK_PATH);
if(bind(sock_listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)))
{
LOGE("bind() fail[%d].", errno);
goto error;
}
if(listen(sock_listen_fd, SOCK_CLIENT_MAX))
{
LOGE("listen() fail[%d].", errno);
goto error;
}
pthread_t pid;
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
if(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
{
LOGE("pthread_attr_setdetachstate() fail.");
goto error;
}
if(pthread_create(&pid, &thread_attr, rtp_ipc_ser_pthread, &sock_listen_fd))
{
LOGE("pthread_create() fail.");
goto error;
}
LOGD("RTP IPC service is running...");
return 0;
error:
close(sock_listen_fd);
return -1;
}
#ifdef MBTK_SOURCE_VERSION_2
static void call_state_change_cb(const void* data, int data_len)
{
if(data) {
mbtk_ril_call_state_info_t *state = (mbtk_ril_call_state_info_t*)data;
LOGD("call state change : call_id-%d, dir-%d, state-%d, num_type-%d,number-%s", state->call_id,
state->dir, state->state, state->num_type, state->call_number);
if(state->state == MBTK_RIL_CALL_STATE_DISCONNECT) {
if(rtp_confs.rtp_state_cur == RTP_STATE_VOIP_PROCESS) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_ENABLE;
rtp_main_thread_cond();
} else if(rtp_confs.rtp_state_cur == RTP_STATE_DISABLE) { // 通话过程中 Disable,挂断后需要单独处理
rtp_confs.rtp_state_pre = RTP_STATE_VOIP_PROCESS;
rtp_confs.rtp_state_cur = RTP_STATE_DISABLE;
rtp_main_thread_cond();
}
} else if(state->state == MBTK_RIL_CALL_STATE_ACTIVE /*state->state == MBTK_RIL_CALL_STATE_ALERTING || state->state == MBTK_RIL_CALL_STATE_INCOMING*/) {
if(rtp_confs.rtp_state_cur == RTP_STATE_ENABLE) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_VOIP_PROCESS;
rtp_main_thread_cond();
}
}
}
}
static int ril_ser_switch(bool open)
{
if(open) {
if(rtp_info.ril_handle) {
LOGW("RIL has opened.");
return 0;
}
rtp_info.ril_handle = mbtk_ril_open(MBTK_AT_PORT_DEF);
if(rtp_info.ril_handle == NULL) {
LOGE("mbtk_ril_open(MBTK_AT_PORT_DEF) fail.");
return -1;
}
mbtk_call_state_change_cb_reg(call_state_change_cb);
} else {
if(!rtp_info.ril_handle) {
LOGW("RIL not open.");
return 0;
}
if(MBTK_RIL_ERR_SUCCESS != mbtk_ril_close(MBTK_AT_PORT_DEF)) {
LOGE("mbtk_ril_close(MBTK_AT_PORT_DEF) fail.");
return -1;
}
rtp_info.ril_handle = NULL;
}
return 0;
}
#else
static void call_state_change_cb(const void* data, int data_len)
{
if(data) {
mbtk_call_info_t *state = (mbtk_call_info_t*)data;
LOGD("call state change : call_wait-%d, state-%d", state->call_wait, state->state);
if(state->call_wait == MBTK_DISCONNECTED) {
if(rtp_confs.rtp_state_cur == RTP_STATE_VOIP_PROCESS) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_ENABLE;
rtp_main_thread_cond();
} else if(rtp_confs.rtp_state_cur == RTP_STATE_DISABLE) { // 通话过程中 Disable,挂断后需要单独处理
rtp_confs.rtp_state_pre = RTP_STATE_VOIP_PROCESS;
rtp_confs.rtp_state_cur = RTP_STATE_DISABLE;
rtp_main_thread_cond();
}
} else if(state->call_wait == MBTK_CLCC && state->state == 0) {
if(rtp_confs.rtp_state_cur == RTP_STATE_ENABLE) {
rtp_confs.rtp_state_pre = rtp_confs.rtp_state_cur;
rtp_confs.rtp_state_cur = RTP_STATE_VOIP_PROCESS;
rtp_main_thread_cond();
}
}
}
#if 0
mbtk_call_info_t *reg = (mbtk_call_info_t *)data;
switch (reg->call_wait)
{
case MBTK_CLCC:
printf("\r\nRING : %d, %d, %d, %d, %d, %s, %d\r\n", reg->dir1, reg->dir, reg->state, reg->mode, reg->mpty, reg->phone_number, reg->type);
break;
case MBTK_DISCONNECTED:
printf("\r\nRING : call dis connected!\r\n");
break;
case MBTK_CPAS:
printf("\r\nCALL : Call state = %d\r\n", reg->pas);
/*
MBTK_CALL_RADY, //MT allows commands from TA/TE
MBTK_CALL_UNAVAILABLE, //MT does not allow commands from TA/TE
MBTK_CALL_UNKNOWN, //MT is not guaranteed to respond to instructions
MBTK_CALL_RINGING, //MT is ready for commands from TA/TE, but the ringer is active
MBTK_CALL_PROGRESS, //MT is ready for commands from TA/TE, but a call is in progress
MBTK_CALL_ASLEEP, //MT is unable to process commands from TA/TE because it is in a low functionality state
MBTK_CALL_ACTIVE,
*/
switch (reg->pas)
{
case MBTK_CALL_RADY:
printf("CALL: call READY\r\n");
break;
case MBTK_CALL_UNAVAILABLE:
printf("CALL: call unavaliable\r\n");
break;
case MBTK_CALL_UNKNOWN:
printf("CALL: call unknown\r\n");
break;
case MBTK_CALL_RINGING:
printf("CALL: call ringing\r\n");
break;
case MBTK_CALL_PROGRESS:
printf("CALL: call progress\r\n");
break;
case MBTK_CALL_ASLEEP:
printf("CALL: call asleep\r\n");
break;
case MBTK_CALL_ACTIVE:
printf("CALL: call active\r\n");
break;
default:
printf("\r\n");
break;
}
break;
default:
printf("\r\nRING : None call_wait = %d\r\n", reg->call_wait);
break;
}
#endif
}
static int ril_ser_switch(bool open)
{
if(open) {
if(rtp_info.ril_handle) {
LOGW("RIL has opened.");
return 0;
}
rtp_info.ril_handle = mbtk_info_handle_get();
if(rtp_info.ril_handle == NULL) {
LOGE("mbtk_info_handle_get() fail.");
return -1;
}
mbtk_call_state_change_cb_reg(rtp_info.ril_handle, call_state_change_cb);
} else {
if(!rtp_info.ril_handle) {
LOGW("RIL not open.");
return 0;
}
if(mbtk_info_handle_free(&rtp_info.ril_handle)) {
LOGE("mbtk_info_handle_free() fail.");
return -1;
}
rtp_info.ril_handle = NULL;
}
return 0;
}
#endif
static int rtp_start(rtp_state_enum state_pre, rtp_state_enum state_cur)
{
LOGD("RTP start, state : %d -> %d", state_pre, state_cur);
char *tag = NULL;
if(state_cur == RTP_STATE_DISABLE) {
if(state_pre == RTP_STATE_VOIP_PROCESS || state_pre == RTP_STATE_ENABLE) {
// Close ril server.
ril_ser_switch(FALSE);
// Close RTP UDP forward server.
if(rtp_voip_server_stop()) {
LOGE("rtp_voip_server_stop() fail.");
}
if(rtp_udp_server_stop()) {
LOGE("rtp_udp_server_stop() fail.");
}
} else {
LOGW("Can not occur[Except for the first time].");
}
tag = "RTP_STATE_DISABLE";
} else if(state_cur == RTP_STATE_ENABLE) {
if(state_pre == RTP_STATE_VOIP_PROCESS) {
// Close RTP UDP forward server.
if(rtp_voip_server_stop()) {
LOGE("rtp_udp_server_stop() fail.");
}
} else if(state_pre == RTP_STATE_DISABLE) {
// Open ril server.
ril_ser_switch(TRUE);
if(rtp_udp_server_start(&rtp_confs)) {
LOGE("rtp_udp_server_start() fail.");
}
} else {
LOGW("Can not occur.");
}
tag = "RTP_STATE_ENABLE";
} else if(state_cur == RTP_STATE_VOIP_PROCESS) {
if(state_pre == RTP_STATE_DISABLE) {
LOGW("Can not occur.");
} else if(state_pre == RTP_STATE_ENABLE) {
// Open RTP UDP forward server.
if(rtp_voip_server_start(&rtp_confs)) {
LOGE("rtp_voip_server_start() fail.");
}
} else {
LOGW("Can not occur.");
}
tag = "RTP_STATE_VOIP_PROCESS";
} else {
LOGE("Unknown state : %d", state_cur);
return -1;
}
// Wait for state change.
rtp_main_thread_wait(tag);
return 0;
}
int main(int argc, char *argv[])
{
mbtk_log_init("radio", "MBTK_RTP");
LOGD("mbtk_rtpd start.");
memset(&rtp_info, 0, sizeof(rtp_info_t));
pthread_mutex_init(&rtp_info.mutex, NULL);
pthread_cond_init(&rtp_info.cond, NULL);
// Start server to monitor client messages.
if(rtp_ipc_server_start()) {
LOGE("rtp_ipc_server_start() fail.");
return -1;
}
while(!rtp_start(rtp_confs.rtp_state_pre, rtp_confs.rtp_state_cur))
{
LOGD("RTP will restart with state %d -> %d", rtp_confs.rtp_state_pre, rtp_confs.rtp_state_cur);
}
LOGE("RTP exit. rtp_start() fail.");
return 0;
}