[RFC,1/6] ring: common functions for 'move head' ops

Message ID 20240815085339.1434-2-konstantin.v.ananyev@yandex.ru (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series Stage-Ordered API and other extensions for ring library |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Konstantin Ananyev Aug. 15, 2024, 8:53 a.m. UTC
From: Konstantin Ananyev <konstantin.ananyev@huawei.com>

Note upfront: that change doesn't introduce any functional or
performance changes.
It is just a code-reordering for:
 - code deduplication
 - ability in future to re-use the same code to introduce new functionality

For each sync mode corresponding move_prod_head() and
move_cons_head() are nearly identical to each other,
the only differences are:
 - do we need to use a @capacity to calculate number of entries or not.
 - what we need to update (prod/cons) and what is used as
   read-only counterpart.
So instead of having 2 copies of nearly identical functions,
introduce a new common one that could be used by both functions:
move_prod_head() and move_cons_head().

As another positive thing - we can get rid of referencing whole rte_ring
structure in that new common sub-function.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
 lib/ring/rte_ring_c11_pvt.h      | 134 +++++--------------------------
 lib/ring/rte_ring_elem_pvt.h     |  66 +++++++++++++++
 lib/ring/rte_ring_generic_pvt.h  | 121 ++++------------------------
 lib/ring/rte_ring_hts_elem_pvt.h |  85 ++++++--------------
 lib/ring/rte_ring_rts_elem_pvt.h |  85 ++++++--------------
 5 files changed, 149 insertions(+), 342 deletions(-)
  

Patch

diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 629b2d9288..048933ddc6 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -28,41 +28,19 @@  __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
 	rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
 }
 
-/**
- * @internal This function updates the producer head for enqueue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sp
- *   Indicates whether multi-producer path is needed or not
- * @param n
- *   The number of elements we will want to enqueue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where enqueue starts
- * @param new_head
- *   Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- *   Returns the amount of free space in the ring BEFORE head was moved
- * @return
- *   Actual number of objects enqueued.
- *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
 static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
-		unsigned int n, enum rte_ring_queue_behavior behavior,
-		uint32_t *old_head, uint32_t *new_head,
-		uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+		const struct rte_ring_headtail *s, uint32_t capacity,
+		unsigned int is_st, unsigned int n,
+		enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
 {
-	const uint32_t capacity = r->capacity;
-	uint32_t cons_tail;
-	unsigned int max = n;
+	uint32_t stail;
 	int success;
+	unsigned int max = n;
 
-	*old_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
+	*old_head = rte_atomic_load_explicit(&d->head,
+			rte_memory_order_relaxed);
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
@@ -73,112 +51,36 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		/* load-acquire synchronize with store-release of ht->tail
 		 * in update_tail.
 		 */
