blob: 2011219c11a914a32e0e98cf82daf251314606d8 [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Generic ring buffer
4 *
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 */
7#include <linux/trace_events.h>
8#include <linux/ring_buffer.h>
9#include <linux/trace_clock.h>
10#include <linux/sched/clock.h>
11#include <linux/trace_seq.h>
12#include <linux/spinlock.h>
13#include <linux/irq_work.h>
14#include <linux/security.h>
15#include <linux/uaccess.h>
16#include <linux/hardirq.h>
17#include <linux/kthread.h> /* for self test */
18#include <linux/module.h>
19#include <linux/percpu.h>
20#include <linux/mutex.h>
21#include <linux/delay.h>
22#include <linux/slab.h>
23#include <linux/init.h>
24#include <linux/hash.h>
25#include <linux/list.h>
26#include <linux/cpu.h>
27#include <linux/oom.h>
28
29#include <asm/local.h>
30
31static void update_pages_handler(struct work_struct *work);
32
33/*
34 * The ring buffer header is special. We must manually up keep it.
35 */
36int ring_buffer_print_entry_header(struct trace_seq *s)
37{
38 trace_seq_puts(s, "# compressed entry header\n");
39 trace_seq_puts(s, "\ttype_len : 5 bits\n");
40 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
41 trace_seq_puts(s, "\tarray : 32 bits\n");
42 trace_seq_putc(s, '\n');
43 trace_seq_printf(s, "\tpadding : type == %d\n",
44 RINGBUF_TYPE_PADDING);
45 trace_seq_printf(s, "\ttime_extend : type == %d\n",
46 RINGBUF_TYPE_TIME_EXTEND);
47 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
48 RINGBUF_TYPE_TIME_STAMP);
49 trace_seq_printf(s, "\tdata max type_len == %d\n",
50 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
51
52 return !trace_seq_has_overflowed(s);
53}
54
55/*
56 * The ring buffer is made up of a list of pages. A separate list of pages is
57 * allocated for each CPU. A writer may only write to a buffer that is
58 * associated with the CPU it is currently executing on. A reader may read
59 * from any per cpu buffer.
60 *
61 * The reader is special. For each per cpu buffer, the reader has its own
62 * reader page. When a reader has read the entire reader page, this reader
63 * page is swapped with another page in the ring buffer.
64 *
65 * Now, as long as the writer is off the reader page, the reader can do what
66 * ever it wants with that page. The writer will never write to that page
67 * again (as long as it is out of the ring buffer).
68 *
69 * Here's some silly ASCII art.
70 *
71 * +------+
72 * |reader| RING BUFFER
73 * |page |
74 * +------+ +---+ +---+ +---+
75 * | |-->| |-->| |
76 * +---+ +---+ +---+
77 * ^ |
78 * | |
79 * +---------------+
80 *
81 *
82 * +------+
83 * |reader| RING BUFFER
84 * |page |------------------v
85 * +------+ +---+ +---+ +---+
86 * | |-->| |-->| |
87 * +---+ +---+ +---+
88 * ^ |
89 * | |
90 * +---------------+
91 *
92 *
93 * +------+
94 * |reader| RING BUFFER
95 * |page |------------------v
96 * +------+ +---+ +---+ +---+
97 * ^ | |-->| |-->| |
98 * | +---+ +---+ +---+
99 * | |
100 * | |
101 * +------------------------------+
102 *
103 *
104 * +------+
105 * |buffer| RING BUFFER
106 * |page |------------------v
107 * +------+ +---+ +---+ +---+
108 * ^ | | | |-->| |
109 * | New +---+ +---+ +---+
110 * | Reader------^ |
111 * | page |
112 * +------------------------------+
113 *
114 *
115 * After we make this swap, the reader can hand this page off to the splice
116 * code and be done with it. It can even allocate a new page if it needs to
117 * and swap that into the ring buffer.
118 *
119 * We will be using cmpxchg soon to make all this lockless.
120 *
121 */
122
123/* Used for individual buffers (after the counter) */
124#define RB_BUFFER_OFF (1 << 20)
125
126#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
127
128#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
129#define RB_ALIGNMENT 4U
130#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
131#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
132
133#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
134# define RB_FORCE_8BYTE_ALIGNMENT 0
135# define RB_ARCH_ALIGNMENT RB_ALIGNMENT
136#else
137# define RB_FORCE_8BYTE_ALIGNMENT 1
138# define RB_ARCH_ALIGNMENT 8U
139#endif
140
141#define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
142
143/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
144#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
145
146enum {
147 RB_LEN_TIME_EXTEND = 8,
148 RB_LEN_TIME_STAMP = 8,
149};
150
151#define skip_time_extend(event) \
152 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
153
154#define extended_time(event) \
155 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
156
157static inline int rb_null_event(struct ring_buffer_event *event)
158{
159 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
160}
161
162static void rb_event_set_padding(struct ring_buffer_event *event)
163{
164 /* padding has a NULL time_delta */
165 event->type_len = RINGBUF_TYPE_PADDING;
166 event->time_delta = 0;
167}
168
169static unsigned
170rb_event_data_length(struct ring_buffer_event *event)
171{
172 unsigned length;
173
174 if (event->type_len)
175 length = event->type_len * RB_ALIGNMENT;
176 else
177 length = event->array[0];
178 return length + RB_EVNT_HDR_SIZE;
179}
180
181/*
182 * Return the length of the given event. Will return
183 * the length of the time extend if the event is a
184 * time extend.
185 */
186static inline unsigned
187rb_event_length(struct ring_buffer_event *event)
188{
189 switch (event->type_len) {
190 case RINGBUF_TYPE_PADDING:
191 if (rb_null_event(event))
192 /* undefined */
193 return -1;
194 return event->array[0] + RB_EVNT_HDR_SIZE;
195
196 case RINGBUF_TYPE_TIME_EXTEND:
197 return RB_LEN_TIME_EXTEND;
198
199 case RINGBUF_TYPE_TIME_STAMP:
200 return RB_LEN_TIME_STAMP;
201
202 case RINGBUF_TYPE_DATA:
203 return rb_event_data_length(event);
204 default:
205 BUG();
206 }
207 /* not hit */
208 return 0;
209}
210
211/*
212 * Return total length of time extend and data,
213 * or just the event length for all other events.
214 */
215static inline unsigned
216rb_event_ts_length(struct ring_buffer_event *event)
217{
218 unsigned len = 0;
219
220 if (extended_time(event)) {
221 /* time extends include the data event after it */
222 len = RB_LEN_TIME_EXTEND;
223 event = skip_time_extend(event);
224 }
225 return len + rb_event_length(event);
226}
227
228/**
229 * ring_buffer_event_length - return the length of the event
230 * @event: the event to get the length of
231 *
232 * Returns the size of the data load of a data event.
233 * If the event is something other than a data event, it
234 * returns the size of the event itself. With the exception
235 * of a TIME EXTEND, where it still returns the size of the
236 * data load of the data event after it.
237 */
238unsigned ring_buffer_event_length(struct ring_buffer_event *event)
239{
240 unsigned length;
241
242 if (extended_time(event))
243 event = skip_time_extend(event);
244
245 length = rb_event_length(event);
246 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
247 return length;
248 length -= RB_EVNT_HDR_SIZE;
249 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
250 length -= sizeof(event->array[0]);
251 return length;
252}
253EXPORT_SYMBOL_GPL(ring_buffer_event_length);
254
255/* inline for ring buffer fast paths */
256static __always_inline void *
257rb_event_data(struct ring_buffer_event *event)
258{
259 if (extended_time(event))
260 event = skip_time_extend(event);
261 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
262 /* If length is in len field, then array[0] has the data */
263 if (event->type_len)
264 return (void *)&event->array[0];
265 /* Otherwise length is in array[0] and array[1] has the data */
266 return (void *)&event->array[1];
267}
268
269/**
270 * ring_buffer_event_data - return the data of the event
271 * @event: the event to get the data from
272 */
273void *ring_buffer_event_data(struct ring_buffer_event *event)
274{
275 return rb_event_data(event);
276}
277EXPORT_SYMBOL_GPL(ring_buffer_event_data);
278
279#define for_each_buffer_cpu(buffer, cpu) \
280 for_each_cpu(cpu, buffer->cpumask)
281
282#define TS_SHIFT 27
283#define TS_MASK ((1ULL << TS_SHIFT) - 1)
284#define TS_DELTA_TEST (~TS_MASK)
285
286/**
287 * ring_buffer_event_time_stamp - return the event's extended timestamp
288 * @event: the event to get the timestamp of
289 *
290 * Returns the extended timestamp associated with a data event.
291 * An extended time_stamp is a 64-bit timestamp represented
292 * internally in a special way that makes the best use of space
293 * contained within a ring buffer event. This function decodes
294 * it and maps it to a straight u64 value.
295 */
296u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event)
297{
298 u64 ts;
299
300 ts = event->array[0];
301 ts <<= TS_SHIFT;
302 ts += event->time_delta;
303
304 return ts;
305}
306
307/* Flag when events were overwritten */
308#define RB_MISSED_EVENTS (1 << 31)
309/* Missed count stored at end */
310#define RB_MISSED_STORED (1 << 30)
311
312#define RB_MISSED_FLAGS (RB_MISSED_EVENTS|RB_MISSED_STORED)
313
314struct buffer_data_page {
315 u64 time_stamp; /* page time stamp */
316 local_t commit; /* write committed index */
317 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
318};
319
320/*
321 * Note, the buffer_page list must be first. The buffer pages
322 * are allocated in cache lines, which means that each buffer
323 * page will be at the beginning of a cache line, and thus
324 * the least significant bits will be zero. We use this to
325 * add flags in the list struct pointers, to make the ring buffer
326 * lockless.
327 */
328struct buffer_page {
329 struct list_head list; /* list of buffer pages */
330 local_t write; /* index for next write */
331 unsigned read; /* index for next read */
332 local_t entries; /* entries on this page */
333 unsigned long real_end; /* real end of data */
334 struct buffer_data_page *page; /* Actual data page */
335};
336
337/*
338 * The buffer page counters, write and entries, must be reset
339 * atomically when crossing page boundaries. To synchronize this
340 * update, two counters are inserted into the number. One is
341 * the actual counter for the write position or count on the page.
342 *
343 * The other is a counter of updaters. Before an update happens
344 * the update partition of the counter is incremented. This will
345 * allow the updater to update the counter atomically.
346 *
347 * The counter is 20 bits, and the state data is 12.
348 */
349#define RB_WRITE_MASK 0xfffff
350#define RB_WRITE_INTCNT (1 << 20)
351
352static void rb_init_page(struct buffer_data_page *bpage)
353{
354 local_set(&bpage->commit, 0);
355}
356
357/*
358 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
359 * this issue out.
360 */
361static void free_buffer_page(struct buffer_page *bpage)
362{
363 free_page((unsigned long)bpage->page);
364 kfree(bpage);
365}
366
367/*
368 * We need to fit the time_stamp delta into 27 bits.
369 */
370static inline int test_time_stamp(u64 delta)
371{
372 if (delta & TS_DELTA_TEST)
373 return 1;
374 return 0;
375}
376
377#define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
378
379/* Max payload is BUF_PAGE_SIZE - header (8bytes) */
380#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
381
382int ring_buffer_print_page_header(struct trace_seq *s)
383{
384 struct buffer_data_page field;
385
386 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
387 "offset:0;\tsize:%u;\tsigned:%u;\n",
388 (unsigned int)sizeof(field.time_stamp),
389 (unsigned int)is_signed_type(u64));
390
391 trace_seq_printf(s, "\tfield: local_t commit;\t"
392 "offset:%u;\tsize:%u;\tsigned:%u;\n",
393 (unsigned int)offsetof(typeof(field), commit),
394 (unsigned int)sizeof(field.commit),
395 (unsigned int)is_signed_type(long));
396
397 trace_seq_printf(s, "\tfield: int overwrite;\t"
398 "offset:%u;\tsize:%u;\tsigned:%u;\n",
399 (unsigned int)offsetof(typeof(field), commit),
400 1,
401 (unsigned int)is_signed_type(long));
402
403 trace_seq_printf(s, "\tfield: char data;\t"
404 "offset:%u;\tsize:%u;\tsigned:%u;\n",
405 (unsigned int)offsetof(typeof(field), data),
406 (unsigned int)BUF_PAGE_SIZE,
407 (unsigned int)is_signed_type(char));
408
409 return !trace_seq_has_overflowed(s);
410}
411
412struct rb_irq_work {
413 struct irq_work work;
414 wait_queue_head_t waiters;
415 wait_queue_head_t full_waiters;
416 bool waiters_pending;
417 bool full_waiters_pending;
418 bool wakeup_full;
419};
420
421/*
422 * Structure to hold event state and handle nested events.
423 */
424struct rb_event_info {
425 u64 ts;
426 u64 delta;
427 unsigned long length;
428 struct buffer_page *tail_page;
429 int add_timestamp;
430};
431
432/*
433 * Used for which event context the event is in.
434 * TRANSITION = 0
435 * NMI = 1
436 * IRQ = 2
437 * SOFTIRQ = 3
438 * NORMAL = 4
439 *
440 * See trace_recursive_lock() comment below for more details.
441 */
442enum {
443 RB_CTX_TRANSITION,
444 RB_CTX_NMI,
445 RB_CTX_IRQ,
446 RB_CTX_SOFTIRQ,
447 RB_CTX_NORMAL,
448 RB_CTX_MAX
449};
450
451/*
452 * head_page == tail_page && head == tail then buffer is empty.
453 */
454struct ring_buffer_per_cpu {
455 int cpu;
456 atomic_t record_disabled;
457 struct ring_buffer *buffer;
458 raw_spinlock_t reader_lock; /* serialize readers */
459 arch_spinlock_t lock;
460 struct lock_class_key lock_key;
461 struct buffer_data_page *free_page;
462 unsigned long nr_pages;
463 unsigned int current_context;
464 struct list_head *pages;
465 struct buffer_page *head_page; /* read from head */
466 struct buffer_page *tail_page; /* write to tail */
467 struct buffer_page *commit_page; /* committed pages */
468 struct buffer_page *reader_page;
469 unsigned long lost_events;
470 unsigned long last_overrun;
471 unsigned long nest;
472 local_t entries_bytes;
473 local_t entries;
474 local_t overrun;
475 local_t commit_overrun;
476 local_t dropped_events;
477 local_t committing;
478 local_t commits;
479 local_t pages_touched;
480 local_t pages_lost;
481 local_t pages_read;
482 long last_pages_touch;
483 size_t shortest_full;
484 unsigned long read;
485 unsigned long read_bytes;
486 u64 write_stamp;
487 u64 read_stamp;
488 /* pages removed since last reset */
489 unsigned long pages_removed;
490 /* ring buffer pages to update, > 0 to add, < 0 to remove */
491 long nr_pages_to_update;
492 struct list_head new_pages; /* new pages to add */
493 struct work_struct update_pages_work;
494 struct completion update_done;
495
496 struct rb_irq_work irq_work;
497};
498
499struct ring_buffer {
500 unsigned flags;
501 int cpus;
502 atomic_t record_disabled;
503 atomic_t resize_disabled;
504 cpumask_var_t cpumask;
505
506 struct lock_class_key *reader_lock_key;
507
508 struct mutex mutex;
509
510 struct ring_buffer_per_cpu **buffers;
511
512 struct hlist_node node;
513 u64 (*clock)(void);
514
515 struct rb_irq_work irq_work;
516 bool time_stamp_abs;
517};
518
519struct ring_buffer_iter {
520 struct ring_buffer_per_cpu *cpu_buffer;
521 unsigned long head;
522 struct buffer_page *head_page;
523 struct buffer_page *cache_reader_page;
524 unsigned long cache_read;
525 unsigned long cache_pages_removed;
526 u64 read_stamp;
527};
528
529/**
530 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
531 * @buffer: The ring_buffer to get the number of pages from
532 * @cpu: The cpu of the ring_buffer to get the number of pages from
533 *
534 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
535 */
536size_t ring_buffer_nr_pages(struct ring_buffer *buffer, int cpu)
537{
538 return buffer->buffers[cpu]->nr_pages;
539}
540
541/**
542 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
543 * @buffer: The ring_buffer to get the number of pages from
544 * @cpu: The cpu of the ring_buffer to get the number of pages from
545 *
546 * Returns the number of pages that have content in the ring buffer.
547 */
548size_t ring_buffer_nr_dirty_pages(struct ring_buffer *buffer, int cpu)
549{
550 size_t read;
551 size_t lost;
552 size_t cnt;
553
554 read = local_read(&buffer->buffers[cpu]->pages_read);
555 lost = local_read(&buffer->buffers[cpu]->pages_lost);
556 cnt = local_read(&buffer->buffers[cpu]->pages_touched);
557
558 if (WARN_ON_ONCE(cnt < lost))
559 return 0;
560
561 cnt -= lost;
562
563 /* The reader can read an empty page, but not more than that */
564 if (cnt < read) {
565 WARN_ON_ONCE(read > cnt + 1);
566 return 0;
567 }
568
569 return cnt - read;
570}
571
572static __always_inline bool full_hit(struct ring_buffer *buffer, int cpu, int full)
573{
574 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
575 size_t nr_pages;
576 size_t dirty;
577
578 nr_pages = cpu_buffer->nr_pages;
579 if (!nr_pages || !full)
580 return true;
581
582 /*
583 * Add one as dirty will never equal nr_pages, as the sub-buffer
584 * that the writer is on is not counted as dirty.
585 * This is needed if "buffer_percent" is set to 100.
586 */
587 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
588
589 return (dirty * 100) >= (full * nr_pages);
590}
591
592/*
593 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
594 *
595 * Schedules a delayed work to wake up any task that is blocked on the
596 * ring buffer waiters queue.
597 */
598static void rb_wake_up_waiters(struct irq_work *work)
599{
600 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
601
602 wake_up_all(&rbwork->waiters);
603 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
604 /* Only cpu_buffer sets the above flags */
605 struct ring_buffer_per_cpu *cpu_buffer =
606 container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
607
608 /* Called from interrupt context */
609 raw_spin_lock(&cpu_buffer->reader_lock);
610 rbwork->wakeup_full = false;
611 rbwork->full_waiters_pending = false;
612
613 /* Waking up all waiters, they will reset the shortest full */
614 cpu_buffer->shortest_full = 0;
615 raw_spin_unlock(&cpu_buffer->reader_lock);
616
617 wake_up_all(&rbwork->full_waiters);
618 }
619}
620
621/**
622 * ring_buffer_wait - wait for input to the ring buffer
623 * @buffer: buffer to wait on
624 * @cpu: the cpu buffer to wait on
625 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
626 *
627 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
628 * as data is added to any of the @buffer's cpu buffers. Otherwise
629 * it will wait for data to be added to a specific cpu buffer.
630 */
631int ring_buffer_wait(struct ring_buffer *buffer, int cpu, int full)
632{
633 struct ring_buffer_per_cpu *cpu_buffer;
634 DEFINE_WAIT(wait);
635 struct rb_irq_work *work;
636 int ret = 0;
637
638 /*
639 * Depending on what the caller is waiting for, either any
640 * data in any cpu buffer, or a specific buffer, put the
641 * caller on the appropriate wait queue.
642 */
643 if (cpu == RING_BUFFER_ALL_CPUS) {
644 work = &buffer->irq_work;
645 /* Full only makes sense on per cpu reads */
646 full = 0;
647 } else {
648 if (!cpumask_test_cpu(cpu, buffer->cpumask))
649 return -ENODEV;
650 cpu_buffer = buffer->buffers[cpu];
651 work = &cpu_buffer->irq_work;
652 }
653
654
655 while (true) {
656 if (full)
657 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
658 else
659 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
660
661 /*
662 * The events can happen in critical sections where
663 * checking a work queue can cause deadlocks.
664 * After adding a task to the queue, this flag is set
665 * only to notify events to try to wake up the queue
666 * using irq_work.
667 *
668 * We don't clear it even if the buffer is no longer
669 * empty. The flag only causes the next event to run
670 * irq_work to do the work queue wake up. The worse
671 * that can happen if we race with !trace_empty() is that
672 * an event will cause an irq_work to try to wake up
673 * an empty queue.
674 *
675 * There's no reason to protect this flag either, as
676 * the work queue and irq_work logic will do the necessary
677 * synchronization for the wake ups. The only thing
678 * that is necessary is that the wake up happens after
679 * a task has been queued. It's OK for spurious wake ups.
680 */
681 if (full)
682 work->full_waiters_pending = true;
683 else
684 work->waiters_pending = true;
685
686 if (signal_pending(current)) {
687 ret = -EINTR;
688 break;
689 }
690
691 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
692 break;
693
694 if (cpu != RING_BUFFER_ALL_CPUS &&
695 !ring_buffer_empty_cpu(buffer, cpu)) {
696 unsigned long flags;
697 bool pagebusy;
698 bool done;
699
700 if (!full)
701 break;
702
703 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
704 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
705 done = !pagebusy && full_hit(buffer, cpu, full);
706
707 if (!cpu_buffer->shortest_full ||
708 cpu_buffer->shortest_full > full)
709 cpu_buffer->shortest_full = full;
710 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
711 if (done)
712 break;
713 }
714
715 schedule();
716 }
717
718 if (full)
719 finish_wait(&work->full_waiters, &wait);
720 else
721 finish_wait(&work->waiters, &wait);
722
723 return ret;
724}
725
726/**
727 * ring_buffer_poll_wait - poll on buffer input
728 * @buffer: buffer to wait on
729 * @cpu: the cpu buffer to wait on
730 * @filp: the file descriptor
731 * @poll_table: The poll descriptor
732 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
733 *
734 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
735 * as data is added to any of the @buffer's cpu buffers. Otherwise
736 * it will wait for data to be added to a specific cpu buffer.
737 *
738 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
739 * zero otherwise.
740 */
741__poll_t ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
742 struct file *filp, poll_table *poll_table, int full)
743{
744 struct ring_buffer_per_cpu *cpu_buffer;
745 struct rb_irq_work *rbwork;
746
747 if (cpu == RING_BUFFER_ALL_CPUS) {
748 rbwork = &buffer->irq_work;
749 full = 0;
750 } else {
751 if (!cpumask_test_cpu(cpu, buffer->cpumask))
752 return EPOLLERR;
753
754 cpu_buffer = buffer->buffers[cpu];
755 rbwork = &cpu_buffer->irq_work;
756 }
757
758 if (full) {
759 unsigned long flags;
760
761 poll_wait(filp, &rbwork->full_waiters, poll_table);
762
763 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
764 if (!cpu_buffer->shortest_full ||
765 cpu_buffer->shortest_full > full)
766 cpu_buffer->shortest_full = full;
767 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
768 if (full_hit(buffer, cpu, full))
769 return EPOLLIN | EPOLLRDNORM;
770 /*
771 * Only allow full_waiters_pending update to be seen after
772 * the shortest_full is set. If the writer sees the
773 * full_waiters_pending flag set, it will compare the
774 * amount in the ring buffer to shortest_full. If the amount
775 * in the ring buffer is greater than the shortest_full
776 * percent, it will call the irq_work handler to wake up
777 * this list. The irq_handler will reset shortest_full
778 * back to zero. That's done under the reader_lock, but
779 * the below smp_mb() makes sure that the update to
780 * full_waiters_pending doesn't leak up into the above.
781 */
782 smp_mb();
783 rbwork->full_waiters_pending = true;
784 return 0;
785 }
786
787 poll_wait(filp, &rbwork->waiters, poll_table);
788 rbwork->waiters_pending = true;
789
790 /*
791 * There's a tight race between setting the waiters_pending and
792 * checking if the ring buffer is empty. Once the waiters_pending bit
793 * is set, the next event will wake the task up, but we can get stuck
794 * if there's only a single event in.
795 *
796 * FIXME: Ideally, we need a memory barrier on the writer side as well,
797 * but adding a memory barrier to all events will cause too much of a
798 * performance hit in the fast path. We only need a memory barrier when
799 * the buffer goes from empty to having content. But as this race is
800 * extremely small, and it's not a problem if another event comes in, we
801 * will fix it later.
802 */
803 smp_mb();
804
805 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
806 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
807 return EPOLLIN | EPOLLRDNORM;
808 return 0;
809}
810
811/* buffer may be either ring_buffer or ring_buffer_per_cpu */
812#define RB_WARN_ON(b, cond) \
813 ({ \
814 int _____ret = unlikely(cond); \
815 if (_____ret) { \
816 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
817 struct ring_buffer_per_cpu *__b = \
818 (void *)b; \
819 atomic_inc(&__b->buffer->record_disabled); \
820 } else \
821 atomic_inc(&b->record_disabled); \
822 WARN_ON(1); \
823 } \
824 _____ret; \
825 })
826
827/* Up this if you want to test the TIME_EXTENTS and normalization */
828#define DEBUG_SHIFT 0
829
830static inline u64 rb_time_stamp(struct ring_buffer *buffer)
831{
832 /* shift to debug/test normalization and TIME_EXTENTS */
833 return buffer->clock() << DEBUG_SHIFT;
834}
835
836u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
837{
838 u64 time;
839
840 preempt_disable_notrace();
841 time = rb_time_stamp(buffer);
842 preempt_enable_notrace();
843
844 return time;
845}
846EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
847
848void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
849 int cpu, u64 *ts)
850{
851 /* Just stupid testing the normalize function and deltas */
852 *ts >>= DEBUG_SHIFT;
853}
854EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
855
856/*
857 * Making the ring buffer lockless makes things tricky.
858 * Although writes only happen on the CPU that they are on,
859 * and they only need to worry about interrupts. Reads can
860 * happen on any CPU.
861 *
862 * The reader page is always off the ring buffer, but when the
863 * reader finishes with a page, it needs to swap its page with
864 * a new one from the buffer. The reader needs to take from
865 * the head (writes go to the tail). But if a writer is in overwrite
866 * mode and wraps, it must push the head page forward.
867 *
868 * Here lies the problem.
869 *
870 * The reader must be careful to replace only the head page, and
871 * not another one. As described at the top of the file in the
872 * ASCII art, the reader sets its old page to point to the next
873 * page after head. It then sets the page after head to point to
874 * the old reader page. But if the writer moves the head page
875 * during this operation, the reader could end up with the tail.
876 *
877 * We use cmpxchg to help prevent this race. We also do something
878 * special with the page before head. We set the LSB to 1.
879 *
880 * When the writer must push the page forward, it will clear the
881 * bit that points to the head page, move the head, and then set
882 * the bit that points to the new head page.
883 *
884 * We also don't want an interrupt coming in and moving the head
885 * page on another writer. Thus we use the second LSB to catch
886 * that too. Thus:
887 *
888 * head->list->prev->next bit 1 bit 0
889 * ------- -------
890 * Normal page 0 0
891 * Points to head page 0 1
892 * New head page 1 0
893 *
894 * Note we can not trust the prev pointer of the head page, because:
895 *
896 * +----+ +-----+ +-----+
897 * | |------>| T |---X--->| N |
898 * | |<------| | | |
899 * +----+ +-----+ +-----+
900 * ^ ^ |
901 * | +-----+ | |
902 * +----------| R |----------+ |
903 * | |<-----------+
904 * +-----+
905 *
906 * Key: ---X--> HEAD flag set in pointer
907 * T Tail page
908 * R Reader page
909 * N Next page
910 *
911 * (see __rb_reserve_next() to see where this happens)
912 *
913 * What the above shows is that the reader just swapped out
914 * the reader page with a page in the buffer, but before it
915 * could make the new header point back to the new page added
916 * it was preempted by a writer. The writer moved forward onto
917 * the new page added by the reader and is about to move forward
918 * again.
919 *
920 * You can see, it is legitimate for the previous pointer of
921 * the head (or any page) not to point back to itself. But only
922 * temporarily.
923 */
924
925#define RB_PAGE_NORMAL 0UL
926#define RB_PAGE_HEAD 1UL
927#define RB_PAGE_UPDATE 2UL
928
929
930#define RB_FLAG_MASK 3UL
931
932/* PAGE_MOVED is not part of the mask */
933#define RB_PAGE_MOVED 4UL
934
935/*
936 * rb_list_head - remove any bit
937 */
938static struct list_head *rb_list_head(struct list_head *list)
939{
940 unsigned long val = (unsigned long)list;
941
942 return (struct list_head *)(val & ~RB_FLAG_MASK);
943}
944
945/*
946 * rb_is_head_page - test if the given page is the head page
947 *
948 * Because the reader may move the head_page pointer, we can
949 * not trust what the head page is (it may be pointing to
950 * the reader page). But if the next page is a header page,
951 * its flags will be non zero.
952 */
953static inline int
954rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
955 struct buffer_page *page, struct list_head *list)
956{
957 unsigned long val;
958
959 val = (unsigned long)list->next;
960
961 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
962 return RB_PAGE_MOVED;
963
964 return val & RB_FLAG_MASK;
965}
966
967/*
968 * rb_is_reader_page
969 *
970 * The unique thing about the reader page, is that, if the
971 * writer is ever on it, the previous pointer never points
972 * back to the reader page.
973 */
974static bool rb_is_reader_page(struct buffer_page *page)
975{
976 struct list_head *list = page->list.prev;
977
978 return rb_list_head(list->next) != &page->list;
979}
980
981/*
982 * rb_set_list_to_head - set a list_head to be pointing to head.
983 */
984static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
985 struct list_head *list)
986{
987 unsigned long *ptr;
988
989 ptr = (unsigned long *)&list->next;
990 *ptr |= RB_PAGE_HEAD;
991 *ptr &= ~RB_PAGE_UPDATE;
992}
993
994/*
995 * rb_head_page_activate - sets up head page
996 */
997static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
998{
999 struct buffer_page *head;
1000
1001 head = cpu_buffer->head_page;
1002 if (!head)
1003 return;
1004
1005 /*
1006 * Set the previous list pointer to have the HEAD flag.
1007 */
1008 rb_set_list_to_head(cpu_buffer, head->list.prev);
1009}
1010
1011static void rb_list_head_clear(struct list_head *list)
1012{
1013 unsigned long *ptr = (unsigned long *)&list->next;
1014
1015 *ptr &= ~RB_FLAG_MASK;
1016}
1017
1018/*
1019 * rb_head_page_deactivate - clears head page ptr (for free list)
1020 */
1021static void
1022rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1023{
1024 struct list_head *hd;
1025
1026 /* Go through the whole list and clear any pointers found. */
1027 rb_list_head_clear(cpu_buffer->pages);
1028
1029 list_for_each(hd, cpu_buffer->pages)
1030 rb_list_head_clear(hd);
1031}
1032
1033static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1034 struct buffer_page *head,
1035 struct buffer_page *prev,
1036 int old_flag, int new_flag)
1037{
1038 struct list_head *list;
1039 unsigned long val = (unsigned long)&head->list;
1040 unsigned long ret;
1041
1042 list = &prev->list;
1043
1044 val &= ~RB_FLAG_MASK;
1045
1046 ret = cmpxchg((unsigned long *)&list->next,
1047 val | old_flag, val | new_flag);
1048
1049 /* check if the reader took the page */
1050 if ((ret & ~RB_FLAG_MASK) != val)
1051 return RB_PAGE_MOVED;
1052
1053 return ret & RB_FLAG_MASK;
1054}
1055
1056static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1057 struct buffer_page *head,
1058 struct buffer_page *prev,
1059 int old_flag)
1060{
1061 return rb_head_page_set(cpu_buffer, head, prev,
1062 old_flag, RB_PAGE_UPDATE);
1063}
1064
1065static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1066 struct buffer_page *head,
1067 struct buffer_page *prev,
1068 int old_flag)
1069{
1070 return rb_head_page_set(cpu_buffer, head, prev,
1071 old_flag, RB_PAGE_HEAD);
1072}
1073
1074static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1075 struct buffer_page *head,
1076 struct buffer_page *prev,
1077 int old_flag)
1078{
1079 return rb_head_page_set(cpu_buffer, head, prev,
1080 old_flag, RB_PAGE_NORMAL);
1081}
1082
1083static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
1084 struct buffer_page **bpage)
1085{
1086 struct list_head *p = rb_list_head((*bpage)->list.next);
1087
1088 *bpage = list_entry(p, struct buffer_page, list);
1089}
1090
1091static struct buffer_page *
1092rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1093{
1094 struct buffer_page *head;
1095 struct buffer_page *page;
1096 struct list_head *list;
1097 int i;
1098
1099 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1100 return NULL;
1101
1102 /* sanity check */
1103 list = cpu_buffer->pages;
1104 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1105 return NULL;
1106
1107 page = head = cpu_buffer->head_page;
1108 /*
1109 * It is possible that the writer moves the header behind
1110 * where we started, and we miss in one loop.
1111 * A second loop should grab the header, but we'll do
1112 * three loops just because I'm paranoid.
1113 */
1114 for (i = 0; i < 3; i++) {
1115 do {
1116 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
1117 cpu_buffer->head_page = page;
1118 return page;
1119 }
1120 rb_inc_page(cpu_buffer, &page);
1121 } while (page != head);
1122 }
1123
1124 RB_WARN_ON(cpu_buffer, 1);
1125
1126 return NULL;
1127}
1128
1129static int rb_head_page_replace(struct buffer_page *old,
1130 struct buffer_page *new)
1131{
1132 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1133 unsigned long val;
1134 unsigned long ret;
1135
1136 val = *ptr & ~RB_FLAG_MASK;
1137 val |= RB_PAGE_HEAD;
1138
1139 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1140
1141 return ret == val;
1142}
1143
1144/*
1145 * rb_tail_page_update - move the tail page forward
1146 */
1147static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1148 struct buffer_page *tail_page,
1149 struct buffer_page *next_page)
1150{
1151 unsigned long old_entries;
1152 unsigned long old_write;
1153
1154 /*
1155 * The tail page now needs to be moved forward.
1156 *
1157 * We need to reset the tail page, but without messing
1158 * with possible erasing of data brought in by interrupts
1159 * that have moved the tail page and are currently on it.
1160 *
1161 * We add a counter to the write field to denote this.
1162 */
1163 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1164 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1165
1166 local_inc(&cpu_buffer->pages_touched);
1167 /*
1168 * Just make sure we have seen our old_write and synchronize
1169 * with any interrupts that come in.
1170 */
1171 barrier();
1172
1173 /*
1174 * If the tail page is still the same as what we think
1175 * it is, then it is up to us to update the tail
1176 * pointer.
1177 */
1178 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1179 /* Zero the write counter */
1180 unsigned long val = old_write & ~RB_WRITE_MASK;
1181 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1182
1183 /*
1184 * This will only succeed if an interrupt did
1185 * not come in and change it. In which case, we
1186 * do not want to modify it.
1187 *
1188 * We add (void) to let the compiler know that we do not care
1189 * about the return value of these functions. We use the
1190 * cmpxchg to only update if an interrupt did not already
1191 * do it for us. If the cmpxchg fails, we don't care.
1192 */
1193 (void)local_cmpxchg(&next_page->write, old_write, val);
1194 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1195
1196 /*
1197 * No need to worry about races with clearing out the commit.
1198 * it only can increment when a commit takes place. But that
1199 * only happens in the outer most nested commit.
1200 */
1201 local_set(&next_page->page->commit, 0);
1202
1203 /* Again, either we update tail_page or an interrupt does */
1204 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1205 }
1206}
1207
1208static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1209 struct buffer_page *bpage)
1210{
1211 unsigned long val = (unsigned long)bpage;
1212
1213 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1214 return 1;
1215
1216 return 0;
1217}
1218
1219/**
1220 * rb_check_list - make sure a pointer to a list has the last bits zero
1221 */
1222static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
1223 struct list_head *list)
1224{
1225 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
1226 return 1;
1227 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
1228 return 1;
1229 return 0;
1230}
1231
1232/**
1233 * rb_check_pages - integrity check of buffer pages
1234 * @cpu_buffer: CPU buffer with pages to test
1235 *
1236 * As a safety measure we check to make sure the data pages have not
1237 * been corrupted.
1238 *
1239 * Callers of this function need to guarantee that the list of pages doesn't get
1240 * modified during the check. In particular, if it's possible that the function
1241 * is invoked with concurrent readers which can swap in a new reader page then
1242 * the caller should take cpu_buffer->reader_lock.
1243 */
1244static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1245{
1246 struct list_head *head = cpu_buffer->pages;
1247 struct buffer_page *bpage, *tmp;
1248
1249 /* Reset the head page if it exists */
1250 if (cpu_buffer->head_page)
1251 rb_set_head_page(cpu_buffer);
1252
1253 rb_head_page_deactivate(cpu_buffer);
1254
1255 if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
1256 return -1;
1257 if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
1258 return -1;
1259
1260 if (rb_check_list(cpu_buffer, head))
1261 return -1;
1262
1263 list_for_each_entry_safe(bpage, tmp, head, list) {
1264 if (RB_WARN_ON(cpu_buffer,
1265 bpage->list.next->prev != &bpage->list))
1266 return -1;
1267 if (RB_WARN_ON(cpu_buffer,
1268 bpage->list.prev->next != &bpage->list))
1269 return -1;
1270 if (rb_check_list(cpu_buffer, &bpage->list))
1271 return -1;
1272 }
1273
1274 rb_head_page_activate(cpu_buffer);
1275
1276 return 0;
1277}
1278
1279static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1280{
1281 struct buffer_page *bpage, *tmp;
1282 bool user_thread = current->mm != NULL;
1283 gfp_t mflags;
1284 long i;
1285
1286 /*
1287 * Check if the available memory is there first.
1288 * Note, si_mem_available() only gives us a rough estimate of available
1289 * memory. It may not be accurate. But we don't care, we just want
1290 * to prevent doing any allocation when it is obvious that it is
1291 * not going to succeed.
1292 */
1293 i = si_mem_available();
1294 if (i < nr_pages)
1295 return -ENOMEM;
1296
1297 /*
1298 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1299 * gracefully without invoking oom-killer and the system is not
1300 * destabilized.
1301 */
1302 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1303
1304 /*
1305 * If a user thread allocates too much, and si_mem_available()
1306 * reports there's enough memory, even though there is not.
1307 * Make sure the OOM killer kills this thread. This can happen
1308 * even with RETRY_MAYFAIL because another task may be doing
1309 * an allocation after this task has taken all memory.
1310 * This is the task the OOM killer needs to take out during this
1311 * loop, even if it was triggered by an allocation somewhere else.
1312 */
1313 if (user_thread)
1314 set_current_oom_origin();
1315 for (i = 0; i < nr_pages; i++) {
1316 struct page *page;
1317
1318 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1319 mflags, cpu_to_node(cpu));
1320 if (!bpage)
1321 goto free_pages;
1322
1323 list_add(&bpage->list, pages);
1324
1325 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0);
1326 if (!page)
1327 goto free_pages;
1328 bpage->page = page_address(page);
1329 rb_init_page(bpage->page);
1330
1331 if (user_thread && fatal_signal_pending(current))
1332 goto free_pages;
1333 }
1334 if (user_thread)
1335 clear_current_oom_origin();
1336
1337 return 0;
1338
1339free_pages:
1340 list_for_each_entry_safe(bpage, tmp, pages, list) {
1341 list_del_init(&bpage->list);
1342 free_buffer_page(bpage);
1343 }
1344 if (user_thread)
1345 clear_current_oom_origin();
1346
1347 return -ENOMEM;
1348}
1349
1350static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1351 unsigned long nr_pages)
1352{
1353 LIST_HEAD(pages);
1354
1355 WARN_ON(!nr_pages);
1356
1357 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1358 return -ENOMEM;
1359
1360 /*
1361 * The ring buffer page list is a circular list that does not
1362 * start and end with a list head. All page list items point to
1363 * other pages.
1364 */
1365 cpu_buffer->pages = pages.next;
1366 list_del(&pages);
1367
1368 cpu_buffer->nr_pages = nr_pages;
1369
1370 rb_check_pages(cpu_buffer);
1371
1372 return 0;
1373}
1374
1375static struct ring_buffer_per_cpu *
1376rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu)
1377{
1378 struct ring_buffer_per_cpu *cpu_buffer;
1379 struct buffer_page *bpage;
1380 struct page *page;
1381 int ret;
1382
1383 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1384 GFP_KERNEL, cpu_to_node(cpu));
1385 if (!cpu_buffer)
1386 return NULL;
1387
1388 cpu_buffer->cpu = cpu;
1389 cpu_buffer->buffer = buffer;
1390 raw_spin_lock_init(&cpu_buffer->reader_lock);
1391 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1392 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1393 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1394 init_completion(&cpu_buffer->update_done);
1395 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1396 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1397 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1398
1399 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1400 GFP_KERNEL, cpu_to_node(cpu));
1401 if (!bpage)
1402 goto fail_free_buffer;
1403
1404 rb_check_bpage(cpu_buffer, bpage);
1405
1406 cpu_buffer->reader_page = bpage;
1407 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1408 if (!page)
1409 goto fail_free_reader;
1410 bpage->page = page_address(page);
1411 rb_init_page(bpage->page);
1412
1413 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1414 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1415
1416 ret = rb_allocate_pages(cpu_buffer, nr_pages);
1417 if (ret < 0)
1418 goto fail_free_reader;
1419
1420 cpu_buffer->head_page
1421 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1422 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1423
1424 rb_head_page_activate(cpu_buffer);
1425
1426 return cpu_buffer;
1427
1428 fail_free_reader:
1429 free_buffer_page(cpu_buffer->reader_page);
1430
1431 fail_free_buffer:
1432 kfree(cpu_buffer);
1433 return NULL;
1434}
1435
1436static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1437{
1438 struct list_head *head = cpu_buffer->pages;
1439 struct buffer_page *bpage, *tmp;
1440
1441 irq_work_sync(&cpu_buffer->irq_work.work);
1442
1443 free_buffer_page(cpu_buffer->reader_page);
1444
1445 if (head) {
1446 rb_head_page_deactivate(cpu_buffer);
1447
1448 list_for_each_entry_safe(bpage, tmp, head, list) {
1449 list_del_init(&bpage->list);
1450 free_buffer_page(bpage);
1451 }
1452 bpage = list_entry(head, struct buffer_page, list);
1453 free_buffer_page(bpage);
1454 }
1455
1456 free_page((unsigned long)cpu_buffer->free_page);
1457
1458 kfree(cpu_buffer);
1459}
1460
1461/**
1462 * __ring_buffer_alloc - allocate a new ring_buffer
1463 * @size: the size in bytes per cpu that is needed.
1464 * @flags: attributes to set for the ring buffer.
1465 *
1466 * Currently the only flag that is available is the RB_FL_OVERWRITE
1467 * flag. This flag means that the buffer will overwrite old data
1468 * when the buffer wraps. If this flag is not set, the buffer will
1469 * drop data when the tail hits the head.
1470 */
1471struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1472 struct lock_class_key *key)
1473{
1474 struct ring_buffer *buffer;
1475 long nr_pages;
1476 int bsize;
1477 int cpu;
1478 int ret;
1479
1480 /* keep it in its own cache line */
1481 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1482 GFP_KERNEL);
1483 if (!buffer)
1484 return NULL;
1485
1486 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1487 goto fail_free_buffer;
1488
1489 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1490 buffer->flags = flags;
1491 buffer->clock = trace_clock_local;
1492 buffer->reader_lock_key = key;
1493
1494 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1495 init_waitqueue_head(&buffer->irq_work.waiters);
1496
1497 /* need at least two pages */
1498 if (nr_pages < 2)
1499 nr_pages = 2;
1500
1501 buffer->cpus = nr_cpu_ids;
1502
1503 bsize = sizeof(void *) * nr_cpu_ids;
1504 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1505 GFP_KERNEL);
1506 if (!buffer->buffers)
1507 goto fail_free_cpumask;
1508
1509 cpu = raw_smp_processor_id();
1510 cpumask_set_cpu(cpu, buffer->cpumask);
1511 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1512 if (!buffer->buffers[cpu])
1513 goto fail_free_buffers;
1514
1515 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1516 if (ret < 0)
1517 goto fail_free_buffers;
1518
1519 mutex_init(&buffer->mutex);
1520
1521 return buffer;
1522
1523 fail_free_buffers:
1524 for_each_buffer_cpu(buffer, cpu) {
1525 if (buffer->buffers[cpu])
1526 rb_free_cpu_buffer(buffer->buffers[cpu]);
1527 }
1528 kfree(buffer->buffers);
1529
1530 fail_free_cpumask:
1531 free_cpumask_var(buffer->cpumask);
1532
1533 fail_free_buffer:
1534 kfree(buffer);
1535 return NULL;
1536}
1537EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1538
1539/**
1540 * ring_buffer_free - free a ring buffer.
1541 * @buffer: the buffer to free.
1542 */
1543void
1544ring_buffer_free(struct ring_buffer *buffer)
1545{
1546 int cpu;
1547
1548 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1549
1550 irq_work_sync(&buffer->irq_work.work);
1551
1552 for_each_buffer_cpu(buffer, cpu)
1553 rb_free_cpu_buffer(buffer->buffers[cpu]);
1554
1555 kfree(buffer->buffers);
1556 free_cpumask_var(buffer->cpumask);
1557
1558 kfree(buffer);
1559}
1560EXPORT_SYMBOL_GPL(ring_buffer_free);
1561
1562void ring_buffer_set_clock(struct ring_buffer *buffer,
1563 u64 (*clock)(void))
1564{
1565 buffer->clock = clock;
1566}
1567
1568void ring_buffer_set_time_stamp_abs(struct ring_buffer *buffer, bool abs)
1569{
1570 buffer->time_stamp_abs = abs;
1571}
1572
1573bool ring_buffer_time_stamp_abs(struct ring_buffer *buffer)
1574{
1575 return buffer->time_stamp_abs;
1576}
1577
1578static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1579
1580static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1581{
1582 return local_read(&bpage->entries) & RB_WRITE_MASK;
1583}
1584
1585static inline unsigned long rb_page_write(struct buffer_page *bpage)
1586{
1587 return local_read(&bpage->write) & RB_WRITE_MASK;
1588}
1589
1590static int
1591rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1592{
1593 struct list_head *tail_page, *to_remove, *next_page;
1594 struct buffer_page *to_remove_page, *tmp_iter_page;
1595 struct buffer_page *last_page, *first_page;
1596 unsigned long nr_removed;
1597 unsigned long head_bit;
1598 int page_entries;
1599
1600 head_bit = 0;
1601
1602 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1603 atomic_inc(&cpu_buffer->record_disabled);
1604 /*
1605 * We don't race with the readers since we have acquired the reader
1606 * lock. We also don't race with writers after disabling recording.
1607 * This makes it easy to figure out the first and the last page to be
1608 * removed from the list. We unlink all the pages in between including
1609 * the first and last pages. This is done in a busy loop so that we
1610 * lose the least number of traces.
1611 * The pages are freed after we restart recording and unlock readers.
1612 */
1613 tail_page = &cpu_buffer->tail_page->list;
1614
1615 /*
1616 * tail page might be on reader page, we remove the next page
1617 * from the ring buffer
1618 */
1619 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1620 tail_page = rb_list_head(tail_page->next);
1621 to_remove = tail_page;
1622
1623 /* start of pages to remove */
1624 first_page = list_entry(rb_list_head(to_remove->next),
1625 struct buffer_page, list);
1626
1627 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1628 to_remove = rb_list_head(to_remove)->next;
1629 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1630 }
1631 /* Read iterators need to reset themselves when some pages removed */
1632 cpu_buffer->pages_removed += nr_removed;
1633
1634 next_page = rb_list_head(to_remove)->next;
1635
1636 /*
1637 * Now we remove all pages between tail_page and next_page.
1638 * Make sure that we have head_bit value preserved for the
1639 * next page
1640 */
1641 tail_page->next = (struct list_head *)((unsigned long)next_page |
1642 head_bit);
1643 next_page = rb_list_head(next_page);
1644 next_page->prev = tail_page;
1645
1646 /* make sure pages points to a valid page in the ring buffer */
1647 cpu_buffer->pages = next_page;
1648
1649 /* update head page */
1650 if (head_bit)
1651 cpu_buffer->head_page = list_entry(next_page,
1652 struct buffer_page, list);
1653
1654 /* pages are removed, resume tracing and then free the pages */
1655 atomic_dec(&cpu_buffer->record_disabled);
1656 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1657
1658 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1659
1660 /* last buffer page to remove */
1661 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1662 list);
1663 tmp_iter_page = first_page;
1664
1665 do {
1666 cond_resched();
1667
1668 to_remove_page = tmp_iter_page;
1669 rb_inc_page(cpu_buffer, &tmp_iter_page);
1670
1671 /* update the counters */
1672 page_entries = rb_page_entries(to_remove_page);
1673 if (page_entries) {
1674 /*
1675 * If something was added to this page, it was full
1676 * since it is not the tail page. So we deduct the
1677 * bytes consumed in ring buffer from here.
1678 * Increment overrun to account for the lost events.
1679 */
1680 local_add(page_entries, &cpu_buffer->overrun);
1681 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1682 local_inc(&cpu_buffer->pages_lost);
1683 }
1684
1685 /*
1686 * We have already removed references to this list item, just
1687 * free up the buffer_page and its page
1688 */
1689 free_buffer_page(to_remove_page);
1690 nr_removed--;
1691
1692 } while (to_remove_page != last_page);
1693
1694 RB_WARN_ON(cpu_buffer, nr_removed);
1695
1696 return nr_removed == 0;
1697}
1698
1699static int
1700rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1701{
1702 struct list_head *pages = &cpu_buffer->new_pages;
1703 int retries, success;
1704
1705 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1706 /*
1707 * We are holding the reader lock, so the reader page won't be swapped
1708 * in the ring buffer. Now we are racing with the writer trying to
1709 * move head page and the tail page.
1710 * We are going to adapt the reader page update process where:
1711 * 1. We first splice the start and end of list of new pages between
1712 * the head page and its previous page.
1713 * 2. We cmpxchg the prev_page->next to point from head page to the
1714 * start of new pages list.
1715 * 3. Finally, we update the head->prev to the end of new list.
1716 *
1717 * We will try this process 10 times, to make sure that we don't keep
1718 * spinning.
1719 */
1720 retries = 10;
1721 success = 0;
1722 while (retries--) {
1723 struct list_head *head_page, *prev_page, *r;
1724 struct list_head *last_page, *first_page;
1725 struct list_head *head_page_with_bit;
1726
1727 head_page = &rb_set_head_page(cpu_buffer)->list;
1728 if (!head_page)
1729 break;
1730 prev_page = head_page->prev;
1731
1732 first_page = pages->next;
1733 last_page = pages->prev;
1734
1735 head_page_with_bit = (struct list_head *)
1736 ((unsigned long)head_page | RB_PAGE_HEAD);
1737
1738 last_page->next = head_page_with_bit;
1739 first_page->prev = prev_page;
1740
1741 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1742
1743 if (r == head_page_with_bit) {
1744 /*
1745 * yay, we replaced the page pointer to our new list,
1746 * now, we just have to update to head page's prev
1747 * pointer to point to end of list
1748 */
1749 head_page->prev = last_page;
1750 success = 1;
1751 break;
1752 }
1753 }
1754
1755 if (success)
1756 INIT_LIST_HEAD(pages);
1757 /*
1758 * If we weren't successful in adding in new pages, warn and stop
1759 * tracing
1760 */
1761 RB_WARN_ON(cpu_buffer, !success);
1762 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1763
1764 /* free pages if they weren't inserted */
1765 if (!success) {
1766 struct buffer_page *bpage, *tmp;
1767 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1768 list) {
1769 list_del_init(&bpage->list);
1770 free_buffer_page(bpage);
1771 }
1772 }
1773 return success;
1774}
1775
1776static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
1777{
1778 int success;
1779
1780 if (cpu_buffer->nr_pages_to_update > 0)
1781 success = rb_insert_pages(cpu_buffer);
1782 else
1783 success = rb_remove_pages(cpu_buffer,
1784 -cpu_buffer->nr_pages_to_update);
1785
1786 if (success)
1787 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
1788}
1789
1790static void update_pages_handler(struct work_struct *work)
1791{
1792 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
1793 struct ring_buffer_per_cpu, update_pages_work);
1794 rb_update_pages(cpu_buffer);
1795 complete(&cpu_buffer->update_done);
1796}
1797
1798/**
1799 * ring_buffer_resize - resize the ring buffer
1800 * @buffer: the buffer to resize.
1801 * @size: the new size.
1802 * @cpu_id: the cpu buffer to resize
1803 *
1804 * Minimum size is 2 * BUF_PAGE_SIZE.
1805 *
1806 * Returns 0 on success and < 0 on failure.
1807 */
1808int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
1809 int cpu_id)
1810{
1811 struct ring_buffer_per_cpu *cpu_buffer;
1812 unsigned long nr_pages;
1813 int cpu, err;
1814
1815 /*
1816 * Always succeed at resizing a non-existent buffer:
1817 */
1818 if (!buffer)
1819 return 0;
1820
1821 /* Make sure the requested buffer exists */
1822 if (cpu_id != RING_BUFFER_ALL_CPUS &&
1823 !cpumask_test_cpu(cpu_id, buffer->cpumask))
1824 return 0;
1825
1826 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1827
1828 /* we need a minimum of two pages */
1829 if (nr_pages < 2)
1830 nr_pages = 2;
1831
1832 size = nr_pages * BUF_PAGE_SIZE;
1833
1834 /*
1835 * Don't succeed if resizing is disabled, as a reader might be
1836 * manipulating the ring buffer and is expecting a sane state while
1837 * this is true.
1838 */
1839 if (atomic_read(&buffer->resize_disabled))
1840 return -EBUSY;
1841
1842 /* prevent another thread from changing buffer sizes */
1843 mutex_lock(&buffer->mutex);
1844
1845 if (cpu_id == RING_BUFFER_ALL_CPUS) {
1846 /* calculate the pages to update */
1847 for_each_buffer_cpu(buffer, cpu) {
1848 cpu_buffer = buffer->buffers[cpu];
1849
1850 cpu_buffer->nr_pages_to_update = nr_pages -
1851 cpu_buffer->nr_pages;
1852 /*
1853 * nothing more to do for removing pages or no update
1854 */
1855 if (cpu_buffer->nr_pages_to_update <= 0)
1856 continue;
1857 /*
1858 * to add pages, make sure all new pages can be
1859 * allocated without receiving ENOMEM
1860 */
1861 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1862 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1863 &cpu_buffer->new_pages, cpu)) {
1864 /* not enough memory for new pages */
1865 err = -ENOMEM;
1866 goto out_err;
1867 }
1868
1869 cond_resched();
1870 }
1871
1872 get_online_cpus();
1873 /*
1874 * Fire off all the required work handlers
1875 * We can't schedule on offline CPUs, but it's not necessary
1876 * since we can change their buffer sizes without any race.
1877 */
1878 for_each_buffer_cpu(buffer, cpu) {
1879 cpu_buffer = buffer->buffers[cpu];
1880 if (!cpu_buffer->nr_pages_to_update)
1881 continue;
1882
1883 /* Can't run something on an offline CPU. */
1884 if (!cpu_online(cpu)) {
1885 rb_update_pages(cpu_buffer);
1886 cpu_buffer->nr_pages_to_update = 0;
1887 } else {
1888 schedule_work_on(cpu,
1889 &cpu_buffer->update_pages_work);
1890 }
1891 }
1892
1893 /* wait for all the updates to complete */
1894 for_each_buffer_cpu(buffer, cpu) {
1895 cpu_buffer = buffer->buffers[cpu];
1896 if (!cpu_buffer->nr_pages_to_update)
1897 continue;
1898
1899 if (cpu_online(cpu))
1900 wait_for_completion(&cpu_buffer->update_done);
1901 cpu_buffer->nr_pages_to_update = 0;
1902 }
1903
1904 put_online_cpus();
1905 } else {
1906 /* Make sure this CPU has been initialized */
1907 if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
1908 goto out;
1909
1910 cpu_buffer = buffer->buffers[cpu_id];
1911
1912 if (nr_pages == cpu_buffer->nr_pages)
1913 goto out;
1914
1915 cpu_buffer->nr_pages_to_update = nr_pages -
1916 cpu_buffer->nr_pages;
1917
1918 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1919 if (cpu_buffer->nr_pages_to_update > 0 &&
1920 __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1921 &cpu_buffer->new_pages, cpu_id)) {
1922 err = -ENOMEM;
1923 goto out_err;
1924 }
1925
1926 get_online_cpus();
1927
1928 /* Can't run something on an offline CPU. */
1929 if (!cpu_online(cpu_id))
1930 rb_update_pages(cpu_buffer);
1931 else {
1932 schedule_work_on(cpu_id,
1933 &cpu_buffer->update_pages_work);
1934 wait_for_completion(&cpu_buffer->update_done);
1935 }
1936
1937 cpu_buffer->nr_pages_to_update = 0;
1938 put_online_cpus();
1939 }
1940
1941 out:
1942 /*
1943 * The ring buffer resize can happen with the ring buffer
1944 * enabled, so that the update disturbs the tracing as little
1945 * as possible. But if the buffer is disabled, we do not need
1946 * to worry about that, and we can take the time to verify
1947 * that the buffer is not corrupt.
1948 */
1949 if (atomic_read(&buffer->record_disabled)) {
1950 atomic_inc(&buffer->record_disabled);
1951 /*
1952 * Even though the buffer was disabled, we must make sure
1953 * that it is truly disabled before calling rb_check_pages.
1954 * There could have been a race between checking
1955 * record_disable and incrementing it.
1956 */
1957 synchronize_rcu();
1958 for_each_buffer_cpu(buffer, cpu) {
1959 unsigned long flags;
1960
1961 cpu_buffer = buffer->buffers[cpu];
1962 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1963 rb_check_pages(cpu_buffer);
1964 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1965 }
1966 atomic_dec(&buffer->record_disabled);
1967 }
1968
1969 mutex_unlock(&buffer->mutex);
1970 return 0;
1971
1972 out_err:
1973 for_each_buffer_cpu(buffer, cpu) {
1974 struct buffer_page *bpage, *tmp;
1975
1976 cpu_buffer = buffer->buffers[cpu];
1977 cpu_buffer->nr_pages_to_update = 0;
1978
1979 if (list_empty(&cpu_buffer->new_pages))
1980 continue;
1981
1982 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1983 list) {
1984 list_del_init(&bpage->list);
1985 free_buffer_page(bpage);
1986 }
1987 }
1988 mutex_unlock(&buffer->mutex);
1989 return err;
1990}
1991EXPORT_SYMBOL_GPL(ring_buffer_resize);
1992
1993void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1994{
1995 mutex_lock(&buffer->mutex);
1996 if (val)
1997 buffer->flags |= RB_FL_OVERWRITE;
1998 else
1999 buffer->flags &= ~RB_FL_OVERWRITE;
2000 mutex_unlock(&buffer->mutex);
2001}
2002EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2003
2004static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
2005{
2006 return bpage->page->data + index;
2007}
2008
2009static __always_inline struct ring_buffer_event *
2010rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
2011{
2012 return __rb_page_index(cpu_buffer->reader_page,
2013 cpu_buffer->reader_page->read);
2014}
2015
2016static __always_inline struct ring_buffer_event *
2017rb_iter_head_event(struct ring_buffer_iter *iter)
2018{
2019 return __rb_page_index(iter->head_page, iter->head);
2020}
2021
2022static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
2023{
2024 return local_read(&bpage->page->commit);
2025}
2026
2027/* Size is determined by what has been committed */
2028static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
2029{
2030 return rb_page_commit(bpage);
2031}
2032
2033static __always_inline unsigned
2034rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2035{
2036 return rb_page_commit(cpu_buffer->commit_page);
2037}
2038
2039static __always_inline unsigned
2040rb_event_index(struct ring_buffer_event *event)
2041{
2042 unsigned long addr = (unsigned long)event;
2043
2044 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
2045}
2046
2047static void rb_inc_iter(struct ring_buffer_iter *iter)
2048{
2049 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2050
2051 /*
2052 * The iterator could be on the reader page (it starts there).
2053 * But the head could have moved, since the reader was
2054 * found. Check for this case and assign the iterator
2055 * to the head page instead of next.
2056 */
2057 if (iter->head_page == cpu_buffer->reader_page)
2058 iter->head_page = rb_set_head_page(cpu_buffer);
2059 else
2060 rb_inc_page(cpu_buffer, &iter->head_page);
2061
2062 iter->read_stamp = iter->head_page->page->time_stamp;
2063 iter->head = 0;
2064}
2065
2066/*
2067 * rb_handle_head_page - writer hit the head page
2068 *
2069 * Returns: +1 to retry page
2070 * 0 to continue
2071 * -1 on error
2072 */
2073static int
2074rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2075 struct buffer_page *tail_page,
2076 struct buffer_page *next_page)
2077{
2078 struct buffer_page *new_head;
2079 int entries;
2080 int type;
2081 int ret;
2082
2083 entries = rb_page_entries(next_page);
2084
2085 /*
2086 * The hard part is here. We need to move the head
2087 * forward, and protect against both readers on
2088 * other CPUs and writers coming in via interrupts.
2089 */
2090 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2091 RB_PAGE_HEAD);
2092
2093 /*
2094 * type can be one of four:
2095 * NORMAL - an interrupt already moved it for us
2096 * HEAD - we are the first to get here.
2097 * UPDATE - we are the interrupt interrupting
2098 * a current move.
2099 * MOVED - a reader on another CPU moved the next
2100 * pointer to its reader page. Give up
2101 * and try again.
2102 */
2103
2104 switch (type) {
2105 case RB_PAGE_HEAD:
2106 /*
2107 * We changed the head to UPDATE, thus
2108 * it is our responsibility to update
2109 * the counters.
2110 */
2111 local_add(entries, &cpu_buffer->overrun);
2112 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
2113 local_inc(&cpu_buffer->pages_lost);
2114
2115 /*
2116 * The entries will be zeroed out when we move the
2117 * tail page.
2118 */
2119
2120 /* still more to do */
2121 break;
2122
2123 case RB_PAGE_UPDATE:
2124 /*
2125 * This is an interrupt that interrupt the
2126 * previous update. Still more to do.
2127 */
2128 break;
2129 case RB_PAGE_NORMAL:
2130 /*
2131 * An interrupt came in before the update
2132 * and processed this for us.
2133 * Nothing left to do.
2134 */
2135 return 1;
2136 case RB_PAGE_MOVED:
2137 /*
2138 * The reader is on another CPU and just did
2139 * a swap with our next_page.
2140 * Try again.
2141 */
2142 return 1;
2143 default:
2144 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2145 return -1;
2146 }
2147
2148 /*
2149 * Now that we are here, the old head pointer is
2150 * set to UPDATE. This will keep the reader from
2151 * swapping the head page with the reader page.
2152 * The reader (on another CPU) will spin till
2153 * we are finished.
2154 *
2155 * We just need to protect against interrupts
2156 * doing the job. We will set the next pointer
2157 * to HEAD. After that, we set the old pointer
2158 * to NORMAL, but only if it was HEAD before.
2159 * otherwise we are an interrupt, and only
2160 * want the outer most commit to reset it.
2161 */
2162 new_head = next_page;
2163 rb_inc_page(cpu_buffer, &new_head);
2164
2165 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2166 RB_PAGE_NORMAL);
2167
2168 /*
2169 * Valid returns are:
2170 * HEAD - an interrupt came in and already set it.
2171 * NORMAL - One of two things:
2172 * 1) We really set it.
2173 * 2) A bunch of interrupts came in and moved
2174 * the page forward again.
2175 */
2176 switch (ret) {
2177 case RB_PAGE_HEAD:
2178 case RB_PAGE_NORMAL:
2179 /* OK */
2180 break;
2181 default:
2182 RB_WARN_ON(cpu_buffer, 1);
2183 return -1;
2184 }
2185
2186 /*
2187 * It is possible that an interrupt came in,
2188 * set the head up, then more interrupts came in
2189 * and moved it again. When we get back here,
2190 * the page would have been set to NORMAL but we
2191 * just set it back to HEAD.
2192 *
2193 * How do you detect this? Well, if that happened
2194 * the tail page would have moved.
2195 */
2196 if (ret == RB_PAGE_NORMAL) {
2197 struct buffer_page *buffer_tail_page;
2198
2199 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2200 /*
2201 * If the tail had moved passed next, then we need
2202 * to reset the pointer.
2203 */
2204 if (buffer_tail_page != tail_page &&
2205 buffer_tail_page != next_page)
2206 rb_head_page_set_normal(cpu_buffer, new_head,
2207 next_page,
2208 RB_PAGE_HEAD);
2209 }
2210
2211 /*
2212 * If this was the outer most commit (the one that
2213 * changed the original pointer from HEAD to UPDATE),
2214 * then it is up to us to reset it to NORMAL.
2215 */
2216 if (type == RB_PAGE_HEAD) {
2217 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2218 tail_page,
2219 RB_PAGE_UPDATE);
2220 if (RB_WARN_ON(cpu_buffer,
2221 ret != RB_PAGE_UPDATE))
2222 return -1;
2223 }
2224
2225 return 0;
2226}
2227
2228static inline void
2229rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2230 unsigned long tail, struct rb_event_info *info)
2231{
2232 struct buffer_page *tail_page = info->tail_page;
2233 struct ring_buffer_event *event;
2234 unsigned long length = info->length;
2235
2236 /*
2237 * Only the event that crossed the page boundary
2238 * must fill the old tail_page with padding.
2239 */
2240 if (tail >= BUF_PAGE_SIZE) {
2241 /*
2242 * If the page was filled, then we still need
2243 * to update the real_end. Reset it to zero
2244 * and the reader will ignore it.
2245 */
2246 if (tail == BUF_PAGE_SIZE)
2247 tail_page->real_end = 0;
2248
2249 local_sub(length, &tail_page->write);
2250 return;
2251 }
2252
2253 event = __rb_page_index(tail_page, tail);
2254
2255 /* account for padding bytes */
2256 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2257
2258 /*
2259 * Save the original length to the meta data.
2260 * This will be used by the reader to add lost event
2261 * counter.
2262 */
2263 tail_page->real_end = tail;
2264
2265 /*
2266 * If this event is bigger than the minimum size, then
2267 * we need to be careful that we don't subtract the
2268 * write counter enough to allow another writer to slip
2269 * in on this page.
2270 * We put in a discarded commit instead, to make sure
2271 * that this space is not used again.
2272 *
2273 * If we are less than the minimum size, we don't need to
2274 * worry about it.
2275 */
2276 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2277 /* No room for any events */
2278
2279 /* Mark the rest of the page with padding */
2280 rb_event_set_padding(event);
2281
2282 /* Make sure the padding is visible before the write update */
2283 smp_wmb();
2284
2285 /* Set the write back to the previous setting */
2286 local_sub(length, &tail_page->write);
2287 return;
2288 }
2289
2290 /* Put in a discarded event */
2291 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2292 event->type_len = RINGBUF_TYPE_PADDING;
2293 /* time delta must be non zero */
2294 event->time_delta = 1;
2295
2296 /* Make sure the padding is visible before the tail_page->write update */
2297 smp_wmb();
2298
2299 /* Set write to end of buffer */
2300 length = (tail + length) - BUF_PAGE_SIZE;
2301 local_sub(length, &tail_page->write);
2302}
2303
2304static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2305
2306/*
2307 * This is the slow path, force gcc not to inline it.
2308 */
2309static noinline struct ring_buffer_event *
2310rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2311 unsigned long tail, struct rb_event_info *info)
2312{
2313 struct buffer_page *tail_page = info->tail_page;
2314 struct buffer_page *commit_page = cpu_buffer->commit_page;
2315 struct ring_buffer *buffer = cpu_buffer->buffer;
2316 struct buffer_page *next_page;
2317 int ret;
2318
2319 next_page = tail_page;
2320
2321 rb_inc_page(cpu_buffer, &next_page);
2322
2323 /*
2324 * If for some reason, we had an interrupt storm that made
2325 * it all the way around the buffer, bail, and warn
2326 * about it.
2327 */
2328 if (unlikely(next_page == commit_page)) {
2329 local_inc(&cpu_buffer->commit_overrun);
2330 goto out_reset;
2331 }
2332
2333 /*
2334 * This is where the fun begins!
2335 *
2336 * We are fighting against races between a reader that
2337 * could be on another CPU trying to swap its reader
2338 * page with the buffer head.
2339 *
2340 * We are also fighting against interrupts coming in and
2341 * moving the head or tail on us as well.
2342 *
2343 * If the next page is the head page then we have filled
2344 * the buffer, unless the commit page is still on the
2345 * reader page.
2346 */
2347 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2348
2349 /*
2350 * If the commit is not on the reader page, then
2351 * move the header page.
2352 */
2353 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2354 /*
2355 * If we are not in overwrite mode,
2356 * this is easy, just stop here.
2357 */
2358 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2359 local_inc(&cpu_buffer->dropped_events);
2360 goto out_reset;
2361 }
2362
2363 ret = rb_handle_head_page(cpu_buffer,
2364 tail_page,
2365 next_page);
2366 if (ret < 0)
2367 goto out_reset;
2368 if (ret)
2369 goto out_again;
2370 } else {
2371 /*
2372 * We need to be careful here too. The
2373 * commit page could still be on the reader
2374 * page. We could have a small buffer, and
2375 * have filled up the buffer with events
2376 * from interrupts and such, and wrapped.
2377 *
2378 * Note, if the tail page is also the on the
2379 * reader_page, we let it move out.
2380 */
2381 if (unlikely((cpu_buffer->commit_page !=
2382 cpu_buffer->tail_page) &&
2383 (cpu_buffer->commit_page ==
2384 cpu_buffer->reader_page))) {
2385 local_inc(&cpu_buffer->commit_overrun);
2386 goto out_reset;
2387 }
2388 }
2389 }
2390
2391 rb_tail_page_update(cpu_buffer, tail_page, next_page);
2392
2393 out_again:
2394
2395 rb_reset_tail(cpu_buffer, tail, info);
2396
2397 /* Commit what we have for now. */
2398 rb_end_commit(cpu_buffer);
2399 /* rb_end_commit() decs committing */
2400 local_inc(&cpu_buffer->committing);
2401
2402 /* fail and let the caller try again */
2403 return ERR_PTR(-EAGAIN);
2404
2405 out_reset:
2406 /* reset write */
2407 rb_reset_tail(cpu_buffer, tail, info);
2408
2409 return NULL;
2410}
2411
2412/* Slow path, do not inline */
2413static noinline struct ring_buffer_event *
2414rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2415{
2416 if (abs)
2417 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2418 else
2419 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2420
2421 /* Not the first event on the page, or not delta? */
2422 if (abs || rb_event_index(event)) {
2423 event->time_delta = delta & TS_MASK;
2424 event->array[0] = delta >> TS_SHIFT;
2425 } else {
2426 /* nope, just zero it */
2427 event->time_delta = 0;
2428 event->array[0] = 0;
2429 }
2430
2431 return skip_time_extend(event);
2432}
2433
2434static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2435 struct ring_buffer_event *event);
2436
2437/**
2438 * rb_update_event - update event type and data
2439 * @event: the event to update
2440 * @type: the type of event
2441 * @length: the size of the event field in the ring buffer
2442 *
2443 * Update the type and data fields of the event. The length
2444 * is the actual size that is written to the ring buffer,
2445 * and with this, we can determine what to place into the
2446 * data field.
2447 */
2448static void
2449rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2450 struct ring_buffer_event *event,
2451 struct rb_event_info *info)
2452{
2453 unsigned length = info->length;
2454 u64 delta = info->delta;
2455
2456 /* Only a commit updates the timestamp */
2457 if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
2458 delta = 0;
2459
2460 /*
2461 * If we need to add a timestamp, then we
2462 * add it to the start of the reserved space.
2463 */
2464 if (unlikely(info->add_timestamp)) {
2465 bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
2466
2467 event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
2468 length -= RB_LEN_TIME_EXTEND;
2469 delta = 0;
2470 }
2471
2472 event->time_delta = delta;
2473 length -= RB_EVNT_HDR_SIZE;
2474 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2475 event->type_len = 0;
2476 event->array[0] = length;
2477 } else
2478 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2479}
2480
2481static unsigned rb_calculate_event_length(unsigned length)
2482{
2483 struct ring_buffer_event event; /* Used only for sizeof array */
2484
2485 /* zero length can cause confusions */
2486 if (!length)
2487 length++;
2488
2489 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2490 length += sizeof(event.array[0]);
2491
2492 length += RB_EVNT_HDR_SIZE;
2493 length = ALIGN(length, RB_ARCH_ALIGNMENT);
2494
2495 /*
2496 * In case the time delta is larger than the 27 bits for it
2497 * in the header, we need to add a timestamp. If another
2498 * event comes in when trying to discard this one to increase
2499 * the length, then the timestamp will be added in the allocated
2500 * space of this event. If length is bigger than the size needed
2501 * for the TIME_EXTEND, then padding has to be used. The events
2502 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2503 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2504 * As length is a multiple of 4, we only need to worry if it
2505 * is 12 (RB_LEN_TIME_EXTEND + 4).
2506 */
2507 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2508 length += RB_ALIGNMENT;
2509
2510 return length;
2511}
2512
2513#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2514static inline bool sched_clock_stable(void)
2515{
2516 return true;
2517}
2518#endif
2519
2520static inline int
2521rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2522 struct ring_buffer_event *event)
2523{
2524 unsigned long new_index, old_index;
2525 struct buffer_page *bpage;
2526 unsigned long index;
2527 unsigned long addr;
2528
2529 new_index = rb_event_index(event);
2530 old_index = new_index + rb_event_ts_length(event);
2531 addr = (unsigned long)event;
2532 addr &= PAGE_MASK;
2533
2534 bpage = READ_ONCE(cpu_buffer->tail_page);
2535
2536 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2537 unsigned long write_mask =
2538 local_read(&bpage->write) & ~RB_WRITE_MASK;
2539 unsigned long event_length = rb_event_length(event);
2540 /*
2541 * This is on the tail page. It is possible that
2542 * a write could come in and move the tail page
2543 * and write to the next page. That is fine
2544 * because we just shorten what is on this page.
2545 */
2546 old_index += write_mask;
2547 new_index += write_mask;
2548 index = local_cmpxchg(&bpage->write, old_index, new_index);
2549 if (index == old_index) {
2550 /* update counters */
2551 local_sub(event_length, &cpu_buffer->entries_bytes);
2552 return 1;
2553 }
2554 }
2555
2556 /* could not discard */
2557 return 0;
2558}
2559
2560static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2561{
2562 local_inc(&cpu_buffer->committing);
2563 local_inc(&cpu_buffer->commits);
2564}
2565
2566static __always_inline void
2567rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2568{
2569 unsigned long max_count;
2570
2571 /*
2572 * We only race with interrupts and NMIs on this CPU.
2573 * If we own the commit event, then we can commit
2574 * all others that interrupted us, since the interruptions
2575 * are in stack format (they finish before they come
2576 * back to us). This allows us to do a simple loop to
2577 * assign the commit to the tail.
2578 */
2579 again:
2580 max_count = cpu_buffer->nr_pages * 100;
2581
2582 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2583 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2584 return;
2585 if (RB_WARN_ON(cpu_buffer,
2586 rb_is_reader_page(cpu_buffer->tail_page)))
2587 return;
2588 /*
2589 * No need for a memory barrier here, as the update
2590 * of the tail_page did it for this page.
2591 */
2592 local_set(&cpu_buffer->commit_page->page->commit,
2593 rb_page_write(cpu_buffer->commit_page));
2594 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
2595 /* Only update the write stamp if the page has an event */
2596 if (rb_page_write(cpu_buffer->commit_page))
2597 cpu_buffer->write_stamp =
2598 cpu_buffer->commit_page->page->time_stamp;
2599 /* add barrier to keep gcc from optimizing too much */
2600 barrier();
2601 }
2602 while (rb_commit_index(cpu_buffer) !=
2603 rb_page_write(cpu_buffer->commit_page)) {
2604
2605 /* Make sure the readers see the content of what is committed. */
2606 smp_wmb();
2607 local_set(&cpu_buffer->commit_page->page->commit,
2608 rb_page_write(cpu_buffer->commit_page));
2609 RB_WARN_ON(cpu_buffer,
2610 local_read(&cpu_buffer->commit_page->page->commit) &
2611 ~RB_WRITE_MASK);
2612 barrier();
2613 }
2614
2615 /* again, keep gcc from optimizing */
2616 barrier();
2617
2618 /*
2619 * If an interrupt came in just after the first while loop
2620 * and pushed the tail page forward, we will be left with
2621 * a dangling commit that will never go forward.
2622 */
2623 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
2624 goto again;
2625}
2626
2627static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2628{
2629 unsigned long commits;
2630
2631 if (RB_WARN_ON(cpu_buffer,
2632 !local_read(&cpu_buffer->committing)))
2633 return;
2634
2635 again:
2636 commits = local_read(&cpu_buffer->commits);
2637 /* synchronize with interrupts */
2638 barrier();
2639 if (local_read(&cpu_buffer->committing) == 1)
2640 rb_set_commit_to_write(cpu_buffer);
2641
2642 local_dec(&cpu_buffer->committing);
2643
2644 /* synchronize with interrupts */
2645 barrier();
2646
2647 /*
2648 * Need to account for interrupts coming in between the
2649 * updating of the commit page and the clearing of the
2650 * committing counter.
2651 */
2652 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2653 !local_read(&cpu_buffer->committing)) {
2654 local_inc(&cpu_buffer->committing);
2655 goto again;
2656 }
2657}
2658
2659static inline void rb_event_discard(struct ring_buffer_event *event)
2660{
2661 if (extended_time(event))
2662 event = skip_time_extend(event);
2663
2664 /* array[0] holds the actual length for the discarded event */
2665 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2666 event->type_len = RINGBUF_TYPE_PADDING;
2667 /* time delta must be non zero */
2668 if (!event->time_delta)
2669 event->time_delta = 1;
2670}
2671
2672static __always_inline bool
2673rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2674 struct ring_buffer_event *event)
2675{
2676 unsigned long addr = (unsigned long)event;
2677 unsigned long index;
2678
2679 index = rb_event_index(event);
2680 addr &= PAGE_MASK;
2681
2682 return cpu_buffer->commit_page->page == (void *)addr &&
2683 rb_commit_index(cpu_buffer) == index;
2684}
2685
2686static __always_inline void
2687rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2688 struct ring_buffer_event *event)
2689{
2690 u64 delta;
2691
2692 /*
2693 * The event first in the commit queue updates the
2694 * time stamp.
2695 */
2696 if (rb_event_is_commit(cpu_buffer, event)) {
2697 /*
2698 * A commit event that is first on a page
2699 * updates the write timestamp with the page stamp
2700 */
2701 if (!rb_event_index(event))
2702 cpu_buffer->write_stamp =
2703 cpu_buffer->commit_page->page->time_stamp;
2704 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2705 delta = ring_buffer_event_time_stamp(event);
2706 cpu_buffer->write_stamp += delta;
2707 } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
2708 delta = ring_buffer_event_time_stamp(event);
2709 cpu_buffer->write_stamp = delta;
2710 } else
2711 cpu_buffer->write_stamp += event->time_delta;
2712 }
2713}
2714
2715static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2716 struct ring_buffer_event *event)
2717{
2718 local_inc(&cpu_buffer->entries);
2719 rb_update_write_stamp(cpu_buffer, event);
2720 rb_end_commit(cpu_buffer);
2721}
2722
2723static __always_inline void
2724rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
2725{
2726 if (buffer->irq_work.waiters_pending) {
2727 buffer->irq_work.waiters_pending = false;
2728 /* irq_work_queue() supplies it's own memory barriers */
2729 irq_work_queue(&buffer->irq_work.work);
2730 }
2731
2732 if (cpu_buffer->irq_work.waiters_pending) {
2733 cpu_buffer->irq_work.waiters_pending = false;
2734 /* irq_work_queue() supplies it's own memory barriers */
2735 irq_work_queue(&cpu_buffer->irq_work.work);
2736 }
2737
2738 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
2739 return;
2740
2741 if (cpu_buffer->reader_page == cpu_buffer->commit_page)
2742 return;
2743
2744 if (!cpu_buffer->irq_work.full_waiters_pending)
2745 return;
2746
2747 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
2748
2749 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
2750 return;
2751
2752 cpu_buffer->irq_work.wakeup_full = true;
2753 cpu_buffer->irq_work.full_waiters_pending = false;
2754 /* irq_work_queue() supplies it's own memory barriers */
2755 irq_work_queue(&cpu_buffer->irq_work.work);
2756}
2757
2758/*
2759 * The lock and unlock are done within a preempt disable section.
2760 * The current_context per_cpu variable can only be modified
2761 * by the current task between lock and unlock. But it can
2762 * be modified more than once via an interrupt. To pass this
2763 * information from the lock to the unlock without having to
2764 * access the 'in_interrupt()' functions again (which do show
2765 * a bit of overhead in something as critical as function tracing,
2766 * we use a bitmask trick.
2767 *
2768 * bit 1 = NMI context
2769 * bit 2 = IRQ context
2770 * bit 3 = SoftIRQ context
2771 * bit 4 = normal context.
2772 *
2773 * This works because this is the order of contexts that can
2774 * preempt other contexts. A SoftIRQ never preempts an IRQ
2775 * context.
2776 *
2777 * When the context is determined, the corresponding bit is
2778 * checked and set (if it was set, then a recursion of that context
2779 * happened).
2780 *
2781 * On unlock, we need to clear this bit. To do so, just subtract
2782 * 1 from the current_context and AND it to itself.
2783 *
2784 * (binary)
2785 * 101 - 1 = 100
2786 * 101 & 100 = 100 (clearing bit zero)
2787 *
2788 * 1010 - 1 = 1001
2789 * 1010 & 1001 = 1000 (clearing bit 1)
2790 *
2791 * The least significant bit can be cleared this way, and it
2792 * just so happens that it is the same bit corresponding to
2793 * the current context.
2794 *
2795 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
2796 * is set when a recursion is detected at the current context, and if
2797 * the TRANSITION bit is already set, it will fail the recursion.
2798 * This is needed because there's a lag between the changing of
2799 * interrupt context and updating the preempt count. In this case,
2800 * a false positive will be found. To handle this, one extra recursion
2801 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
2802 * bit is already set, then it is considered a recursion and the function
2803 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
2804 *
2805 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
2806 * to be cleared. Even if it wasn't the context that set it. That is,
2807 * if an interrupt comes in while NORMAL bit is set and the ring buffer
2808 * is called before preempt_count() is updated, since the check will
2809 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
2810 * NMI then comes in, it will set the NMI bit, but when the NMI code
2811 * does the trace_recursive_unlock() it will clear the TRANSTION bit
2812 * and leave the NMI bit set. But this is fine, because the interrupt
2813 * code that set the TRANSITION bit will then clear the NMI bit when it
2814 * calls trace_recursive_unlock(). If another NMI comes in, it will
2815 * set the TRANSITION bit and continue.
2816 *
2817 * Note: The TRANSITION bit only handles a single transition between context.
2818 */
2819
2820static __always_inline int
2821trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
2822{
2823 unsigned int val = cpu_buffer->current_context;
2824 unsigned long pc = preempt_count();
2825 int bit;
2826
2827 if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET)))
2828 bit = RB_CTX_NORMAL;
2829 else
2830 bit = pc & NMI_MASK ? RB_CTX_NMI :
2831 pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ;
2832
2833 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
2834 /*
2835 * It is possible that this was called by transitioning
2836 * between interrupt context, and preempt_count() has not
2837 * been updated yet. In this case, use the TRANSITION bit.
2838 */
2839 bit = RB_CTX_TRANSITION;
2840 if (val & (1 << (bit + cpu_buffer->nest)))
2841 return 1;
2842 }
2843
2844 val |= (1 << (bit + cpu_buffer->nest));
2845 cpu_buffer->current_context = val;
2846
2847 return 0;
2848}
2849
2850static __always_inline void
2851trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
2852{
2853 cpu_buffer->current_context &=
2854 cpu_buffer->current_context - (1 << cpu_buffer->nest);
2855}
2856
2857/* The recursive locking above uses 5 bits */
2858#define NESTED_BITS 5
2859
2860/**
2861 * ring_buffer_nest_start - Allow to trace while nested
2862 * @buffer: The ring buffer to modify
2863 *
2864 * The ring buffer has a safety mechanism to prevent recursion.
2865 * But there may be a case where a trace needs to be done while
2866 * tracing something else. In this case, calling this function
2867 * will allow this function to nest within a currently active
2868 * ring_buffer_lock_reserve().
2869 *
2870 * Call this function before calling another ring_buffer_lock_reserve() and
2871 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
2872 */
2873void ring_buffer_nest_start(struct ring_buffer *buffer)
2874{
2875 struct ring_buffer_per_cpu *cpu_buffer;
2876 int cpu;
2877
2878 /* Enabled by ring_buffer_nest_end() */
2879 preempt_disable_notrace();
2880 cpu = raw_smp_processor_id();
2881 cpu_buffer = buffer->buffers[cpu];
2882 /* This is the shift value for the above recursive locking */
2883 cpu_buffer->nest += NESTED_BITS;
2884}
2885
2886/**
2887 * ring_buffer_nest_end - Allow to trace while nested
2888 * @buffer: The ring buffer to modify
2889 *
2890 * Must be called after ring_buffer_nest_start() and after the
2891 * ring_buffer_unlock_commit().
2892 */
2893void ring_buffer_nest_end(struct ring_buffer *buffer)
2894{
2895 struct ring_buffer_per_cpu *cpu_buffer;
2896 int cpu;
2897
2898 /* disabled by ring_buffer_nest_start() */
2899 cpu = raw_smp_processor_id();
2900 cpu_buffer = buffer->buffers[cpu];
2901 /* This is the shift value for the above recursive locking */
2902 cpu_buffer->nest -= NESTED_BITS;
2903 preempt_enable_notrace();
2904}
2905
2906/**
2907 * ring_buffer_unlock_commit - commit a reserved
2908 * @buffer: The buffer to commit to
2909 * @event: The event pointer to commit.
2910 *
2911 * This commits the data to the ring buffer, and releases any locks held.
2912 *
2913 * Must be paired with ring_buffer_lock_reserve.
2914 */
2915int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2916 struct ring_buffer_event *event)
2917{
2918 struct ring_buffer_per_cpu *cpu_buffer;
2919 int cpu = raw_smp_processor_id();
2920
2921 cpu_buffer = buffer->buffers[cpu];
2922
2923 rb_commit(cpu_buffer, event);
2924
2925 rb_wakeups(buffer, cpu_buffer);
2926
2927 trace_recursive_unlock(cpu_buffer);
2928
2929 preempt_enable_notrace();
2930
2931 return 0;
2932}
2933EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
2934
2935static noinline void
2936rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2937 struct rb_event_info *info)
2938{
2939 WARN_ONCE(info->delta > (1ULL << 59),
2940 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2941 (unsigned long long)info->delta,
2942 (unsigned long long)info->ts,
2943 (unsigned long long)cpu_buffer->write_stamp,
2944 sched_clock_stable() ? "" :
2945 "If you just came from a suspend/resume,\n"
2946 "please switch to the trace global clock:\n"
2947 " echo global > /sys/kernel/debug/tracing/trace_clock\n"
2948 "or add trace_clock=global to the kernel command line\n");
2949 info->add_timestamp = 1;
2950}
2951
2952static struct ring_buffer_event *
2953__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
2954 struct rb_event_info *info)
2955{
2956 struct ring_buffer_event *event;
2957 struct buffer_page *tail_page;
2958 unsigned long tail, write;
2959
2960 /*
2961 * If the time delta since the last event is too big to
2962 * hold in the time field of the event, then we append a
2963 * TIME EXTEND event ahead of the data event.
2964 */
2965 if (unlikely(info->add_timestamp))
2966 info->length += RB_LEN_TIME_EXTEND;
2967
2968 /* Don't let the compiler play games with cpu_buffer->tail_page */
2969 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
2970 write = local_add_return(info->length, &tail_page->write);
2971
2972 /* set write to only the index of the write */
2973 write &= RB_WRITE_MASK;
2974 tail = write - info->length;
2975
2976 /*
2977 * If this is the first commit on the page, then it has the same
2978 * timestamp as the page itself.
2979 */
2980 if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
2981 info->delta = 0;
2982
2983 /* See if we shot pass the end of this buffer page */
2984 if (unlikely(write > BUF_PAGE_SIZE))
2985 return rb_move_tail(cpu_buffer, tail, info);
2986
2987 /* We reserved something on the buffer */
2988
2989 event = __rb_page_index(tail_page, tail);
2990 rb_update_event(cpu_buffer, event, info);
2991
2992 local_inc(&tail_page->entries);
2993
2994 /*
2995 * If this is the first commit on the page, then update
2996 * its timestamp.
2997 */
2998 if (!tail)
2999 tail_page->page->time_stamp = info->ts;
3000
3001 /* account for these added bytes */
3002 local_add(info->length, &cpu_buffer->entries_bytes);
3003
3004 return event;
3005}
3006
3007static __always_inline struct ring_buffer_event *
3008rb_reserve_next_event(struct ring_buffer *buffer,
3009 struct ring_buffer_per_cpu *cpu_buffer,
3010 unsigned long length)
3011{
3012 struct ring_buffer_event *event;
3013 struct rb_event_info info;
3014 int nr_loops = 0;
3015 u64 diff;
3016
3017 /* ring buffer does cmpxchg, make sure it is safe in NMI context */
3018 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3019 (unlikely(in_nmi()))) {
3020 return NULL;
3021 }
3022
3023 rb_start_commit(cpu_buffer);
3024
3025#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3026 /*
3027 * Due to the ability to swap a cpu buffer from a buffer
3028 * it is possible it was swapped before we committed.
3029 * (committing stops a swap). We check for it here and
3030 * if it happened, we have to fail the write.
3031 */
3032 barrier();
3033 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
3034 local_dec(&cpu_buffer->committing);
3035 local_dec(&cpu_buffer->commits);
3036 return NULL;
3037 }
3038#endif
3039
3040 info.length = rb_calculate_event_length(length);
3041 again:
3042 info.add_timestamp = 0;
3043 info.delta = 0;
3044
3045 /*
3046 * We allow for interrupts to reenter here and do a trace.
3047 * If one does, it will cause this original code to loop
3048 * back here. Even with heavy interrupts happening, this
3049 * should only happen a few times in a row. If this happens
3050 * 1000 times in a row, there must be either an interrupt
3051 * storm or we have something buggy.
3052 * Bail!
3053 */
3054 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3055 goto out_fail;
3056
3057 info.ts = rb_time_stamp(cpu_buffer->buffer);
3058 diff = info.ts - cpu_buffer->write_stamp;
3059
3060 /* make sure this diff is calculated here */
3061 barrier();
3062
3063 if (ring_buffer_time_stamp_abs(buffer)) {
3064 info.delta = info.ts;
3065 rb_handle_timestamp(cpu_buffer, &info);
3066 } else /* Did the write stamp get updated already? */
3067 if (likely(info.ts >= cpu_buffer->write_stamp)) {
3068 info.delta = diff;
3069 if (unlikely(test_time_stamp(info.delta)))
3070 rb_handle_timestamp(cpu_buffer, &info);
3071 }
3072
3073 event = __rb_reserve_next(cpu_buffer, &info);
3074
3075 if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3076 if (info.add_timestamp)
3077 info.length -= RB_LEN_TIME_EXTEND;
3078 goto again;
3079 }
3080
3081 if (!event)
3082 goto out_fail;
3083
3084 return event;
3085
3086 out_fail:
3087 rb_end_commit(cpu_buffer);
3088 return NULL;
3089}
3090
3091/**
3092 * ring_buffer_lock_reserve - reserve a part of the buffer
3093 * @buffer: the ring buffer to reserve from
3094 * @length: the length of the data to reserve (excluding event header)
3095 *
3096 * Returns a reserved event on the ring buffer to copy directly to.
3097 * The user of this interface will need to get the body to write into
3098 * and can use the ring_buffer_event_data() interface.
3099 *
3100 * The length is the length of the data needed, not the event length
3101 * which also includes the event header.
3102 *
3103 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3104 * If NULL is returned, then nothing has been allocated or locked.
3105 */
3106struct ring_buffer_event *
3107ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
3108{
3109 struct ring_buffer_per_cpu *cpu_buffer;
3110 struct ring_buffer_event *event;
3111 int cpu;
3112
3113 /* If we are tracing schedule, we don't want to recurse */
3114 preempt_disable_notrace();
3115
3116 if (unlikely(atomic_read(&buffer->record_disabled)))
3117 goto out;
3118
3119 cpu = raw_smp_processor_id();
3120
3121 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3122 goto out;
3123
3124 cpu_buffer = buffer->buffers[cpu];
3125
3126 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3127 goto out;
3128
3129 if (unlikely(length > BUF_MAX_DATA_SIZE))
3130 goto out;
3131
3132 if (unlikely(trace_recursive_lock(cpu_buffer)))
3133 goto out;
3134
3135 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3136 if (!event)
3137 goto out_unlock;
3138
3139 return event;
3140
3141 out_unlock:
3142 trace_recursive_unlock(cpu_buffer);
3143 out:
3144 preempt_enable_notrace();
3145 return NULL;
3146}
3147EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3148
3149/*
3150 * Decrement the entries to the page that an event is on.
3151 * The event does not even need to exist, only the pointer
3152 * to the page it is on. This may only be called before the commit
3153 * takes place.
3154 */
3155static inline void
3156rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3157 struct ring_buffer_event *event)
3158{
3159 unsigned long addr = (unsigned long)event;
3160 struct buffer_page *bpage = cpu_buffer->commit_page;
3161 struct buffer_page *start;
3162
3163 addr &= PAGE_MASK;
3164
3165 /* Do the likely case first */
3166 if (likely(bpage->page == (void *)addr)) {
3167 local_dec(&bpage->entries);
3168 return;
3169 }
3170
3171 /*
3172 * Because the commit page may be on the reader page we
3173 * start with the next page and check the end loop there.
3174 */
3175 rb_inc_page(cpu_buffer, &bpage);
3176 start = bpage;
3177 do {
3178 if (bpage->page == (void *)addr) {
3179 local_dec(&bpage->entries);
3180 return;
3181 }
3182 rb_inc_page(cpu_buffer, &bpage);
3183 } while (bpage != start);
3184
3185 /* commit not part of this buffer?? */
3186 RB_WARN_ON(cpu_buffer, 1);
3187}
3188
3189/**
3190 * ring_buffer_commit_discard - discard an event that has not been committed
3191 * @buffer: the ring buffer
3192 * @event: non committed event to discard
3193 *
3194 * Sometimes an event that is in the ring buffer needs to be ignored.
3195 * This function lets the user discard an event in the ring buffer
3196 * and then that event will not be read later.
3197 *
3198 * This function only works if it is called before the item has been
3199 * committed. It will try to free the event from the ring buffer
3200 * if another event has not been added behind it.
3201 *
3202 * If another event has been added behind it, it will set the event
3203 * up as discarded, and perform the commit.
3204 *
3205 * If this function is called, do not call ring_buffer_unlock_commit on
3206 * the event.
3207 */
3208void ring_buffer_discard_commit(struct ring_buffer *buffer,
3209 struct ring_buffer_event *event)
3210{
3211 struct ring_buffer_per_cpu *cpu_buffer;
3212 int cpu;
3213
3214 /* The event is discarded regardless */
3215 rb_event_discard(event);
3216
3217 cpu = smp_processor_id();
3218 cpu_buffer = buffer->buffers[cpu];
3219
3220 /*
3221 * This must only be called if the event has not been
3222 * committed yet. Thus we can assume that preemption
3223 * is still disabled.
3224 */
3225 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3226
3227 rb_decrement_entry(cpu_buffer, event);
3228 if (rb_try_to_discard(cpu_buffer, event))
3229 goto out;
3230
3231 /*
3232 * The commit is still visible by the reader, so we
3233 * must still update the timestamp.
3234 */
3235 rb_update_write_stamp(cpu_buffer, event);
3236 out:
3237 rb_end_commit(cpu_buffer);
3238
3239 trace_recursive_unlock(cpu_buffer);
3240
3241 preempt_enable_notrace();
3242
3243}
3244EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3245
3246/**
3247 * ring_buffer_write - write data to the buffer without reserving
3248 * @buffer: The ring buffer to write to.
3249 * @length: The length of the data being written (excluding the event header)
3250 * @data: The data to write to the buffer.
3251 *
3252 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3253 * one function. If you already have the data to write to the buffer, it
3254 * may be easier to simply call this function.
3255 *
3256 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3257 * and not the length of the event which would hold the header.
3258 */
3259int ring_buffer_write(struct ring_buffer *buffer,
3260 unsigned long length,
3261 void *data)
3262{
3263 struct ring_buffer_per_cpu *cpu_buffer;
3264 struct ring_buffer_event *event;
3265 void *body;
3266 int ret = -EBUSY;
3267 int cpu;
3268
3269 preempt_disable_notrace();
3270
3271 if (atomic_read(&buffer->record_disabled))
3272 goto out;
3273
3274 cpu = raw_smp_processor_id();
3275
3276 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3277 goto out;
3278
3279 cpu_buffer = buffer->buffers[cpu];
3280
3281 if (atomic_read(&cpu_buffer->record_disabled))
3282 goto out;
3283
3284 if (length > BUF_MAX_DATA_SIZE)
3285 goto out;
3286
3287 if (unlikely(trace_recursive_lock(cpu_buffer)))
3288 goto out;
3289
3290 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3291 if (!event)
3292 goto out_unlock;
3293
3294 body = rb_event_data(event);
3295
3296 memcpy(body, data, length);
3297
3298 rb_commit(cpu_buffer, event);
3299
3300 rb_wakeups(buffer, cpu_buffer);
3301
3302 ret = 0;
3303
3304 out_unlock:
3305 trace_recursive_unlock(cpu_buffer);
3306
3307 out:
3308 preempt_enable_notrace();
3309
3310 return ret;
3311}
3312EXPORT_SYMBOL_GPL(ring_buffer_write);
3313
3314static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3315{
3316 struct buffer_page *reader = cpu_buffer->reader_page;
3317 struct buffer_page *head = rb_set_head_page(cpu_buffer);
3318 struct buffer_page *commit = cpu_buffer->commit_page;
3319
3320 /* In case of error, head will be NULL */
3321 if (unlikely(!head))
3322 return true;
3323
3324 /* Reader should exhaust content in reader page */
3325 if (reader->read != rb_page_commit(reader))
3326 return false;
3327
3328 /*
3329 * If writers are committing on the reader page, knowing all
3330 * committed content has been read, the ring buffer is empty.
3331 */
3332 if (commit == reader)
3333 return true;
3334
3335 /*
3336 * If writers are committing on a page other than reader page
3337 * and head page, there should always be content to read.
3338 */
3339 if (commit != head)
3340 return false;
3341
3342 /*
3343 * Writers are committing on the head page, we just need
3344 * to care about there're committed data, and the reader will
3345 * swap reader page with head page when it is to read data.
3346 */
3347 return rb_page_commit(commit) == 0;
3348}
3349
3350/**
3351 * ring_buffer_record_disable - stop all writes into the buffer
3352 * @buffer: The ring buffer to stop writes to.
3353 *
3354 * This prevents all writes to the buffer. Any attempt to write
3355 * to the buffer after this will fail and return NULL.
3356 *
3357 * The caller should call synchronize_rcu() after this.
3358 */
3359void ring_buffer_record_disable(struct ring_buffer *buffer)
3360{
3361 atomic_inc(&buffer->record_disabled);
3362}
3363EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3364
3365/**
3366 * ring_buffer_record_enable - enable writes to the buffer
3367 * @buffer: The ring buffer to enable writes
3368 *
3369 * Note, multiple disables will need the same number of enables
3370 * to truly enable the writing (much like preempt_disable).
3371 */
3372void ring_buffer_record_enable(struct ring_buffer *buffer)
3373{
3374 atomic_dec(&buffer->record_disabled);
3375}
3376EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3377
3378/**
3379 * ring_buffer_record_off - stop all writes into the buffer
3380 * @buffer: The ring buffer to stop writes to.
3381 *
3382 * This prevents all writes to the buffer. Any attempt to write
3383 * to the buffer after this will fail and return NULL.
3384 *
3385 * This is different than ring_buffer_record_disable() as
3386 * it works like an on/off switch, where as the disable() version
3387 * must be paired with a enable().
3388 */
3389void ring_buffer_record_off(struct ring_buffer *buffer)
3390{
3391 unsigned int rd;
3392 unsigned int new_rd;
3393
3394 do {
3395 rd = atomic_read(&buffer->record_disabled);
3396 new_rd = rd | RB_BUFFER_OFF;
3397 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3398}
3399EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3400
3401/**
3402 * ring_buffer_record_on - restart writes into the buffer
3403 * @buffer: The ring buffer to start writes to.
3404 *
3405 * This enables all writes to the buffer that was disabled by
3406 * ring_buffer_record_off().
3407 *
3408 * This is different than ring_buffer_record_enable() as
3409 * it works like an on/off switch, where as the enable() version
3410 * must be paired with a disable().
3411 */
3412void ring_buffer_record_on(struct ring_buffer *buffer)
3413{
3414 unsigned int rd;
3415 unsigned int new_rd;
3416
3417 do {
3418 rd = atomic_read(&buffer->record_disabled);
3419 new_rd = rd & ~RB_BUFFER_OFF;
3420 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3421}
3422EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3423
3424/**
3425 * ring_buffer_record_is_on - return true if the ring buffer can write
3426 * @buffer: The ring buffer to see if write is enabled
3427 *
3428 * Returns true if the ring buffer is in a state that it accepts writes.
3429 */
3430bool ring_buffer_record_is_on(struct ring_buffer *buffer)
3431{
3432 return !atomic_read(&buffer->record_disabled);
3433}
3434
3435/**
3436 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
3437 * @buffer: The ring buffer to see if write is set enabled
3438 *
3439 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
3440 * Note that this does NOT mean it is in a writable state.
3441 *
3442 * It may return true when the ring buffer has been disabled by
3443 * ring_buffer_record_disable(), as that is a temporary disabling of
3444 * the ring buffer.
3445 */
3446bool ring_buffer_record_is_set_on(struct ring_buffer *buffer)
3447{
3448 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
3449}
3450
3451/**
3452 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
3453 * @buffer: The ring buffer to stop writes to.
3454 * @cpu: The CPU buffer to stop
3455 *
3456 * This prevents all writes to the buffer. Any attempt to write
3457 * to the buffer after this will fail and return NULL.
3458 *
3459 * The caller should call synchronize_rcu() after this.
3460 */
3461void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
3462{
3463 struct ring_buffer_per_cpu *cpu_buffer;
3464
3465 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3466 return;
3467
3468 cpu_buffer = buffer->buffers[cpu];
3469 atomic_inc(&cpu_buffer->record_disabled);
3470}
3471EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
3472
3473/**
3474 * ring_buffer_record_enable_cpu - enable writes to the buffer
3475 * @buffer: The ring buffer to enable writes
3476 * @cpu: The CPU to enable.
3477 *
3478 * Note, multiple disables will need the same number of enables
3479 * to truly enable the writing (much like preempt_disable).
3480 */
3481void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
3482{
3483 struct ring_buffer_per_cpu *cpu_buffer;
3484
3485 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3486 return;
3487
3488 cpu_buffer = buffer->buffers[cpu];
3489 atomic_dec(&cpu_buffer->record_disabled);
3490}
3491EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
3492
3493/*
3494 * The total entries in the ring buffer is the running counter
3495 * of entries entered into the ring buffer, minus the sum of
3496 * the entries read from the ring buffer and the number of
3497 * entries that were overwritten.
3498 */
3499static inline unsigned long
3500rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
3501{
3502 return local_read(&cpu_buffer->entries) -
3503 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
3504}
3505
3506/**
3507 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
3508 * @buffer: The ring buffer
3509 * @cpu: The per CPU buffer to read from.
3510 */
3511u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
3512{
3513 unsigned long flags;
3514 struct ring_buffer_per_cpu *cpu_buffer;
3515 struct buffer_page *bpage;
3516 u64 ret = 0;
3517
3518 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3519 return 0;
3520
3521 cpu_buffer = buffer->buffers[cpu];
3522 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3523 /*
3524 * if the tail is on reader_page, oldest time stamp is on the reader
3525 * page
3526 */
3527 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
3528 bpage = cpu_buffer->reader_page;
3529 else
3530 bpage = rb_set_head_page(cpu_buffer);
3531 if (bpage)
3532 ret = bpage->page->time_stamp;
3533 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3534
3535 return ret;
3536}
3537EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
3538
3539/**
3540 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
3541 * @buffer: The ring buffer
3542 * @cpu: The per CPU buffer to read from.
3543 */
3544unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
3545{
3546 struct ring_buffer_per_cpu *cpu_buffer;
3547 unsigned long ret;
3548
3549 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3550 return 0;
3551
3552 cpu_buffer = buffer->buffers[cpu];
3553 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
3554
3555 return ret;
3556}
3557EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
3558
3559/**
3560 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
3561 * @buffer: The ring buffer
3562 * @cpu: The per CPU buffer to get the entries from.
3563 */
3564unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
3565{
3566 struct ring_buffer_per_cpu *cpu_buffer;
3567
3568 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3569 return 0;
3570
3571 cpu_buffer = buffer->buffers[cpu];
3572
3573 return rb_num_of_entries(cpu_buffer);
3574}
3575EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
3576
3577/**
3578 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
3579 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
3580 * @buffer: The ring buffer
3581 * @cpu: The per CPU buffer to get the number of overruns from
3582 */
3583unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
3584{
3585 struct ring_buffer_per_cpu *cpu_buffer;
3586 unsigned long ret;
3587
3588 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3589 return 0;
3590
3591 cpu_buffer = buffer->buffers[cpu];
3592 ret = local_read(&cpu_buffer->overrun);
3593
3594 return ret;
3595}
3596EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
3597
3598/**
3599 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
3600 * commits failing due to the buffer wrapping around while there are uncommitted
3601 * events, such as during an interrupt storm.
3602 * @buffer: The ring buffer
3603 * @cpu: The per CPU buffer to get the number of overruns from
3604 */
3605unsigned long
3606ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
3607{
3608 struct ring_buffer_per_cpu *cpu_buffer;
3609 unsigned long ret;
3610
3611 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3612 return 0;
3613
3614 cpu_buffer = buffer->buffers[cpu];
3615 ret = local_read(&cpu_buffer->commit_overrun);
3616
3617 return ret;
3618}
3619EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
3620
3621/**
3622 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
3623 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
3624 * @buffer: The ring buffer
3625 * @cpu: The per CPU buffer to get the number of overruns from
3626 */
3627unsigned long
3628ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu)
3629{
3630 struct ring_buffer_per_cpu *cpu_buffer;
3631 unsigned long ret;
3632
3633 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3634 return 0;
3635
3636 cpu_buffer = buffer->buffers[cpu];
3637 ret = local_read(&cpu_buffer->dropped_events);
3638
3639 return ret;
3640}
3641EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
3642
3643/**
3644 * ring_buffer_read_events_cpu - get the number of events successfully read
3645 * @buffer: The ring buffer
3646 * @cpu: The per CPU buffer to get the number of events read
3647 */
3648unsigned long
3649ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu)
3650{
3651 struct ring_buffer_per_cpu *cpu_buffer;
3652
3653 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3654 return 0;
3655
3656 cpu_buffer = buffer->buffers[cpu];
3657 return cpu_buffer->read;
3658}
3659EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
3660
3661/**
3662 * ring_buffer_entries - get the number of entries in a buffer
3663 * @buffer: The ring buffer
3664 *
3665 * Returns the total number of entries in the ring buffer
3666 * (all CPU entries)
3667 */
3668unsigned long ring_buffer_entries(struct ring_buffer *buffer)
3669{
3670 struct ring_buffer_per_cpu *cpu_buffer;
3671 unsigned long entries = 0;
3672 int cpu;
3673
3674 /* if you care about this being correct, lock the buffer */
3675 for_each_buffer_cpu(buffer, cpu) {
3676 cpu_buffer = buffer->buffers[cpu];
3677 entries += rb_num_of_entries(cpu_buffer);
3678 }
3679
3680 return entries;
3681}
3682EXPORT_SYMBOL_GPL(ring_buffer_entries);
3683
3684/**
3685 * ring_buffer_overruns - get the number of overruns in buffer
3686 * @buffer: The ring buffer
3687 *
3688 * Returns the total number of overruns in the ring buffer
3689 * (all CPU entries)
3690 */
3691unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
3692{
3693 struct ring_buffer_per_cpu *cpu_buffer;
3694 unsigned long overruns = 0;
3695 int cpu;
3696
3697 /* if you care about this being correct, lock the buffer */
3698 for_each_buffer_cpu(buffer, cpu) {
3699 cpu_buffer = buffer->buffers[cpu];
3700 overruns += local_read(&cpu_buffer->overrun);
3701 }
3702
3703 return overruns;
3704}
3705EXPORT_SYMBOL_GPL(ring_buffer_overruns);
3706
3707static void rb_iter_reset(struct ring_buffer_iter *iter)
3708{
3709 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3710
3711 /* Iterator usage is expected to have record disabled */
3712 iter->head_page = cpu_buffer->reader_page;
3713 iter->head = cpu_buffer->reader_page->read;
3714
3715 iter->cache_reader_page = iter->head_page;
3716 iter->cache_read = cpu_buffer->read;
3717 iter->cache_pages_removed = cpu_buffer->pages_removed;
3718
3719 if (iter->head)
3720 iter->read_stamp = cpu_buffer->read_stamp;
3721 else
3722 iter->read_stamp = iter->head_page->page->time_stamp;
3723}
3724
3725/**
3726 * ring_buffer_iter_reset - reset an iterator
3727 * @iter: The iterator to reset
3728 *
3729 * Resets the iterator, so that it will start from the beginning
3730 * again.
3731 */
3732void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
3733{
3734 struct ring_buffer_per_cpu *cpu_buffer;
3735 unsigned long flags;
3736
3737 if (!iter)
3738 return;
3739
3740 cpu_buffer = iter->cpu_buffer;
3741
3742 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3743 rb_iter_reset(iter);
3744 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3745}
3746EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
3747
3748/**
3749 * ring_buffer_iter_empty - check if an iterator has no more to read
3750 * @iter: The iterator to check
3751 */
3752int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
3753{
3754 struct ring_buffer_per_cpu *cpu_buffer;
3755 struct buffer_page *reader;
3756 struct buffer_page *head_page;
3757 struct buffer_page *commit_page;
3758 unsigned commit;
3759
3760 cpu_buffer = iter->cpu_buffer;
3761
3762 /* Remember, trace recording is off when iterator is in use */
3763 reader = cpu_buffer->reader_page;
3764 head_page = cpu_buffer->head_page;
3765 commit_page = cpu_buffer->commit_page;
3766 commit = rb_page_commit(commit_page);
3767
3768 return ((iter->head_page == commit_page && iter->head == commit) ||
3769 (iter->head_page == reader && commit_page == head_page &&
3770 head_page->read == commit &&
3771 iter->head == rb_page_commit(cpu_buffer->reader_page)));
3772}
3773EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
3774
3775static void
3776rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
3777 struct ring_buffer_event *event)
3778{
3779 u64 delta;
3780
3781 switch (event->type_len) {
3782 case RINGBUF_TYPE_PADDING:
3783 return;
3784
3785 case RINGBUF_TYPE_TIME_EXTEND:
3786 delta = ring_buffer_event_time_stamp(event);
3787 cpu_buffer->read_stamp += delta;
3788 return;
3789
3790 case RINGBUF_TYPE_TIME_STAMP:
3791 delta = ring_buffer_event_time_stamp(event);
3792 cpu_buffer->read_stamp = delta;
3793 return;
3794
3795 case RINGBUF_TYPE_DATA:
3796 cpu_buffer->read_stamp += event->time_delta;
3797 return;
3798
3799 default:
3800 BUG();
3801 }
3802 return;
3803}
3804
3805static void
3806rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
3807 struct ring_buffer_event *event)
3808{
3809 u64 delta;
3810
3811 switch (event->type_len) {
3812 case RINGBUF_TYPE_PADDING:
3813 return;
3814
3815 case RINGBUF_TYPE_TIME_EXTEND:
3816 delta = ring_buffer_event_time_stamp(event);
3817 iter->read_stamp += delta;
3818 return;
3819
3820 case RINGBUF_TYPE_TIME_STAMP:
3821 delta = ring_buffer_event_time_stamp(event);
3822 iter->read_stamp = delta;
3823 return;
3824
3825 case RINGBUF_TYPE_DATA:
3826 iter->read_stamp += event->time_delta;
3827 return;
3828
3829 default:
3830 BUG();
3831 }
3832 return;
3833}
3834
3835static struct buffer_page *
3836rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
3837{
3838 struct buffer_page *reader = NULL;
3839 unsigned long overwrite;
3840 unsigned long flags;
3841 int nr_loops = 0;
3842 int ret;
3843
3844 local_irq_save(flags);
3845 arch_spin_lock(&cpu_buffer->lock);
3846
3847 again:
3848 /*
3849 * This should normally only loop twice. But because the
3850 * start of the reader inserts an empty page, it causes
3851 * a case where we will loop three times. There should be no
3852 * reason to loop four times (that I know of).
3853 */
3854 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3855 reader = NULL;
3856 goto out;
3857 }
3858
3859 reader = cpu_buffer->reader_page;
3860
3861 /* If there's more to read, return this page */
3862 if (cpu_buffer->reader_page->read < rb_page_size(reader))
3863 goto out;
3864
3865 /* Never should we have an index greater than the size */
3866 if (RB_WARN_ON(cpu_buffer,
3867 cpu_buffer->reader_page->read > rb_page_size(reader)))
3868 goto out;
3869
3870 /* check if we caught up to the tail */
3871 reader = NULL;
3872 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3873 goto out;
3874
3875 /* Don't bother swapping if the ring buffer is empty */
3876 if (rb_num_of_entries(cpu_buffer) == 0)
3877 goto out;
3878
3879 /*
3880 * Reset the reader page to size zero.
3881 */
3882 local_set(&cpu_buffer->reader_page->write, 0);
3883 local_set(&cpu_buffer->reader_page->entries, 0);
3884 local_set(&cpu_buffer->reader_page->page->commit, 0);
3885 cpu_buffer->reader_page->real_end = 0;
3886
3887 spin:
3888 /*
3889 * Splice the empty reader page into the list around the head.
3890 */
3891 reader = rb_set_head_page(cpu_buffer);
3892 if (!reader)
3893 goto out;
3894 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3895 cpu_buffer->reader_page->list.prev = reader->list.prev;
3896
3897 /*
3898 * cpu_buffer->pages just needs to point to the buffer, it
3899 * has no specific buffer page to point to. Lets move it out
3900 * of our way so we don't accidentally swap it.
3901 */
3902 cpu_buffer->pages = reader->list.prev;
3903
3904 /* The reader page will be pointing to the new head */
3905 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3906
3907 /*
3908 * We want to make sure we read the overruns after we set up our
3909 * pointers to the next object. The writer side does a
3910 * cmpxchg to cross pages which acts as the mb on the writer
3911 * side. Note, the reader will constantly fail the swap
3912 * while the writer is updating the pointers, so this
3913 * guarantees that the overwrite recorded here is the one we
3914 * want to compare with the last_overrun.
3915 */
3916 smp_mb();
3917 overwrite = local_read(&(cpu_buffer->overrun));
3918
3919 /*
3920 * Here's the tricky part.
3921 *
3922 * We need to move the pointer past the header page.
3923 * But we can only do that if a writer is not currently
3924 * moving it. The page before the header page has the
3925 * flag bit '1' set if it is pointing to the page we want.
3926 * but if the writer is in the process of moving it
3927 * than it will be '2' or already moved '0'.
3928 */
3929
3930 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3931
3932 /*
3933 * If we did not convert it, then we must try again.
3934 */
3935 if (!ret)
3936 goto spin;
3937
3938 /*
3939 * Yay! We succeeded in replacing the page.
3940 *
3941 * Now make the new head point back to the reader page.
3942 */
3943 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3944 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3945
3946 local_inc(&cpu_buffer->pages_read);
3947
3948 /* Finally update the reader page to the new head */
3949 cpu_buffer->reader_page = reader;
3950 cpu_buffer->reader_page->read = 0;
3951
3952 if (overwrite != cpu_buffer->last_overrun) {
3953 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3954 cpu_buffer->last_overrun = overwrite;
3955 }
3956
3957 goto again;
3958
3959 out:
3960 /* Update the read_stamp on the first event */
3961 if (reader && reader->read == 0)
3962 cpu_buffer->read_stamp = reader->page->time_stamp;
3963
3964 arch_spin_unlock(&cpu_buffer->lock);
3965 local_irq_restore(flags);
3966
3967 /*
3968 * The writer has preempt disable, wait for it. But not forever
3969 * Although, 1 second is pretty much "forever"
3970 */
3971#define USECS_WAIT 1000000
3972 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
3973 /* If the write is past the end of page, a writer is still updating it */
3974 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE))
3975 break;
3976
3977 udelay(1);
3978
3979 /* Get the latest version of the reader write value */
3980 smp_rmb();
3981 }
3982
3983 /* The writer is not moving forward? Something is wrong */
3984 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
3985 reader = NULL;
3986
3987 /*
3988 * Make sure we see any padding after the write update
3989 * (see rb_reset_tail()).
3990 *
3991 * In addition, a writer may be writing on the reader page
3992 * if the page has not been fully filled, so the read barrier
3993 * is also needed to make sure we see the content of what is
3994 * committed by the writer (see rb_set_commit_to_write()).
3995 */
3996 smp_rmb();
3997
3998
3999 return reader;
4000}
4001
4002static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4003{
4004 struct ring_buffer_event *event;
4005 struct buffer_page *reader;
4006 unsigned length;
4007
4008 reader = rb_get_reader_page(cpu_buffer);
4009
4010 /* This function should not be called when buffer is empty */
4011 if (RB_WARN_ON(cpu_buffer, !reader))
4012 return;
4013
4014 event = rb_reader_event(cpu_buffer);
4015
4016 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
4017 cpu_buffer->read++;
4018
4019 rb_update_read_stamp(cpu_buffer, event);
4020
4021 length = rb_event_length(event);
4022 cpu_buffer->reader_page->read += length;
4023}
4024
4025static void rb_advance_iter(struct ring_buffer_iter *iter)
4026{
4027 struct ring_buffer_per_cpu *cpu_buffer;
4028 struct ring_buffer_event *event;
4029 unsigned length;
4030
4031 cpu_buffer = iter->cpu_buffer;
4032
4033 /*
4034 * Check if we are at the end of the buffer.
4035 */
4036 if (iter->head >= rb_page_size(iter->head_page)) {
4037 /* discarded commits can make the page empty */
4038 if (iter->head_page == cpu_buffer->commit_page)
4039 return;
4040 rb_inc_iter(iter);
4041 return;
4042 }
4043
4044 event = rb_iter_head_event(iter);
4045
4046 length = rb_event_length(event);
4047
4048 /*
4049 * This should not be called to advance the header if we are
4050 * at the tail of the buffer.
4051 */
4052 if (RB_WARN_ON(cpu_buffer,
4053 (iter->head_page == cpu_buffer->commit_page) &&
4054 (iter->head + length > rb_commit_index(cpu_buffer))))
4055 return;
4056
4057 rb_update_iter_read_stamp(iter, event);
4058
4059 iter->head += length;
4060
4061 /* check for end of page padding */
4062 if ((iter->head >= rb_page_size(iter->head_page)) &&
4063 (iter->head_page != cpu_buffer->commit_page))
4064 rb_inc_iter(iter);
4065}
4066
4067static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4068{
4069 return cpu_buffer->lost_events;
4070}
4071
4072static struct ring_buffer_event *
4073rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4074 unsigned long *lost_events)
4075{
4076 struct ring_buffer_event *event;
4077 struct buffer_page *reader;
4078 int nr_loops = 0;
4079
4080 if (ts)
4081 *ts = 0;
4082 again:
4083 /*
4084 * We repeat when a time extend is encountered.
4085 * Since the time extend is always attached to a data event,
4086 * we should never loop more than once.
4087 * (We never hit the following condition more than twice).
4088 */
4089 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4090 return NULL;
4091
4092 reader = rb_get_reader_page(cpu_buffer);
4093 if (!reader)
4094 return NULL;
4095
4096 event = rb_reader_event(cpu_buffer);
4097
4098 switch (event->type_len) {
4099 case RINGBUF_TYPE_PADDING:
4100 if (rb_null_event(event))
4101 RB_WARN_ON(cpu_buffer, 1);
4102 /*
4103 * Because the writer could be discarding every
4104 * event it creates (which would probably be bad)
4105 * if we were to go back to "again" then we may never
4106 * catch up, and will trigger the warn on, or lock
4107 * the box. Return the padding, and we will release
4108 * the current locks, and try again.
4109 */
4110 return event;
4111
4112 case RINGBUF_TYPE_TIME_EXTEND:
4113 /* Internal data, OK to advance */
4114 rb_advance_reader(cpu_buffer);
4115 goto again;
4116
4117 case RINGBUF_TYPE_TIME_STAMP:
4118 if (ts) {
4119 *ts = ring_buffer_event_time_stamp(event);
4120 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4121 cpu_buffer->cpu, ts);
4122 }
4123 /* Internal data, OK to advance */
4124 rb_advance_reader(cpu_buffer);
4125 goto again;
4126
4127 case RINGBUF_TYPE_DATA:
4128 if (ts && !(*ts)) {
4129 *ts = cpu_buffer->read_stamp + event->time_delta;
4130 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4131 cpu_buffer->cpu, ts);
4132 }
4133 if (lost_events)
4134 *lost_events = rb_lost_events(cpu_buffer);
4135 return event;
4136
4137 default:
4138 BUG();
4139 }
4140
4141 return NULL;
4142}
4143EXPORT_SYMBOL_GPL(ring_buffer_peek);
4144
4145static struct ring_buffer_event *
4146rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4147{
4148 struct ring_buffer *buffer;
4149 struct ring_buffer_per_cpu *cpu_buffer;
4150 struct ring_buffer_event *event;
4151 int nr_loops = 0;
4152
4153 if (ts)
4154 *ts = 0;
4155
4156 cpu_buffer = iter->cpu_buffer;
4157 buffer = cpu_buffer->buffer;
4158
4159 /*
4160 * Check if someone performed a consuming read to the buffer
4161 * or removed some pages from the buffer. In these cases,
4162 * iterator was invalidated and we need to reset it.
4163 */
4164 if (unlikely(iter->cache_read != cpu_buffer->read ||
4165 iter->cache_reader_page != cpu_buffer->reader_page ||
4166 iter->cache_pages_removed != cpu_buffer->pages_removed))
4167 rb_iter_reset(iter);
4168
4169 again:
4170 if (ring_buffer_iter_empty(iter))
4171 return NULL;
4172
4173 /*
4174 * We repeat when a time extend is encountered or we hit
4175 * the end of the page. Since the time extend is always attached
4176 * to a data event, we should never loop more than three times.
4177 * Once for going to next page, once on time extend, and
4178 * finally once to get the event.
4179 * (We never hit the following condition more than thrice).
4180 */
4181 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3))
4182 return NULL;
4183
4184 if (rb_per_cpu_empty(cpu_buffer))
4185 return NULL;
4186
4187 if (iter->head >= rb_page_size(iter->head_page)) {
4188 rb_inc_iter(iter);
4189 goto again;
4190 }
4191
4192 event = rb_iter_head_event(iter);
4193
4194 switch (event->type_len) {
4195 case RINGBUF_TYPE_PADDING:
4196 if (rb_null_event(event)) {
4197 rb_inc_iter(iter);
4198 goto again;
4199 }
4200 rb_advance_iter(iter);
4201 return event;
4202
4203 case RINGBUF_TYPE_TIME_EXTEND:
4204 /* Internal data, OK to advance */
4205 rb_advance_iter(iter);
4206 goto again;
4207
4208 case RINGBUF_TYPE_TIME_STAMP:
4209 if (ts) {
4210 *ts = ring_buffer_event_time_stamp(event);
4211 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4212 cpu_buffer->cpu, ts);
4213 }
4214 /* Internal data, OK to advance */
4215 rb_advance_iter(iter);
4216 goto again;
4217
4218 case RINGBUF_TYPE_DATA:
4219 if (ts && !(*ts)) {
4220 *ts = iter->read_stamp + event->time_delta;
4221 ring_buffer_normalize_time_stamp(buffer,
4222 cpu_buffer->cpu, ts);
4223 }
4224 return event;
4225
4226 default:
4227 BUG();
4228 }
4229
4230 return NULL;
4231}
4232EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4233
4234static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4235{
4236 if (likely(!in_nmi())) {
4237 raw_spin_lock(&cpu_buffer->reader_lock);
4238 return true;
4239 }
4240
4241 /*
4242 * If an NMI die dumps out the content of the ring buffer
4243 * trylock must be used to prevent a deadlock if the NMI
4244 * preempted a task that holds the ring buffer locks. If
4245 * we get the lock then all is fine, if not, then continue
4246 * to do the read, but this can corrupt the ring buffer,
4247 * so it must be permanently disabled from future writes.
4248 * Reading from NMI is a oneshot deal.
4249 */
4250 if (raw_spin_trylock(&cpu_buffer->reader_lock))
4251 return true;
4252
4253 /* Continue without locking, but disable the ring buffer */
4254 atomic_inc(&cpu_buffer->record_disabled);
4255 return false;
4256}
4257
4258static inline void
4259rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4260{
4261 if (likely(locked))
4262 raw_spin_unlock(&cpu_buffer->reader_lock);
4263 return;
4264}
4265
4266/**
4267 * ring_buffer_peek - peek at the next event to be read
4268 * @buffer: The ring buffer to read
4269 * @cpu: The cpu to peak at
4270 * @ts: The timestamp counter of this event.
4271 * @lost_events: a variable to store if events were lost (may be NULL)
4272 *
4273 * This will return the event that will be read next, but does
4274 * not consume the data.
4275 */
4276struct ring_buffer_event *
4277ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
4278 unsigned long *lost_events)
4279{
4280 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4281 struct ring_buffer_event *event;
4282 unsigned long flags;
4283 bool dolock;
4284
4285 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4286 return NULL;
4287
4288 again:
4289 local_irq_save(flags);
4290 dolock = rb_reader_lock(cpu_buffer);
4291 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4292 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4293 rb_advance_reader(cpu_buffer);
4294 rb_reader_unlock(cpu_buffer, dolock);
4295 local_irq_restore(flags);
4296
4297 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4298 goto again;
4299
4300 return event;
4301}
4302
4303/**
4304 * ring_buffer_iter_peek - peek at the next event to be read
4305 * @iter: The ring buffer iterator
4306 * @ts: The timestamp counter of this event.
4307 *
4308 * This will return the event that will be read next, but does
4309 * not increment the iterator.
4310 */
4311struct ring_buffer_event *
4312ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4313{
4314 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4315 struct ring_buffer_event *event;
4316 unsigned long flags;
4317
4318 again:
4319 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4320 event = rb_iter_peek(iter, ts);
4321 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4322
4323 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4324 goto again;
4325
4326 return event;
4327}
4328
4329/**
4330 * ring_buffer_consume - return an event and consume it
4331 * @buffer: The ring buffer to get the next event from
4332 * @cpu: the cpu to read the buffer from
4333 * @ts: a variable to store the timestamp (may be NULL)
4334 * @lost_events: a variable to store if events were lost (may be NULL)
4335 *
4336 * Returns the next event in the ring buffer, and that event is consumed.
4337 * Meaning, that sequential reads will keep returning a different event,
4338 * and eventually empty the ring buffer if the producer is slower.
4339 */
4340struct ring_buffer_event *
4341ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
4342 unsigned long *lost_events)
4343{
4344 struct ring_buffer_per_cpu *cpu_buffer;
4345 struct ring_buffer_event *event = NULL;
4346 unsigned long flags;
4347 bool dolock;
4348
4349 again:
4350 /* might be called in atomic */
4351 preempt_disable();
4352
4353 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4354 goto out;
4355
4356 cpu_buffer = buffer->buffers[cpu];
4357 local_irq_save(flags);
4358 dolock = rb_reader_lock(cpu_buffer);
4359
4360 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4361 if (event) {
4362 cpu_buffer->lost_events = 0;
4363 rb_advance_reader(cpu_buffer);
4364 }
4365
4366 rb_reader_unlock(cpu_buffer, dolock);
4367 local_irq_restore(flags);
4368
4369 out:
4370 preempt_enable();
4371
4372 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4373 goto again;
4374
4375 return event;
4376}
4377EXPORT_SYMBOL_GPL(ring_buffer_consume);
4378
4379/**
4380 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4381 * @buffer: The ring buffer to read from
4382 * @cpu: The cpu buffer to iterate over
4383 * @flags: gfp flags to use for memory allocation
4384 *
4385 * This performs the initial preparations necessary to iterate
4386 * through the buffer. Memory is allocated, buffer recording
4387 * is disabled, and the iterator pointer is returned to the caller.
4388 *
4389 * Disabling buffer recording prevents the reading from being
4390 * corrupted. This is not a consuming read, so a producer is not
4391 * expected.
4392 *
4393 * After a sequence of ring_buffer_read_prepare calls, the user is
4394 * expected to make at least one call to ring_buffer_read_prepare_sync.
4395 * Afterwards, ring_buffer_read_start is invoked to get things going
4396 * for real.
4397 *
4398 * This overall must be paired with ring_buffer_read_finish.
4399 */
4400struct ring_buffer_iter *
4401ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu, gfp_t flags)
4402{
4403 struct ring_buffer_per_cpu *cpu_buffer;
4404 struct ring_buffer_iter *iter;
4405
4406 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4407 return NULL;
4408
4409 iter = kmalloc(sizeof(*iter), flags);
4410 if (!iter)
4411 return NULL;
4412
4413 cpu_buffer = buffer->buffers[cpu];
4414
4415 iter->cpu_buffer = cpu_buffer;
4416
4417 atomic_inc(&buffer->resize_disabled);
4418 atomic_inc(&cpu_buffer->record_disabled);
4419
4420 return iter;
4421}
4422EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
4423
4424/**
4425 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
4426 *
4427 * All previously invoked ring_buffer_read_prepare calls to prepare
4428 * iterators will be synchronized. Afterwards, read_buffer_read_start
4429 * calls on those iterators are allowed.
4430 */
4431void
4432ring_buffer_read_prepare_sync(void)
4433{
4434 synchronize_rcu();
4435}
4436EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
4437
4438/**
4439 * ring_buffer_read_start - start a non consuming read of the buffer
4440 * @iter: The iterator returned by ring_buffer_read_prepare
4441 *
4442 * This finalizes the startup of an iteration through the buffer.
4443 * The iterator comes from a call to ring_buffer_read_prepare and
4444 * an intervening ring_buffer_read_prepare_sync must have been
4445 * performed.
4446 *
4447 * Must be paired with ring_buffer_read_finish.
4448 */
4449void
4450ring_buffer_read_start(struct ring_buffer_iter *iter)
4451{
4452 struct ring_buffer_per_cpu *cpu_buffer;
4453 unsigned long flags;
4454
4455 if (!iter)
4456 return;
4457
4458 cpu_buffer = iter->cpu_buffer;
4459
4460 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4461 arch_spin_lock(&cpu_buffer->lock);
4462 rb_iter_reset(iter);
4463 arch_spin_unlock(&cpu_buffer->lock);
4464 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4465}
4466EXPORT_SYMBOL_GPL(ring_buffer_read_start);
4467
4468/**
4469 * ring_buffer_read_finish - finish reading the iterator of the buffer
4470 * @iter: The iterator retrieved by ring_buffer_start
4471 *
4472 * This re-enables the recording to the buffer, and frees the
4473 * iterator.
4474 */
4475void
4476ring_buffer_read_finish(struct ring_buffer_iter *iter)
4477{
4478 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4479 unsigned long flags;
4480
4481 /*
4482 * Ring buffer is disabled from recording, here's a good place
4483 * to check the integrity of the ring buffer.
4484 * Must prevent readers from trying to read, as the check
4485 * clears the HEAD page and readers require it.
4486 */
4487 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4488 rb_check_pages(cpu_buffer);
4489 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4490
4491 atomic_dec(&cpu_buffer->record_disabled);
4492 atomic_dec(&cpu_buffer->buffer->resize_disabled);
4493 kfree(iter);
4494}
4495EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
4496
4497/**
4498 * ring_buffer_iter_advance - advance the iterator to the next location
4499 * @iter: The ring buffer iterator
4500 *
4501 * Move the location of the iterator such that the next read will
4502 * be the next location of the iterator.
4503 */
4504void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
4505{
4506 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4507 unsigned long flags;
4508
4509 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4510
4511 rb_advance_iter(iter);
4512
4513 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4514}
4515EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
4516
4517/**
4518 * ring_buffer_size - return the size of the ring buffer (in bytes)
4519 * @buffer: The ring buffer.
4520 */
4521unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
4522{
4523 /*
4524 * Earlier, this method returned
4525 * BUF_PAGE_SIZE * buffer->nr_pages
4526 * Since the nr_pages field is now removed, we have converted this to
4527 * return the per cpu buffer value.
4528 */
4529 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4530 return 0;
4531
4532 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
4533}
4534EXPORT_SYMBOL_GPL(ring_buffer_size);
4535
4536static void rb_clear_buffer_page(struct buffer_page *page)
4537{
4538 local_set(&page->write, 0);
4539 local_set(&page->entries, 0);
4540 rb_init_page(page->page);
4541 page->read = 0;
4542}
4543
4544static void
4545rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
4546{
4547 struct buffer_page *page;
4548
4549 rb_head_page_deactivate(cpu_buffer);
4550
4551 cpu_buffer->head_page
4552 = list_entry(cpu_buffer->pages, struct buffer_page, list);
4553 rb_clear_buffer_page(cpu_buffer->head_page);
4554 list_for_each_entry(page, cpu_buffer->pages, list) {
4555 rb_clear_buffer_page(page);
4556 }
4557
4558 cpu_buffer->tail_page = cpu_buffer->head_page;
4559 cpu_buffer->commit_page = cpu_buffer->head_page;
4560
4561 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
4562 INIT_LIST_HEAD(&cpu_buffer->new_pages);
4563 rb_clear_buffer_page(cpu_buffer->reader_page);
4564
4565 local_set(&cpu_buffer->entries_bytes, 0);
4566 local_set(&cpu_buffer->overrun, 0);
4567 local_set(&cpu_buffer->commit_overrun, 0);
4568 local_set(&cpu_buffer->dropped_events, 0);
4569 local_set(&cpu_buffer->entries, 0);
4570 local_set(&cpu_buffer->committing, 0);
4571 local_set(&cpu_buffer->commits, 0);
4572 local_set(&cpu_buffer->pages_touched, 0);
4573 local_set(&cpu_buffer->pages_lost, 0);
4574 local_set(&cpu_buffer->pages_read, 0);
4575 cpu_buffer->last_pages_touch = 0;
4576 cpu_buffer->shortest_full = 0;
4577 cpu_buffer->read = 0;
4578 cpu_buffer->read_bytes = 0;
4579
4580 cpu_buffer->write_stamp = 0;
4581 cpu_buffer->read_stamp = 0;
4582
4583 cpu_buffer->lost_events = 0;
4584 cpu_buffer->last_overrun = 0;
4585
4586 rb_head_page_activate(cpu_buffer);
4587 cpu_buffer->pages_removed = 0;
4588}
4589
4590/**
4591 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
4592 * @buffer: The ring buffer to reset a per cpu buffer of
4593 * @cpu: The CPU buffer to be reset
4594 */
4595void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
4596{
4597 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4598 unsigned long flags;
4599
4600 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4601 return;
4602 /* prevent another thread from changing buffer sizes */
4603 mutex_lock(&buffer->mutex);
4604
4605 atomic_inc(&buffer->resize_disabled);
4606 atomic_inc(&cpu_buffer->record_disabled);
4607
4608 /* Make sure all commits have finished */
4609 synchronize_rcu();
4610
4611 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4612
4613 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
4614 goto out;
4615
4616 arch_spin_lock(&cpu_buffer->lock);
4617
4618 rb_reset_cpu(cpu_buffer);
4619
4620 arch_spin_unlock(&cpu_buffer->lock);
4621
4622 out:
4623 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4624
4625 atomic_dec(&cpu_buffer->record_disabled);
4626 atomic_dec(&buffer->resize_disabled);
4627
4628 mutex_unlock(&buffer->mutex);
4629}
4630EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
4631
4632/**
4633 * ring_buffer_reset - reset a ring buffer
4634 * @buffer: The ring buffer to reset all cpu buffers
4635 */
4636void ring_buffer_reset(struct ring_buffer *buffer)
4637{
4638 int cpu;
4639
4640 for_each_buffer_cpu(buffer, cpu)
4641 ring_buffer_reset_cpu(buffer, cpu);
4642}
4643EXPORT_SYMBOL_GPL(ring_buffer_reset);
4644
4645/**
4646 * rind_buffer_empty - is the ring buffer empty?
4647 * @buffer: The ring buffer to test
4648 */
4649bool ring_buffer_empty(struct ring_buffer *buffer)
4650{
4651 struct ring_buffer_per_cpu *cpu_buffer;
4652 unsigned long flags;
4653 bool dolock;
4654 int cpu;
4655 int ret;
4656
4657 /* yes this is racy, but if you don't like the race, lock the buffer */
4658 for_each_buffer_cpu(buffer, cpu) {
4659 cpu_buffer = buffer->buffers[cpu];
4660 local_irq_save(flags);
4661 dolock = rb_reader_lock(cpu_buffer);
4662 ret = rb_per_cpu_empty(cpu_buffer);
4663 rb_reader_unlock(cpu_buffer, dolock);
4664 local_irq_restore(flags);
4665
4666 if (!ret)
4667 return false;
4668 }
4669
4670 return true;
4671}
4672EXPORT_SYMBOL_GPL(ring_buffer_empty);
4673
4674/**
4675 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
4676 * @buffer: The ring buffer
4677 * @cpu: The CPU buffer to test
4678 */
4679bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
4680{
4681 struct ring_buffer_per_cpu *cpu_buffer;
4682 unsigned long flags;
4683 bool dolock;
4684 int ret;
4685
4686 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4687 return true;
4688
4689 cpu_buffer = buffer->buffers[cpu];
4690 local_irq_save(flags);
4691 dolock = rb_reader_lock(cpu_buffer);
4692 ret = rb_per_cpu_empty(cpu_buffer);
4693 rb_reader_unlock(cpu_buffer, dolock);
4694 local_irq_restore(flags);
4695
4696 return ret;
4697}
4698EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
4699
4700#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
4701/**
4702 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
4703 * @buffer_a: One buffer to swap with
4704 * @buffer_b: The other buffer to swap with
4705 *
4706 * This function is useful for tracers that want to take a "snapshot"
4707 * of a CPU buffer and has another back up buffer lying around.
4708 * it is expected that the tracer handles the cpu buffer not being
4709 * used at the moment.
4710 */
4711int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
4712 struct ring_buffer *buffer_b, int cpu)
4713{
4714 struct ring_buffer_per_cpu *cpu_buffer_a;
4715 struct ring_buffer_per_cpu *cpu_buffer_b;
4716 int ret = -EINVAL;
4717
4718 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
4719 !cpumask_test_cpu(cpu, buffer_b->cpumask))
4720 goto out;
4721
4722 cpu_buffer_a = buffer_a->buffers[cpu];
4723 cpu_buffer_b = buffer_b->buffers[cpu];
4724
4725 /* At least make sure the two buffers are somewhat the same */
4726 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
4727 goto out;
4728
4729 ret = -EAGAIN;
4730
4731 if (atomic_read(&buffer_a->record_disabled))
4732 goto out;
4733
4734 if (atomic_read(&buffer_b->record_disabled))
4735 goto out;
4736
4737 if (atomic_read(&cpu_buffer_a->record_disabled))
4738 goto out;
4739
4740 if (atomic_read(&cpu_buffer_b->record_disabled))
4741 goto out;
4742
4743 /*
4744 * We can't do a synchronize_rcu here because this
4745 * function can be called in atomic context.
4746 * Normally this will be called from the same CPU as cpu.
4747 * If not it's up to the caller to protect this.
4748 */
4749 atomic_inc(&cpu_buffer_a->record_disabled);
4750 atomic_inc(&cpu_buffer_b->record_disabled);
4751
4752 ret = -EBUSY;
4753 if (local_read(&cpu_buffer_a->committing))
4754 goto out_dec;
4755 if (local_read(&cpu_buffer_b->committing))
4756 goto out_dec;
4757
4758 buffer_a->buffers[cpu] = cpu_buffer_b;
4759 buffer_b->buffers[cpu] = cpu_buffer_a;
4760
4761 cpu_buffer_b->buffer = buffer_a;
4762 cpu_buffer_a->buffer = buffer_b;
4763
4764 ret = 0;
4765
4766out_dec:
4767 atomic_dec(&cpu_buffer_a->record_disabled);
4768 atomic_dec(&cpu_buffer_b->record_disabled);
4769out:
4770 return ret;
4771}
4772EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
4773#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
4774
4775/**
4776 * ring_buffer_alloc_read_page - allocate a page to read from buffer
4777 * @buffer: the buffer to allocate for.
4778 * @cpu: the cpu buffer to allocate.
4779 *
4780 * This function is used in conjunction with ring_buffer_read_page.
4781 * When reading a full page from the ring buffer, these functions
4782 * can be used to speed up the process. The calling function should
4783 * allocate a few pages first with this function. Then when it
4784 * needs to get pages from the ring buffer, it passes the result
4785 * of this function into ring_buffer_read_page, which will swap
4786 * the page that was allocated, with the read page of the buffer.
4787 *
4788 * Returns:
4789 * The page allocated, or ERR_PTR
4790 */
4791void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
4792{
4793 struct ring_buffer_per_cpu *cpu_buffer;
4794 struct buffer_data_page *bpage = NULL;
4795 unsigned long flags;
4796 struct page *page;
4797
4798 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4799 return ERR_PTR(-ENODEV);
4800
4801 cpu_buffer = buffer->buffers[cpu];
4802 local_irq_save(flags);
4803 arch_spin_lock(&cpu_buffer->lock);
4804
4805 if (cpu_buffer->free_page) {
4806 bpage = cpu_buffer->free_page;
4807 cpu_buffer->free_page = NULL;
4808 }
4809
4810 arch_spin_unlock(&cpu_buffer->lock);
4811 local_irq_restore(flags);
4812
4813 if (bpage)
4814 goto out;
4815
4816 page = alloc_pages_node(cpu_to_node(cpu),
4817 GFP_KERNEL | __GFP_NORETRY, 0);
4818 if (!page)
4819 return ERR_PTR(-ENOMEM);
4820
4821 bpage = page_address(page);
4822
4823 out:
4824 rb_init_page(bpage);
4825
4826 return bpage;
4827}
4828EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
4829
4830/**
4831 * ring_buffer_free_read_page - free an allocated read page
4832 * @buffer: the buffer the page was allocate for
4833 * @cpu: the cpu buffer the page came from
4834 * @data: the page to free
4835 *
4836 * Free a page allocated from ring_buffer_alloc_read_page.
4837 */
4838void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data)
4839{
4840 struct ring_buffer_per_cpu *cpu_buffer;
4841 struct buffer_data_page *bpage = data;
4842 struct page *page = virt_to_page(bpage);
4843 unsigned long flags;
4844
4845 if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
4846 return;
4847
4848 cpu_buffer = buffer->buffers[cpu];
4849
4850 /* If the page is still in use someplace else, we can't reuse it */
4851 if (page_ref_count(page) > 1)
4852 goto out;
4853
4854 local_irq_save(flags);
4855 arch_spin_lock(&cpu_buffer->lock);
4856
4857 if (!cpu_buffer->free_page) {
4858 cpu_buffer->free_page = bpage;
4859 bpage = NULL;
4860 }
4861
4862 arch_spin_unlock(&cpu_buffer->lock);
4863 local_irq_restore(flags);
4864
4865 out:
4866 free_page((unsigned long)bpage);
4867}
4868EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
4869
4870/**
4871 * ring_buffer_read_page - extract a page from the ring buffer
4872 * @buffer: buffer to extract from
4873 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
4874 * @len: amount to extract
4875 * @cpu: the cpu of the buffer to extract
4876 * @full: should the extraction only happen when the page is full.
4877 *
4878 * This function will pull out a page from the ring buffer and consume it.
4879 * @data_page must be the address of the variable that was returned
4880 * from ring_buffer_alloc_read_page. This is because the page might be used
4881 * to swap with a page in the ring buffer.
4882 *
4883 * for example:
4884 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
4885 * if (IS_ERR(rpage))
4886 * return PTR_ERR(rpage);
4887 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
4888 * if (ret >= 0)
4889 * process_page(rpage, ret);
4890 *
4891 * When @full is set, the function will not return true unless
4892 * the writer is off the reader page.
4893 *
4894 * Note: it is up to the calling functions to handle sleeps and wakeups.
4895 * The ring buffer can be used anywhere in the kernel and can not
4896 * blindly call wake_up. The layer that uses the ring buffer must be
4897 * responsible for that.
4898 *
4899 * Returns:
4900 * >=0 if data has been transferred, returns the offset of consumed data.
4901 * <0 if no data has been transferred.
4902 */
4903int ring_buffer_read_page(struct ring_buffer *buffer,
4904 void **data_page, size_t len, int cpu, int full)
4905{
4906 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4907 struct ring_buffer_event *event;
4908 struct buffer_data_page *bpage;
4909 struct buffer_page *reader;
4910 unsigned long missed_events;
4911 unsigned long flags;
4912 unsigned int commit;
4913 unsigned int read;
4914 u64 save_timestamp;
4915 int ret = -1;
4916
4917 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4918 goto out;
4919
4920 /*
4921 * If len is not big enough to hold the page header, then
4922 * we can not copy anything.
4923 */
4924 if (len <= BUF_PAGE_HDR_SIZE)
4925 goto out;
4926
4927 len -= BUF_PAGE_HDR_SIZE;
4928
4929 if (!data_page)
4930 goto out;
4931
4932 bpage = *data_page;
4933 if (!bpage)
4934 goto out;
4935
4936 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4937
4938 reader = rb_get_reader_page(cpu_buffer);
4939 if (!reader)
4940 goto out_unlock;
4941
4942 event = rb_reader_event(cpu_buffer);
4943
4944 read = reader->read;
4945 commit = rb_page_commit(reader);
4946
4947 /* Check if any events were dropped */
4948 missed_events = cpu_buffer->lost_events;
4949
4950 /*
4951 * If this page has been partially read or
4952 * if len is not big enough to read the rest of the page or
4953 * a writer is still on the page, then
4954 * we must copy the data from the page to the buffer.
4955 * Otherwise, we can simply swap the page with the one passed in.
4956 */
4957 if (read || (len < (commit - read)) ||
4958 cpu_buffer->reader_page == cpu_buffer->commit_page) {
4959 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
4960 unsigned int rpos = read;
4961 unsigned int pos = 0;
4962 unsigned int size;
4963
4964 /*
4965 * If a full page is expected, this can still be returned
4966 * if there's been a previous partial read and the
4967 * rest of the page can be read and the commit page is off
4968 * the reader page.
4969 */
4970 if (full &&
4971 (!read || (len < (commit - read)) ||
4972 cpu_buffer->reader_page == cpu_buffer->commit_page))
4973 goto out_unlock;
4974
4975 if (len > (commit - read))
4976 len = (commit - read);
4977
4978 /* Always keep the time extend and data together */
4979 size = rb_event_ts_length(event);
4980
4981 if (len < size)
4982 goto out_unlock;
4983
4984 /* save the current timestamp, since the user will need it */
4985 save_timestamp = cpu_buffer->read_stamp;
4986
4987 /* Need to copy one event at a time */
4988 do {
4989 /* We need the size of one event, because
4990 * rb_advance_reader only advances by one event,
4991 * whereas rb_event_ts_length may include the size of
4992 * one or two events.
4993 * We have already ensured there's enough space if this
4994 * is a time extend. */
4995 size = rb_event_length(event);
4996 memcpy(bpage->data + pos, rpage->data + rpos, size);
4997
4998 len -= size;
4999
5000 rb_advance_reader(cpu_buffer);
5001 rpos = reader->read;
5002 pos += size;
5003
5004 if (rpos >= commit)
5005 break;
5006
5007 event = rb_reader_event(cpu_buffer);
5008 /* Always keep the time extend and data together */
5009 size = rb_event_ts_length(event);
5010 } while (len >= size);
5011
5012 /* update bpage */
5013 local_set(&bpage->commit, pos);
5014 bpage->time_stamp = save_timestamp;
5015
5016 /* we copied everything to the beginning */
5017 read = 0;
5018 } else {
5019 /* update the entry counter */
5020 cpu_buffer->read += rb_page_entries(reader);
5021 cpu_buffer->read_bytes += BUF_PAGE_SIZE;
5022
5023 /* swap the pages */
5024 rb_init_page(bpage);
5025 bpage = reader->page;
5026 reader->page = *data_page;
5027 local_set(&reader->write, 0);
5028 local_set(&reader->entries, 0);
5029 reader->read = 0;
5030 *data_page = bpage;
5031
5032 /*
5033 * Use the real_end for the data size,
5034 * This gives us a chance to store the lost events
5035 * on the page.
5036 */
5037 if (reader->real_end)
5038 local_set(&bpage->commit, reader->real_end);
5039 }
5040 ret = read;
5041
5042 cpu_buffer->lost_events = 0;
5043
5044 commit = local_read(&bpage->commit);
5045 /*
5046 * Set a flag in the commit field if we lost events
5047 */
5048 if (missed_events) {
5049 /* If there is room at the end of the page to save the
5050 * missed events, then record it there.
5051 */
5052 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
5053 memcpy(&bpage->data[commit], &missed_events,
5054 sizeof(missed_events));
5055 local_add(RB_MISSED_STORED, &bpage->commit);
5056 commit += sizeof(missed_events);
5057 }
5058 local_add(RB_MISSED_EVENTS, &bpage->commit);
5059 }
5060
5061 /*
5062 * This page may be off to user land. Zero it out here.
5063 */
5064 if (commit < BUF_PAGE_SIZE)
5065 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
5066
5067 out_unlock:
5068 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5069
5070 out:
5071 return ret;
5072}
5073EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5074
5075/*
5076 * We only allocate new buffers, never free them if the CPU goes down.
5077 * If we were to free the buffer, then the user would lose any trace that was in
5078 * the buffer.
5079 */
5080int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
5081{
5082 struct ring_buffer *buffer;
5083 long nr_pages_same;
5084 int cpu_i;
5085 unsigned long nr_pages;
5086
5087 buffer = container_of(node, struct ring_buffer, node);
5088 if (cpumask_test_cpu(cpu, buffer->cpumask))
5089 return 0;
5090
5091 nr_pages = 0;
5092 nr_pages_same = 1;
5093 /* check if all cpu sizes are same */
5094 for_each_buffer_cpu(buffer, cpu_i) {
5095 /* fill in the size from first enabled cpu */
5096 if (nr_pages == 0)
5097 nr_pages = buffer->buffers[cpu_i]->nr_pages;
5098 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
5099 nr_pages_same = 0;
5100 break;
5101 }
5102 }
5103 /* allocate minimum pages, user can later expand it */
5104 if (!nr_pages_same)
5105 nr_pages = 2;
5106 buffer->buffers[cpu] =
5107 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
5108 if (!buffer->buffers[cpu]) {
5109 WARN(1, "failed to allocate ring buffer on CPU %u\n",
5110 cpu);
5111 return -ENOMEM;
5112 }
5113 smp_wmb();
5114 cpumask_set_cpu(cpu, buffer->cpumask);
5115 return 0;
5116}
5117
5118#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
5119/*
5120 * This is a basic integrity check of the ring buffer.
5121 * Late in the boot cycle this test will run when configured in.
5122 * It will kick off a thread per CPU that will go into a loop
5123 * writing to the per cpu ring buffer various sizes of data.
5124 * Some of the data will be large items, some small.
5125 *
5126 * Another thread is created that goes into a spin, sending out
5127 * IPIs to the other CPUs to also write into the ring buffer.
5128 * this is to test the nesting ability of the buffer.
5129 *
5130 * Basic stats are recorded and reported. If something in the
5131 * ring buffer should happen that's not expected, a big warning
5132 * is displayed and all ring buffers are disabled.
5133 */
5134static struct task_struct *rb_threads[NR_CPUS] __initdata;
5135
5136struct rb_test_data {
5137 struct ring_buffer *buffer;
5138 unsigned long events;
5139 unsigned long bytes_written;
5140 unsigned long bytes_alloc;
5141 unsigned long bytes_dropped;
5142 unsigned long events_nested;
5143 unsigned long bytes_written_nested;
5144 unsigned long bytes_alloc_nested;
5145 unsigned long bytes_dropped_nested;
5146 int min_size_nested;
5147 int max_size_nested;
5148 int max_size;
5149 int min_size;
5150 int cpu;
5151 int cnt;
5152};
5153
5154static struct rb_test_data rb_data[NR_CPUS] __initdata;
5155
5156/* 1 meg per cpu */
5157#define RB_TEST_BUFFER_SIZE 1048576
5158
5159static char rb_string[] __initdata =
5160 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
5161 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
5162 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
5163
5164static bool rb_test_started __initdata;
5165
5166struct rb_item {
5167 int size;
5168 char str[];
5169};
5170
5171static __init int rb_write_something(struct rb_test_data *data, bool nested)
5172{
5173 struct ring_buffer_event *event;
5174 struct rb_item *item;
5175 bool started;
5176 int event_len;
5177 int size;
5178 int len;
5179 int cnt;
5180
5181 /* Have nested writes different that what is written */
5182 cnt = data->cnt + (nested ? 27 : 0);
5183
5184 /* Multiply cnt by ~e, to make some unique increment */
5185 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
5186
5187 len = size + sizeof(struct rb_item);
5188
5189 started = rb_test_started;
5190 /* read rb_test_started before checking buffer enabled */
5191 smp_rmb();
5192
5193 event = ring_buffer_lock_reserve(data->buffer, len);
5194 if (!event) {
5195 /* Ignore dropped events before test starts. */
5196 if (started) {
5197 if (nested)
5198 data->bytes_dropped += len;
5199 else
5200 data->bytes_dropped_nested += len;
5201 }
5202 return len;
5203 }
5204
5205 event_len = ring_buffer_event_length(event);
5206
5207 if (RB_WARN_ON(data->buffer, event_len < len))
5208 goto out;
5209
5210 item = ring_buffer_event_data(event);
5211 item->size = size;
5212 memcpy(item->str, rb_string, size);
5213
5214 if (nested) {
5215 data->bytes_alloc_nested += event_len;
5216 data->bytes_written_nested += len;
5217 data->events_nested++;
5218 if (!data->min_size_nested || len < data->min_size_nested)
5219 data->min_size_nested = len;
5220 if (len > data->max_size_nested)
5221 data->max_size_nested = len;
5222 } else {
5223 data->bytes_alloc += event_len;
5224 data->bytes_written += len;
5225 data->events++;
5226 if (!data->min_size || len < data->min_size)
5227 data->max_size = len;
5228 if (len > data->max_size)
5229 data->max_size = len;
5230 }
5231
5232 out:
5233 ring_buffer_unlock_commit(data->buffer, event);
5234
5235 return 0;
5236}
5237
5238static __init int rb_test(void *arg)
5239{
5240 struct rb_test_data *data = arg;
5241
5242 while (!kthread_should_stop()) {
5243 rb_write_something(data, false);
5244 data->cnt++;
5245
5246 set_current_state(TASK_INTERRUPTIBLE);
5247 /* Now sleep between a min of 100-300us and a max of 1ms */
5248 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
5249 }
5250
5251 return 0;
5252}
5253
5254static __init void rb_ipi(void *ignore)
5255{
5256 struct rb_test_data *data;
5257 int cpu = smp_processor_id();
5258
5259 data = &rb_data[cpu];
5260 rb_write_something(data, true);
5261}
5262
5263static __init int rb_hammer_test(void *arg)
5264{
5265 while (!kthread_should_stop()) {
5266
5267 /* Send an IPI to all cpus to write data! */
5268 smp_call_function(rb_ipi, NULL, 1);
5269 /* No sleep, but for non preempt, let others run */
5270 schedule();
5271 }
5272
5273 return 0;
5274}
5275
5276static __init int test_ringbuffer(void)
5277{
5278 struct task_struct *rb_hammer;
5279 struct ring_buffer *buffer;
5280 int cpu;
5281 int ret = 0;
5282
5283 if (security_locked_down(LOCKDOWN_TRACEFS)) {
5284 pr_warning("Lockdown is enabled, skipping ring buffer tests\n");
5285 return 0;
5286 }
5287
5288 pr_info("Running ring buffer tests...\n");
5289
5290 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
5291 if (WARN_ON(!buffer))
5292 return 0;
5293
5294 /* Disable buffer so that threads can't write to it yet */
5295 ring_buffer_record_off(buffer);
5296
5297 for_each_online_cpu(cpu) {
5298 rb_data[cpu].buffer = buffer;
5299 rb_data[cpu].cpu = cpu;
5300 rb_data[cpu].cnt = cpu;
5301 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
5302 "rbtester/%d", cpu);
5303 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
5304 pr_cont("FAILED\n");
5305 ret = PTR_ERR(rb_threads[cpu]);
5306 goto out_free;
5307 }
5308
5309 kthread_bind(rb_threads[cpu], cpu);
5310 wake_up_process(rb_threads[cpu]);
5311 }
5312
5313 /* Now create the rb hammer! */
5314 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
5315 if (WARN_ON(IS_ERR(rb_hammer))) {
5316 pr_cont("FAILED\n");
5317 ret = PTR_ERR(rb_hammer);
5318 goto out_free;
5319 }
5320
5321 ring_buffer_record_on(buffer);
5322 /*
5323 * Show buffer is enabled before setting rb_test_started.
5324 * Yes there's a small race window where events could be
5325 * dropped and the thread wont catch it. But when a ring
5326 * buffer gets enabled, there will always be some kind of
5327 * delay before other CPUs see it. Thus, we don't care about
5328 * those dropped events. We care about events dropped after
5329 * the threads see that the buffer is active.
5330 */
5331 smp_wmb();
5332 rb_test_started = true;
5333
5334 set_current_state(TASK_INTERRUPTIBLE);
5335 /* Just run for 10 seconds */;
5336 schedule_timeout(10 * HZ);
5337
5338 kthread_stop(rb_hammer);
5339
5340 out_free:
5341 for_each_online_cpu(cpu) {
5342 if (!rb_threads[cpu])
5343 break;
5344 kthread_stop(rb_threads[cpu]);
5345 }
5346 if (ret) {
5347 ring_buffer_free(buffer);
5348 return ret;
5349 }
5350
5351 /* Report! */
5352 pr_info("finished\n");
5353 for_each_online_cpu(cpu) {
5354 struct ring_buffer_event *event;
5355 struct rb_test_data *data = &rb_data[cpu];
5356 struct rb_item *item;
5357 unsigned long total_events;
5358 unsigned long total_dropped;
5359 unsigned long total_written;
5360 unsigned long total_alloc;
5361 unsigned long total_read = 0;
5362 unsigned long total_size = 0;
5363 unsigned long total_len = 0;
5364 unsigned long total_lost = 0;
5365 unsigned long lost;
5366 int big_event_size;
5367 int small_event_size;
5368
5369 ret = -1;
5370
5371 total_events = data->events + data->events_nested;
5372 total_written = data->bytes_written + data->bytes_written_nested;
5373 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
5374 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
5375
5376 big_event_size = data->max_size + data->max_size_nested;
5377 small_event_size = data->min_size + data->min_size_nested;
5378
5379 pr_info("CPU %d:\n", cpu);
5380 pr_info(" events: %ld\n", total_events);
5381 pr_info(" dropped bytes: %ld\n", total_dropped);
5382 pr_info(" alloced bytes: %ld\n", total_alloc);
5383 pr_info(" written bytes: %ld\n", total_written);
5384 pr_info(" biggest event: %d\n", big_event_size);
5385 pr_info(" smallest event: %d\n", small_event_size);
5386
5387 if (RB_WARN_ON(buffer, total_dropped))
5388 break;
5389
5390 ret = 0;
5391
5392 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
5393 total_lost += lost;
5394 item = ring_buffer_event_data(event);
5395 total_len += ring_buffer_event_length(event);
5396 total_size += item->size + sizeof(struct rb_item);
5397 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
5398 pr_info("FAILED!\n");
5399 pr_info("buffer had: %.*s\n", item->size, item->str);
5400 pr_info("expected: %.*s\n", item->size, rb_string);
5401 RB_WARN_ON(buffer, 1);
5402 ret = -1;
5403 break;
5404 }
5405 total_read++;
5406 }
5407 if (ret)
5408 break;
5409
5410 ret = -1;
5411
5412 pr_info(" read events: %ld\n", total_read);
5413 pr_info(" lost events: %ld\n", total_lost);
5414 pr_info(" total events: %ld\n", total_lost + total_read);
5415 pr_info(" recorded len bytes: %ld\n", total_len);
5416 pr_info(" recorded size bytes: %ld\n", total_size);
5417 if (total_lost)
5418 pr_info(" With dropped events, record len and size may not match\n"
5419 " alloced and written from above\n");
5420 if (!total_lost) {
5421 if (RB_WARN_ON(buffer, total_len != total_alloc ||
5422 total_size != total_written))
5423 break;
5424 }
5425 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
5426 break;
5427
5428 ret = 0;
5429 }
5430 if (!ret)
5431 pr_info("Ring buffer PASSED!\n");
5432
5433 ring_buffer_free(buffer);
5434 return 0;
5435}
5436
5437late_initcall(test_ringbuffer);
5438#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */