master
1/*-
2 * SPDX-License-Identifier: BSD-2-Clause
3 *
4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5 * All rights reserved.
6 * Copyright (c) 2024 Arm Ltd
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 */
30
31#ifndef _SYS_BUF_RING_H_
32#define _SYS_BUF_RING_H_
33
34#include <sys/param.h>
35#include <sys/kassert.h>
36#include <machine/atomic.h>
37#include <machine/cpu.h>
38
39#if defined(DEBUG_BUFRING) && defined(_KERNEL)
40#include <sys/lock.h>
41#include <sys/mutex.h>
42#endif
43
44/*
45 * We only apply the mask to the head and tail values when calculating the
46 * index into br_ring to access. This means the upper bits can be used as
47 * epoch to reduce the chance the atomic_cmpset succeedes when it should
48 * fail, e.g. when the head wraps while the CPU is in an interrupt. This
49 * is a probablistic fix as there is still a very unlikely chance the
50 * value wraps back to the expected value.
51 *
52 */
53struct buf_ring {
54 volatile uint32_t br_prod_head;
55 volatile uint32_t br_prod_tail;
56 int br_prod_size;
57 int br_prod_mask;
58 uint64_t br_drops;
59 volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
60 volatile uint32_t br_cons_tail;
61 int br_cons_size;
62 int br_cons_mask;
63#if defined(DEBUG_BUFRING) && defined(_KERNEL)
64 struct mtx *br_lock;
65#endif
66 void *br_ring[0] __aligned(CACHE_LINE_SIZE);
67};
68
69/*
70 * multi-producer safe lock-free ring buffer enqueue
71 *
72 */
73static __inline int
74buf_ring_enqueue(struct buf_ring *br, void *buf)
75{
76 uint32_t prod_head, prod_next, prod_idx;
77 uint32_t cons_tail, mask;
78
79 mask = br->br_prod_mask;
80#ifdef DEBUG_BUFRING
81 /*
82 * Note: It is possible to encounter an mbuf that was removed
83 * via drbr_peek(), and then re-added via drbr_putback() and
84 * trigger a spurious panic.
85 */
86 for (uint32_t i = br->br_cons_head; i != br->br_prod_head; i++)
87 if (br->br_ring[i & mask] == buf)
88 panic("buf=%p already enqueue at %d prod=%d cons=%d",
89 buf, i, br->br_prod_tail, br->br_cons_tail);
90#endif
91 critical_enter();
92 do {
93 /*
94 * br->br_prod_head needs to be read before br->br_cons_tail.
95 * If not then we could perform the dequeue and enqueue
96 * between reading br_cons_tail and reading br_prod_head. This
97 * could give us values where br_cons_head == br_prod_tail
98 * (after masking).
99 *
100 * To work around this us a load acquire. This is just to
101 * ensure ordering within this thread.
102 */
103 prod_head = atomic_load_acq_32(&br->br_prod_head);
104 prod_next = prod_head + 1;
105 cons_tail = atomic_load_acq_32(&br->br_cons_tail);
106
107 if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
108 rmb();
109 if (prod_head == br->br_prod_head &&
110 cons_tail == br->br_cons_tail) {
111 br->br_drops++;
112 critical_exit();
113 return (ENOBUFS);
114 }
115 continue;
116 }
117 } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
118 prod_idx = prod_head & mask;
119#ifdef DEBUG_BUFRING
120 if (br->br_ring[prod_idx] != NULL)
121 panic("dangling value in enqueue");
122#endif
123 br->br_ring[prod_idx] = buf;
124
125 /*
126 * If there are other enqueues in progress
127 * that preceded us, we need to wait for them
128 * to complete
129 */
130 while (br->br_prod_tail != prod_head)
131 cpu_spinwait();
132 atomic_store_rel_32(&br->br_prod_tail, prod_next);
133 critical_exit();
134 return (0);
135}
136
137/*
138 * multi-consumer safe dequeue
139 *
140 */
141static __inline void *
142buf_ring_dequeue_mc(struct buf_ring *br)
143{
144 uint32_t cons_head, cons_next, cons_idx;
145 uint32_t prod_tail, mask;
146 void *buf;
147
148 critical_enter();
149 mask = br->br_cons_mask;
150 do {
151 /*
152 * As with buf_ring_enqueue ensure we read the head before
153 * the tail. If we read them in the wrong order we may
154 * think the bug_ring is full when it is empty.
155 */
156 cons_head = atomic_load_acq_32(&br->br_cons_head);
157 cons_next = cons_head + 1;
158 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
159
160 if (cons_head == prod_tail) {
161 critical_exit();
162 return (NULL);
163 }
164 } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
165 cons_idx = cons_head & mask;
166
167 buf = br->br_ring[cons_idx];
168#ifdef DEBUG_BUFRING
169 br->br_ring[cons_idx] = NULL;
170#endif
171 /*
172 * If there are other dequeues in progress
173 * that preceded us, we need to wait for them
174 * to complete
175 */
176 while (br->br_cons_tail != cons_head)
177 cpu_spinwait();
178
179 atomic_store_rel_32(&br->br_cons_tail, cons_next);
180 critical_exit();
181
182 return (buf);
183}
184
185/*
186 * single-consumer dequeue
187 * use where dequeue is protected by a lock
188 * e.g. a network driver's tx queue lock
189 */
190static __inline void *
191buf_ring_dequeue_sc(struct buf_ring *br)
192{
193 uint32_t cons_head, cons_next, cons_idx;
194 uint32_t prod_tail, mask;
195 void *buf;
196
197 mask = br->br_cons_mask;
198 cons_head = br->br_cons_head;
199 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
200
201 cons_next = cons_head + 1;
202
203 if (cons_head == prod_tail)
204 return (NULL);
205
206 cons_idx = cons_head & mask;
207 br->br_cons_head = cons_next;
208 buf = br->br_ring[cons_idx];
209
210#ifdef DEBUG_BUFRING
211 br->br_ring[cons_idx] = NULL;
212#ifdef _KERNEL
213 if (!mtx_owned(br->br_lock))
214 panic("lock not held on single consumer dequeue");
215#endif
216 if (br->br_cons_tail != cons_head)
217 panic("inconsistent list cons_tail=%d cons_head=%d",
218 br->br_cons_tail, cons_head);
219#endif
220 atomic_store_rel_32(&br->br_cons_tail, cons_next);
221 return (buf);
222}
223
224/*
225 * single-consumer advance after a peek
226 * use where it is protected by a lock
227 * e.g. a network driver's tx queue lock
228 */
229static __inline void
230buf_ring_advance_sc(struct buf_ring *br)
231{
232 uint32_t cons_head, cons_next, prod_tail;
233#ifdef DEBUG_BUFRING
234 uint32_t mask;
235
236 mask = br->br_cons_mask;
237#endif
238 cons_head = br->br_cons_head;
239 prod_tail = br->br_prod_tail;
240
241 cons_next = cons_head + 1;
242 if (cons_head == prod_tail)
243 return;
244 br->br_cons_head = cons_next;
245#ifdef DEBUG_BUFRING
246 br->br_ring[cons_head & mask] = NULL;
247#endif
248 atomic_store_rel_32(&br->br_cons_tail, cons_next);
249}
250
251/*
252 * Used to return a buffer (most likely already there)
253 * to the top of the ring. The caller should *not*
254 * have used any dequeue to pull it out of the ring
255 * but instead should have used the peek() function.
256 * This is normally used where the transmit queue
257 * of a driver is full, and an mbuf must be returned.
258 * Most likely whats in the ring-buffer is what
259 * is being put back (since it was not removed), but
260 * sometimes the lower transmit function may have
261 * done a pullup or other function that will have
262 * changed it. As an optimization we always put it
263 * back (since jhb says the store is probably cheaper),
264 * if we have to do a multi-queue version we will need
265 * the compare and an atomic.
266 */
267static __inline void
268buf_ring_putback_sc(struct buf_ring *br, void *new)
269{
270 uint32_t mask;
271
272 mask = br->br_cons_mask;
273 KASSERT((br->br_cons_head & mask) != (br->br_prod_tail & mask),
274 ("Buf-Ring has none in putback")) ;
275 br->br_ring[br->br_cons_head & mask] = new;
276}
277
278/*
279 * return a pointer to the first entry in the ring
280 * without modifying it, or NULL if the ring is empty
281 * race-prone if not protected by a lock
282 */
283static __inline void *
284buf_ring_peek(struct buf_ring *br)
285{
286 uint32_t cons_head, prod_tail, mask;
287
288#if defined(DEBUG_BUFRING) && defined(_KERNEL)
289 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
290 panic("lock not held on single consumer dequeue");
291#endif
292 mask = br->br_cons_mask;
293 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
294 cons_head = br->br_cons_head;
295
296 if (cons_head == prod_tail)
297 return (NULL);
298
299 return (br->br_ring[cons_head & mask]);
300}
301
302static __inline void *
303buf_ring_peek_clear_sc(struct buf_ring *br)
304{
305 uint32_t cons_head, prod_tail, mask;
306 void *ret;
307
308#if defined(DEBUG_BUFRING) && defined(_KERNEL)
309 if (!mtx_owned(br->br_lock))
310 panic("lock not held on single consumer dequeue");
311#endif
312
313 mask = br->br_cons_mask;
314 prod_tail = atomic_load_acq_32(&br->br_prod_tail);
315 cons_head = br->br_cons_head;
316
317 if (cons_head == prod_tail)
318 return (NULL);
319
320 ret = br->br_ring[cons_head & mask];
321#ifdef DEBUG_BUFRING
322 /*
323 * Single consumer, i.e. cons_head will not move while we are
324 * running, so atomic_swap_ptr() is not necessary here.
325 */
326 br->br_ring[cons_head & mask] = NULL;
327#endif
328 return (ret);
329}
330
331static __inline int
332buf_ring_full(struct buf_ring *br)
333{
334
335 return (br->br_prod_head == br->br_cons_tail + br->br_cons_size - 1);
336}
337
338static __inline int
339buf_ring_empty(struct buf_ring *br)
340{
341
342 return (br->br_cons_head == br->br_prod_tail);
343}
344
345static __inline int
346buf_ring_count(struct buf_ring *br)
347{
348
349 return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
350 & br->br_prod_mask);
351}
352
353#ifdef _KERNEL
354struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
355 struct mtx *);
356void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
357#else
358
359#include <stdlib.h>
360
361static inline struct buf_ring *
362buf_ring_alloc(int count)
363{
364 struct buf_ring *br;
365
366 KASSERT(powerof2(count), ("buf ring must be size power of 2"));
367
368 br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *));
369 if (br == NULL)
370 return (NULL);
371 br->br_prod_size = br->br_cons_size = count;
372 br->br_prod_mask = br->br_cons_mask = count - 1;
373 br->br_prod_head = br->br_cons_head = 0;
374 br->br_prod_tail = br->br_cons_tail = 0;
375 return (br);
376}
377
378static inline void
379buf_ring_free(struct buf_ring *br)
380{
381 free(br);
382}
383
384#endif /* !_KERNEL */
385#endif /* _SYS_BUF_RING_H_ */