-		cons_tail = rte_atomic_load_explicit(&r->cons.tail,
+		stail = rte_atomic_load_explicit(&s->tail,
 					rte_memory_order_acquire);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
-		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * *old_head > s->tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = (capacity + cons_tail - *old_head);
+		*entries = (capacity + stail - *old_head);
 
 		/* check that we have enough room in ring */
-		if (unlikely(n > *free_entries))
+		if (unlikely(n > *entries))
 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
-					0 : *free_entries;
+					0 : *entries;
 
 		if (n == 0)
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_sp) {
-			r->prod.head = *new_head;
+		if (is_st) {
+			d->head = *new_head;
 			success = 1;
 		} else
 			/* on failure, *old_head is updated */
-			success = rte_atomic_compare_exchange_strong_explicit(&r->prod.head,
-					old_head, *new_head,
+			success = rte_atomic_compare_exchange_strong_explicit(
+					&d->head, old_head, *new_head,
 					rte_memory_order_relaxed,
 					rte_memory_order_relaxed);
 	} while (unlikely(success == 0));
 	return n;
 }
 
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sc
- *   Indicates whether multi-consumer path is needed or not
- * @param n
- *   The number of elements we will want to dequeue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- *   Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- *   Returns the number of entries in the ring BEFORE head was moved
- * @return
- *   - Actual number of objects dequeued.
- *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
-		unsigned int n, enum rte_ring_queue_behavior behavior,
-		uint32_t *old_head, uint32_t *new_head,
-		uint32_t *entries)
-{
-	unsigned int max = n;
-	uint32_t prod_tail;
-	int success;
-
-	/* move cons.head atomically */
-	*old_head = rte_atomic_load_explicit(&r->cons.head, rte_memory_order_relaxed);
-	do {
-		/* Restore n as it may change every loop */
-		n = max;
-
-		/* Ensure the head is read before tail */
-		rte_atomic_thread_fence(rte_memory_order_acquire);
-
-		/* this load-acquire synchronize with store-release of ht->tail
-		 * in update_tail.
-		 */
-		prod_tail = rte_atomic_load_explicit(&r->prod.tail,
-					rte_memory_order_acquire);
-
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * cons_head > prod_tail). So 'entries' is always between 0
-		 * and size(ring)-1.
-		 */
-		*entries = (prod_tail - *old_head);
-
-		/* Set the actual entries for dequeue */
-		if (n > *entries)
-			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-		if (unlikely(n == 0))
-			return 0;
-
-		*new_head = *old_head + n;
-		if (is_sc) {
-			r->cons.head = *new_head;
-			success = 1;
-		} else
-			/* on failure, *old_head will be updated */
-			success = rte_atomic_compare_exchange_strong_explicit(&r->cons.head,
-							old_head, *new_head,
-							rte_memory_order_relaxed,
-							rte_memory_order_relaxed);
-	} while (unlikely(success == 0));
-	return n;
-}
-
 #endif /* _RTE_RING_C11_PVT_H_ */
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 4b80f58980..3a83668a08 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -293,6 +293,72 @@  __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
 #include "rte_ring_generic_pvt.h"
 #endif
 
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head,
+		uint32_t *free_entries)
+{
+	return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
+			is_sp, n, behavior, old_head, new_head, free_entries);
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to dequeue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head,
+		uint32_t *entries)
+{
+	return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
+			is_sc, n, behavior, old_head, new_head, entries);
+}
+
 /**
  * @internal Enqueue several objects on the ring
  *
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index 457f41dab3..12f3595926 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -29,36 +29,13 @@  __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
 	ht->tail = new_val;
 }
 
-/**
- * @internal This function updates the producer head for enqueue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sp
- *   Indicates whether multi-producer path is needed or not
- * @param n
- *   The number of elements we will want to enqueue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where enqueue starts
- * @param new_head
- *   Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- *   Returns the amount of free space in the ring BEFORE head was moved
- * @return
- *   Actual number of objects enqueued.
- *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
 static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
-		unsigned int n, enum rte_ring_queue_behavior behavior,
-		uint32_t *old_head, uint32_t *new_head,
-		uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+		const struct rte_ring_headtail *s, uint32_t capacity,
+		unsigned int is_st, unsigned int n,
+		enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
 {
-	const uint32_t capacity = r->capacity;
 	unsigned int max = n;
 	int success;
 
@@ -66,7 +43,7 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = r->prod.head;
+		*old_head = d->head;
 
 		/* add rmb barrier to avoid load/load reorder in weak
 		 * memory model. It is noop on x86
@@ -76,97 +53,27 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
-		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * *old_head > s->tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = (capacity + r->cons.tail - *old_head);
+		*entries = (capacity + s->tail - *old_head);
 
 		/* check that we have enough room in ring */
-		if (unlikely(n > *free_entries))
+		if (unlikely(n > *entries))
 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
