[RFC,6/6] ring: minimize reads of the counterpart cache-line

Message ID 20240815085339.1434-7-konstantin.v.ananyev@yandex.ru (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series Stage-Ordered API and other extensions for ring library |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/loongarch-compilation success Compilation OK
ci/loongarch-unit-testing fail Unit Testing FAIL
ci/Intel-compilation fail Compilation issues
ci/intel-Testing success Testing PASS
ci/github-robot: build fail github build: failed
ci/intel-Functional success Functional PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/iol-intel-Performance pending Performance Testing pending
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-broadcom-Performance pending Performance Testing pending
ci/iol-broadcom-Functional success Functional Testing PASS
ci/iol-sample-apps-testing success Testing PASS
ci/iol-unit-arm64-testing fail Testing issues
ci/iol-compile-arm64-testing fail Testing issues
ci/iol-unit-amd64-testing fail Testing issues
ci/iol-compile-amd64-testing fail Testing issues

Commit Message

Konstantin Ananyev Aug. 15, 2024, 8:53 a.m. UTC
From: Konstantin Ananyev <konstantin.ananyev@huawei.com>

Note upfront: this change shouldn't affect rte_ring public API.
Though as layout of public structures have changed - it is an ABI
breakage.

This is an attempt to implement rte_ring optimization
that was suggested by Morten and discussed on this mailing
list a while ago.
The idea is to optimize MP/SP & MC/SC ring enqueue/dequeue ops
by storing along with the head its Cached Foreign Tail (CFT) value.
I.E.: for producer we cache consumer tail value and visa-versa.
To avoid races head and CFT values are read/written using atomic
64-bit ops.
In theory that might help by reducing number of times producer
needs to access consumer's cache-line and visa-versa.
In practice, I didn't see any impressive boost so far:
-  ring_per_autotest micro-bench - results are a mixed bag,
   Some are a bit better, some are worse.
 - [so]ring_stress_autotest  micro-benchs: ~10-15% improvement
 - l3fwd in wqorder/wqundorder (see previous patch for details):
   no real difference.

Though so far my testing scope was quite limited, I tried it only
on x86 machines. So can I ask all interested parties:
different platform vendors (ARM, PPC, etc.)
and people who do use rte_ring extensively to give it a try and come up
with the feedback.

If there would be no real performance improvements on
any platform we support, or some problems will be encountered -
I am ok to drop that patch.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
 drivers/net/mlx5/mlx5_hws_cnt.h   |  5 ++--
 drivers/net/ring/rte_eth_ring.c   |  2 +-
 lib/ring/rte_ring.c               |  6 ++--
 lib/ring/rte_ring_core.h          | 12 +++++++-
 lib/ring/rte_ring_generic_pvt.h   | 46 +++++++++++++++++++++----------
 lib/ring/rte_ring_peek_elem_pvt.h |  4 +--
 lib/ring/soring.c                 | 31 +++++++++++++++------
 lib/ring/soring.h                 |  4 +--
 8 files changed, 77 insertions(+), 33 deletions(-)
  

Patch

diff --git a/drivers/net/mlx5/mlx5_hws_cnt.h b/drivers/net/mlx5/mlx5_hws_cnt.h
index 996ac8dd9a..663146563c 100644
--- a/drivers/net/mlx5/mlx5_hws_cnt.h
+++ b/drivers/net/mlx5/mlx5_hws_cnt.h
@@ -388,11 +388,12 @@  __mlx5_hws_cnt_pool_enqueue_revert(struct rte_ring *r, unsigned int n,
 
 	MLX5_ASSERT(r->prod.sync_type == RTE_RING_SYNC_ST);
 	MLX5_ASSERT(r->cons.sync_type == RTE_RING_SYNC_ST);
-	current_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
+	current_head = rte_atomic_load_explicit(&r->prod.head.val.pos,
+			rte_memory_order_relaxed);
 	MLX5_ASSERT(n <= r->capacity);
 	MLX5_ASSERT(n <= rte_ring_count(r));
 	revert2head = current_head - n;
-	r->prod.head = revert2head; /* This ring should be SP. */
+	r->prod.head.val.pos = revert2head; /* This ring should be SP. */
 	__rte_ring_get_elem_addr(r, revert2head, sizeof(cnt_id_t), n,
 			&zcd->ptr1, &zcd->n1, &zcd->ptr2);
 	/* Update tail */
diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c
index 1346a0dba3..31009e90d2 100644
--- a/drivers/net/ring/rte_eth_ring.c
+++ b/drivers/net/ring/rte_eth_ring.c
@@ -325,7 +325,7 @@  eth_get_monitor_addr(void *rx_queue, struct rte_power_monitor_cond *pmc)
 	 */
 	pmc->addr = &rng->prod.head;
 	pmc->size = sizeof(rng->prod.head);
-	pmc->opaque[0] = rng->prod.head;
+	pmc->opaque[0] = rng->prod.head.val.pos;
 	pmc->fn = ring_monitor_callback;
 	return 0;
 }
diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
index aebb6d6728..cb2c39c7ad 100644
--- a/lib/ring/rte_ring.c
+++ b/lib/ring/rte_ring.c
@@ -102,7 +102,7 @@  reset_headtail(void *p)
 	switch (ht->sync_type) {
 	case RTE_RING_SYNC_MT:
 	case RTE_RING_SYNC_ST:
-		ht->head = 0;
+		ht->head.raw = 0;
 		ht->tail = 0;
 		break;
 	case RTE_RING_SYNC_MT_RTS:
@@ -373,9 +373,9 @@  rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "  size=%"PRIu32"\n", r->size);
 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
 	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head.val.pos);
 	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head.val.pos);
 	fprintf(f, "  used=%u\n", rte_ring_count(r));
 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
