[RFC,4/6] app/test: add unit tests for soring API

Message ID 20240815085339.1434-5-konstantin.v.ananyev@yandex.ru (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series Stage-Ordered API and other extensions for ring library |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Konstantin Ananyev Aug. 15, 2024, 8:53 a.m. UTC
From: Konstantin Ananyev <konstantin.ananyev@huawei.com>

Add both functional and stess test-cases for soring API.
Stress test serves as both functional and performance test of soring
enqueue/dequeue/acquire/release operations under high contention
(for both over committed and non-over committed scenarios).

Signed-off-by: Eimear Morrissey <eimear.morrissey@huawei.com>
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
 app/test/meson.build               |   3 +
 app/test/test_soring.c             | 452 ++++++++++++++++
 app/test/test_soring_mt_stress.c   |  45 ++
 app/test/test_soring_stress.c      |  48 ++
 app/test/test_soring_stress.h      |  35 ++
 app/test/test_soring_stress_impl.h | 832 +++++++++++++++++++++++++++++
 6 files changed, 1415 insertions(+)
 create mode 100644 app/test/test_soring.c
 create mode 100644 app/test/test_soring_mt_stress.c
 create mode 100644 app/test/test_soring_stress.c
 create mode 100644 app/test/test_soring_stress.h
 create mode 100644 app/test/test_soring_stress_impl.h
  

Patch

diff --git a/app/test/meson.build b/app/test/meson.build
index e29258e6ec..c290162e43 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -175,6 +175,9 @@  source_file_deps = {
     'test_security_proto.c' : ['cryptodev', 'security'],
     'test_seqlock.c': [],
     'test_service_cores.c': [],
+    'test_soring.c': [],
+    'test_soring_mt_stress.c': [],
+    'test_soring_stress.c': [],
     'test_spinlock.c': [],
     'test_stack.c': ['stack'],
     'test_stack_perf.c': ['stack'],
diff --git a/app/test/test_soring.c b/app/test/test_soring.c
new file mode 100644
index 0000000000..381979bc6f
--- /dev/null
+++ b/app/test/test_soring.c
@@ -0,0 +1,452 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_launch.h>
+#include <rte_cycles.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_errno.h>
+#include <rte_hexdump.h>
+
+#include <rte_soring.h>
+
+#include "test.h"
+
+#define MAX_ACQUIRED 20
+
+#define SORING_TEST_ASSERT(val, expected) do { \
+	RTE_TEST_ASSERT(expected == val, \
+			"%s: expected %u got %u\n", #val, expected, val); \
+} while (0)
+
+static void
+set_soring_init_param(struct rte_soring_param *prm,
+		const char *name, uint32_t esize, uint32_t elems,
+		uint32_t stages, uint32_t stsize,
+		enum rte_ring_sync_type rst_prod,
+		enum rte_ring_sync_type rst_cons)
+{
+	prm->name = name;
+	prm->esize = esize;
+	prm->elems = elems;
+	prm->stages = stages;
+	prm->stsize = stsize;
+	prm->prod_synt = rst_prod;
+	prm->cons_synt = rst_cons;
+}
+
+static int
+move_forward_stage(struct rte_soring *sor,
+		uint32_t num_packets, uint32_t stage)
+{
+	uint32_t acquired;
+	uint32_t ftoken;
+	uint32_t *acquired_objs[MAX_ACQUIRED];
+
+	acquired = rte_soring_acquire(sor, acquired_objs, NULL, stage,
+			num_packets, RTE_RING_QUEUE_FIXED, &ftoken, NULL);
+	SORING_TEST_ASSERT(acquired, num_packets);
+	rte_soring_release(sor, NULL, NULL, stage, num_packets,
+			ftoken);
+
+	return 0;
+}
+
+/*
+ * struct rte_soring_param param checking.
+ */
+static int
+test_soring_init(void)
+{
+	struct rte_soring *sor = NULL;
+	struct rte_soring_param prm;
+	int rc;
+	size_t sz;
+	memset(&prm, 0, sizeof(prm));
+
+/*init memory*/
+	set_soring_init_param(&prm, "alloc_memory", sizeof(uintptr_t),
+			4, 1, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	sz = rte_soring_get_memsize(&prm);
+	sor = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+	RTE_TEST_ASSERT_NOT_NULL(sor, "could not allocate memory for soring");
+
+	set_soring_init_param(&prm, "test_invalid_stages", sizeof(uintptr_t),
+			4, 0, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_FAIL(rc, "initted soring with invalid num stages");
+
+	set_soring_init_param(&prm, "test_invalid_esize", 0,
+			4, 1, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_FAIL(rc, "initted soring with 0 esize");
+
+	set_soring_init_param(&prm, "test_invalid_esize", 9,
+			4, 1, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_FAIL(rc, "initted soring with esize not multiple of 4");
+
+	set_soring_init_param(&prm, "test_invalid_rsize", sizeof(uintptr_t),
+			4, 1, 3, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_FAIL(rc, "initted soring with rcsize not multiple of 4");
+
+	set_soring_init_param(&prm, "test_invalid_elems", sizeof(uintptr_t),
+			RTE_SORING_ELEM_MAX + 1, 1, 4, RTE_RING_SYNC_MT,
+			RTE_RING_SYNC_MT);
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_FAIL(rc, "initted soring with invalid num elements");
+
+	rte_free(sor);
+	return 0;
+}
+
+static int
+test_soring_get_memsize(void)
+{
+
+	struct rte_soring_param prm;
+	ssize_t sz;
+
+	set_soring_init_param(&prm, "memsize", sizeof(uint32_t *),
+			10, 1, 0, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	sz = rte_soring_get_memsize(&prm);
+	RTE_TEST_ASSERT(sz > 0, "failed to calculate size");
+
+	set_soring_init_param(&prm, "memsize", sizeof(uint8_t),
+			16, UINT32_MAX, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	sz = rte_soring_get_memsize(&prm);
+	RTE_TEST_ASSERT_EQUAL(sz, -EINVAL, "calcuated size incorrect");
+
+	set_soring_init_param(&prm, "memsize", 0,
+			16, UINT32_MAX, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	sz = rte_soring_get_memsize(&prm);
+	RTE_TEST_ASSERT_EQUAL(sz, -EINVAL, "calculated size incorrect");
+
+	return 0;
+
+}
+
+static int
+test_soring_stages(void)
+{
+	struct rte_soring *sor = NULL;
+	struct rte_soring_param prm;
+	uint32_t objs[32];
+	uint32_t rcs[32];
+	uint32_t acquired_objs[32];
+	uint32_t acquired_rcs[32];
+	uint32_t dequeued_rcs[32];
+	uint32_t dequeued_objs[32];
+	size_t ssz;
+	uint32_t stage, enqueued, dequeued, acquired;
+	uint32_t i, ftoken;
+
+	memset(&prm, 0, sizeof(prm));
+	set_soring_init_param(&prm, "stages", sizeof(uint32_t), 32,
+			10000, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	ssz = rte_soring_get_memsize(&prm);
+	RTE_TEST_ASSERT(ssz > 0, "parameter error calculating ring size");
+	sor = rte_zmalloc(NULL, ssz, RTE_CACHE_LINE_SIZE);
+	RTE_TEST_ASSERT_NOT_NULL(sor, "couldn't allocate memory for soring");
+	rte_soring_init(sor, &prm);
+
+	for (i = 0; i < 32; i++) {
+		rcs[i] = i;
+		objs[i] = i + 32;
+	}
+
+	enqueued = rte_soring_enqueue(sor, objs, rcs, 32,
+			RTE_RING_QUEUE_VARIABLE, NULL);
+	SORING_TEST_ASSERT(enqueued, 32);
+
+	for (stage = 0; stage < 10000; stage++) {
+		int j;
+		dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 32,
+				RTE_RING_QUEUE_FIXED, NULL);
+		/* check none at end stage */
+		SORING_TEST_ASSERT(dequeued, 0);
+
+		acquired = rte_soring_acquire(sor, acquired_objs,
+				acquired_rcs, stage, 32, RTE_RING_QUEUE_FIXED,
+				&ftoken, NULL);
+		SORING_TEST_ASSERT(acquired, 32);
+
+		for (j = 0; j < 32; j++) {
+			SORING_TEST_ASSERT(acquired_rcs[j], j + stage);
+			SORING_TEST_ASSERT(acquired_objs[j], j + stage + 32);
+			/* modify both queue object and rc */
+			acquired_objs[j]++;
+			acquired_rcs[j]++;
+		}
+
+		rte_soring_release(sor, acquired_objs,
+				acquired_rcs, stage, 32,
+				ftoken);
+	}
+
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, dequeued_rcs, 32,
+			RTE_RING_QUEUE_FIXED, NULL);
+	SORING_TEST_ASSERT(dequeued, 32);
+		for (i = 0; i < 32; i++) {
+			/* ensure we ended up with the expected values in order*/
+			SORING_TEST_ASSERT(dequeued_rcs[i], i + 10000);
+			SORING_TEST_ASSERT(dequeued_objs[i], i + 32 + 10000);
+		}
+	rte_free(sor);
+	return 0;
+}
+
+static int
+test_soring_enqueue_dequeue(void)
+{
+	struct rte_soring *sor = NULL;
+	struct rte_soring_param prm;
+	int rc;
+	uint32_t i;
+	size_t sz;
+	uint32_t queue_objs[10];
+	uint32_t *queue_objs_p[10];
+	uint32_t free_space;
+	uint32_t enqueued, dequeued;
+	uint32_t *dequeued_objs[10];
+
+	memset(&prm, 0, sizeof(prm));
+	for (i = 0; i < 10; i++) {
+		queue_objs[i] = i + 1;
+		queue_objs_p[i] = &queue_objs[i];
+	}
+
+/*init memory*/
+	set_soring_init_param(&prm, "enqueue/dequeue", sizeof(uint32_t *),
+			10, 1, 0, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	sz = rte_soring_get_memsize(&prm);
+	sor = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+	RTE_TEST_ASSERT_NOT_NULL(sor, "alloc failed for soring");
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_SUCCESS(rc, "Failed to init soring");
+
+	free_space = 0;
+
+	enqueued = rte_soring_enqueue(sor, queue_objs_p, NULL, 5,
+			RTE_RING_QUEUE_VARIABLE, &free_space);
+
+	SORING_TEST_ASSERT(free_space, 5);
+	SORING_TEST_ASSERT(enqueued, 5);
+
+	/* fixed amount enqueue */
+	enqueued = rte_soring_enqueue(sor, queue_objs_p, NULL, 7,
+			RTE_RING_QUEUE_FIXED, &free_space);
+	SORING_TEST_ASSERT(free_space, 5);
+	SORING_TEST_ASSERT(enqueued, 0);
+
+	/* variable amount enqueue */
+	enqueued = rte_soring_enqueue(sor, queue_objs_p + 5, NULL, 7,
+			RTE_RING_QUEUE_VARIABLE, &free_space);
+	SORING_TEST_ASSERT(free_space, 0);
+	SORING_TEST_ASSERT(enqueued, 5);
+
+	/* test no dequeue while stage 0 has not completed */
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 10,
+			RTE_RING_QUEUE_FIXED, NULL);
+	SORING_TEST_ASSERT(dequeued, 0);
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 10,
+			RTE_RING_QUEUE_VARIABLE, NULL);
+	SORING_TEST_ASSERT(dequeued, 0);
+
+	move_forward_stage(sor, 8, 0);
+
+	/* can only dequeue as many as have completed stage*/
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 10,
+			RTE_RING_QUEUE_FIXED, NULL);
+	SORING_TEST_ASSERT(dequeued, 0);
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 10,
+			RTE_RING_QUEUE_VARIABLE, NULL);
+	SORING_TEST_ASSERT(dequeued, 8);
+	/* packets remain in order */
+	for (i = 0; i < dequeued; i++) {
+		RTE_TEST_ASSERT_EQUAL(dequeued_objs[i],
+				queue_objs_p[i], "dequeued != enqueued");
+	}
+
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 1,
+			RTE_RING_QUEUE_VARIABLE, NULL);
+	SORING_TEST_ASSERT(dequeued, 0);
+
+	move_forward_stage(sor, 2, 0);
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, NULL, 2,
+			RTE_RING_QUEUE_VARIABLE, NULL);
+	SORING_TEST_ASSERT(dequeued, 2);
+	/* packets remain in order */
+	for (i = 0; i < dequeued; i++) {
+		RTE_TEST_ASSERT_EQUAL(dequeued_objs[i],
+				queue_objs_p[i + 8], "dequeued != enqueued");
+	}
+
+	rte_free(sor);
+	return 0;
+}
+
+static int
+test_soring_acquire_release(void)
+{
+
+	struct rte_soring *sor = NULL;
+	struct rte_soring_param prm;
+	int rc, i;
+	size_t sz;
+
+	memset(&prm, 0, sizeof(prm));
+	uint32_t queue_objs[10];
+	uint32_t rc_objs[10];
+	uint32_t acquired_objs[10];
+	uint32_t dequeued_objs[10];
+	uint32_t dequeued_rcs[10];
+	uint32_t s1_acquired_rcs[10];
+	uint32_t free_space, enqueued, ftoken, acquired, dequeued;
+
+	for (i = 0; i < 10; i++) {
+		queue_objs[i] = i + 5;
+		rc_objs[i] = i + 10;
+	}
+
+/*init memory*/
+	set_soring_init_param(&prm, "test_acquire_release", sizeof(uint32_t),
+			20, 2, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+	sz = rte_soring_get_memsize(&prm);
+	sor = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+	if (sor == NULL) {
+		printf("%s: alloc(%zu) for FIFO with %u elems failed",
+			__func__, sz, prm.elems);
+		return -ENOMEM;
+	}
+
+	/* init ring */
+	rc = rte_soring_init(sor, &prm);
+	RTE_TEST_ASSERT_SUCCESS(rc, "failed to init soring");
+
+	/* enqueue with associated rc */
+	enqueued = rte_soring_enqueue(sor, queue_objs, rc_objs, 5,
+			RTE_RING_QUEUE_VARIABLE, &free_space);
+	SORING_TEST_ASSERT(enqueued, 5);
+	/* enqueue without associated rc */
+	enqueued = rte_soring_enqueue(sor, queue_objs + 5, NULL, 5,
+			RTE_RING_QUEUE_VARIABLE, &free_space);
+	SORING_TEST_ASSERT(enqueued, 5);
+
+	/* acquire the objects with rc's and ensure they are as expected */
+	acquired = rte_soring_acquire(sor, acquired_objs,
+			s1_acquired_rcs, 0, 5, RTE_RING_QUEUE_VARIABLE, &ftoken, NULL);
+	SORING_TEST_ASSERT(acquired, 5);
+	for (i = 0; i < 5; i++) {
+		RTE_TEST_ASSERT_EQUAL(s1_acquired_rcs[i], rc_objs[i],
+				"acquired rc[%d]: %u != enqueued rc: %u",
+				i, s1_acquired_rcs[i], rc_objs[i]);
+		RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i],
+				"acquired obj[%d]: %u != enqueued obj %u",
+				i, acquired_objs[i], queue_objs[i]);
+	}
+	rte_soring_release(sor, NULL, NULL, 0, 5, ftoken);
+
+	/* acquire the objects without rc's and ensure they are as expected */
+	acquired = rte_soring_acquire(sor, acquired_objs,
+			s1_acquired_rcs, 0, 5, RTE_RING_QUEUE_VARIABLE,
+			&ftoken, NULL);
+	SORING_TEST_ASSERT(acquired, 5);
+	for (i = 0; i < 5; i++) {
+		/* as the rc area of memory is zero'd at init this is true
+		 * but this is a detail of implementation rather than
+		 * a guarantee.
+		 */
+		RTE_TEST_ASSERT_EQUAL(s1_acquired_rcs[i], 0,
+				"acquired rc not empty");
+		RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i + 5],
+				"acquired obj[%d]: %u != enqueued obj %u",
+				i, acquired_objs[i], queue_objs[i + 5]);
+	}
+	/*release the objects, adding rc's */
+	rte_soring_release(sor, NULL, rc_objs + 5, 0, 5,
+			ftoken);
+
+	acquired = rte_soring_acquire(sor, acquired_objs,
+			s1_acquired_rcs, 1, 10, RTE_RING_QUEUE_VARIABLE,
+			&ftoken, NULL);
+	SORING_TEST_ASSERT(acquired, 10);
+	for (i = 0; i < 10; i++) {
+		/* ensure the associated rc's are the ones added at relase */
+		RTE_TEST_ASSERT_EQUAL(s1_acquired_rcs[i], rc_objs[i],
+				"acquired rc[%d]: %u != enqueued rc: %u",
+				i, s1_acquired_rcs[i], rc_objs[i]);
+		RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i],
+				"acquired obj[%d]: %u != enqueued obj %u",
+				i, acquired_objs[i], queue_objs[i]);
+	}
+	/*release the objects, with rc's set to NULL */
+	rte_soring_release(sor, NULL, NULL, 1, 10, ftoken);
+
+	dequeued = rte_soring_dequeue(sor, dequeued_objs, dequeued_rcs, 10,
+			RTE_RING_QUEUE_VARIABLE, NULL);
+	SORING_TEST_ASSERT(dequeued, 10);
+	for (i = 0; i < 10; i++) {
+		/* ensure the associated rc's are the ones added at relase */
+		RTE_TEST_ASSERT_EQUAL(dequeued_rcs[i], rc_objs[i],
+				"dequeued rc[%d]: %u != enqueued rc: %u",
+				i, dequeued_rcs[i], rc_objs[i]);
+		RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i],
+				"acquired obj[%d]: %u != enqueued obj %u",
+				i, acquired_objs[i], queue_objs[i]);
+	}
+
+	rte_free(sor);
+	return 0;
+}
+
+static int
+test_soring(void)
+{
+
+	/* Negative test cases */
+	if (test_soring_init() < 0)
+		goto test_fail;
+
+	/* Memory calculations */
+	if (test_soring_get_memsize() < 0)
+		goto test_fail;
+
+	/* Basic enqueue/dequeue operations */
+	if (test_soring_enqueue_dequeue() < 0)
+		goto test_fail;
+
+	/* Acquire/release */
+	if (test_soring_acquire_release() < 0)
+		goto test_fail;
+
+	/* Test large number of stages */
+	if (test_soring_stages() < 0)
+		goto test_fail;
+
+	return 0;
+
+test_fail:
+	return -1;
+}
+
+REGISTER_FAST_TEST(soring_autotest, true, true, test_soring);
+
diff --git a/app/test/test_soring_mt_stress.c b/app/test/test_soring_mt_stress.c
new file mode 100644
index 0000000000..f18ea54444
--- /dev/null
+++ b/app/test/test_soring_mt_stress.c
@@ -0,0 +1,45 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include "test_soring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_bulk(struct rte_soring *r, void **obj, uint32_t n,
+	enum rte_ring_queue_behavior bhv, uint32_t *avail)
+{
+	return rte_soring_dequeue(r, obj, NULL, n, bhv, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_soring *r, void * const *obj, uint32_t n,
+	enum rte_ring_queue_behavior bhv, uint32_t *free)
+{
+	return rte_soring_enqueue(r, obj, NULL, n, bhv, free);
+}
+
+static inline uint32_t
+_st_ring_stage_acquire(struct rte_soring *r, uint32_t stage, void **obj,
+	uint32_t num, enum rte_ring_queue_behavior bhv, uint32_t *token,
+	uint32_t *avail)
+{
+	return rte_soring_acquire(r, obj, NULL, stage, num, bhv,
+			token, avail);
+}
+
+static inline void
+_st_ring_stage_release(struct rte_soring *r, uint32_t stage, uint32_t token,
+	void * const *obj, uint32_t num)
+{
+	RTE_SET_USED(obj);
+	rte_soring_release(r, NULL, NULL, stage, num, token);
+}
+
+static const enum rte_ring_queue_behavior ring_behavior =
+	RTE_RING_QUEUE_VARIABLE;
+
+const struct test test_soring_mt_stress = {
+	.name = "MT",
+	.nb_case = RTE_DIM(tests),
+	.cases = tests,
+};
diff --git a/app/test/test_soring_stress.c b/app/test/test_soring_stress.c
new file mode 100644
index 0000000000..334af6a29c
--- /dev/null
+++ b/app/test/test_soring_stress.c
@@ -0,0 +1,48 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include "test_soring_stress.h"
+
+static int
+run_test(const struct test *test)
+{
+	int32_t rc;
+	uint32_t i, k;
+
+	for (i = 0, k = 0; i != test->nb_case; i++) {
+
+		printf("TEST-CASE %s %s START\n",
+			test->name, test->cases[i].name);
+
+		rc = test->cases[i].func(test->cases[i].wfunc);
+		k += (rc == 0);
+
+		if (rc != 0)
+			printf("TEST-CASE %s %s FAILED\n",
+				test->name, test->cases[i].name);
+		else
+			printf("TEST-CASE %s %s OK\n",
+				test->name, test->cases[i].name);
+	}
+
+	return k;
+}
+
+static int
+test_ring_stress(void)
+{
+	uint32_t n, k;
+
+	n = 0;
+	k = 0;
+
+	n += test_soring_mt_stress.nb_case;
+	k += run_test(&test_soring_mt_stress);
+
+	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
+		n, k, n - k);
+	return (k != n);
+}
+
+REGISTER_TEST_COMMAND(soring_stress_autotest, test_ring_stress);
diff --git a/app/test/test_soring_stress.h b/app/test/test_soring_stress.h
new file mode 100644
index 0000000000..6190e96117
--- /dev/null
+++ b/app/test/test_soring_stress.h
@@ -0,0 +1,35 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+
+#include <inttypes.h>
+#include <stddef.h>
+#include <stdalign.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <rte_soring.h>
+#include <rte_cycles.h>
+#include <rte_launch.h>
+#include <rte_pause.h>
+#include <rte_random.h>
+#include <rte_malloc.h>
+#include <rte_spinlock.h>
+
+#include "test.h"
+
+struct test_case {
+	const char *name;
+	int (*func)(int (*)(void *));
+	int (*wfunc)(void *arg);
+};
+
+struct test {
+	const char *name;
+	uint32_t nb_case;
+	const struct test_case *cases;
+};
+
+extern const struct test test_soring_mt_stress;
diff --git a/app/test/test_soring_stress_impl.h b/app/test/test_soring_stress_impl.h
new file mode 100644
index 0000000000..a64c9eaaea
--- /dev/null
+++ b/app/test/test_soring_stress_impl.h
@@ -0,0 +1,832 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <stdalign.h>
+
+#include "test_soring_stress.h"
+
+/**
+ * Stress test for soring enqueue/dequeue/acquire/release operations.
+ * Depending on the role, performs at least one of the following patterns
+ * on each worker:
+ * - dequeue/read-write data from/to the dequeued objects/enqueue.
+ * - acquire/read-write data from/to the acquired objects/release.
+ * Serves as both functional and performance test of soring
+ * data-path API under high contention
+ * (for both over committed and non-over committed scenarios).
+ */
+
+#define RING_NAME	"SORING_STRESS"
+#define BULK_NUM	32
+#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
+
+#define MAX_STAGES	16
+
+enum {
+	WRK_CMD_STOP,
+	WRK_CMD_RUN,
+};
+
+static uint32_t wrk_cmd __rte_cache_aligned = WRK_CMD_STOP;
+
+/* test run-time in seconds */
+static const uint32_t run_time = 60;
+static const uint32_t verbose;
+
+struct lcore_op_stat {
+	uint64_t nb_lcore;
+	uint64_t nb_call;
+	uint64_t nb_obj;
+	uint64_t nb_cycle;
+	uint64_t max_cycle;
+	uint64_t min_cycle;
+};
+
+struct lcore_stat {
+	uint64_t nb_cycle;
+	struct lcore_op_stat deqenq;
+	uint32_t role_mask;
+	uint32_t nb_stage;
+	struct lcore_op_stat stage[MAX_STAGES];
+};
+
+#define	ROLE_DEQENQ	RTE_BIT32(0)
+#define	ROLE_STAGE(n)	RTE_BIT32(n + 1)
+
+struct lcore_arg {
+	struct rte_soring *rng;
+	struct lcore_stat stats;
+} __rte_cache_aligned;
+
+struct ring_elem {
+	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
+} __rte_cache_aligned;
+
+/*
+ * redefinable functions
+ */
+
+static const enum rte_ring_queue_behavior ring_behavior;
+
+static uint32_t
+_st_ring_dequeue_bulk(struct rte_soring *r, void **obj, uint32_t n,
+	enum rte_ring_queue_behavior bhv, uint32_t *avail);
+
+static uint32_t
+_st_ring_enqueue_bulk(struct rte_soring *r, void * const *obj, uint32_t n,
+	enum rte_ring_queue_behavior bhv, uint32_t *free);
+
+static uint32_t
+_st_ring_stage_acquire(struct rte_soring *r, uint32_t stage, void **obj,
+	uint32_t num, enum rte_ring_queue_behavior bhv, uint32_t *token,
+	uint32_t *avail);
+
+static void
+_st_ring_stage_release(struct rte_soring *r, uint32_t stage, uint32_t token,
+	void * const *obj, uint32_t num);
+
+static void
+lcore_op_stat_update(struct lcore_op_stat *ls, uint64_t call, uint64_t obj,
+	uint64_t tm, int32_t prcs)
+{
+	ls->nb_call += call;
+	ls->nb_obj += obj;
+	ls->nb_cycle += tm;
+	if (prcs) {
+		ls->max_cycle = RTE_MAX(ls->max_cycle, tm);
+		ls->min_cycle = RTE_MIN(ls->min_cycle, tm);
+	}
+}
+
+static void
+lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
+	uint64_t tm, int32_t prcs)
+{
+	uint32_t i;
+
+	ls->nb_cycle += tm;
+	lcore_op_stat_update(&ls->deqenq, call, obj, tm, prcs);
+	for (i = 0; i != ls->nb_stage; i++)
+		lcore_op_stat_update(ls->stage + i, call, obj, tm, prcs);
+}
+
+static void
+lcore_op_stat_aggr(struct lcore_op_stat *ms, const struct lcore_op_stat *ls)
+{
+	ms->nb_call += ls->nb_call;
+	ms->nb_obj += ls->nb_obj;
+	ms->nb_cycle += ls->nb_cycle;
+	ms->max_cycle = RTE_MAX(ms->max_cycle, ls->max_cycle);
+	ms->min_cycle = RTE_MIN(ms->min_cycle, ls->min_cycle);
+}
+
+static void
+lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
+{
+	uint32_t i;
+
+	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
+	lcore_op_stat_aggr(&ms->deqenq, &ls->deqenq);
+	ms->deqenq.nb_lcore += ((ls->role_mask & ROLE_DEQENQ) != 0);
+	for (i = 0; i != ms->nb_stage; i++) {
+		lcore_op_stat_aggr(ms->stage + i, ls->stage + i);
+		ms->stage[i].nb_lcore += ((ls->role_mask & ROLE_STAGE(i)) != 0);
+	}
+}
+
+static void
+lcore_op_stat_dump(FILE *f, const struct lcore_op_stat *ls, const char *cap,
+	long double st)
+{
+	fprintf(f, "\t%s={\n", cap);
+
+	fprintf(f, "\t\tnb_lcore=%" PRIu64 ",\n", ls->nb_lcore);
+	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->nb_call);
+	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->nb_obj);
+	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->nb_cycle);
+	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
+		(long double)ls->nb_obj / ls->nb_call);
+	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
+		(long double)ls->nb_cycle / ls->nb_obj);
+	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
+		(long double)ls->nb_cycle / ls->nb_call);
+
+	/* if min/max cycles per call stats was collected */
+	if (ls->min_cycle != UINT64_MAX) {
+		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+			ls->max_cycle,
+			(long double)ls->max_cycle / st);
+		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+			ls->min_cycle,
+			(long double)ls->min_cycle / st);
+	}
+
+	fprintf(f, "\t},\n");
+}
+
+static void
+lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
+{
+	uint32_t i;
+	long double st;
+	char cap[64];
+
+	st = (long double)rte_get_timer_hz() / US_PER_S;
+
+	if (lc == UINT32_MAX)
+		fprintf(f, "%s(AGGREGATE)={\n", __func__);
+	else
+		fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
+
+	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
+		ls->nb_cycle, (long double)ls->nb_cycle / st);
+
+	lcore_op_stat_dump(f, &ls->deqenq, "DEQ+ENQ", st);
+	for (i = 0; i != ls->nb_stage; i++) {
+		snprintf(cap, sizeof(cap), "%s#%u", "STAGE", i);
+		lcore_op_stat_dump(f, ls->stage + i, cap, st);
+	}
+
+	fprintf(f, "};\n");
+}
+
+static void
+fill_ring_elm(struct ring_elem *elm, uint32_t fill)
+{
+	uint32_t i;
+
+	for (i = 0; i != RTE_DIM(elm->cnt); i++)
+		elm->cnt[i] = fill;
+}
+
+static int32_t
+check_updt_elem(struct ring_elem *elm[], uint32_t num,
+	const struct ring_elem *check, const struct ring_elem *fill)
+{
+	uint32_t i;
+
+	static rte_spinlock_t dump_lock;
+
+	for (i = 0; i != num; i++) {
+		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
+			rte_spinlock_lock(&dump_lock);
+			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
+				"offending object: %p\n",
+				__func__, rte_lcore_id(), num, i, elm[i]);
+			rte_memdump(stdout, "expected", check, sizeof(*check));
+			rte_memdump(stdout, "result", elm[i], sizeof(*elm[i]));
+			rte_spinlock_unlock(&dump_lock);
+			return -EINVAL;
+		}
+		memcpy(elm[i], fill, sizeof(*elm[i]));
+	}
+
+	return 0;
+}
+
+static int
+check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
+	enum rte_ring_queue_behavior bhv, const char *fname, const char *opname)
+{
+	if (bhv == RTE_RING_QUEUE_FIXED && exp != res) {
+		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
+			fname, lc, opname, exp, res);
+		return -ENOSPC;
+	}
+	if (bhv == RTE_RING_QUEUE_VARIABLE && exp < res) {
+		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
+			fname, lc, opname, exp, res);
+		return -ENOSPC;
+	}
+	return 0;
+}
+
+/* num in interval [7/8, 11/8] of BULK_NUM */
+static inline uint32_t
+rand_elem_num(void)
+{
+	uint32_t num;
+
+	num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
+	return num;
+}
+
+/*
+ * for each enabled stage do:
+ *   acquire burst of objects
+ *   read and check their contents
+ *   update and check their contents
+ *   release burst of objects
+ * done
+ */
+static int32_t
+test_worker_stages(struct lcore_arg *la, uint32_t lc, const char *fname,
+	struct ring_elem *obj[2 * BULK_NUM], enum rte_ring_queue_behavior bhv,
+	const struct ring_elem *def_elm, const struct ring_elem *loc_elm,
+	const struct ring_elem stg_elm[MAX_STAGES], int32_t prcs)
+{
+	int32_t rc;
+	uint32_t i, n, num, tkn;
+	uint64_t tm0, tm1;
+	const struct ring_elem *celm, *pelm;
+
+	num = rand_elem_num();
+
+	rc = 0;
+	tkn = 0;
+	for (i = 0, pelm = def_elm; i != la->stats.nb_stage; pelm = celm, i++) {
+
+		celm = stg_elm + i;
+
+		/* given stage is not enabled on that lcore */
+		if ((la->stats.role_mask & ROLE_STAGE(i)) == 0)
+			continue;
+
+		/* reset all pointer values */
+		memset(obj, 0, sizeof(*obj) * num);
+
+		/* acquire num elems */
+		tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+		n = _st_ring_stage_acquire(la->rng, i, (void **)obj, num,
+				bhv, &tkn, NULL);
+		tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
+
+		/* check return value and objects */
+		rc = check_ring_op(num, n, lc, bhv, fname,
+			RTE_STR(_st_ring_stage_acquire));
+		if (rc == 0)
+			rc = check_updt_elem(obj, n, pelm, loc_elm);
+		if (rc != 0)
+			break;
+
+		/* release num elems */
+		rte_compiler_barrier();
+		rc = check_updt_elem(obj, n, loc_elm, celm);
+		if (rc != 0)
+			break;
+
+		if (n == 0)
+			tm1 = 0;
+		else {
+			tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+			_st_ring_stage_release(la->rng, i, tkn,
+					(void **)obj, n);
+			tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
+		}
+		lcore_op_stat_update(la->stats.stage + i, 1, n, tm0 + tm1,
+				prcs);
+	}
+
+	return rc;
+}
+
+static int32_t
+test_worker_deqenq(struct lcore_arg *la, uint32_t lc, const char *fname,
+	struct ring_elem *obj[2 * BULK_NUM], enum rte_ring_queue_behavior bhv,
+	const struct ring_elem *def_elm, const struct ring_elem *loc_elm,
+	const struct ring_elem *pelm, int32_t prcs)
+{
+	int32_t rc;
+	uint32_t k, n, num;
+	uint64_t tm0, tm1;
+
+	num = rand_elem_num();
+
+	/* reset all pointer values */
+	memset(obj, 0, sizeof(*obj) * num);
+
+	/* dequeue num elems */
+	tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+	n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, bhv, NULL);
+
+	tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
+
+	/* check return value and objects */
+	rc = check_ring_op(num, n, lc, bhv, fname,
+			RTE_STR(_st_ring_dequeue_bulk));
+	if (rc == 0)
+		rc = check_updt_elem(obj, n, pelm, loc_elm);
+	if (rc != 0)
+		return rc;
+
+	/* enqueue n elems */
+	rte_compiler_barrier();
+	rc = check_updt_elem(obj, n, loc_elm, def_elm);
+	if (rc != 0)
+		return rc;
+
+	tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+	k = _st_ring_enqueue_bulk(la->rng, (void **)obj, n,
+			RTE_RING_QUEUE_FIXED, NULL);
+	tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
+
+	/* check return value */
+	rc = check_ring_op(n, k, lc, RTE_RING_QUEUE_FIXED, fname,
+			RTE_STR(_st_ring_enqueue_bulk));
+	if (rc != 0)
+		return rc;
+
+	lcore_op_stat_update(&la->stats.deqenq, 1, n, tm0 + tm1, prcs);
+	return 0;
+}
+
+static int
+test_worker(void *arg, const char *fname, int32_t prcs)
+{
+	int32_t rc;
+	uint32_t i, lc;
+	uint64_t cl;
+	struct lcore_arg *la;
+	struct ring_elem *obj[2 * BULK_NUM];
+	struct ring_elem *pelm, def_elm, loc_elm, stg_elm[MAX_STAGES];
+
+	la = arg;
+	lc = rte_lcore_id();
+
+	fill_ring_elm(&def_elm, UINT32_MAX);
+	fill_ring_elm(&loc_elm, lc);
+
+	for (i = 0; i != RTE_DIM(stg_elm); i++)
+		fill_ring_elm(stg_elm + i, (i + 1) << 24);
+
+	pelm = stg_elm + la->stats.nb_stage - 1;
+
+	/* Acquire ordering is not required as the main is not
+	 * really releasing any data through 'wrk_cmd' to
+	 * the worker.
+	 */
+	while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) != WRK_CMD_RUN)
+		rte_pause();
+
+	cl = rte_rdtsc_precise();
+
+	do {
+		if ((la->stats.role_mask & ~ROLE_DEQENQ) != 0) {
+			rc = test_worker_stages(la, lc, fname, obj,
+				ring_behavior, &def_elm, &loc_elm, stg_elm,
+				prcs);
+			if (rc != 0)
+				break;
+		}
+
+		if ((la->stats.role_mask & ROLE_DEQENQ) != 0) {
+			rc = test_worker_deqenq(la, lc, fname, obj,
+				ring_behavior, &def_elm, &loc_elm, pelm, prcs);
+			if (rc != 0)
+				break;
+		}
+
+	} while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) == WRK_CMD_RUN);
+
+	cl = rte_rdtsc_precise() - cl;
+	if (prcs == 0)
+		lcore_stat_update(&la->stats, 0, 0, cl, 0);
+	la->stats.nb_cycle = cl;
+	return rc;
+}
+static int
+test_worker_prcs(void *arg)
+{
+	return test_worker(arg, __func__, 1);
+}
+
+static int
+test_worker_avg(void *arg)
+{
+	return test_worker(arg, __func__, 0);
+}
+
+static void
+mt1_fini(struct rte_soring *rng, void *data)
+{
+	rte_free(rng);
+	rte_free(data);
+}
+
+static int
+mt1_init(struct rte_soring **rng, void **data, uint32_t num,
+	enum rte_ring_sync_type prod_synt, enum rte_ring_sync_type cons_synt,
+	uint32_t nb_stages)
+{
+	int32_t rc;
+	size_t sz;
+	uint32_t i;
+	struct rte_soring *r;
+	struct ring_elem *elm;
+	void *p;
+	struct rte_soring_param prm;
+
+	*rng = NULL;
+	*data = NULL;
+
+	sz = num * sizeof(*elm);
+	elm = rte_zmalloc(NULL, sz, alignof(typeof(*elm)));
+	if (elm == NULL) {
+		printf("%s: alloc(%zu) for %u elems data failed",
+			__func__, sz, num);
+		return -ENOMEM;
+	}
+
+	*data = elm;
+
+	/* alloc soring */
+	memset(&prm, 0, sizeof(prm));
+
+	prm.name = __func__;
+	prm.elems = num;
+	prm.esize = sizeof(uintptr_t);
+	prm.stages = nb_stages;
+	prm.prod_synt = prod_synt;
+	prm.cons_synt = cons_synt;
+
+	sz = rte_soring_get_memsize(&prm);
+	r = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+	if (r == NULL) {
+		printf("%s: alloc(%zu) for FIFO with %u elems failed",
+			__func__, sz, prm.elems);
+		return -ENOMEM;
+	}
+
+	*rng = r;
+
+	rc = rte_soring_init(r, &prm);
+	if (rc != 0) {
+		printf("%s: rte_soring_init(r=%p,elems=%u,stages=%u) failed, "
+			"error: %d(%s)\n",
+			__func__, r, prm.elems, prm.stages, rc, strerror(-rc));
+		return rc;
+	}
+
+	for (i = 0; i != num; i++) {
+		fill_ring_elm(elm + i, UINT32_MAX);
+		p = elm + i;
+		if (_st_ring_enqueue_bulk(r, &p, 1, RTE_RING_QUEUE_FIXED,
+				NULL) != 1)
+			break;
+	}
+
+	if (i != num) {
+		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
+			__func__, r, num, i);
+		return -ENOSPC;
+	}
+
+	return 0;
+}
+
+static int
+test_mt(int (*test)(void *), enum rte_ring_sync_type prod_synt,
+	enum rte_ring_sync_type cons_synt, uint32_t nb_stage,
+	const uint32_t role_mask[RTE_MAX_LCORE])
+{
+	int32_t rc;
+	uint32_t i, lc, mc;
+	struct rte_soring *r;
+	void *data;
+	struct lcore_arg arg[RTE_MAX_LCORE];
+
+	static const struct lcore_op_stat init_stat = {
+		.min_cycle = UINT64_MAX,
+	};
+
+	rc = mt1_init(&r, &data, RING_SIZE, prod_synt, cons_synt, nb_stage);
+
+	if (rc != 0) {
+		mt1_fini(r, data);
+		return rc;
+	}
+
+	memset(arg, 0, sizeof(arg));
+
+	/* launch on all workers */
+	RTE_LCORE_FOREACH_WORKER(lc) {
+		arg[lc].rng = r;
+		arg[lc].stats.deqenq = init_stat;
+		arg[lc].stats.nb_stage = nb_stage;
+		arg[lc].stats.role_mask = role_mask[lc];
+		for (i = 0; i != arg[lc].stats.nb_stage; i++)
+			arg[lc].stats.stage[i] = init_stat;
+		rte_eal_remote_launch(test, &arg[lc], lc);
+	}
+
+	/* signal workers to start test */
+	__atomic_store_n(&wrk_cmd, WRK_CMD_RUN, __ATOMIC_RELEASE);
+
+	rte_delay_us(run_time * US_PER_S);
+
+	/* signal workers to stop test */
+	__atomic_store_n(&wrk_cmd, WRK_CMD_STOP, __ATOMIC_RELEASE);
+
+	/* wait for workers and collect stats. */
+	mc = rte_lcore_id();
+	arg[mc].stats.deqenq = init_stat;
+	arg[mc].stats.nb_stage = nb_stage;
+	for (i = 0; i != arg[mc].stats.nb_stage; i++)
+		arg[mc].stats.stage[i] = init_stat;
+
+	rc = 0;
+	RTE_LCORE_FOREACH_WORKER(lc) {
+		rc |= rte_eal_wait_lcore(lc);
+		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
+		if (verbose != 0)
+			lcore_stat_dump(stdout, lc, &arg[lc].stats);
+	}
+
+	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
+	mt1_fini(r, data);
+	return rc;
+}
+
+/*
+ * launch all stages and deq+enq on all worker lcores
+ */
+static void
+role_mask_sym(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+	uint32_t lc;
+	const uint32_t msk =  RTE_BIT32(nb_stage + 2) - 1;
+
+	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+	RTE_LCORE_FOREACH_WORKER(lc)
+		role_mask[lc] = msk;
+}
+
+/*
+ * Divide all workers in two (nearly) equal groups:
+ * - workers from 'even' group do deque+enque
+ * - workers from 'odd' group do acquire/release (for all stages)
+ */
+static void
+role_mask_even_odd(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+	uint32_t i, lc;
+	const uint32_t msk[2] = {
+		[0] = ROLE_DEQENQ,
+		[1] =  RTE_GENMASK32(nb_stage + 1, 1),
+	};
+
+	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+
+	i = 0;
+	RTE_LCORE_FOREACH_WORKER(lc) {
+		role_mask[lc] = msk[i & 1];
+		i++;
+	}
+	if (i == 1) {
+		lc = rte_get_next_lcore(-1, 1, 0);
+		role_mask[lc] |= msk[i & 1];
+	}
+}
+
+/*
+ * Divide all workers (nearly) evenly among all possible stages
+ */
+static void
+role_mask_div(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+	uint32_t i, lc;
+	uint32_t msk[nb_stage + 1];
+
+	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+
+	for (i = 0; i != RTE_DIM(msk); i++) {
+		msk[i] = RTE_BIT32(i);
+	};
+
+	i = 0;
+	RTE_LCORE_FOREACH_WORKER(lc) {
+		role_mask[lc] = msk[i % RTE_DIM(msk)];
+		i++;
+	}
+	if (i < RTE_DIM(msk)) {
+		lc = rte_get_next_lcore(-1, 1, 0);
+		for (; i != RTE_DIM(msk); i++)
+			role_mask[lc] |= msk[i % RTE_DIM(msk)];
+	}
+}
+
+/*
+ * one worker does ST enqueue+dequeue, while all others - stages processing.
+ */
+static void
+role_mask_denq_st(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+	uint32_t i, lc;
+	const uint32_t msk[2] = {
+		[0] = ROLE_DEQENQ,
+		[1] =  RTE_GENMASK32(nb_stage + 1, 1),
+	};
+
+	memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+
+	i = 0;
+	RTE_LCORE_FOREACH_WORKER(lc) {
+		if (i == 0)
+			role_mask[lc] = msk[0];
+		else
+			role_mask[lc] = msk[1];
+		i++;
+	}
+	if (i == 1) {
+		lc = rte_get_next_lcore(-1, 1, 0);
+		role_mask[lc] |= msk[1];
+	}
+}
+
+
+static int
+test_sym_mt1(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+	const uint32_t nb_stage = 1;
+
+	role_mask_sym(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+			nb_stage, role_mask);
+}
+
+static int
+test_sym_mt4(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+
+	const uint32_t nb_stage = 4;
+
+	role_mask_sym(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+			nb_stage, role_mask);
+}
+
+static int
+test_sym_mt_rts4(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+
+	const uint32_t nb_stage = 4;
+
+	role_mask_sym(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_MT_RTS, RTE_RING_SYNC_MT_RTS,
+			nb_stage, role_mask);
+}
+
+static int
+test_sym_mt_hts4(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+
+	const uint32_t nb_stage = 4;
+
+	role_mask_sym(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_MT_HTS, RTE_RING_SYNC_MT_HTS,
+			nb_stage, role_mask);
+}
+
+static int
+test_stdenq_stage4(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+
+	const uint32_t nb_stage = 4;
+
+	role_mask_denq_st(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_ST, RTE_RING_SYNC_ST,
+			nb_stage, role_mask);
+}
+
+
+static int
+test_even_odd_mt5(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+
+	const uint32_t nb_stage = 5;
+
+	role_mask_even_odd(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+			nb_stage, role_mask);
+}
+
+static int
+test_div_mt3(int (*test)(void *))
+{
+	uint32_t role_mask[RTE_MAX_LCORE];
+
+	const uint32_t nb_stage = 3;
+
+	role_mask_div(nb_stage, role_mask);
+	return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+			nb_stage, role_mask);
+}
+
+static const struct test_case tests[] = {
+	{
+		.name = "MT_DEQENQ-MT_STG1-PRCS",
+		.func = test_sym_mt1,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MT_DEQENQ-MT_STG1-AVG",
+		.func = test_sym_mt1,
+		.wfunc = test_worker_avg,
+	},
+	{
+		.name = "ST_DEQENQ-MT_STG4-PRCS",
+		.func = test_stdenq_stage4,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "ST_DEQENQ-MT_STG4-AVG",
+		.func = test_stdenq_stage4,
+		.wfunc = test_worker_avg,
+	},
+	{
+		.name = "MT_DEQENQ-MT_STG4-PRCS",
+		.func = test_sym_mt4,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MT_DEQENQ-MT_STG4-AVG",
+		.func = test_sym_mt4,
+		.wfunc = test_worker_avg,
+	},
+	{
+		.name = "MTRTS_DEQENQ-MT_STG4-PRCS",
+		.func = test_sym_mt_rts4,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MTRTS_DEQENQ-MT_STG4-AVG",
+		.func = test_sym_mt_rts4,
+		.wfunc = test_worker_avg,
+	},
+	{
+		.name = "MTHTS_DEQENQ-MT_STG4-PRCS",
+		.func = test_sym_mt_hts4,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MTHTS_DEQENQ-MT_STG4-AVG",
+		.func = test_sym_mt_hts4,
+		.wfunc = test_worker_avg,
+	},
+	{
+		.name = "MT_DEQENQ-MT_STG5-1:1-PRCS",
+		.func = test_even_odd_mt5,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MT_DEQENQ-MT_STG5-1:1-AVG",
+		.func = test_even_odd_mt5,
+		.wfunc = test_worker_avg,
+	},
+	{
+		.name = "MT_DEQENQ-MT_STG3-1:3-PRCS",
+		.func = test_div_mt3,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MT_DEQENQ_MT_STG3-1:3-AVG",
+		.func = test_div_mt3,
+		.wfunc = test_worker_avg,
+	},
+};