@@ -11,6 +11,17 @@
#ifndef _RTE_RING_C11_PVT_H_
#define _RTE_RING_C11_PVT_H_
+/**
+ * @file rte_ring_c11_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for MP/SP and MC/SC ring modes.
+ * For more information please refer to <rte_ring.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
@@ -29,40 +40,45 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
}
/**
- * @internal This function updates the producer head for enqueue
+ * @internal This is a helper function that moves the producer/consumer head
*
- * @param r
- * A pointer to the ring structure
- * @param is_sp
- * Indicates whether multi-producer path is needed or not
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param is_st
+ * Indicates whether multi-thread safe path is needed or not
* @param n
- * The number of elements we will want to enqueue, i.e. how far should the
- * head be moved
+ * The number of elements we want to move head value on
* @param behavior
- * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
* @param old_head
- * Returns head value as it was before the move, i.e. where enqueue starts
+ * Returns head value as it was before the move
* @param new_head
- * Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- * Returns the amount of free space in the ring BEFORE head was moved
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
* @return
- * Actual number of objects enqueued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int is_st, unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
- const uint32_t capacity = r->capacity;
- uint32_t cons_tail;
- unsigned int max = n;
+ uint32_t stail;
int success;
+ unsigned int max = n;
- *old_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
+ *old_head = rte_atomic_load_explicit(&d->head,
+ rte_memory_order_relaxed);
do {
/* Reset n to the initial burst count */
n = max;
@@ -73,112 +89,36 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
/* load-acquire synchronize with store-release of ht->tail
* in update_tail.
*/
- cons_tail = rte_atomic_load_explicit(&r->cons.tail,
+ stail = rte_atomic_load_explicit(&s->tail,
rte_memory_order_acquire);
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = (capacity + cons_tail - *old_head);
+ *entries = (capacity + stail - *old_head);
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
return 0;
*new_head = *old_head + n;
- if (is_sp) {
- r->prod.head = *new_head;
+ if (is_st) {
+ d->head = *new_head;
success = 1;
} else
/* on failure, *old_head is updated */
- success = rte_atomic_compare_exchange_strong_explicit(&r->prod.head,
- old_head, *new_head,
+ success = rte_atomic_compare_exchange_strong_explicit(
+ &d->head, old_head, *new_head,
rte_memory_order_relaxed,
rte_memory_order_relaxed);
} while (unlikely(success == 0));
return n;
}
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- * A pointer to the ring structure
- * @param is_sc
- * Indicates whether multi-consumer path is needed or not
- * @param n
- * The number of elements we will want to dequeue, i.e. how far should the
- * head be moved
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- * Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- * Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- * Returns the number of entries in the ring BEFORE head was moved
- * @return
- * - Actual number of objects dequeued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *entries)
-{
- unsigned int max = n;
- uint32_t prod_tail;
- int success;
-
- /* move cons.head atomically */
- *old_head = rte_atomic_load_explicit(&r->cons.head, rte_memory_order_relaxed);
- do {
- /* Restore n as it may change every loop */
- n = max;
-
- /* Ensure the head is read before tail */
- rte_atomic_thread_fence(rte_memory_order_acquire);
-
- /* this load-acquire synchronize with store-release of ht->tail
- * in update_tail.
- */
- prod_tail = rte_atomic_load_explicit(&r->prod.tail,
- rte_memory_order_acquire);
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = (prod_tail - *old_head);
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- return 0;
-
- *new_head = *old_head + n;
- if (is_sc) {
- r->cons.head = *new_head;
- success = 1;
- } else
- /* on failure, *old_head will be updated */
- success = rte_atomic_compare_exchange_strong_explicit(&r->cons.head,
- old_head, *new_head,
- rte_memory_order_relaxed,
- rte_memory_order_relaxed);
- } while (unlikely(success == 0));
- return n;
-}
-
#endif /* _RTE_RING_C11_PVT_H_ */
@@ -293,6 +293,72 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
#include "rte_ring_generic_pvt.h"
#endif
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sp
+ * Indicates whether multi-producer path is needed or not
+ * @param n
+ * The number of elements we will want to enqueue, i.e. how far should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ * Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ * Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *free_entries)
+{
+ return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
+ is_sp, n, behavior, old_head, new_head, free_entries);
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ * A pointer to the ring structure
+ * @param is_sc
+ * Indicates whether multi-consumer path is needed or not
+ * @param n
+ * The number of elements we will want to dequeue, i.e. how far should the
+ * head be moved
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ * Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ * Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ * Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head,
+ uint32_t *entries)
+{
+ return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
+ is_sc, n, behavior, old_head, new_head, entries);
+}
+
/**
* @internal Enqueue several objects on the ring
*
@@ -10,6 +10,17 @@
#ifndef _RTE_RING_GENERIC_PVT_H_
#define _RTE_RING_GENERIC_PVT_H_
+/**
+ * @file rte_ring_generic_pvt.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for MP/SP and MC/SC ring modes.
+ * For more information please refer to <rte_ring.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
@@ -30,35 +41,39 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
}
/**
- * @internal This function updates the producer head for enqueue
+ * @internal This is a helper function that moves the producer/consumer head
*
- * @param r
- * A pointer to the ring structure
- * @param is_sp
- * Indicates whether multi-producer path is needed or not
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param is_st
+ * Indicates whether multi-thread safe path is needed or not
* @param n
- * The number of elements we will want to enqueue, i.e. how far should the
- * head be moved
+ * The number of elements we want to move head value on
* @param behavior
- * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
* @param old_head
- * Returns head value as it was before the move, i.e. where enqueue starts
+ * Returns head value as it was before the move
* @param new_head
- * Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- * Returns the amount of free space in the ring BEFORE head was moved
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
* @return
- * Actual number of objects enqueued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int is_st, unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
- const uint32_t capacity = r->capacity;
unsigned int max = n;
int success;
@@ -66,7 +81,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
/* Reset n to the initial burst count */
n = max;
- *old_head = r->prod.head;
+ *old_head = d->head;
/* add rmb barrier to avoid load/load reorder in weak
* memory model. It is noop on x86
@@ -76,97 +91,27 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = (capacity + r->cons.tail - *old_head);
+ *entries = (capacity + s->tail - *old_head);
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
return 0;
*new_head = *old_head + n;
- if (is_sp) {
- r->prod.head = *new_head;
+ if (is_st) {
+ d->head = *new_head;
success = 1;
} else
- success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->prod.head,
- *old_head, *new_head);
- } while (unlikely(success == 0));
- return n;
-}
-
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- * A pointer to the ring structure
- * @param is_sc
- * Indicates whether multi-consumer path is needed or not
- * @param n
- * The number of elements we will want to dequeue, i.e. how far should the
- * head be moved
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
- * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- * Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- * Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- * Returns the number of entries in the ring BEFORE head was moved
- * @return
- * - Actual number of objects dequeued.
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
- unsigned int n, enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head,
- uint32_t *entries)
-{
- unsigned int max = n;
- int success;
-
- /* move cons.head atomically */
- do {
- /* Restore n as it may change every loop */
- n = max;
-
- *old_head = r->cons.head;
-
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = (r->prod.tail - *old_head);
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- return 0;
-
- *new_head = *old_head + n;
- if (is_sc) {
- r->cons.head = *new_head;
- rte_smp_rmb();
- success = 1;
- } else {
- success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->cons.head,
+ success = rte_atomic32_cmpset(
+ (uint32_t *)(uintptr_t)&d->head,
*old_head, *new_head);
- }
} while (unlikely(success == 0));
return n;
}
@@ -51,19 +51,39 @@ __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
}
/**
- * @internal This function updates the producer head for enqueue
+ * @internal This is a helper function that moves the producer/consumer head
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * Indicates whether multi-thread safe path is needed or not
+ * @param num
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
-static __rte_always_inline unsigned int
-__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+static __rte_always_inline uint32_t
+__rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
- uint32_t *free_entries)
+ uint32_t *entries)
{
uint32_t n;
union __rte_ring_hts_pos np, op;
- const uint32_t capacity = r->capacity;
-
- op.raw = rte_atomic_load_explicit(&r->hts_prod.ht.raw, rte_memory_order_acquire);
+ op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
@@ -74,20 +94,20 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
* make sure that we read prod head/tail *before*
* reading cons tail.
*/
- __rte_ring_hts_head_wait(&r->hts_prod, &op);
+ __rte_ring_hts_head_wait(d, &op);
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > cons_tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = capacity + r->cons.tail - op.pos.head;
+ *entries = capacity + s->tail - op.pos.head;
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
break;
@@ -100,13 +120,25 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
* - OOO reads of cons tail value
* - OOO copy of elems from the ring
*/
- } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_prod.ht.raw,
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw,
(uint64_t *)(uintptr_t)&op.raw, np.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
+ rte_memory_order_acquire,
+ rte_memory_order_acquire) == 0);
*old_head = op.pos.head;
return n;
}
+/**
+ * @internal This function updates the producer head for enqueue
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *free_entries)
+{
+ return __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
+ r->capacity, num, behavior, old_head, free_entries);
+}
/**
* @internal This function updates the consumer head for dequeue
@@ -116,51 +148,8 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
uint32_t *entries)
{
- uint32_t n;
- union __rte_ring_hts_pos np, op;
-
- op.raw = rte_atomic_load_explicit(&r->hts_cons.ht.raw, rte_memory_order_acquire);
-
- /* move cons.head atomically */
- do {
- /* Restore n as it may change every loop */
- n = num;
-
- /*
- * wait for tail to be equal to head,
- * make sure that we read cons head/tail *before*
- * reading prod tail.
- */
- __rte_ring_hts_head_wait(&r->hts_cons, &op);
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = r->prod.tail - op.pos.head;
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- break;
-
- np.pos.tail = op.pos.tail;
- np.pos.head = op.pos.head + n;
-
- /*
- * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
- * - OOO reads of prod tail value
- * - OOO copy of elems from the ring
- */
- } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_cons.ht.raw,
- (uint64_t *)(uintptr_t)&op.raw, np.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
-
- *old_head = op.pos.head;
- return n;
+ return __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
+ 0, num, behavior, old_head, entries);
}
/**
@@ -65,19 +65,40 @@ __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
}
/**
- * @internal This function updates the producer head for enqueue.
+ * @internal This is a helper function that moves the producer/consumer head
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * Indicates whether multi-thread safe path is needed or not
+ * @param num
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline uint32_t
-__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+__rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
- uint32_t *free_entries)
+ uint32_t *entries)
{
uint32_t n;
union __rte_ring_rts_poscnt nh, oh;
- const uint32_t capacity = r->capacity;
-
- oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire);
+ oh.raw = rte_atomic_load_explicit(&d->head.raw,
+ rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
@@ -88,20 +109,20 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
* make sure that we read prod head *before*
* reading cons tail.
*/
- __rte_ring_rts_head_wait(&r->rts_prod, &oh);
+ __rte_ring_rts_head_wait(d, &oh);
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
- * *old_head > cons_tail). So 'free_entries' is always between 0
+ * *old_head > cons_tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *free_entries = capacity + r->cons.tail - oh.val.pos;
+ *entries = capacity + s->tail - oh.val.pos;
/* check that we have enough room in ring */
- if (unlikely(n > *free_entries))
+ if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
- 0 : *free_entries;
+ 0 : *entries;
if (n == 0)
break;
@@ -114,14 +135,27 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
* - OOO reads of cons tail value
* - OOO copy of elems to the ring
*/
- } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
(uint64_t *)(uintptr_t)&oh.raw, nh.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
+ rte_memory_order_acquire,
+ rte_memory_order_acquire) == 0);
*old_head = oh.val.pos;
return n;
}
+/**
+ * @internal This function updates the producer head for enqueue.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *free_entries)
+{
+ return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
+ r->capacity, num, behavior, old_head, free_entries);
+}
+
/**
* @internal This function updates the consumer head for dequeue
*/
@@ -130,51 +164,8 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
enum rte_ring_queue_behavior behavior, uint32_t *old_head,
uint32_t *entries)
{
- uint32_t n;
- union __rte_ring_rts_poscnt nh, oh;
-
- oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire);
-
- /* move cons.head atomically */
- do {
- /* Restore n as it may change every loop */
- n = num;
-
- /*
- * wait for cons head/tail distance,
- * make sure that we read cons head *before*
- * reading prod tail.
- */
- __rte_ring_rts_head_wait(&r->rts_cons, &oh);
-
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1.
- */
- *entries = r->prod.tail - oh.val.pos;
-
- /* Set the actual entries for dequeue */
- if (n > *entries)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (unlikely(n == 0))
- break;
-
- nh.val.pos = oh.val.pos + n;
- nh.val.cnt = oh.val.cnt + 1;
-
- /*
- * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
- * - OOO reads of prod tail value
- * - OOO copy of elems from the ring
- */
- } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
- (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
- rte_memory_order_acquire, rte_memory_order_acquire) == 0);
-
- *old_head = oh.val.pos;
- return n;
+ return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
+ 0, num, behavior, old_head, entries);
}
/**