index 270869d214..b88a1bc352 100644
--- a/lib/ring/rte_ring_core.h
+++ b/lib/ring/rte_ring_core.h
@@ -66,8 +66,17 @@  enum rte_ring_sync_type {
  * Depending on sync_type format of that structure might be different,
  * but offset for *sync_type* and *tail* values should remain the same.
  */
+union __rte_ring_head_cft {
+	/** raw 8B value to read/write *cnt* and *pos* as one atomic op */
+	alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+	struct {
+		uint32_t pos; /**< head position */
+		uint32_t cft; /**< cached foreign tail value*/
+	} val;
+};
+
 struct rte_ring_headtail {
-	volatile RTE_ATOMIC(uint32_t) head;      /**< prod/consumer head. */
+	uint32_t __unused;
 	volatile RTE_ATOMIC(uint32_t) tail;      /**< prod/consumer tail. */
 	union {
 		/** sync type of prod/cons */
@@ -75,6 +84,7 @@  struct rte_ring_headtail {
 		/** deprecated -  True if single prod/cons */
 		uint32_t single;
 	};
+	union __rte_ring_head_cft head;
 };
 
 union __rte_ring_rts_poscnt {
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index 12f3595926..e70f4ff32c 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -38,17 +38,18 @@  __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 {
 	unsigned int max = n;
 	int success;
+	uint32_t tail;
+	union __rte_ring_head_cft nh, oh;
+
+	oh.raw = rte_atomic_load_explicit(&d->head.raw,
+			rte_memory_order_acquire);
 
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = d->head;
-
-		/* add rmb barrier to avoid load/load reorder in weak
-		 * memory model. It is noop on x86
-		 */
-		rte_smp_rmb();
+		*old_head = oh.val.pos;
+		tail = oh.val.cft;
 
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -56,24 +57,41 @@  __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 		 * *old_head > s->tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*entries = (capacity + s->tail - *old_head);
+		*entries = (capacity + tail - *old_head);
 
-		/* check that we have enough room in ring */
-		if (unlikely(n > *entries))
-			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+		/* attempt #1: check that we have enough room with
+		 * cached-foreign-tail value.
+		 * Note that actual tail value can go forward till we cached
+		 * it, in that case we might have to update our cached value.
+		 */
+		if (unlikely(n > *entries)) {
+
+			tail = rte_atomic_load_explicit(&s->tail,
+				rte_memory_order_relaxed);
+			*entries = (capacity + tail - *old_head);
+
+			/* attempt #2: check that we have enough room in ring */
+			if (unlikely(n > *entries))
+				n = (behavior == RTE_RING_QUEUE_FIXED) ?
 					0 : *entries;
+		}
 
 		if (n == 0)
 			return 0;
 
 		*new_head = *old_head + n;
+		nh.val.pos = *new_head;
+		nh.val.cft = tail;
+
 		if (is_st) {
-			d->head = *new_head;
+			d->head.raw = nh.raw;
 			success = 1;
 		} else
-			success = rte_atomic32_cmpset(
-					(uint32_t *)(uintptr_t)&d->head,
-					*old_head, *new_head);
+			success = rte_atomic_compare_exchange_strong_explicit(
+				&d->head.raw, (uint64_t *)(uintptr_t)&oh.raw,
+				nh.raw, rte_memory_order_acquire,
+				rte_memory_order_acquire);
+
 	} while (unlikely(success == 0));
 	return n;
 }
diff --git a/lib/ring/rte_ring_peek_elem_pvt.h b/lib/ring/rte_ring_peek_elem_pvt.h
index b5f0822b7e..e4dd0ae094 100644
--- a/lib/ring/rte_ring_peek_elem_pvt.h
+++ b/lib/ring/rte_ring_peek_elem_pvt.h
@@ -33,7 +33,7 @@  __rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail,
 {
 	uint32_t h, n, t;
 
-	h = ht->head;
+	h = ht->head.val.pos;
 	t = ht->tail;
 	n = h - t;
 
@@ -58,7 +58,7 @@  __rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail,
 	RTE_SET_USED(enqueue);
 
 	pos = tail + num;
-	ht->head = pos;
+	ht->head.val.pos = pos;
 	rte_atomic_store_explicit(&ht->tail, pos, rte_memory_order_release);
 }
 
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 929bde9697..baa449c872 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -90,7 +90,8 @@  __rte_soring_stage_finalize(struct soring_stage_headtail *sht,
 	 * already finished.
 	 */
 
-	head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
+	head = rte_atomic_load_explicit(&sht->head.val.pos,
+			rte_memory_order_relaxed);
 	n = RTE_MIN(head - ot.pos, maxn);
 	for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
 
@@ -213,22 +214,36 @@  __rte_soring_stage_move_head(struct soring_stage_headtail *d,
 	uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
 {
 	uint32_t n, tail;
+	union __rte_ring_head_cft nh, oh;
 
-	*old_head = rte_atomic_load_explicit(&d->head,
+	oh.raw = rte_atomic_load_explicit(&d->head.raw,
 			rte_memory_order_acquire);
 
 	do {
 		n = num;
-		tail = rte_atomic_load_explicit(&s->tail,
-				rte_memory_order_relaxed);
+		*old_head = oh.val.pos;
+		tail = oh.val.cft;
 		*avail = capacity + tail - *old_head;
-		if (n > *avail)
-			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
+
+		if (n > *avail) {
+			tail = rte_atomic_load_explicit(&s->tail,
+				rte_memory_order_relaxed);
+			*avail = capacity + tail - *old_head;
+
+			if (n > *avail)
+				n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : *avail;
+		}
+
 		if (n == 0)
 			return 0;
+
 		*new_head = *old_head + n;
-	} while (rte_atomic_compare_exchange_strong_explicit(&d->head,
-			old_head, *new_head, rte_memory_order_acquire,
+		nh.val.pos = *new_head;
+		nh.val.cft = tail;
+
+	} while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
+			&oh.raw, nh.raw, rte_memory_order_acquire,
 			rte_memory_order_acquire) == 0);
 
 	return n;
diff --git a/lib/ring/soring.h b/lib/ring/soring.h
index 3a3f6efa76..0fb333aa71 100644
--- a/lib/ring/soring.h
+++ b/lib/ring/soring.h
@@ -60,8 +60,8 @@  union soring_stage_tail {
 
 struct soring_stage_headtail {
 	volatile union soring_stage_tail tail;
-	enum rte_ring_sync_type unused;  /**< unused */
-	volatile RTE_ATOMIC(uint32_t) head;
+	enum rte_ring_sync_type __unused;  /**< unused */
+	union __rte_ring_head_cft head;
 };
 
 /**