-					0 : *free_entries;
+					0 : *entries;
 
 		if (n == 0)
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_sp) {
-			r->prod.head = *new_head;
+		if (is_st) {
+			d->head = *new_head;
 			success = 1;
 		} else
-			success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->prod.head,
-					*old_head, *new_head);
-	} while (unlikely(success == 0));
-	return n;
-}
-
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sc
- *   Indicates whether multi-consumer path is needed or not
- * @param n
- *   The number of elements we will want to dequeue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- *   Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- *   Returns the number of entries in the ring BEFORE head was moved
- * @return
- *   - Actual number of objects dequeued.
- *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
-		unsigned int n, enum rte_ring_queue_behavior behavior,
-		uint32_t *old_head, uint32_t *new_head,
-		uint32_t *entries)
-{
-	unsigned int max = n;
-	int success;
-
-	/* move cons.head atomically */
-	do {
-		/* Restore n as it may change every loop */
-		n = max;
-
-		*old_head = r->cons.head;
-
-		/* add rmb barrier to avoid load/load reorder in weak
-		 * memory model. It is noop on x86
-		 */
-		rte_smp_rmb();
-
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * cons_head > prod_tail). So 'entries' is always between 0
-		 * and size(ring)-1.
-		 */
-		*entries = (r->prod.tail - *old_head);
-
-		/* Set the actual entries for dequeue */
-		if (n > *entries)
-			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-		if (unlikely(n == 0))
-			return 0;
-
-		*new_head = *old_head + n;
-		if (is_sc) {
-			r->cons.head = *new_head;
-			rte_smp_rmb();
-			success = 1;
-		} else {
-			success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->cons.head,
+			success = rte_atomic32_cmpset(
+					(uint32_t *)(uintptr_t)&d->head,
 					*old_head, *new_head);
-		}
 	} while (unlikely(success == 0));
 	return n;
 }
diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index 91f5eeccb9..ed5f16879f 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -50,20 +50,16 @@  __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
 	}
 }
 
-/**
- * @internal This function updates the producer head for enqueue
- */
-static __rte_always_inline unsigned int
-__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+static __rte_always_inline uint32_t
+__rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
+	const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
-	uint32_t *free_entries)
+	uint32_t *entries)
 {
 	uint32_t n;
 	union __rte_ring_hts_pos np, op;
 
-	const uint32_t capacity = r->capacity;
-
-	op.raw = rte_atomic_load_explicit(&r->hts_prod.ht.raw, rte_memory_order_acquire);
+	op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire);
 
 	do {
 		/* Reset n to the initial burst count */
@@ -74,7 +70,7 @@  __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 		 * make sure that we read prod head/tail *before*
 		 * reading cons tail.
 		 */
-		__rte_ring_hts_head_wait(&r->hts_prod, &op);
+		__rte_ring_hts_head_wait(d, &op);
 
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -82,12 +78,12 @@  __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = capacity + r->cons.tail - op.pos.head;
+		*entries = capacity + s->tail - op.pos.head;
 
 		/* check that we have enough room in ring */
-		if (unlikely(n > *free_entries))
+		if (unlikely(n > *entries))
 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
-					0 : *free_entries;
+					0 : *entries;
 
 		if (n == 0)
 			break;
@@ -100,13 +96,25 @@  __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 	 *  - OOO reads of cons tail value
 	 *  - OOO copy of elems from the ring
 	 */
-	} while (rte_atomic_compare_exchange_strong_explicit(&r->hts_prod.ht.raw,
+	} while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw,
 			(uint64_t *)(uintptr_t)&op.raw, np.raw,
-			rte_memory_order_acquire, rte_memory_order_acquire) == 0);
+			rte_memory_order_acquire,
+			rte_memory_order_acquire) == 0);
 
 	*old_head = op.pos.head;
 	return n;
 }
