master
  1/*-
  2 * SPDX-License-Identifier: BSD-2-Clause
  3 *
  4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
  5 * All rights reserved.
  6 * Copyright (c) 2024 Arm Ltd
  7 *
  8 * Redistribution and use in source and binary forms, with or without
  9 * modification, are permitted provided that the following conditions
 10 * are met:
 11 * 1. Redistributions of source code must retain the above copyright
 12 *    notice, this list of conditions and the following disclaimer.
 13 * 2. Redistributions in binary form must reproduce the above copyright
 14 *    notice, this list of conditions and the following disclaimer in the
 15 *    documentation and/or other materials provided with the distribution.
 16 *
 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 27 * SUCH DAMAGE.
 28 *
 29 */
 30
 31#ifndef	_SYS_BUF_RING_H_
 32#define	_SYS_BUF_RING_H_
 33
 34#include <sys/param.h>
 35#include <sys/kassert.h>
 36#include <machine/atomic.h>
 37#include <machine/cpu.h>
 38
 39#if defined(DEBUG_BUFRING) && defined(_KERNEL)
 40#include <sys/lock.h>
 41#include <sys/mutex.h>
 42#endif
 43
 44/*
 45 * We only apply the mask to the head and tail values when calculating the
 46 * index into br_ring to access. This means the upper bits can be used as
 47 * epoch to reduce the chance the atomic_cmpset succeedes when it should
 48 * fail, e.g. when the head wraps while the CPU is in an interrupt. This
 49 * is a probablistic fix as there is still a very unlikely chance the
 50 * value wraps back to the expected value.
 51 *
 52 */
 53struct buf_ring {
 54	volatile uint32_t	br_prod_head;
 55	volatile uint32_t	br_prod_tail;	
 56	int              	br_prod_size;
 57	int              	br_prod_mask;
 58	uint64_t		br_drops;
 59	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
 60	volatile uint32_t	br_cons_tail;
 61	int		 	br_cons_size;
 62	int              	br_cons_mask;
 63#if defined(DEBUG_BUFRING) && defined(_KERNEL)
 64	struct mtx		*br_lock;
 65#endif	
 66	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
 67};
 68
 69/*
 70 * multi-producer safe lock-free ring buffer enqueue
 71 *
 72 */
 73static __inline int
 74buf_ring_enqueue(struct buf_ring *br, void *buf)
 75{
 76	uint32_t prod_head, prod_next, prod_idx;
 77	uint32_t cons_tail, mask;
 78
 79	mask = br->br_prod_mask;
 80#ifdef DEBUG_BUFRING
 81	/*
 82	 * Note: It is possible to encounter an mbuf that was removed
 83	 * via drbr_peek(), and then re-added via drbr_putback() and
 84	 * trigger a spurious panic.
 85	 */
 86	for (uint32_t i = br->br_cons_head; i != br->br_prod_head; i++)
 87		if (br->br_ring[i & mask] == buf)
 88			panic("buf=%p already enqueue at %d prod=%d cons=%d",
 89			    buf, i, br->br_prod_tail, br->br_cons_tail);
 90#endif	
 91	critical_enter();
 92	do {
 93		/*
 94		 * br->br_prod_head needs to be read before br->br_cons_tail.
 95		 * If not then we could perform the dequeue and enqueue
 96		 * between reading br_cons_tail and reading br_prod_head. This
 97		 * could give us values where br_cons_head == br_prod_tail
 98		 * (after masking).
 99		 *
100		 * To work around this us a load acquire. This is just to
101		 * ensure ordering within this thread.
102		 */
103		prod_head = atomic_load_acq_32(&br->br_prod_head);
104		prod_next = prod_head + 1;
105		cons_tail = atomic_load_acq_32(&br->br_cons_tail);
106
107		if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
108			rmb();
109			if (prod_head == br->br_prod_head &&
110			    cons_tail == br->br_cons_tail) {
111				br->br_drops++;
112				critical_exit();
113				return (ENOBUFS);
114			}
115			continue;
116		}
117	} while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
118	prod_idx = prod_head & mask;
119#ifdef DEBUG_BUFRING
120	if (br->br_ring[prod_idx] != NULL)
121		panic("dangling value in enqueue");
122#endif	
123	br->br_ring[prod_idx] = buf;
124
125	/*
126	 * If there are other enqueues in progress
127	 * that preceded us, we need to wait for them
128	 * to complete 
129	 */   
130	while (br->br_prod_tail != prod_head)
131		cpu_spinwait();
132	atomic_store_rel_32(&br->br_prod_tail, prod_next);
133	critical_exit();
134	return (0);
135}
136
137/*
138 * multi-consumer safe dequeue 
139 *
140 */
141static __inline void *
142buf_ring_dequeue_mc(struct buf_ring *br)
143{
144	uint32_t cons_head, cons_next, cons_idx;
145	uint32_t prod_tail, mask;
146	void *buf;
147
148	critical_enter();
149	mask = br->br_cons_mask;
150	do {
151		/*
152		 * As with buf_ring_enqueue ensure we read the head before
153		 * the tail. If we read them in the wrong order we may
154		 * think the bug_ring is full when it is empty.
155		 */
156		cons_head = atomic_load_acq_32(&br->br_cons_head);
157		cons_next = cons_head + 1;
158		prod_tail = atomic_load_acq_32(&br->br_prod_tail);
159
160		if (cons_head == prod_tail) {
161			critical_exit();
162			return (NULL);
163		}
164	} while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
165	cons_idx = cons_head & mask;
166
167	buf = br->br_ring[cons_idx];
168#ifdef DEBUG_BUFRING
169	br->br_ring[cons_idx] = NULL;
170#endif
171	/*
172	 * If there are other dequeues in progress
173	 * that preceded us, we need to wait for them
174	 * to complete 
175	 */   
176	while (br->br_cons_tail != cons_head)
177		cpu_spinwait();
178
179	atomic_store_rel_32(&br->br_cons_tail, cons_next);
180	critical_exit();
181
182	return (buf);
183}
184
185/*
186 * single-consumer dequeue 
187 * use where dequeue is protected by a lock
188 * e.g. a network driver's tx queue lock
189 */
190static __inline void *
191buf_ring_dequeue_sc(struct buf_ring *br)
192{
193	uint32_t cons_head, cons_next, cons_idx;
194	uint32_t prod_tail, mask;
195	void *buf;
196
197	mask = br->br_cons_mask;
198	cons_head = br->br_cons_head;
199	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
200
201	cons_next = cons_head + 1;
202
203	if (cons_head == prod_tail)
204		return (NULL);
205
206	cons_idx = cons_head & mask;
207	br->br_cons_head = cons_next;
208	buf = br->br_ring[cons_idx];
209
210#ifdef DEBUG_BUFRING
211	br->br_ring[cons_idx] = NULL;
212#ifdef _KERNEL
213	if (!mtx_owned(br->br_lock))
214		panic("lock not held on single consumer dequeue");
215#endif
216	if (br->br_cons_tail != cons_head)
217		panic("inconsistent list cons_tail=%d cons_head=%d",
218		    br->br_cons_tail, cons_head);
219#endif
220	atomic_store_rel_32(&br->br_cons_tail, cons_next);
221	return (buf);
222}
223
224/*
225 * single-consumer advance after a peek
226 * use where it is protected by a lock
227 * e.g. a network driver's tx queue lock
228 */
229static __inline void
230buf_ring_advance_sc(struct buf_ring *br)
231{
232	uint32_t cons_head, cons_next, prod_tail;
233#ifdef DEBUG_BUFRING
234	uint32_t mask;
235
236	mask = br->br_cons_mask;
237#endif
238	cons_head = br->br_cons_head;
239	prod_tail = br->br_prod_tail;
240
241	cons_next = cons_head + 1;
242	if (cons_head == prod_tail)
243		return;
244	br->br_cons_head = cons_next;
245#ifdef DEBUG_BUFRING
246	br->br_ring[cons_head & mask] = NULL;
247#endif
248	atomic_store_rel_32(&br->br_cons_tail, cons_next);
249}
250
251/*
252 * Used to return a buffer (most likely already there)
253 * to the top of the ring. The caller should *not*
254 * have used any dequeue to pull it out of the ring
255 * but instead should have used the peek() function.
256 * This is normally used where the transmit queue
257 * of a driver is full, and an mbuf must be returned.
258 * Most likely whats in the ring-buffer is what
259 * is being put back (since it was not removed), but
260 * sometimes the lower transmit function may have
261 * done a pullup or other function that will have
262 * changed it. As an optimization we always put it
263 * back (since jhb says the store is probably cheaper),
264 * if we have to do a multi-queue version we will need
265 * the compare and an atomic.
266 */
267static __inline void
268buf_ring_putback_sc(struct buf_ring *br, void *new)
269{
270	uint32_t mask;
271
272	mask = br->br_cons_mask;
273	KASSERT((br->br_cons_head & mask) != (br->br_prod_tail & mask),
274		("Buf-Ring has none in putback")) ;
275	br->br_ring[br->br_cons_head & mask] = new;
276}
277
278/*
279 * return a pointer to the first entry in the ring
280 * without modifying it, or NULL if the ring is empty
281 * race-prone if not protected by a lock
282 */
283static __inline void *
284buf_ring_peek(struct buf_ring *br)
285{
286	uint32_t cons_head, prod_tail, mask;
287
288#if defined(DEBUG_BUFRING) && defined(_KERNEL)
289	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
290		panic("lock not held on single consumer dequeue");
291#endif	
292	mask = br->br_cons_mask;
293	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
294	cons_head = br->br_cons_head;
295
296	if (cons_head == prod_tail)
297		return (NULL);
298
299	return (br->br_ring[cons_head & mask]);
300}
301
302static __inline void *
303buf_ring_peek_clear_sc(struct buf_ring *br)
304{
305	uint32_t cons_head, prod_tail, mask;
306	void *ret;
307
308#if defined(DEBUG_BUFRING) && defined(_KERNEL)
309	if (!mtx_owned(br->br_lock))
310		panic("lock not held on single consumer dequeue");
311#endif	
312
313	mask = br->br_cons_mask;
314	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
315	cons_head = br->br_cons_head;
316
317	if (cons_head == prod_tail)
318		return (NULL);
319
320	ret = br->br_ring[cons_head & mask];
321#ifdef DEBUG_BUFRING
322	/*
323	 * Single consumer, i.e. cons_head will not move while we are
324	 * running, so atomic_swap_ptr() is not necessary here.
325	 */
326	br->br_ring[cons_head & mask] = NULL;
327#endif
328	return (ret);
329}
330
331static __inline int
332buf_ring_full(struct buf_ring *br)
333{
334
335	return (br->br_prod_head == br->br_cons_tail + br->br_cons_size - 1);
336}
337
338static __inline int
339buf_ring_empty(struct buf_ring *br)
340{
341
342	return (br->br_cons_head == br->br_prod_tail);
343}
344
345static __inline int
346buf_ring_count(struct buf_ring *br)
347{
348
349	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
350	    & br->br_prod_mask);
351}
352
353#ifdef _KERNEL
354struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
355    struct mtx *);
356void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
357#else
358
359#include <stdlib.h>
360
361static inline struct buf_ring *
362buf_ring_alloc(int count)
363{
364	struct buf_ring *br;
365
366	KASSERT(powerof2(count), ("buf ring must be size power of 2"));
367
368	br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *));
369	if (br == NULL)
370		return (NULL);
371	br->br_prod_size = br->br_cons_size = count;
372	br->br_prod_mask = br->br_cons_mask = count - 1;
373	br->br_prod_head = br->br_cons_head = 0;
374	br->br_prod_tail = br->br_cons_tail = 0;
375	return (br);
376}
377
378static inline void
379buf_ring_free(struct buf_ring *br)
380{
381	free(br);
382}
383
384#endif /* !_KERNEL */
385#endif /* _SYS_BUF_RING_H_ */