blob: 50dceed77ba66118124541f6b957ef256ed256f8 [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Kernel Connection Multiplexor
4 *
5 * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
6 */
7
8#include <linux/bpf.h>
9#include <linux/errno.h>
10#include <linux/errqueue.h>
11#include <linux/file.h>
12#include <linux/in.h>
13#include <linux/kernel.h>
14#include <linux/module.h>
15#include <linux/net.h>
16#include <linux/netdevice.h>
17#include <linux/poll.h>
18#include <linux/rculist.h>
19#include <linux/skbuff.h>
20#include <linux/socket.h>
21#include <linux/uaccess.h>
22#include <linux/workqueue.h>
23#include <linux/syscalls.h>
24#include <linux/sched/signal.h>
25
26#include <net/kcm.h>
27#include <net/netns/generic.h>
28#include <net/sock.h>
29#include <uapi/linux/kcm.h>
30
31unsigned int kcm_net_id;
32
33static struct kmem_cache *kcm_psockp __read_mostly;
34static struct kmem_cache *kcm_muxp __read_mostly;
35static struct workqueue_struct *kcm_wq;
36
37static inline struct kcm_sock *kcm_sk(const struct sock *sk)
38{
39 return (struct kcm_sock *)sk;
40}
41
42static inline struct kcm_tx_msg *kcm_tx_msg(struct sk_buff *skb)
43{
44 return (struct kcm_tx_msg *)skb->cb;
45}
46
47static void report_csk_error(struct sock *csk, int err)
48{
49 csk->sk_err = EPIPE;
50 csk->sk_error_report(csk);
51}
52
53static void kcm_abort_tx_psock(struct kcm_psock *psock, int err,
54 bool wakeup_kcm)
55{
56 struct sock *csk = psock->sk;
57 struct kcm_mux *mux = psock->mux;
58
59 /* Unrecoverable error in transmit */
60
61 spin_lock_bh(&mux->lock);
62
63 if (psock->tx_stopped) {
64 spin_unlock_bh(&mux->lock);
65 return;
66 }
67
68 psock->tx_stopped = 1;
69 KCM_STATS_INCR(psock->stats.tx_aborts);
70
71 if (!psock->tx_kcm) {
72 /* Take off psocks_avail list */
73 list_del(&psock->psock_avail_list);
74 } else if (wakeup_kcm) {
75 /* In this case psock is being aborted while outside of
76 * write_msgs and psock is reserved. Schedule tx_work
77 * to handle the failure there. Need to commit tx_stopped
78 * before queuing work.
79 */
80 smp_mb();
81
82 queue_work(kcm_wq, &psock->tx_kcm->tx_work);
83 }
84
85 spin_unlock_bh(&mux->lock);
86
87 /* Report error on lower socket */
88 report_csk_error(csk, err);
89}
90
91/* RX mux lock held. */
92static void kcm_update_rx_mux_stats(struct kcm_mux *mux,
93 struct kcm_psock *psock)
94{
95 STRP_STATS_ADD(mux->stats.rx_bytes,
96 psock->strp.stats.bytes -
97 psock->saved_rx_bytes);
98 mux->stats.rx_msgs +=
99 psock->strp.stats.msgs - psock->saved_rx_msgs;
100 psock->saved_rx_msgs = psock->strp.stats.msgs;
101 psock->saved_rx_bytes = psock->strp.stats.bytes;
102}
103
104static void kcm_update_tx_mux_stats(struct kcm_mux *mux,
105 struct kcm_psock *psock)
106{
107 KCM_STATS_ADD(mux->stats.tx_bytes,
108 psock->stats.tx_bytes - psock->saved_tx_bytes);
109 mux->stats.tx_msgs +=
110 psock->stats.tx_msgs - psock->saved_tx_msgs;
111 psock->saved_tx_msgs = psock->stats.tx_msgs;
112 psock->saved_tx_bytes = psock->stats.tx_bytes;
113}
114
115static int kcm_queue_rcv_skb(struct sock *sk, struct sk_buff *skb);
116
117/* KCM is ready to receive messages on its queue-- either the KCM is new or
118 * has become unblocked after being blocked on full socket buffer. Queue any
119 * pending ready messages on a psock. RX mux lock held.
120 */
121static void kcm_rcv_ready(struct kcm_sock *kcm)
122{
123 struct kcm_mux *mux = kcm->mux;
124 struct kcm_psock *psock;
125 struct sk_buff *skb;
126
127 if (unlikely(kcm->rx_wait || kcm->rx_psock || kcm->rx_disabled))
128 return;
129
130 while (unlikely((skb = __skb_dequeue(&mux->rx_hold_queue)))) {
131 if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
132 /* Assuming buffer limit has been reached */
133 skb_queue_head(&mux->rx_hold_queue, skb);
134 WARN_ON(!sk_rmem_alloc_get(&kcm->sk));
135 return;
136 }
137 }
138
139 while (!list_empty(&mux->psocks_ready)) {
140 psock = list_first_entry(&mux->psocks_ready, struct kcm_psock,
141 psock_ready_list);
142
143 if (kcm_queue_rcv_skb(&kcm->sk, psock->ready_rx_msg)) {
144 /* Assuming buffer limit has been reached */
145 WARN_ON(!sk_rmem_alloc_get(&kcm->sk));
146 return;
147 }
148
149 /* Consumed the ready message on the psock. Schedule rx_work to
150 * get more messages.
151 */
152 list_del(&psock->psock_ready_list);
153 psock->ready_rx_msg = NULL;
154 /* Commit clearing of ready_rx_msg for queuing work */
155 smp_mb();
156
157 strp_unpause(&psock->strp);
158 strp_check_rcv(&psock->strp);
159 }
160
161 /* Buffer limit is okay now, add to ready list */
162 list_add_tail(&kcm->wait_rx_list,
163 &kcm->mux->kcm_rx_waiters);
164 /* paired with lockless reads in kcm_rfree() */
165 WRITE_ONCE(kcm->rx_wait, true);
166}
167
168static void kcm_rfree(struct sk_buff *skb)
169{
170 struct sock *sk = skb->sk;
171 struct kcm_sock *kcm = kcm_sk(sk);
172 struct kcm_mux *mux = kcm->mux;
173 unsigned int len = skb->truesize;
174
175 sk_mem_uncharge(sk, len);
176 atomic_sub(len, &sk->sk_rmem_alloc);
177
178 /* For reading rx_wait and rx_psock without holding lock */
179 smp_mb__after_atomic();
180
181 if (!READ_ONCE(kcm->rx_wait) && !READ_ONCE(kcm->rx_psock) &&
182 sk_rmem_alloc_get(sk) < sk->sk_rcvlowat) {
183 spin_lock_bh(&mux->rx_lock);
184 kcm_rcv_ready(kcm);
185 spin_unlock_bh(&mux->rx_lock);
186 }
187}
188
189static int kcm_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
190{
191 struct sk_buff_head *list = &sk->sk_receive_queue;
192
193 if (atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf)
194 return -ENOMEM;
195
196 if (!sk_rmem_schedule(sk, skb, skb->truesize))
197 return -ENOBUFS;
198
199 skb->dev = NULL;
200
201 skb_orphan(skb);
202 skb->sk = sk;
203 skb->destructor = kcm_rfree;
204 atomic_add(skb->truesize, &sk->sk_rmem_alloc);
205 sk_mem_charge(sk, skb->truesize);
206
207 skb_queue_tail(list, skb);
208
209 if (!sock_flag(sk, SOCK_DEAD))
210 sk->sk_data_ready(sk);
211
212 return 0;
213}
214
215/* Requeue received messages for a kcm socket to other kcm sockets. This is
216 * called with a kcm socket is receive disabled.
217 * RX mux lock held.
218 */
219static void requeue_rx_msgs(struct kcm_mux *mux, struct sk_buff_head *head)
220{
221 struct sk_buff *skb;
222 struct kcm_sock *kcm;
223
224 while ((skb = skb_dequeue(head))) {
225 /* Reset destructor to avoid calling kcm_rcv_ready */
226 skb->destructor = sock_rfree;
227 skb_orphan(skb);
228try_again:
229 if (list_empty(&mux->kcm_rx_waiters)) {
230 skb_queue_tail(&mux->rx_hold_queue, skb);
231 continue;
232 }
233
234 kcm = list_first_entry(&mux->kcm_rx_waiters,
235 struct kcm_sock, wait_rx_list);
236
237 if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
238 /* Should mean socket buffer full */
239 list_del(&kcm->wait_rx_list);
240 /* paired with lockless reads in kcm_rfree() */
241 WRITE_ONCE(kcm->rx_wait, false);
242
243 /* Commit rx_wait to read in kcm_free */
244 smp_wmb();
245
246 goto try_again;
247 }
248 }
249}
250
251/* Lower sock lock held */
252static struct kcm_sock *reserve_rx_kcm(struct kcm_psock *psock,
253 struct sk_buff *head)
254{
255 struct kcm_mux *mux = psock->mux;
256 struct kcm_sock *kcm;
257
258 WARN_ON(psock->ready_rx_msg);
259
260 if (psock->rx_kcm)
261 return psock->rx_kcm;
262
263 spin_lock_bh(&mux->rx_lock);
264
265 if (psock->rx_kcm) {
266 spin_unlock_bh(&mux->rx_lock);
267 return psock->rx_kcm;
268 }
269
270 kcm_update_rx_mux_stats(mux, psock);
271
272 if (list_empty(&mux->kcm_rx_waiters)) {
273 psock->ready_rx_msg = head;
274 strp_pause(&psock->strp);
275 list_add_tail(&psock->psock_ready_list,
276 &mux->psocks_ready);
277 spin_unlock_bh(&mux->rx_lock);
278 return NULL;
279 }
280
281 kcm = list_first_entry(&mux->kcm_rx_waiters,
282 struct kcm_sock, wait_rx_list);
283 list_del(&kcm->wait_rx_list);
284 /* paired with lockless reads in kcm_rfree() */
285 WRITE_ONCE(kcm->rx_wait, false);
286
287 psock->rx_kcm = kcm;
288 /* paired with lockless reads in kcm_rfree() */
289 WRITE_ONCE(kcm->rx_psock, psock);
290
291 spin_unlock_bh(&mux->rx_lock);
292
293 return kcm;
294}
295
296static void kcm_done(struct kcm_sock *kcm);
297
298static void kcm_done_work(struct work_struct *w)
299{
300 kcm_done(container_of(w, struct kcm_sock, done_work));
301}
302
303/* Lower sock held */
304static void unreserve_rx_kcm(struct kcm_psock *psock,
305 bool rcv_ready)
306{
307 struct kcm_sock *kcm = psock->rx_kcm;
308 struct kcm_mux *mux = psock->mux;
309
310 if (!kcm)
311 return;
312
313 spin_lock_bh(&mux->rx_lock);
314
315 psock->rx_kcm = NULL;
316 /* paired with lockless reads in kcm_rfree() */
317 WRITE_ONCE(kcm->rx_psock, NULL);
318
319 /* Commit kcm->rx_psock before sk_rmem_alloc_get to sync with
320 * kcm_rfree
321 */
322 smp_mb();
323
324 if (unlikely(kcm->done)) {
325 spin_unlock_bh(&mux->rx_lock);
326
327 /* Need to run kcm_done in a task since we need to qcquire
328 * callback locks which may already be held here.
329 */
330 INIT_WORK(&kcm->done_work, kcm_done_work);
331 schedule_work(&kcm->done_work);
332 return;
333 }
334
335 if (unlikely(kcm->rx_disabled)) {
336 requeue_rx_msgs(mux, &kcm->sk.sk_receive_queue);
337 } else if (rcv_ready || unlikely(!sk_rmem_alloc_get(&kcm->sk))) {
338 /* Check for degenerative race with rx_wait that all
339 * data was dequeued (accounted for in kcm_rfree).
340 */
341 kcm_rcv_ready(kcm);
342 }
343 spin_unlock_bh(&mux->rx_lock);
344}
345
346/* Lower sock lock held */
347static void psock_data_ready(struct sock *sk)
348{
349 struct kcm_psock *psock;
350
351 read_lock_bh(&sk->sk_callback_lock);
352
353 psock = (struct kcm_psock *)sk->sk_user_data;
354 if (likely(psock))
355 strp_data_ready(&psock->strp);
356
357 read_unlock_bh(&sk->sk_callback_lock);
358}
359
360/* Called with lower sock held */
361static void kcm_rcv_strparser(struct strparser *strp, struct sk_buff *skb)
362{
363 struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
364 struct kcm_sock *kcm;
365
366try_queue:
367 kcm = reserve_rx_kcm(psock, skb);
368 if (!kcm) {
369 /* Unable to reserve a KCM, message is held in psock and strp
370 * is paused.
371 */
372 return;
373 }
374
375 if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
376 /* Should mean socket buffer full */
377 unreserve_rx_kcm(psock, false);
378 goto try_queue;
379 }
380}
381
382static int kcm_parse_func_strparser(struct strparser *strp, struct sk_buff *skb)
383{
384 struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
385 struct bpf_prog *prog = psock->bpf_prog;
386 int res;
387
388 preempt_disable();
389 res = BPF_PROG_RUN(prog, skb);
390 preempt_enable();
391 return res;
392}
393
394static int kcm_read_sock_done(struct strparser *strp, int err)
395{
396 struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
397
398 unreserve_rx_kcm(psock, true);
399
400 return err;
401}
402
403static void psock_state_change(struct sock *sk)
404{
405 /* TCP only does a EPOLLIN for a half close. Do a EPOLLHUP here
406 * since application will normally not poll with EPOLLIN
407 * on the TCP sockets.
408 */
409
410 report_csk_error(sk, EPIPE);
411}
412
413static void psock_write_space(struct sock *sk)
414{
415 struct kcm_psock *psock;
416 struct kcm_mux *mux;
417 struct kcm_sock *kcm;
418
419 read_lock_bh(&sk->sk_callback_lock);
420
421 psock = (struct kcm_psock *)sk->sk_user_data;
422 if (unlikely(!psock))
423 goto out;
424 mux = psock->mux;
425
426 spin_lock_bh(&mux->lock);
427
428 /* Check if the socket is reserved so someone is waiting for sending. */
429 kcm = psock->tx_kcm;
430 if (kcm && !unlikely(kcm->tx_stopped))
431 queue_work(kcm_wq, &kcm->tx_work);
432
433 spin_unlock_bh(&mux->lock);
434out:
435 read_unlock_bh(&sk->sk_callback_lock);
436}
437
438static void unreserve_psock(struct kcm_sock *kcm);
439
440/* kcm sock is locked. */
441static struct kcm_psock *reserve_psock(struct kcm_sock *kcm)
442{
443 struct kcm_mux *mux = kcm->mux;
444 struct kcm_psock *psock;
445
446 psock = kcm->tx_psock;
447
448 smp_rmb(); /* Must read tx_psock before tx_wait */
449
450 if (psock) {
451 WARN_ON(kcm->tx_wait);
452 if (unlikely(psock->tx_stopped))
453 unreserve_psock(kcm);
454 else
455 return kcm->tx_psock;
456 }
457
458 spin_lock_bh(&mux->lock);
459
460 /* Check again under lock to see if psock was reserved for this
461 * psock via psock_unreserve.
462 */
463 psock = kcm->tx_psock;
464 if (unlikely(psock)) {
465 WARN_ON(kcm->tx_wait);
466 spin_unlock_bh(&mux->lock);
467 return kcm->tx_psock;
468 }
469
470 if (!list_empty(&mux->psocks_avail)) {
471 psock = list_first_entry(&mux->psocks_avail,
472 struct kcm_psock,
473 psock_avail_list);
474 list_del(&psock->psock_avail_list);
475 if (kcm->tx_wait) {
476 list_del(&kcm->wait_psock_list);
477 kcm->tx_wait = false;
478 }
479 kcm->tx_psock = psock;
480 psock->tx_kcm = kcm;
481 KCM_STATS_INCR(psock->stats.reserved);
482 } else if (!kcm->tx_wait) {
483 list_add_tail(&kcm->wait_psock_list,
484 &mux->kcm_tx_waiters);
485 kcm->tx_wait = true;
486 }
487
488 spin_unlock_bh(&mux->lock);
489
490 return psock;
491}
492
493/* mux lock held */
494static void psock_now_avail(struct kcm_psock *psock)
495{
496 struct kcm_mux *mux = psock->mux;
497 struct kcm_sock *kcm;
498
499 if (list_empty(&mux->kcm_tx_waiters)) {
500 list_add_tail(&psock->psock_avail_list,
501 &mux->psocks_avail);
502 } else {
503 kcm = list_first_entry(&mux->kcm_tx_waiters,
504 struct kcm_sock,
505 wait_psock_list);
506 list_del(&kcm->wait_psock_list);
507 kcm->tx_wait = false;
508 psock->tx_kcm = kcm;
509
510 /* Commit before changing tx_psock since that is read in
511 * reserve_psock before queuing work.
512 */
513 smp_mb();
514
515 kcm->tx_psock = psock;
516 KCM_STATS_INCR(psock->stats.reserved);
517 queue_work(kcm_wq, &kcm->tx_work);
518 }
519}
520
521/* kcm sock is locked. */
522static void unreserve_psock(struct kcm_sock *kcm)
523{
524 struct kcm_psock *psock;
525 struct kcm_mux *mux = kcm->mux;
526
527 spin_lock_bh(&mux->lock);
528
529 psock = kcm->tx_psock;
530
531 if (WARN_ON(!psock)) {
532 spin_unlock_bh(&mux->lock);
533 return;
534 }
535
536 smp_rmb(); /* Read tx_psock before tx_wait */
537
538 kcm_update_tx_mux_stats(mux, psock);
539
540 WARN_ON(kcm->tx_wait);
541
542 kcm->tx_psock = NULL;
543 psock->tx_kcm = NULL;
544 KCM_STATS_INCR(psock->stats.unreserved);
545
546 if (unlikely(psock->tx_stopped)) {
547 if (psock->done) {
548 /* Deferred free */
549 list_del(&psock->psock_list);
550 mux->psocks_cnt--;
551 sock_put(psock->sk);
552 fput(psock->sk->sk_socket->file);
553 kmem_cache_free(kcm_psockp, psock);
554 }
555
556 /* Don't put back on available list */
557
558 spin_unlock_bh(&mux->lock);
559
560 return;
561 }
562
563 psock_now_avail(psock);
564
565 spin_unlock_bh(&mux->lock);
566}
567
568static void kcm_report_tx_retry(struct kcm_sock *kcm)
569{
570 struct kcm_mux *mux = kcm->mux;
571
572 spin_lock_bh(&mux->lock);
573 KCM_STATS_INCR(mux->stats.tx_retries);
574 spin_unlock_bh(&mux->lock);
575}
576
577/* Write any messages ready on the kcm socket. Called with kcm sock lock
578 * held. Return bytes actually sent or error.
579 */
580static int kcm_write_msgs(struct kcm_sock *kcm)
581{
582 struct sock *sk = &kcm->sk;
583 struct kcm_psock *psock;
584 struct sk_buff *skb, *head;
585 struct kcm_tx_msg *txm;
586 unsigned short fragidx, frag_offset;
587 unsigned int sent, total_sent = 0;
588 int ret = 0;
589
590 kcm->tx_wait_more = false;
591 psock = kcm->tx_psock;
592 if (unlikely(psock && psock->tx_stopped)) {
593 /* A reserved psock was aborted asynchronously. Unreserve
594 * it and we'll retry the message.
595 */
596 unreserve_psock(kcm);
597 kcm_report_tx_retry(kcm);
598 if (skb_queue_empty(&sk->sk_write_queue))
599 return 0;
600
601 kcm_tx_msg(skb_peek(&sk->sk_write_queue))->sent = 0;
602
603 } else if (skb_queue_empty(&sk->sk_write_queue)) {
604 return 0;
605 }
606
607 head = skb_peek(&sk->sk_write_queue);
608 txm = kcm_tx_msg(head);
609
610 if (txm->sent) {
611 /* Send of first skbuff in queue already in progress */
612 if (WARN_ON(!psock)) {
613 ret = -EINVAL;
614 goto out;
615 }
616 sent = txm->sent;
617 frag_offset = txm->frag_offset;
618 fragidx = txm->fragidx;
619 skb = txm->frag_skb;
620
621 goto do_frag;
622 }
623
624try_again:
625 psock = reserve_psock(kcm);
626 if (!psock)
627 goto out;
628
629 do {
630 skb = head;
631 txm = kcm_tx_msg(head);
632 sent = 0;
633
634do_frag_list:
635 if (WARN_ON(!skb_shinfo(skb)->nr_frags)) {
636 ret = -EINVAL;
637 goto out;
638 }
639
640 for (fragidx = 0; fragidx < skb_shinfo(skb)->nr_frags;
641 fragidx++) {
642 skb_frag_t *frag;
643
644 frag_offset = 0;
645do_frag:
646 frag = &skb_shinfo(skb)->frags[fragidx];
647 if (WARN_ON(!skb_frag_size(frag))) {
648 ret = -EINVAL;
649 goto out;
650 }
651
652 ret = kernel_sendpage(psock->sk->sk_socket,
653 skb_frag_page(frag),
654 skb_frag_off(frag) + frag_offset,
655 skb_frag_size(frag) - frag_offset,
656 MSG_DONTWAIT);
657 if (ret <= 0) {
658 if (ret == -EAGAIN) {
659 /* Save state to try again when there's
660 * write space on the socket
661 */
662 txm->sent = sent;
663 txm->frag_offset = frag_offset;
664 txm->fragidx = fragidx;
665 txm->frag_skb = skb;
666
667 ret = 0;
668 goto out;
669 }
670
671 /* Hard failure in sending message, abort this
672 * psock since it has lost framing
673 * synchonization and retry sending the
674 * message from the beginning.
675 */
676 kcm_abort_tx_psock(psock, ret ? -ret : EPIPE,
677 true);
678 unreserve_psock(kcm);
679
680 txm->sent = 0;
681 kcm_report_tx_retry(kcm);
682 ret = 0;
683
684 goto try_again;
685 }
686
687 sent += ret;
688 frag_offset += ret;
689 KCM_STATS_ADD(psock->stats.tx_bytes, ret);
690 if (frag_offset < skb_frag_size(frag)) {
691 /* Not finished with this frag */
692 goto do_frag;
693 }
694 }
695
696 if (skb == head) {
697 if (skb_has_frag_list(skb)) {
698 skb = skb_shinfo(skb)->frag_list;
699 goto do_frag_list;
700 }
701 } else if (skb->next) {
702 skb = skb->next;
703 goto do_frag_list;
704 }
705
706 /* Successfully sent the whole packet, account for it. */
707 skb_dequeue(&sk->sk_write_queue);
708 kfree_skb(head);
709 sk->sk_wmem_queued -= sent;
710 total_sent += sent;
711 KCM_STATS_INCR(psock->stats.tx_msgs);
712 } while ((head = skb_peek(&sk->sk_write_queue)));
713out:
714 if (!head) {
715 /* Done with all queued messages. */
716 WARN_ON(!skb_queue_empty(&sk->sk_write_queue));
717 unreserve_psock(kcm);
718 }
719
720 /* Check if write space is available */
721 sk->sk_write_space(sk);
722
723 return total_sent ? : ret;
724}
725
726static void kcm_tx_work(struct work_struct *w)
727{
728 struct kcm_sock *kcm = container_of(w, struct kcm_sock, tx_work);
729 struct sock *sk = &kcm->sk;
730 int err;
731
732 lock_sock(sk);
733
734 /* Primarily for SOCK_DGRAM sockets, also handle asynchronous tx
735 * aborts
736 */
737 err = kcm_write_msgs(kcm);
738 if (err < 0) {
739 /* Hard failure in write, report error on KCM socket */
740 pr_warn("KCM: Hard failure on kcm_write_msgs %d\n", err);
741 report_csk_error(&kcm->sk, -err);
742 goto out;
743 }
744
745 /* Primarily for SOCK_SEQPACKET sockets */
746 if (likely(sk->sk_socket) &&
747 test_bit(SOCK_NOSPACE, &sk->sk_socket->flags)) {
748 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
749 sk->sk_write_space(sk);
750 }
751
752out:
753 release_sock(sk);
754}
755
756static void kcm_push(struct kcm_sock *kcm)
757{
758 if (kcm->tx_wait_more)
759 kcm_write_msgs(kcm);
760}
761
762static ssize_t kcm_sendpage(struct socket *sock, struct page *page,
763 int offset, size_t size, int flags)
764
765{
766 struct sock *sk = sock->sk;
767 struct kcm_sock *kcm = kcm_sk(sk);
768 struct sk_buff *skb = NULL, *head = NULL;
769 long timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
770 bool eor;
771 int err = 0;
772 int i;
773
774 if (flags & MSG_SENDPAGE_NOTLAST)
775 flags |= MSG_MORE;
776
777 /* No MSG_EOR from splice, only look at MSG_MORE */
778 eor = !(flags & MSG_MORE);
779
780 lock_sock(sk);
781
782 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
783
784 err = -EPIPE;
785 if (sk->sk_err)
786 goto out_error;
787
788 if (kcm->seq_skb) {
789 /* Previously opened message */
790 head = kcm->seq_skb;
791 skb = kcm_tx_msg(head)->last_skb;
792 i = skb_shinfo(skb)->nr_frags;
793
794 if (skb_can_coalesce(skb, i, page, offset)) {
795 skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], size);
796 skb_shinfo(skb)->tx_flags |= SKBTX_SHARED_FRAG;
797 goto coalesced;
798 }
799
800 if (i >= MAX_SKB_FRAGS) {
801 struct sk_buff *tskb;
802
803 tskb = alloc_skb(0, sk->sk_allocation);
804 while (!tskb) {
805 kcm_push(kcm);
806 err = sk_stream_wait_memory(sk, &timeo);
807 if (err)
808 goto out_error;
809 }
810
811 if (head == skb)
812 skb_shinfo(head)->frag_list = tskb;
813 else
814 skb->next = tskb;
815
816 skb = tskb;
817 skb->ip_summed = CHECKSUM_UNNECESSARY;
818 i = 0;
819 }
820 } else {
821 /* Call the sk_stream functions to manage the sndbuf mem. */
822 if (!sk_stream_memory_free(sk)) {
823 kcm_push(kcm);
824 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
825 err = sk_stream_wait_memory(sk, &timeo);
826 if (err)
827 goto out_error;
828 }
829
830 head = alloc_skb(0, sk->sk_allocation);
831 while (!head) {
832 kcm_push(kcm);
833 err = sk_stream_wait_memory(sk, &timeo);
834 if (err)
835 goto out_error;
836 }
837
838 skb = head;
839 i = 0;
840 }
841
842 get_page(page);
843 skb_fill_page_desc(skb, i, page, offset, size);
844 skb_shinfo(skb)->tx_flags |= SKBTX_SHARED_FRAG;
845
846coalesced:
847 skb->len += size;
848 skb->data_len += size;
849 skb->truesize += size;
850 sk->sk_wmem_queued += size;
851 sk_mem_charge(sk, size);
852
853 if (head != skb) {
854 head->len += size;
855 head->data_len += size;
856 head->truesize += size;
857 }
858
859 if (eor) {
860 bool not_busy = skb_queue_empty(&sk->sk_write_queue);
861
862 /* Message complete, queue it on send buffer */
863 __skb_queue_tail(&sk->sk_write_queue, head);
864 kcm->seq_skb = NULL;
865 KCM_STATS_INCR(kcm->stats.tx_msgs);
866
867 if (flags & MSG_BATCH) {
868 kcm->tx_wait_more = true;
869 } else if (kcm->tx_wait_more || not_busy) {
870 err = kcm_write_msgs(kcm);
871 if (err < 0) {
872 /* We got a hard error in write_msgs but have
873 * already queued this message. Report an error
874 * in the socket, but don't affect return value
875 * from sendmsg
876 */
877 pr_warn("KCM: Hard failure on kcm_write_msgs\n");
878 report_csk_error(&kcm->sk, -err);
879 }
880 }
881 } else {
882 /* Message not complete, save state */
883 kcm->seq_skb = head;
884 kcm_tx_msg(head)->last_skb = skb;
885 }
886
887 KCM_STATS_ADD(kcm->stats.tx_bytes, size);
888
889 release_sock(sk);
890 return size;
891
892out_error:
893 kcm_push(kcm);
894
895 err = sk_stream_error(sk, flags, err);
896
897 /* make sure we wake any epoll edge trigger waiter */
898 if (unlikely(skb_queue_len(&sk->sk_write_queue) == 0 && err == -EAGAIN))
899 sk->sk_write_space(sk);
900
901 release_sock(sk);
902 return err;
903}
904
905static int kcm_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
906{
907 struct sock *sk = sock->sk;
908 struct kcm_sock *kcm = kcm_sk(sk);
909 struct sk_buff *skb = NULL, *head = NULL;
910 size_t copy, copied = 0;
911 long timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
912 int eor = (sock->type == SOCK_DGRAM) ?
913 !(msg->msg_flags & MSG_MORE) : !!(msg->msg_flags & MSG_EOR);
914 int err = -EPIPE;
915
916 mutex_lock(&kcm->tx_mutex);
917 lock_sock(sk);
918
919 /* Per tcp_sendmsg this should be in poll */
920 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
921
922 if (sk->sk_err)
923 goto out_error;
924
925 if (kcm->seq_skb) {
926 /* Previously opened message */
927 head = kcm->seq_skb;
928 skb = kcm_tx_msg(head)->last_skb;
929 goto start;
930 }
931
932 /* Call the sk_stream functions to manage the sndbuf mem. */
933 if (!sk_stream_memory_free(sk)) {
934 kcm_push(kcm);
935 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
936 err = sk_stream_wait_memory(sk, &timeo);
937 if (err)
938 goto out_error;
939 }
940
941 if (msg_data_left(msg)) {
942 /* New message, alloc head skb */
943 head = alloc_skb(0, sk->sk_allocation);
944 while (!head) {
945 kcm_push(kcm);
946 err = sk_stream_wait_memory(sk, &timeo);
947 if (err)
948 goto out_error;
949
950 head = alloc_skb(0, sk->sk_allocation);
951 }
952
953 skb = head;
954
955 /* Set ip_summed to CHECKSUM_UNNECESSARY to avoid calling
956 * csum_and_copy_from_iter from skb_do_copy_data_nocache.
957 */
958 skb->ip_summed = CHECKSUM_UNNECESSARY;
959 }
960
961start:
962 while (msg_data_left(msg)) {
963 bool merge = true;
964 int i = skb_shinfo(skb)->nr_frags;
965 struct page_frag *pfrag = sk_page_frag(sk);
966
967 if (!sk_page_frag_refill(sk, pfrag))
968 goto wait_for_memory;
969
970 if (!skb_can_coalesce(skb, i, pfrag->page,
971 pfrag->offset)) {
972 if (i == MAX_SKB_FRAGS) {
973 struct sk_buff *tskb;
974
975 tskb = alloc_skb(0, sk->sk_allocation);
976 if (!tskb)
977 goto wait_for_memory;
978
979 if (head == skb)
980 skb_shinfo(head)->frag_list = tskb;
981 else
982 skb->next = tskb;
983
984 skb = tskb;
985 skb->ip_summed = CHECKSUM_UNNECESSARY;
986 continue;
987 }
988 merge = false;
989 }
990
991 copy = min_t(int, msg_data_left(msg),
992 pfrag->size - pfrag->offset);
993
994 if (!sk_wmem_schedule(sk, copy))
995 goto wait_for_memory;
996
997 err = skb_copy_to_page_nocache(sk, &msg->msg_iter, skb,
998 pfrag->page,
999 pfrag->offset,
1000 copy);
1001 if (err)
1002 goto out_error;
1003
1004 /* Update the skb. */
1005 if (merge) {
1006 skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
1007 } else {
1008 skb_fill_page_desc(skb, i, pfrag->page,
1009 pfrag->offset, copy);
1010 get_page(pfrag->page);
1011 }
1012
1013 pfrag->offset += copy;
1014 copied += copy;
1015 if (head != skb) {
1016 head->len += copy;
1017 head->data_len += copy;
1018 }
1019
1020 continue;
1021
1022wait_for_memory:
1023 kcm_push(kcm);
1024 err = sk_stream_wait_memory(sk, &timeo);
1025 if (err)
1026 goto out_error;
1027 }
1028
1029 if (eor) {
1030 bool not_busy = skb_queue_empty(&sk->sk_write_queue);
1031
1032 if (head) {
1033 /* Message complete, queue it on send buffer */
1034 __skb_queue_tail(&sk->sk_write_queue, head);
1035 kcm->seq_skb = NULL;
1036 KCM_STATS_INCR(kcm->stats.tx_msgs);
1037 }
1038
1039 if (msg->msg_flags & MSG_BATCH) {
1040 kcm->tx_wait_more = true;
1041 } else if (kcm->tx_wait_more || not_busy) {
1042 err = kcm_write_msgs(kcm);
1043 if (err < 0) {
1044 /* We got a hard error in write_msgs but have
1045 * already queued this message. Report an error
1046 * in the socket, but don't affect return value
1047 * from sendmsg
1048 */
1049 pr_warn("KCM: Hard failure on kcm_write_msgs\n");
1050 report_csk_error(&kcm->sk, -err);
1051 }
1052 }
1053 } else {
1054 /* Message not complete, save state */
1055partial_message:
1056 if (head) {
1057 kcm->seq_skb = head;
1058 kcm_tx_msg(head)->last_skb = skb;
1059 }
1060 }
1061
1062 KCM_STATS_ADD(kcm->stats.tx_bytes, copied);
1063
1064 release_sock(sk);
1065 mutex_unlock(&kcm->tx_mutex);
1066 return copied;
1067
1068out_error:
1069 kcm_push(kcm);
1070
1071 if (sock->type == SOCK_SEQPACKET) {
1072 /* Wrote some bytes before encountering an
1073 * error, return partial success.
1074 */
1075 if (copied)
1076 goto partial_message;
1077 if (head != kcm->seq_skb)
1078 kfree_skb(head);
1079 } else {
1080 kfree_skb(head);
1081 kcm->seq_skb = NULL;
1082 }
1083
1084 err = sk_stream_error(sk, msg->msg_flags, err);
1085
1086 /* make sure we wake any epoll edge trigger waiter */
1087 if (unlikely(skb_queue_len(&sk->sk_write_queue) == 0 && err == -EAGAIN))
1088 sk->sk_write_space(sk);
1089
1090 release_sock(sk);
1091 mutex_unlock(&kcm->tx_mutex);
1092 return err;
1093}
1094
1095static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
1096 size_t len, int flags)
1097{
1098 int noblock = flags & MSG_DONTWAIT;
1099 struct sock *sk = sock->sk;
1100 struct kcm_sock *kcm = kcm_sk(sk);
1101 int err = 0;
1102 struct strp_msg *stm;
1103 int copied = 0;
1104 struct sk_buff *skb;
1105
1106 skb = skb_recv_datagram(sk, flags, noblock, &err);
1107 if (!skb)
1108 goto out;
1109
1110 /* Okay, have a message on the receive queue */
1111
1112 stm = strp_msg(skb);
1113
1114 if (len > stm->full_len)
1115 len = stm->full_len;
1116
1117 err = skb_copy_datagram_msg(skb, stm->offset, msg, len);
1118 if (err < 0)
1119 goto out;
1120
1121 copied = len;
1122 if (likely(!(flags & MSG_PEEK))) {
1123 KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
1124 if (copied < stm->full_len) {
1125 if (sock->type == SOCK_DGRAM) {
1126 /* Truncated message */
1127 msg->msg_flags |= MSG_TRUNC;
1128 goto msg_finished;
1129 }
1130 stm->offset += copied;
1131 stm->full_len -= copied;
1132 } else {
1133msg_finished:
1134 /* Finished with message */
1135 msg->msg_flags |= MSG_EOR;
1136 KCM_STATS_INCR(kcm->stats.rx_msgs);
1137 }
1138 }
1139
1140out:
1141 skb_free_datagram(sk, skb);
1142 return copied ? : err;
1143}
1144
1145static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
1146 struct pipe_inode_info *pipe, size_t len,
1147 unsigned int flags)
1148{
1149 int noblock = flags & MSG_DONTWAIT;
1150 struct sock *sk = sock->sk;
1151 struct kcm_sock *kcm = kcm_sk(sk);
1152 struct strp_msg *stm;
1153 int err = 0;
1154 ssize_t copied;
1155 struct sk_buff *skb;
1156
1157 /* Only support splice for SOCKSEQPACKET */
1158
1159 skb = skb_recv_datagram(sk, flags, noblock, &err);
1160 if (!skb)
1161 goto err_out;
1162
1163 /* Okay, have a message on the receive queue */
1164
1165 stm = strp_msg(skb);
1166
1167 if (len > stm->full_len)
1168 len = stm->full_len;
1169
1170 copied = skb_splice_bits(skb, sk, stm->offset, pipe, len, flags);
1171 if (copied < 0) {
1172 err = copied;
1173 goto err_out;
1174 }
1175
1176 KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
1177
1178 stm->offset += copied;
1179 stm->full_len -= copied;
1180
1181 /* We have no way to return MSG_EOR. If all the bytes have been
1182 * read we still leave the message in the receive socket buffer.
1183 * A subsequent recvmsg needs to be done to return MSG_EOR and
1184 * finish reading the message.
1185 */
1186
1187 skb_free_datagram(sk, skb);
1188 return copied;
1189
1190err_out:
1191 skb_free_datagram(sk, skb);
1192 return err;
1193}
1194
1195/* kcm sock lock held */
1196static void kcm_recv_disable(struct kcm_sock *kcm)
1197{
1198 struct kcm_mux *mux = kcm->mux;
1199
1200 if (kcm->rx_disabled)
1201 return;
1202
1203 spin_lock_bh(&mux->rx_lock);
1204
1205 kcm->rx_disabled = 1;
1206
1207 /* If a psock is reserved we'll do cleanup in unreserve */
1208 if (!kcm->rx_psock) {
1209 if (kcm->rx_wait) {
1210 list_del(&kcm->wait_rx_list);
1211 /* paired with lockless reads in kcm_rfree() */
1212 WRITE_ONCE(kcm->rx_wait, false);
1213 }
1214
1215 requeue_rx_msgs(mux, &kcm->sk.sk_receive_queue);
1216 }
1217
1218 spin_unlock_bh(&mux->rx_lock);
1219}
1220
1221/* kcm sock lock held */
1222static void kcm_recv_enable(struct kcm_sock *kcm)
1223{
1224 struct kcm_mux *mux = kcm->mux;
1225
1226 if (!kcm->rx_disabled)
1227 return;
1228
1229 spin_lock_bh(&mux->rx_lock);
1230
1231 kcm->rx_disabled = 0;
1232 kcm_rcv_ready(kcm);
1233
1234 spin_unlock_bh(&mux->rx_lock);
1235}
1236
1237static int kcm_setsockopt(struct socket *sock, int level, int optname,
1238 char __user *optval, unsigned int optlen)
1239{
1240 struct kcm_sock *kcm = kcm_sk(sock->sk);
1241 int val, valbool;
1242 int err = 0;
1243
1244 if (level != SOL_KCM)
1245 return -ENOPROTOOPT;
1246
1247 if (optlen < sizeof(int))
1248 return -EINVAL;
1249
1250 if (get_user(val, (int __user *)optval))
1251 return -EINVAL;
1252
1253 valbool = val ? 1 : 0;
1254
1255 switch (optname) {
1256 case KCM_RECV_DISABLE:
1257 lock_sock(&kcm->sk);
1258 if (valbool)
1259 kcm_recv_disable(kcm);
1260 else
1261 kcm_recv_enable(kcm);
1262 release_sock(&kcm->sk);
1263 break;
1264 default:
1265 err = -ENOPROTOOPT;
1266 }
1267
1268 return err;
1269}
1270
1271static int kcm_getsockopt(struct socket *sock, int level, int optname,
1272 char __user *optval, int __user *optlen)
1273{
1274 struct kcm_sock *kcm = kcm_sk(sock->sk);
1275 int val, len;
1276
1277 if (level != SOL_KCM)
1278 return -ENOPROTOOPT;
1279
1280 if (get_user(len, optlen))
1281 return -EFAULT;
1282
1283 if (len < 0)
1284 return -EINVAL;
1285
1286 len = min_t(unsigned int, len, sizeof(int));
1287
1288 switch (optname) {
1289 case KCM_RECV_DISABLE:
1290 val = kcm->rx_disabled;
1291 break;
1292 default:
1293 return -ENOPROTOOPT;
1294 }
1295
1296 if (put_user(len, optlen))
1297 return -EFAULT;
1298 if (copy_to_user(optval, &val, len))
1299 return -EFAULT;
1300 return 0;
1301}
1302
1303static void init_kcm_sock(struct kcm_sock *kcm, struct kcm_mux *mux)
1304{
1305 struct kcm_sock *tkcm;
1306 struct list_head *head;
1307 int index = 0;
1308
1309 /* For SOCK_SEQPACKET sock type, datagram_poll checks the sk_state, so
1310 * we set sk_state, otherwise epoll_wait always returns right away with
1311 * EPOLLHUP
1312 */
1313 kcm->sk.sk_state = TCP_ESTABLISHED;
1314
1315 /* Add to mux's kcm sockets list */
1316 kcm->mux = mux;
1317 spin_lock_bh(&mux->lock);
1318
1319 head = &mux->kcm_socks;
1320 list_for_each_entry(tkcm, &mux->kcm_socks, kcm_sock_list) {
1321 if (tkcm->index != index)
1322 break;
1323 head = &tkcm->kcm_sock_list;
1324 index++;
1325 }
1326
1327 list_add(&kcm->kcm_sock_list, head);
1328 kcm->index = index;
1329
1330 mux->kcm_socks_cnt++;
1331 spin_unlock_bh(&mux->lock);
1332
1333 INIT_WORK(&kcm->tx_work, kcm_tx_work);
1334 mutex_init(&kcm->tx_mutex);
1335
1336 spin_lock_bh(&mux->rx_lock);
1337 kcm_rcv_ready(kcm);
1338 spin_unlock_bh(&mux->rx_lock);
1339}
1340
1341static int kcm_attach(struct socket *sock, struct socket *csock,
1342 struct bpf_prog *prog)
1343{
1344 struct kcm_sock *kcm = kcm_sk(sock->sk);
1345 struct kcm_mux *mux = kcm->mux;
1346 struct sock *csk;
1347 struct kcm_psock *psock = NULL, *tpsock;
1348 struct list_head *head;
1349 int index = 0;
1350 static const struct strp_callbacks cb = {
1351 .rcv_msg = kcm_rcv_strparser,
1352 .parse_msg = kcm_parse_func_strparser,
1353 .read_sock_done = kcm_read_sock_done,
1354 };
1355 int err = 0;
1356
1357 csk = csock->sk;
1358 if (!csk)
1359 return -EINVAL;
1360
1361 lock_sock(csk);
1362
1363 /* Only allow TCP sockets to be attached for now */
1364 if ((csk->sk_family != AF_INET && csk->sk_family != AF_INET6) ||
1365 csk->sk_protocol != IPPROTO_TCP) {
1366 err = -EOPNOTSUPP;
1367 goto out;
1368 }
1369
1370 /* Don't allow listeners or closed sockets */
1371 if (csk->sk_state == TCP_LISTEN || csk->sk_state == TCP_CLOSE) {
1372 err = -EOPNOTSUPP;
1373 goto out;
1374 }
1375
1376 psock = kmem_cache_zalloc(kcm_psockp, GFP_KERNEL);
1377 if (!psock) {
1378 err = -ENOMEM;
1379 goto out;
1380 }
1381
1382 psock->mux = mux;
1383 psock->sk = csk;
1384 psock->bpf_prog = prog;
1385
1386 write_lock_bh(&csk->sk_callback_lock);
1387
1388 /* Check if sk_user_data is aready by KCM or someone else.
1389 * Must be done under lock to prevent race conditions.
1390 */
1391 if (csk->sk_user_data) {
1392 write_unlock_bh(&csk->sk_callback_lock);
1393 kmem_cache_free(kcm_psockp, psock);
1394 err = -EALREADY;
1395 goto out;
1396 }
1397
1398 err = strp_init(&psock->strp, csk, &cb);
1399 if (err) {
1400 write_unlock_bh(&csk->sk_callback_lock);
1401 kmem_cache_free(kcm_psockp, psock);
1402 goto out;
1403 }
1404
1405 psock->save_data_ready = csk->sk_data_ready;
1406 psock->save_write_space = csk->sk_write_space;
1407 psock->save_state_change = csk->sk_state_change;
1408 csk->sk_user_data = psock;
1409 csk->sk_data_ready = psock_data_ready;
1410 csk->sk_write_space = psock_write_space;
1411 csk->sk_state_change = psock_state_change;
1412
1413 write_unlock_bh(&csk->sk_callback_lock);
1414
1415 sock_hold(csk);
1416
1417 /* Finished initialization, now add the psock to the MUX. */
1418 spin_lock_bh(&mux->lock);
1419 head = &mux->psocks;
1420 list_for_each_entry(tpsock, &mux->psocks, psock_list) {
1421 if (tpsock->index != index)
1422 break;
1423 head = &tpsock->psock_list;
1424 index++;
1425 }
1426
1427 list_add(&psock->psock_list, head);
1428 psock->index = index;
1429
1430 KCM_STATS_INCR(mux->stats.psock_attach);
1431 mux->psocks_cnt++;
1432 psock_now_avail(psock);
1433 spin_unlock_bh(&mux->lock);
1434
1435 /* Schedule RX work in case there are already bytes queued */
1436 strp_check_rcv(&psock->strp);
1437
1438out:
1439 release_sock(csk);
1440
1441 return err;
1442}
1443
1444static int kcm_attach_ioctl(struct socket *sock, struct kcm_attach *info)
1445{
1446 struct socket *csock;
1447 struct bpf_prog *prog;
1448 int err;
1449
1450 csock = sockfd_lookup(info->fd, &err);
1451 if (!csock)
1452 return -ENOENT;
1453
1454 prog = bpf_prog_get_type(info->bpf_fd, BPF_PROG_TYPE_SOCKET_FILTER);
1455 if (IS_ERR(prog)) {
1456 err = PTR_ERR(prog);
1457 goto out;
1458 }
1459
1460 err = kcm_attach(sock, csock, prog);
1461 if (err) {
1462 bpf_prog_put(prog);
1463 goto out;
1464 }
1465
1466 /* Keep reference on file also */
1467
1468 return 0;
1469out:
1470 fput(csock->file);
1471 return err;
1472}
1473
1474static void kcm_unattach(struct kcm_psock *psock)
1475{
1476 struct sock *csk = psock->sk;
1477 struct kcm_mux *mux = psock->mux;
1478
1479 lock_sock(csk);
1480
1481 /* Stop getting callbacks from TCP socket. After this there should
1482 * be no way to reserve a kcm for this psock.
1483 */
1484 write_lock_bh(&csk->sk_callback_lock);
1485 csk->sk_user_data = NULL;
1486 csk->sk_data_ready = psock->save_data_ready;
1487 csk->sk_write_space = psock->save_write_space;
1488 csk->sk_state_change = psock->save_state_change;
1489 strp_stop(&psock->strp);
1490
1491 if (WARN_ON(psock->rx_kcm)) {
1492 write_unlock_bh(&csk->sk_callback_lock);
1493 release_sock(csk);
1494 return;
1495 }
1496
1497 spin_lock_bh(&mux->rx_lock);
1498
1499 /* Stop receiver activities. After this point psock should not be
1500 * able to get onto ready list either through callbacks or work.
1501 */
1502 if (psock->ready_rx_msg) {
1503 list_del(&psock->psock_ready_list);
1504 kfree_skb(psock->ready_rx_msg);
1505 psock->ready_rx_msg = NULL;
1506 KCM_STATS_INCR(mux->stats.rx_ready_drops);
1507 }
1508
1509 spin_unlock_bh(&mux->rx_lock);
1510
1511 write_unlock_bh(&csk->sk_callback_lock);
1512
1513 /* Call strp_done without sock lock */
1514 release_sock(csk);
1515 strp_done(&psock->strp);
1516 lock_sock(csk);
1517
1518 bpf_prog_put(psock->bpf_prog);
1519
1520 spin_lock_bh(&mux->lock);
1521
1522 aggregate_psock_stats(&psock->stats, &mux->aggregate_psock_stats);
1523 save_strp_stats(&psock->strp, &mux->aggregate_strp_stats);
1524
1525 KCM_STATS_INCR(mux->stats.psock_unattach);
1526
1527 if (psock->tx_kcm) {
1528 /* psock was reserved. Just mark it finished and we will clean
1529 * up in the kcm paths, we need kcm lock which can not be
1530 * acquired here.
1531 */
1532 KCM_STATS_INCR(mux->stats.psock_unattach_rsvd);
1533 spin_unlock_bh(&mux->lock);
1534
1535 /* We are unattaching a socket that is reserved. Abort the
1536 * socket since we may be out of sync in sending on it. We need
1537 * to do this without the mux lock.
1538 */
1539 kcm_abort_tx_psock(psock, EPIPE, false);
1540
1541 spin_lock_bh(&mux->lock);
1542 if (!psock->tx_kcm) {
1543 /* psock now unreserved in window mux was unlocked */
1544 goto no_reserved;
1545 }
1546 psock->done = 1;
1547
1548 /* Commit done before queuing work to process it */
1549 smp_mb();
1550
1551 /* Queue tx work to make sure psock->done is handled */
1552 queue_work(kcm_wq, &psock->tx_kcm->tx_work);
1553 spin_unlock_bh(&mux->lock);
1554 } else {
1555no_reserved:
1556 if (!psock->tx_stopped)
1557 list_del(&psock->psock_avail_list);
1558 list_del(&psock->psock_list);
1559 mux->psocks_cnt--;
1560 spin_unlock_bh(&mux->lock);
1561
1562 sock_put(csk);
1563 fput(csk->sk_socket->file);
1564 kmem_cache_free(kcm_psockp, psock);
1565 }
1566
1567 release_sock(csk);
1568}
1569
1570static int kcm_unattach_ioctl(struct socket *sock, struct kcm_unattach *info)
1571{
1572 struct kcm_sock *kcm = kcm_sk(sock->sk);
1573 struct kcm_mux *mux = kcm->mux;
1574 struct kcm_psock *psock;
1575 struct socket *csock;
1576 struct sock *csk;
1577 int err;
1578
1579 csock = sockfd_lookup(info->fd, &err);
1580 if (!csock)
1581 return -ENOENT;
1582
1583 csk = csock->sk;
1584 if (!csk) {
1585 err = -EINVAL;
1586 goto out;
1587 }
1588
1589 err = -ENOENT;
1590
1591 spin_lock_bh(&mux->lock);
1592
1593 list_for_each_entry(psock, &mux->psocks, psock_list) {
1594 if (psock->sk != csk)
1595 continue;
1596
1597 /* Found the matching psock */
1598
1599 if (psock->unattaching || WARN_ON(psock->done)) {
1600 err = -EALREADY;
1601 break;
1602 }
1603
1604 psock->unattaching = 1;
1605
1606 spin_unlock_bh(&mux->lock);
1607
1608 /* Lower socket lock should already be held */
1609 kcm_unattach(psock);
1610
1611 err = 0;
1612 goto out;
1613 }
1614
1615 spin_unlock_bh(&mux->lock);
1616
1617out:
1618 fput(csock->file);
1619 return err;
1620}
1621
1622static struct proto kcm_proto = {
1623 .name = "KCM",
1624 .owner = THIS_MODULE,
1625 .obj_size = sizeof(struct kcm_sock),
1626};
1627
1628/* Clone a kcm socket. */
1629static struct file *kcm_clone(struct socket *osock)
1630{
1631 struct socket *newsock;
1632 struct sock *newsk;
1633
1634 newsock = sock_alloc();
1635 if (!newsock)
1636 return ERR_PTR(-ENFILE);
1637
1638 newsock->type = osock->type;
1639 newsock->ops = osock->ops;
1640
1641 __module_get(newsock->ops->owner);
1642
1643 newsk = sk_alloc(sock_net(osock->sk), PF_KCM, GFP_KERNEL,
1644 &kcm_proto, false);
1645 if (!newsk) {
1646 sock_release(newsock);
1647 return ERR_PTR(-ENOMEM);
1648 }
1649 sock_init_data(newsock, newsk);
1650 init_kcm_sock(kcm_sk(newsk), kcm_sk(osock->sk)->mux);
1651
1652 return sock_alloc_file(newsock, 0, osock->sk->sk_prot_creator->name);
1653}
1654
1655static int kcm_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
1656{
1657 int err;
1658
1659 switch (cmd) {
1660 case SIOCKCMATTACH: {
1661 struct kcm_attach info;
1662
1663 if (copy_from_user(&info, (void __user *)arg, sizeof(info)))
1664 return -EFAULT;
1665
1666 err = kcm_attach_ioctl(sock, &info);
1667
1668 break;
1669 }
1670 case SIOCKCMUNATTACH: {
1671 struct kcm_unattach info;
1672
1673 if (copy_from_user(&info, (void __user *)arg, sizeof(info)))
1674 return -EFAULT;
1675
1676 err = kcm_unattach_ioctl(sock, &info);
1677
1678 break;
1679 }
1680 case SIOCKCMCLONE: {
1681 struct kcm_clone info;
1682 struct file *file;
1683
1684 info.fd = get_unused_fd_flags(0);
1685 if (unlikely(info.fd < 0))
1686 return info.fd;
1687
1688 file = kcm_clone(sock);
1689 if (IS_ERR(file)) {
1690 put_unused_fd(info.fd);
1691 return PTR_ERR(file);
1692 }
1693 if (copy_to_user((void __user *)arg, &info,
1694 sizeof(info))) {
1695 put_unused_fd(info.fd);
1696 fput(file);
1697 return -EFAULT;
1698 }
1699 fd_install(info.fd, file);
1700 err = 0;
1701 break;
1702 }
1703 default:
1704 err = -ENOIOCTLCMD;
1705 break;
1706 }
1707
1708 return err;
1709}
1710
1711static void free_mux(struct rcu_head *rcu)
1712{
1713 struct kcm_mux *mux = container_of(rcu,
1714 struct kcm_mux, rcu);
1715
1716 kmem_cache_free(kcm_muxp, mux);
1717}
1718
1719static void release_mux(struct kcm_mux *mux)
1720{
1721 struct kcm_net *knet = mux->knet;
1722 struct kcm_psock *psock, *tmp_psock;
1723
1724 /* Release psocks */
1725 list_for_each_entry_safe(psock, tmp_psock,
1726 &mux->psocks, psock_list) {
1727 if (!WARN_ON(psock->unattaching))
1728 kcm_unattach(psock);
1729 }
1730
1731 if (WARN_ON(mux->psocks_cnt))
1732 return;
1733
1734 __skb_queue_purge(&mux->rx_hold_queue);
1735
1736 mutex_lock(&knet->mutex);
1737 aggregate_mux_stats(&mux->stats, &knet->aggregate_mux_stats);
1738 aggregate_psock_stats(&mux->aggregate_psock_stats,
1739 &knet->aggregate_psock_stats);
1740 aggregate_strp_stats(&mux->aggregate_strp_stats,
1741 &knet->aggregate_strp_stats);
1742 list_del_rcu(&mux->kcm_mux_list);
1743 knet->count--;
1744 mutex_unlock(&knet->mutex);
1745
1746 call_rcu(&mux->rcu, free_mux);
1747}
1748
1749static void kcm_done(struct kcm_sock *kcm)
1750{
1751 struct kcm_mux *mux = kcm->mux;
1752 struct sock *sk = &kcm->sk;
1753 int socks_cnt;
1754
1755 spin_lock_bh(&mux->rx_lock);
1756 if (kcm->rx_psock) {
1757 /* Cleanup in unreserve_rx_kcm */
1758 WARN_ON(kcm->done);
1759 kcm->rx_disabled = 1;
1760 kcm->done = 1;
1761 spin_unlock_bh(&mux->rx_lock);
1762 return;
1763 }
1764
1765 if (kcm->rx_wait) {
1766 list_del(&kcm->wait_rx_list);
1767 /* paired with lockless reads in kcm_rfree() */
1768 WRITE_ONCE(kcm->rx_wait, false);
1769 }
1770 /* Move any pending receive messages to other kcm sockets */
1771 requeue_rx_msgs(mux, &sk->sk_receive_queue);
1772
1773 spin_unlock_bh(&mux->rx_lock);
1774
1775 if (WARN_ON(sk_rmem_alloc_get(sk)))
1776 return;
1777
1778 /* Detach from MUX */
1779 spin_lock_bh(&mux->lock);
1780
1781 list_del(&kcm->kcm_sock_list);
1782 mux->kcm_socks_cnt--;
1783 socks_cnt = mux->kcm_socks_cnt;
1784
1785 spin_unlock_bh(&mux->lock);
1786
1787 if (!socks_cnt) {
1788 /* We are done with the mux now. */
1789 release_mux(mux);
1790 }
1791
1792 WARN_ON(kcm->rx_wait);
1793
1794 sock_put(&kcm->sk);
1795}
1796
1797/* Called by kcm_release to close a KCM socket.
1798 * If this is the last KCM socket on the MUX, destroy the MUX.
1799 */
1800static int kcm_release(struct socket *sock)
1801{
1802 struct sock *sk = sock->sk;
1803 struct kcm_sock *kcm;
1804 struct kcm_mux *mux;
1805 struct kcm_psock *psock;
1806
1807 if (!sk)
1808 return 0;
1809
1810 kcm = kcm_sk(sk);
1811 mux = kcm->mux;
1812
1813 lock_sock(sk);
1814 sock_orphan(sk);
1815 kfree_skb(kcm->seq_skb);
1816
1817 /* Purge queue under lock to avoid race condition with tx_work trying
1818 * to act when queue is nonempty. If tx_work runs after this point
1819 * it will just return.
1820 */
1821 __skb_queue_purge(&sk->sk_write_queue);
1822
1823 /* Set tx_stopped. This is checked when psock is bound to a kcm and we
1824 * get a writespace callback. This prevents further work being queued
1825 * from the callback (unbinding the psock occurs after canceling work.
1826 */
1827 kcm->tx_stopped = 1;
1828
1829 release_sock(sk);
1830
1831 spin_lock_bh(&mux->lock);
1832 if (kcm->tx_wait) {
1833 /* Take of tx_wait list, after this point there should be no way
1834 * that a psock will be assigned to this kcm.
1835 */
1836 list_del(&kcm->wait_psock_list);
1837 kcm->tx_wait = false;
1838 }
1839 spin_unlock_bh(&mux->lock);
1840
1841 /* Cancel work. After this point there should be no outside references
1842 * to the kcm socket.
1843 */
1844 cancel_work_sync(&kcm->tx_work);
1845
1846 lock_sock(sk);
1847 psock = kcm->tx_psock;
1848 if (psock) {
1849 /* A psock was reserved, so we need to kill it since it
1850 * may already have some bytes queued from a message. We
1851 * need to do this after removing kcm from tx_wait list.
1852 */
1853 kcm_abort_tx_psock(psock, EPIPE, false);
1854 unreserve_psock(kcm);
1855 }
1856 release_sock(sk);
1857
1858 WARN_ON(kcm->tx_wait);
1859 WARN_ON(kcm->tx_psock);
1860
1861 sock->sk = NULL;
1862
1863 kcm_done(kcm);
1864
1865 return 0;
1866}
1867
1868static const struct proto_ops kcm_dgram_ops = {
1869 .family = PF_KCM,
1870 .owner = THIS_MODULE,
1871 .release = kcm_release,
1872 .bind = sock_no_bind,
1873 .connect = sock_no_connect,
1874 .socketpair = sock_no_socketpair,
1875 .accept = sock_no_accept,
1876 .getname = sock_no_getname,
1877 .poll = datagram_poll,
1878 .ioctl = kcm_ioctl,
1879 .listen = sock_no_listen,
1880 .shutdown = sock_no_shutdown,
1881 .setsockopt = kcm_setsockopt,
1882 .getsockopt = kcm_getsockopt,
1883 .sendmsg = kcm_sendmsg,
1884 .recvmsg = kcm_recvmsg,
1885 .mmap = sock_no_mmap,
1886 .sendpage = kcm_sendpage,
1887};
1888
1889static const struct proto_ops kcm_seqpacket_ops = {
1890 .family = PF_KCM,
1891 .owner = THIS_MODULE,
1892 .release = kcm_release,
1893 .bind = sock_no_bind,
1894 .connect = sock_no_connect,
1895 .socketpair = sock_no_socketpair,
1896 .accept = sock_no_accept,
1897 .getname = sock_no_getname,
1898 .poll = datagram_poll,
1899 .ioctl = kcm_ioctl,
1900 .listen = sock_no_listen,
1901 .shutdown = sock_no_shutdown,
1902 .setsockopt = kcm_setsockopt,
1903 .getsockopt = kcm_getsockopt,
1904 .sendmsg = kcm_sendmsg,
1905 .recvmsg = kcm_recvmsg,
1906 .mmap = sock_no_mmap,
1907 .sendpage = kcm_sendpage,
1908 .splice_read = kcm_splice_read,
1909};
1910
1911/* Create proto operation for kcm sockets */
1912static int kcm_create(struct net *net, struct socket *sock,
1913 int protocol, int kern)
1914{
1915 struct kcm_net *knet = net_generic(net, kcm_net_id);
1916 struct sock *sk;
1917 struct kcm_mux *mux;
1918
1919 switch (sock->type) {
1920 case SOCK_DGRAM:
1921 sock->ops = &kcm_dgram_ops;
1922 break;
1923 case SOCK_SEQPACKET:
1924 sock->ops = &kcm_seqpacket_ops;
1925 break;
1926 default:
1927 return -ESOCKTNOSUPPORT;
1928 }
1929
1930 if (protocol != KCMPROTO_CONNECTED)
1931 return -EPROTONOSUPPORT;
1932
1933 sk = sk_alloc(net, PF_KCM, GFP_KERNEL, &kcm_proto, kern);
1934 if (!sk)
1935 return -ENOMEM;
1936
1937 /* Allocate a kcm mux, shared between KCM sockets */
1938 mux = kmem_cache_zalloc(kcm_muxp, GFP_KERNEL);
1939 if (!mux) {
1940 sk_free(sk);
1941 return -ENOMEM;
1942 }
1943
1944 spin_lock_init(&mux->lock);
1945 spin_lock_init(&mux->rx_lock);
1946 INIT_LIST_HEAD(&mux->kcm_socks);
1947 INIT_LIST_HEAD(&mux->kcm_rx_waiters);
1948 INIT_LIST_HEAD(&mux->kcm_tx_waiters);
1949
1950 INIT_LIST_HEAD(&mux->psocks);
1951 INIT_LIST_HEAD(&mux->psocks_ready);
1952 INIT_LIST_HEAD(&mux->psocks_avail);
1953
1954 mux->knet = knet;
1955
1956 /* Add new MUX to list */
1957 mutex_lock(&knet->mutex);
1958 list_add_rcu(&mux->kcm_mux_list, &knet->mux_list);
1959 knet->count++;
1960 mutex_unlock(&knet->mutex);
1961
1962 skb_queue_head_init(&mux->rx_hold_queue);
1963
1964 /* Init KCM socket */
1965 sock_init_data(sock, sk);
1966 init_kcm_sock(kcm_sk(sk), mux);
1967
1968 return 0;
1969}
1970
1971static const struct net_proto_family kcm_family_ops = {
1972 .family = PF_KCM,
1973 .create = kcm_create,
1974 .owner = THIS_MODULE,
1975};
1976
1977static __net_init int kcm_init_net(struct net *net)
1978{
1979 struct kcm_net *knet = net_generic(net, kcm_net_id);
1980
1981 INIT_LIST_HEAD_RCU(&knet->mux_list);
1982 mutex_init(&knet->mutex);
1983
1984 return 0;
1985}
1986
1987static __net_exit void kcm_exit_net(struct net *net)
1988{
1989 struct kcm_net *knet = net_generic(net, kcm_net_id);
1990
1991 /* All KCM sockets should be closed at this point, which should mean
1992 * that all multiplexors and psocks have been destroyed.
1993 */
1994 WARN_ON(!list_empty(&knet->mux_list));
1995
1996 mutex_destroy(&knet->mutex);
1997}
1998
1999static struct pernet_operations kcm_net_ops = {
2000 .init = kcm_init_net,
2001 .exit = kcm_exit_net,
2002 .id = &kcm_net_id,
2003 .size = sizeof(struct kcm_net),
2004};
2005
2006static int __init kcm_init(void)
2007{
2008 int err = -ENOMEM;
2009
2010 kcm_muxp = kmem_cache_create("kcm_mux_cache",
2011 sizeof(struct kcm_mux), 0,
2012 SLAB_HWCACHE_ALIGN, NULL);
2013 if (!kcm_muxp)
2014 goto fail;
2015
2016 kcm_psockp = kmem_cache_create("kcm_psock_cache",
2017 sizeof(struct kcm_psock), 0,
2018 SLAB_HWCACHE_ALIGN, NULL);
2019 if (!kcm_psockp)
2020 goto fail;
2021
2022 kcm_wq = create_singlethread_workqueue("kkcmd");
2023 if (!kcm_wq)
2024 goto fail;
2025
2026 err = proto_register(&kcm_proto, 1);
2027 if (err)
2028 goto fail;
2029
2030 err = register_pernet_device(&kcm_net_ops);
2031 if (err)
2032 goto net_ops_fail;
2033
2034 err = sock_register(&kcm_family_ops);
2035 if (err)
2036 goto sock_register_fail;
2037
2038 err = kcm_proc_init();
2039 if (err)
2040 goto proc_init_fail;
2041
2042 return 0;
2043
2044proc_init_fail:
2045 sock_unregister(PF_KCM);
2046
2047sock_register_fail:
2048 unregister_pernet_device(&kcm_net_ops);
2049
2050net_ops_fail:
2051 proto_unregister(&kcm_proto);
2052
2053fail:
2054 kmem_cache_destroy(kcm_muxp);
2055 kmem_cache_destroy(kcm_psockp);
2056
2057 if (kcm_wq)
2058 destroy_workqueue(kcm_wq);
2059
2060 return err;
2061}
2062
2063static void __exit kcm_exit(void)
2064{
2065 kcm_proc_exit();
2066 sock_unregister(PF_KCM);
2067 unregister_pernet_device(&kcm_net_ops);
2068 proto_unregister(&kcm_proto);
2069 destroy_workqueue(kcm_wq);
2070
2071 kmem_cache_destroy(kcm_muxp);
2072 kmem_cache_destroy(kcm_psockp);
2073}
2074
2075module_init(kcm_init);
2076module_exit(kcm_exit);
2077
2078MODULE_LICENSE("GPL");
2079MODULE_ALIAS_NETPROTO(PF_KCM);