+/**
+ * @internal This function updates the producer head for enqueue
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	return __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
+			r->capacity, num, behavior, old_head, free_entries);
+}
 
 /**
  * @internal This function updates the consumer head for dequeue
@@ -116,51 +124,8 @@  __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *entries)
 {
-	uint32_t n;
-	union __rte_ring_hts_pos np, op;
-
-	op.raw = rte_atomic_load_explicit(&r->hts_cons.ht.raw, rte_memory_order_acquire);
-
-	/* move cons.head atomically */
-	do {
-		/* Restore n as it may change every loop */
-		n = num;
-
-		/*
-		 * wait for tail to be equal to head,
-		 * make sure that we read cons head/tail *before*
-		 * reading prod tail.
-		 */
-		__rte_ring_hts_head_wait(&r->hts_cons, &op);
-
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * cons_head > prod_tail). So 'entries' is always between 0
-		 * and size(ring)-1.
-		 */
-		*entries = r->prod.tail - op.pos.head;
-
-		/* Set the actual entries for dequeue */
-		if (n > *entries)
-			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-		if (unlikely(n == 0))
-			break;
-
-		np.pos.tail = op.pos.tail;
-		np.pos.head = op.pos.head + n;
-
-	/*
-	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-	 *  - OOO reads of prod tail value
-	 *  - OOO copy of elems from the ring
-	 */
-	} while (rte_atomic_compare_exchange_strong_explicit(&r->hts_cons.ht.raw,
-			(uint64_t *)(uintptr_t)&op.raw, np.raw,
-			rte_memory_order_acquire, rte_memory_order_acquire) == 0);
-
-	*old_head = op.pos.head;
-	return n;
+	return __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
+			0, num, behavior, old_head, entries);
 }
 
 /**
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 122650346b..027409a3fa 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -64,20 +64,17 @@  __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
 	}
 }
 
-/**
- * @internal This function updates the producer head for enqueue.
- */
 static __rte_always_inline uint32_t
-__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+__rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
+	const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
-	uint32_t *free_entries)
+	uint32_t *entries)
 {
 	uint32_t n;
 	union __rte_ring_rts_poscnt nh, oh;
 
-	const uint32_t capacity = r->capacity;
-
-	oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire);
+	oh.raw = rte_atomic_load_explicit(&d->head.raw,
+			rte_memory_order_acquire);
 
 	do {
 		/* Reset n to the initial burst count */
@@ -88,7 +85,7 @@  __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 		 * make sure that we read prod head *before*
 		 * reading cons tail.
 		 */
-		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
+		__rte_ring_rts_head_wait(d, &oh);
 
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -96,12 +93,12 @@  __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = capacity + r->cons.tail - oh.val.pos;
+		*entries = capacity + s->tail - oh.val.pos;
 
 		/* check that we have enough room in ring */
-		if (unlikely(n > *free_entries))
+		if (unlikely(n > *entries))
 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
-					0 : *free_entries;
+					0 : *entries;
 
 		if (n == 0)
 			break;
@@ -114,14 +111,27 @@  __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 	 *  - OOO reads of cons tail value
 	 *  - OOO copy of elems to the ring
 	 */
-	} while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
+	} while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
 			(uint64_t *)(uintptr_t)&oh.raw, nh.raw,
-			rte_memory_order_acquire, rte_memory_order_acquire) == 0);
+			rte_memory_order_acquire,
+			rte_memory_order_acquire) == 0);
 
 	*old_head = oh.val.pos;
 	return n;
 }
 
+/**
+ * @internal This function updates the producer head for enqueue.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
+			r->capacity, num, behavior, old_head, free_entries);
+}
+
 /**
  * @internal This function updates the consumer head for dequeue
  */
@@ -130,51 +140,8 @@  __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *entries)
 {
-	uint32_t n;
-	union __rte_ring_rts_poscnt nh, oh;
-
-	oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire);
-
-	/* move cons.head atomically */
-	do {
-		/* Restore n as it may change every loop */
-		n = num;
-
-		/*
-		 * wait for cons head/tail distance,
-		 * make sure that we read cons head *before*
-		 * reading prod tail.
-		 */
-		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
-
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * cons_head > prod_tail). So 'entries' is always between 0
-		 * and size(ring)-1.
-		 */
-		*entries = r->prod.tail - oh.val.pos;
-
-		/* Set the actual entries for dequeue */
-		if (n > *entries)
-			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-		if (unlikely(n == 0))
-			break;
-
-		nh.val.pos = oh.val.pos + n;
-		nh.val.cnt = oh.val.cnt + 1;
-
-	/*
-	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-	 *  - OOO reads of prod tail value
-	 *  - OOO copy of elems from the ring
-	 */
-	} while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
-			(uint64_t *)(uintptr_t)&oh.raw, nh.raw,
-			rte_memory_order_acquire, rte_memory_order_acquire) == 0);
-
-	*old_head = oh.val.pos;
-	return n;
+	return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
+			0, num, behavior, old_head, entries);
 }
 
 /**