blob: 11166fa5c152972519f553c0ff0fddd4b084f1c8 [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001#include <stdlib.h>
2#include <string.h>
3#include <errno.h>
4#include <pthread.h>
5#include <sys/time.h>
6#include "threadqueue.h"
7
8
9#define MSGPOOL_SIZE 256
10
11struct msglist {
12 struct threadmsg msg;
13 struct msglist *next;
14};
15
16static inline struct msglist *get_msglist(struct threadqueue *queue)
17{
18struct msglist *tmp;
19
20 if(queue->msgpool != NULL) {
21 tmp = queue->msgpool;
22 queue->msgpool = tmp->next;
23 queue->msgpool_length--;
24 } else {
25 tmp = malloc(sizeof *tmp);
26 }
27
28 return tmp;
29}
30
31static inline void release_msglist(struct threadqueue *queue,struct msglist *node)
32{
33
34 if(queue->msgpool_length > ( queue->length/8 + MSGPOOL_SIZE)) {
35 free(node);
36 } else {
37 node->msg.data = NULL;
38 node->msg.msgtype = 0;
39 node->next = queue->msgpool;
40 queue->msgpool = node;
41 queue->msgpool_length++;
42 }
43 if(queue->msgpool_length > (queue->length/4 + MSGPOOL_SIZE*10)) {
44 struct msglist *tmp = queue->msgpool;
45 queue->msgpool = tmp->next;
46 free(tmp);
47 queue->msgpool_length--;
48 }
49}
50
51int thread_queue_init(struct threadqueue *queue)
52{
53 int ret = 0;
54 if (queue == NULL)
55 return EINVAL;
56
57 memset(queue, 0, sizeof(struct threadqueue));
58 ret = pthread_cond_init(&queue->cond, NULL);
59 if (ret != 0)
60 return ret;
61
62 ret = pthread_mutex_init(&queue->mutex, NULL);
63 if (ret != 0) {
64 pthread_cond_destroy(&queue->cond);
65 return ret;
66 }
67
68 return 0;
69}
70
71int thread_queue_add(struct threadqueue *queue, void *data, long msgtype)
72{
73 struct msglist *newmsg;
74
75 pthread_mutex_lock(&queue->mutex);
76 newmsg = get_msglist(queue);
77 if (newmsg == NULL) {
78 pthread_mutex_unlock(&queue->mutex);
79 return ENOMEM;
80 }
81 newmsg->msg.data = data;
82 newmsg->msg.msgtype = msgtype;
83
84 newmsg->next = NULL;
85 if (queue->last == NULL) {
86 queue->last = newmsg;
87 queue->first = newmsg;
88 } else {
89 queue->last->next = newmsg;
90 queue->last = newmsg;
91 }
92
93 if(queue->length == 0)
94 pthread_cond_broadcast(&queue->cond);
95 queue->length++;
96 pthread_mutex_unlock(&queue->mutex);
97
98 return 0;
99}
100
101int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg)
102{
103 struct msglist *firstrec;
104 int ret = 0;
105 struct timespec abstimeout;
106
107 if (queue == NULL || msg == NULL) {
108 return EINVAL;
109 }
110 if (timeout) {
111 struct timeval now;
112
113 gettimeofday(&now, NULL);
114 abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
115 abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
116 if (abstimeout.tv_nsec >= 1000000000) {
117 abstimeout.tv_sec++;
118 abstimeout.tv_nsec -= 1000000000;
119 }
120 }
121
122 pthread_mutex_lock(&queue->mutex);
123
124 /* Will wait until awakened by a signal or broadcast */
125 while (queue->first == NULL && ret != ETIMEDOUT) { //Need to loop to handle spurious wakeups
126 if (timeout)
127 ret = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
128 else
129 pthread_cond_wait(&queue->cond, &queue->mutex);
130 }
131 if (ret == ETIMEDOUT) {
132 pthread_mutex_unlock(&queue->mutex);
133 return ret;
134 }
135
136 firstrec = queue->first;
137 queue->first = queue->first->next;
138 queue->length--;
139
140 if (queue->first == NULL) {
141 queue->last = NULL; // we know this since we hold the lock
142 queue->length = 0;
143 }
144
145 msg->data = firstrec->msg.data;
146 msg->msgtype = firstrec->msg.msgtype;
147 msg->qlength = queue->length;
148
149 release_msglist(queue,firstrec);
150 pthread_mutex_unlock(&queue->mutex);
151
152 return 0;
153}
154
155//maybe caller should supply a callback for cleaning the elements ?
156int thread_queue_cleanup(struct threadqueue *queue, int freedata)
157{
158 struct msglist *rec;
159 struct msglist *next;
160 struct msglist *recs[2];
161 int ret,i;
162
163 if (queue == NULL)
164 return EINVAL;
165
166 pthread_mutex_lock(&queue->mutex);
167 recs[0] = queue->first;
168 recs[1] = queue->msgpool;
169 for(i = 0; i < 2 ; i++) {
170 rec = recs[i];
171 while (rec) {
172 next = rec->next;
173 if (freedata)
174 free(rec->msg.data);
175 free(rec);
176 rec = next;
177 }
178 }
179
180 pthread_mutex_unlock(&queue->mutex);
181 ret = pthread_mutex_destroy(&queue->mutex);
182 pthread_cond_destroy(&queue->cond);
183
184 return ret;
185}
186
187long thread_queue_length(struct threadqueue *queue)
188{
189 long counter;
190 /* get the length properly */
191 pthread_mutex_lock(&queue->mutex);
192 counter = queue->length;
193 pthread_mutex_unlock(&queue->mutex);
194
195 return counter;
196}