@@ -175,6 +175,9 @@ source_file_deps = {
'test_security_proto.c' : ['cryptodev', 'security'],
'test_seqlock.c': [],
'test_service_cores.c': [],
+ 'test_soring.c': [],
+ 'test_soring_mt_stress.c': [],
+ 'test_soring_stress.c': [],
'test_spinlock.c': [],
'test_stack.c': ['stack'],
'test_stack_perf.c': ['stack'],
new file mode 100644
@@ -0,0 +1,442 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_launch.h>
+#include <rte_cycles.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_errno.h>
+#include <rte_hexdump.h>
+
+#include <rte_soring.h>
+
+#include "test.h"
+
+#define MAX_ACQUIRED 20
+
+#define SORING_TEST_ASSERT(val, expected) do { \
+ RTE_TEST_ASSERT(expected == val, \
+ "%s: expected %u got %u\n", #val, expected, val); \
+} while (0)
+
+static void
+set_soring_init_param(struct rte_soring_param *prm,
+ const char *name, uint32_t esize, uint32_t elems,
+ uint32_t stages, uint32_t stsize,
+ enum rte_ring_sync_type rst_prod,
+ enum rte_ring_sync_type rst_cons)
+{
+ prm->name = name;
+ prm->elem_size = esize;
+ prm->elems = elems;
+ prm->stages = stages;
+ prm->meta_size = stsize;
+ prm->prod_synt = rst_prod;
+ prm->cons_synt = rst_cons;
+}
+
+static int
+move_forward_stage(struct rte_soring *sor,
+ uint32_t num_packets, uint32_t stage)
+{
+ uint32_t acquired;
+ uint32_t ftoken;
+ uint32_t *acquired_objs[MAX_ACQUIRED];
+
+ acquired = rte_soring_acquire_bulk(sor, acquired_objs, stage,
+ num_packets, &ftoken, NULL);
+ SORING_TEST_ASSERT(acquired, num_packets);
+ rte_soring_release(sor, NULL, stage, num_packets,
+ ftoken);
+
+ return 0;
+}
+
+/*
+ * struct rte_soring_param param checking.
+ */
+static int
+test_soring_init(void)
+{
+ struct rte_soring *sor = NULL;
+ struct rte_soring_param prm;
+ int rc;
+ size_t sz;
+ memset(&prm, 0, sizeof(prm));
+
+/*init memory*/
+ set_soring_init_param(&prm, "alloc_memory", sizeof(uintptr_t),
+ 4, 1, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ sz = rte_soring_get_memsize(&prm);
+ sor = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+ RTE_TEST_ASSERT_NOT_NULL(sor, "could not allocate memory for soring");
+
+ set_soring_init_param(&prm, "test_invalid_stages", sizeof(uintptr_t),
+ 4, 0, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_FAIL(rc, "initted soring with invalid num stages");
+
+ set_soring_init_param(&prm, "test_invalid_esize", 0,
+ 4, 1, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_FAIL(rc, "initted soring with 0 esize");
+
+ set_soring_init_param(&prm, "test_invalid_esize", 9,
+ 4, 1, 4, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_FAIL(rc, "initted soring with esize not multiple of 4");
+
+ set_soring_init_param(&prm, "test_invalid_rsize", sizeof(uintptr_t),
+ 4, 1, 3, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_FAIL(rc, "initted soring with rcsize not multiple of 4");
+
+ set_soring_init_param(&prm, "test_invalid_elems", sizeof(uintptr_t),
+ RTE_SORING_ELEM_MAX + 1, 1, 4, RTE_RING_SYNC_MT,
+ RTE_RING_SYNC_MT);
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_FAIL(rc, "initted soring with invalid num elements");
+
+ rte_free(sor);
+ return 0;
+}
+
+static int
+test_soring_get_memsize(void)
+{
+
+ struct rte_soring_param prm;
+ ssize_t sz;
+
+ set_soring_init_param(&prm, "memsize", sizeof(uint32_t *),
+ 10, 1, 0, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ sz = rte_soring_get_memsize(&prm);
+ RTE_TEST_ASSERT(sz > 0, "failed to calculate size");
+
+ set_soring_init_param(&prm, "memsize", sizeof(uint8_t),
+ 16, UINT32_MAX, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ sz = rte_soring_get_memsize(&prm);
+ RTE_TEST_ASSERT_EQUAL(sz, -EINVAL, "calcuated size incorrect");
+
+ set_soring_init_param(&prm, "memsize", 0,
+ 16, UINT32_MAX, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ sz = rte_soring_get_memsize(&prm);
+ RTE_TEST_ASSERT_EQUAL(sz, -EINVAL, "calculated size incorrect");
+
+ return 0;
+
+}
+
+static int
+test_soring_stages(void)
+{
+ struct rte_soring *sor = NULL;
+ struct rte_soring_param prm;
+ uint32_t objs[32];
+ uint32_t rcs[32];
+ uint32_t acquired_objs[32];
+ uint32_t acquired_rcs[32];
+ uint32_t dequeued_rcs[32];
+ uint32_t dequeued_objs[32];
+ size_t ssz;
+ uint32_t stage, enqueued, dequeued, acquired;
+ uint32_t i, ftoken;
+
+ memset(&prm, 0, sizeof(prm));
+ set_soring_init_param(&prm, "stages", sizeof(uint32_t), 32,
+ 10000, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ ssz = rte_soring_get_memsize(&prm);
+ RTE_TEST_ASSERT(ssz > 0, "parameter error calculating ring size");
+ sor = rte_zmalloc(NULL, ssz, RTE_CACHE_LINE_SIZE);
+ RTE_TEST_ASSERT_NOT_NULL(sor, "couldn't allocate memory for soring");
+ rte_soring_init(sor, &prm);
+
+ for (i = 0; i < 32; i++) {
+ rcs[i] = i;
+ objs[i] = i + 32;
+ }
+
+ enqueued = rte_soring_enqueux_burst(sor, objs, rcs, 32, NULL);
+ SORING_TEST_ASSERT(enqueued, 32);
+
+ for (stage = 0; stage < 10000; stage++) {
+ int j;
+ dequeued = rte_soring_dequeue_bulk(sor, dequeued_objs,
+ 32, NULL);
+ /* check none at end stage */
+ SORING_TEST_ASSERT(dequeued, 0);
+
+ acquired = rte_soring_acquirx_bulk(sor, acquired_objs,
+ acquired_rcs, stage, 32, &ftoken, NULL);
+ SORING_TEST_ASSERT(acquired, 32);
+
+ for (j = 0; j < 32; j++) {
+ SORING_TEST_ASSERT(acquired_rcs[j], j + stage);
+ SORING_TEST_ASSERT(acquired_objs[j], j + stage + 32);
+ /* modify both queue object and rc */
+ acquired_objs[j]++;
+ acquired_rcs[j]++;
+ }
+
+ rte_soring_releasx(sor, acquired_objs,
+ acquired_rcs, stage, 32,
+ ftoken);
+ }
+
+ dequeued = rte_soring_dequeux_bulk(sor, dequeued_objs, dequeued_rcs,
+ 32, NULL);
+ SORING_TEST_ASSERT(dequeued, 32);
+ for (i = 0; i < 32; i++) {
+ /* ensure we ended up with the expected values in order*/
+ SORING_TEST_ASSERT(dequeued_rcs[i], i + 10000);
+ SORING_TEST_ASSERT(dequeued_objs[i], i + 32 + 10000);
+ }
+ rte_free(sor);
+ return 0;
+}
+
+static int
+test_soring_enqueue_dequeue(void)
+{
+ struct rte_soring *sor = NULL;
+ struct rte_soring_param prm;
+ int rc;
+ uint32_t i;
+ size_t sz;
+ uint32_t queue_objs[10];
+ uint32_t *queue_objs_p[10];
+ uint32_t free_space;
+ uint32_t enqueued, dequeued;
+ uint32_t *dequeued_objs[10];
+
+ memset(&prm, 0, sizeof(prm));
+ for (i = 0; i < 10; i++) {
+ queue_objs[i] = i + 1;
+ queue_objs_p[i] = &queue_objs[i];
+ }
+
+/*init memory*/
+ set_soring_init_param(&prm, "enqueue/dequeue", sizeof(uint32_t *),
+ 10, 1, 0, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ sz = rte_soring_get_memsize(&prm);
+ sor = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+ RTE_TEST_ASSERT_NOT_NULL(sor, "alloc failed for soring");
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_SUCCESS(rc, "Failed to init soring");
+
+ free_space = 0;
+
+ enqueued = rte_soring_enqueue_burst(sor, queue_objs_p, 5, &free_space);
+
+ SORING_TEST_ASSERT(free_space, 5);
+ SORING_TEST_ASSERT(enqueued, 5);
+
+ /* fixed amount enqueue */
+ enqueued = rte_soring_enqueue_bulk(sor, queue_objs_p, 7, &free_space);
+
+ SORING_TEST_ASSERT(free_space, 5);
+ SORING_TEST_ASSERT(enqueued, 0);
+
+ /* variable amount enqueue */
+ enqueued = rte_soring_enqueue_burst(sor, queue_objs_p + 5, 7,
+ &free_space);
+ SORING_TEST_ASSERT(free_space, 0);
+ SORING_TEST_ASSERT(enqueued, 5);
+
+ /* test no dequeue while stage 0 has not completed */
+ dequeued = rte_soring_dequeue_bulk(sor, dequeued_objs, 10, NULL);
+ SORING_TEST_ASSERT(dequeued, 0);
+ dequeued = rte_soring_dequeue_burst(sor, dequeued_objs, 10, NULL);
+ SORING_TEST_ASSERT(dequeued, 0);
+
+ move_forward_stage(sor, 8, 0);
+
+ /* can only dequeue as many as have completed stage*/
+ dequeued = rte_soring_dequeue_bulk(sor, dequeued_objs, 10, NULL);
+ SORING_TEST_ASSERT(dequeued, 0);
+ dequeued = rte_soring_dequeue_burst(sor, dequeued_objs, 10, NULL);
+ SORING_TEST_ASSERT(dequeued, 8);
+ /* packets remain in order */
+ for (i = 0; i < dequeued; i++) {
+ RTE_TEST_ASSERT_EQUAL(dequeued_objs[i],
+ queue_objs_p[i], "dequeued != enqueued");
+ }
+
+ dequeued = rte_soring_dequeue_burst(sor, dequeued_objs, 1, NULL);
+ SORING_TEST_ASSERT(dequeued, 0);
+
+ move_forward_stage(sor, 2, 0);
+ dequeued = rte_soring_dequeue_burst(sor, dequeued_objs, 2, NULL);
+ SORING_TEST_ASSERT(dequeued, 2);
+ /* packets remain in order */
+ for (i = 0; i < dequeued; i++) {
+ RTE_TEST_ASSERT_EQUAL(dequeued_objs[i],
+ queue_objs_p[i + 8], "dequeued != enqueued");
+ }
+
+ rte_soring_dump(stdout, sor);
+ rte_free(sor);
+ return 0;
+}
+
+static int
+test_soring_acquire_release(void)
+{
+
+ struct rte_soring *sor = NULL;
+ struct rte_soring_param prm;
+ int rc, i;
+ size_t sz;
+
+ memset(&prm, 0, sizeof(prm));
+ uint32_t queue_objs[10];
+ uint32_t rc_objs[10];
+ uint32_t acquired_objs[10];
+ uint32_t dequeued_objs[10];
+ uint32_t dequeued_rcs[10];
+ uint32_t s1_acquired_rcs[10];
+ uint32_t free_space, enqueued, ftoken, acquired, dequeued;
+
+ for (i = 0; i < 10; i++) {
+ queue_objs[i] = i + 5;
+ rc_objs[i] = i + 10;
+ }
+
+/*init memory*/
+ set_soring_init_param(&prm, "test_acquire_release", sizeof(uint32_t),
+ 20, 2, sizeof(uint32_t), RTE_RING_SYNC_MT, RTE_RING_SYNC_MT);
+ sz = rte_soring_get_memsize(&prm);
+ sor = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+ if (sor == NULL) {
+ printf("%s: alloc(%zu) for FIFO with %u elems failed",
+ __func__, sz, prm.elems);
+ return -ENOMEM;
+ }
+
+ /* init ring */
+ rc = rte_soring_init(sor, &prm);
+ RTE_TEST_ASSERT_SUCCESS(rc, "failed to init soring");
+
+ /* enqueue with associated rc */
+ enqueued = rte_soring_enqueux_burst(sor, queue_objs, rc_objs, 5,
+ &free_space);
+ SORING_TEST_ASSERT(enqueued, 5);
+ /* enqueue without associated rc */
+ enqueued = rte_soring_enqueue_burst(sor, queue_objs + 5, 5,
+ &free_space);
+ SORING_TEST_ASSERT(enqueued, 5);
+
+ /* acquire the objects with rc's and ensure they are as expected */
+ acquired = rte_soring_acquirx_burst(sor, acquired_objs,
+ s1_acquired_rcs, 0, 5, &ftoken, NULL);
+ SORING_TEST_ASSERT(acquired, 5);
+ for (i = 0; i < 5; i++) {
+ RTE_TEST_ASSERT_EQUAL(s1_acquired_rcs[i], rc_objs[i],
+ "acquired rc[%d]: %u != enqueued rc: %u",
+ i, s1_acquired_rcs[i], rc_objs[i]);
+ RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i],
+ "acquired obj[%d]: %u != enqueued obj %u",
+ i, acquired_objs[i], queue_objs[i]);
+ }
+ rte_soring_release(sor, NULL, 0, 5, ftoken);
+
+ /* acquire the objects without rc's and ensure they are as expected */
+ acquired = rte_soring_acquirx_burst(sor, acquired_objs,
+ s1_acquired_rcs, 0, 5, &ftoken, NULL);
+ SORING_TEST_ASSERT(acquired, 5);
+ for (i = 0; i < 5; i++) {
+ /* as the rc area of memory is zero'd at init this is true
+ * but this is a detail of implementation rather than
+ * a guarantee.
+ */
+ RTE_TEST_ASSERT_EQUAL(s1_acquired_rcs[i], 0,
+ "acquired rc not empty");
+ RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i + 5],
+ "acquired obj[%d]: %u != enqueued obj %u",
+ i, acquired_objs[i], queue_objs[i + 5]);
+ }
+ /*release the objects, adding rc's */
+ rte_soring_releasx(sor, NULL, rc_objs + 5, 0, 5,
+ ftoken);
+
+ acquired = rte_soring_acquirx_burst(sor, acquired_objs,
+ s1_acquired_rcs, 1, 10, &ftoken, NULL);
+ SORING_TEST_ASSERT(acquired, 10);
+ for (i = 0; i < 10; i++) {
+ /* ensure the associated rc's are the ones added at relase */
+ RTE_TEST_ASSERT_EQUAL(s1_acquired_rcs[i], rc_objs[i],
+ "acquired rc[%d]: %u != enqueued rc: %u",
+ i, s1_acquired_rcs[i], rc_objs[i]);
+ RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i],
+ "acquired obj[%d]: %u != enqueued obj %u",
+ i, acquired_objs[i], queue_objs[i]);
+ }
+ /*release the objects, with rc's set to NULL */
+ rte_soring_release(sor, NULL, 1, 10, ftoken);
+
+ dequeued = rte_soring_dequeux_burst(sor, dequeued_objs, dequeued_rcs,
+ 10, NULL);
+ SORING_TEST_ASSERT(dequeued, 10);
+ for (i = 0; i < 10; i++) {
+ /* ensure the associated rc's are the ones added at relase */
+ RTE_TEST_ASSERT_EQUAL(dequeued_rcs[i], rc_objs[i],
+ "dequeued rc[%d]: %u != enqueued rc: %u",
+ i, dequeued_rcs[i], rc_objs[i]);
+ RTE_TEST_ASSERT_EQUAL(acquired_objs[i], queue_objs[i],
+ "acquired obj[%d]: %u != enqueued obj %u",
+ i, acquired_objs[i], queue_objs[i]);
+ }
+ rte_soring_dump(stdout, sor);
+ rte_free(sor);
+ return 0;
+}
+
+static int
+test_soring(void)
+{
+
+ /* Negative test cases */
+ if (test_soring_init() < 0)
+ goto test_fail;
+
+ /* Memory calculations */
+ if (test_soring_get_memsize() < 0)
+ goto test_fail;
+
+ /* Basic enqueue/dequeue operations */
+ if (test_soring_enqueue_dequeue() < 0)
+ goto test_fail;
+
+ /* Acquire/release */
+ if (test_soring_acquire_release() < 0)
+ goto test_fail;
+
+ /* Test large number of stages */
+ if (test_soring_stages() < 0)
+ goto test_fail;
+
+ return 0;
+
+test_fail:
+ return -1;
+}
+
+REGISTER_FAST_TEST(soring_autotest, true, true, test_soring);
+
new file mode 100644
@@ -0,0 +1,40 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include "test_soring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_burst(struct rte_soring *r, void **obj, uint32_t n,
+ uint32_t *avail)
+{
+ return rte_soring_dequeue_burst(r, obj, n, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_soring *r, void * const *obj, uint32_t n,
+ uint32_t *free)
+{
+ return rte_soring_enqueue_bulk(r, obj, n, free);
+}
+
+static inline uint32_t
+_st_ring_acquire_burst(struct rte_soring *r, uint32_t stage, void **obj,
+ uint32_t num, uint32_t *token, uint32_t *avail)
+{
+ return rte_soring_acquire_burst(r, obj, stage, num, token, avail);
+}
+
+static inline void
+_st_ring_release(struct rte_soring *r, uint32_t stage, uint32_t token,
+ void * const *obj, uint32_t num)
+{
+ RTE_SET_USED(obj);
+ rte_soring_release(r, NULL, stage, num, token);
+}
+
+const struct test test_soring_mt_stress = {
+ .name = "MT",
+ .nb_case = RTE_DIM(tests),
+ .cases = tests,
+};
new file mode 100644
@@ -0,0 +1,48 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include "test_soring_stress.h"
+
+static int
+run_test(const struct test *test)
+{
+ int32_t rc;
+ uint32_t i, k;
+
+ for (i = 0, k = 0; i != test->nb_case; i++) {
+
+ printf("TEST-CASE %s %s START\n",
+ test->name, test->cases[i].name);
+
+ rc = test->cases[i].func(test->cases[i].wfunc);
+ k += (rc == 0);
+
+ if (rc != 0)
+ printf("TEST-CASE %s %s FAILED\n",
+ test->name, test->cases[i].name);
+ else
+ printf("TEST-CASE %s %s OK\n",
+ test->name, test->cases[i].name);
+ }
+
+ return k;
+}
+
+static int
+test_ring_stress(void)
+{
+ uint32_t n, k;
+
+ n = 0;
+ k = 0;
+
+ n += test_soring_mt_stress.nb_case;
+ k += run_test(&test_soring_mt_stress);
+
+ printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
+ n, k, n - k);
+ return (k != n);
+}
+
+REGISTER_TEST_COMMAND(soring_stress_autotest, test_ring_stress);
new file mode 100644
@@ -0,0 +1,35 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+
+#include <inttypes.h>
+#include <stddef.h>
+#include <stdalign.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <rte_soring.h>
+#include <rte_cycles.h>
+#include <rte_launch.h>
+#include <rte_pause.h>
+#include <rte_random.h>
+#include <rte_malloc.h>
+#include <rte_spinlock.h>
+
+#include "test.h"
+
+struct test_case {
+ const char *name;
+ int (*func)(int (*)(void *));
+ int (*wfunc)(void *arg);
+};
+
+struct test {
+ const char *name;
+ uint32_t nb_case;
+ const struct test_case *cases;
+};
+
+extern const struct test test_soring_mt_stress;
new file mode 100644
@@ -0,0 +1,827 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <stdalign.h>
+
+#include "test_soring_stress.h"
+
+/**
+ * Stress test for soring enqueue/dequeue/acquire/release operations.
+ * Depending on the role, performs at least one of the following patterns
+ * on each worker:
+ * - dequeue/read-write data from/to the dequeued objects/enqueue.
+ * - acquire/read-write data from/to the acquired objects/release.
+ * Serves as both functional and performance test of soring
+ * data-path API under high contention
+ * (for both over committed and non-over committed scenarios).
+ */
+
+#define RING_NAME "SORING_STRESS"
+#define BULK_NUM 32
+#define RING_SIZE (2 * BULK_NUM * RTE_MAX_LCORE)
+
+#define MAX_STAGES 16
+
+enum {
+ WRK_CMD_STOP,
+ WRK_CMD_RUN,
+};
+
+static uint32_t wrk_cmd __rte_cache_aligned = WRK_CMD_STOP;
+
+/* test run-time in seconds */
+static const uint32_t run_time = 60;
+static const uint32_t verbose;
+
+struct lcore_op_stat {
+ uint64_t nb_lcore;
+ uint64_t nb_call;
+ uint64_t nb_obj;
+ uint64_t nb_cycle;
+ uint64_t max_cycle;
+ uint64_t min_cycle;
+};
+
+struct lcore_stat {
+ uint64_t nb_cycle;
+ struct lcore_op_stat deqenq;
+ uint32_t role_mask;
+ uint32_t nb_stage;
+ struct lcore_op_stat stage[MAX_STAGES];
+};
+
+#define ROLE_DEQENQ RTE_BIT32(0)
+#define ROLE_STAGE(n) RTE_BIT32(n + 1)
+
+struct lcore_arg {
+ struct rte_soring *rng;
+ struct lcore_stat stats;
+} __rte_cache_aligned;
+
+struct ring_elem {
+ uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
+} __rte_cache_aligned;
+
+/*
+ * redefinable functions
+ */
+
+static uint32_t
+_st_ring_dequeue_burst(struct rte_soring *r, void **obj, uint32_t n,
+ uint32_t *avail);
+
+static uint32_t
+_st_ring_enqueue_bulk(struct rte_soring *r, void * const *obj, uint32_t n,
+ uint32_t *free);
+
+static uint32_t
+_st_ring_acquire_burst(struct rte_soring *r, uint32_t stage, void **obj,
+ uint32_t num, uint32_t *token, uint32_t *avail);
+
+static void
+_st_ring_release(struct rte_soring *r, uint32_t stage, uint32_t token,
+ void * const *obj, uint32_t num);
+
+static void
+lcore_op_stat_update(struct lcore_op_stat *ls, uint64_t call, uint64_t obj,
+ uint64_t tm, int32_t prcs)
+{
+ ls->nb_call += call;
+ ls->nb_obj += obj;
+ ls->nb_cycle += tm;
+ if (prcs) {
+ ls->max_cycle = RTE_MAX(ls->max_cycle, tm);
+ ls->min_cycle = RTE_MIN(ls->min_cycle, tm);
+ }
+}
+
+static void
+lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
+ uint64_t tm, int32_t prcs)
+{
+ uint32_t i;
+
+ ls->nb_cycle += tm;
+ lcore_op_stat_update(&ls->deqenq, call, obj, tm, prcs);
+ for (i = 0; i != ls->nb_stage; i++)
+ lcore_op_stat_update(ls->stage + i, call, obj, tm, prcs);
+}
+
+static void
+lcore_op_stat_aggr(struct lcore_op_stat *ms, const struct lcore_op_stat *ls)
+{
+ ms->nb_call += ls->nb_call;
+ ms->nb_obj += ls->nb_obj;
+ ms->nb_cycle += ls->nb_cycle;
+ ms->max_cycle = RTE_MAX(ms->max_cycle, ls->max_cycle);
+ ms->min_cycle = RTE_MIN(ms->min_cycle, ls->min_cycle);
+}
+
+static void
+lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
+{
+ uint32_t i;
+
+ ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
+ lcore_op_stat_aggr(&ms->deqenq, &ls->deqenq);
+ ms->deqenq.nb_lcore += ((ls->role_mask & ROLE_DEQENQ) != 0);
+ for (i = 0; i != ms->nb_stage; i++) {
+ lcore_op_stat_aggr(ms->stage + i, ls->stage + i);
+ ms->stage[i].nb_lcore += ((ls->role_mask & ROLE_STAGE(i)) != 0);
+ }
+}
+
+static void
+lcore_op_stat_dump(FILE *f, const struct lcore_op_stat *ls, const char *cap,
+ long double st)
+{
+ fprintf(f, "\t%s={\n", cap);
+
+ fprintf(f, "\t\tnb_lcore=%" PRIu64 ",\n", ls->nb_lcore);
+ fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->nb_call);
+ fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->nb_obj);
+ fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->nb_cycle);
+ fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
+ (long double)ls->nb_obj / ls->nb_call);
+ fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
+ (long double)ls->nb_cycle / ls->nb_obj);
+ fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
+ (long double)ls->nb_cycle / ls->nb_call);
+
+ /* if min/max cycles per call stats was collected */
+ if (ls->min_cycle != UINT64_MAX) {
+ fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+ ls->max_cycle,
+ (long double)ls->max_cycle / st);
+ fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+ ls->min_cycle,
+ (long double)ls->min_cycle / st);
+ }
+
+ fprintf(f, "\t},\n");
+}
+
+static void
+lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
+{
+ uint32_t i;
+ long double st;
+ char cap[64];
+
+ st = (long double)rte_get_timer_hz() / US_PER_S;
+
+ if (lc == UINT32_MAX)
+ fprintf(f, "%s(AGGREGATE)={\n", __func__);
+ else
+ fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
+
+ fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
+ ls->nb_cycle, (long double)ls->nb_cycle / st);
+
+ lcore_op_stat_dump(f, &ls->deqenq, "DEQ+ENQ", st);
+ for (i = 0; i != ls->nb_stage; i++) {
+ snprintf(cap, sizeof(cap), "%s#%u", "STAGE", i);
+ lcore_op_stat_dump(f, ls->stage + i, cap, st);
+ }
+
+ fprintf(f, "};\n");
+}
+
+static void
+fill_ring_elm(struct ring_elem *elm, uint32_t fill)
+{
+ uint32_t i;
+
+ for (i = 0; i != RTE_DIM(elm->cnt); i++)
+ elm->cnt[i] = fill;
+}
+
+static int32_t
+check_updt_elem(struct ring_elem *elm[], uint32_t num,
+ const struct ring_elem *check, const struct ring_elem *fill)
+{
+ uint32_t i;
+
+ static rte_spinlock_t dump_lock;
+
+ for (i = 0; i != num; i++) {
+ if (memcmp(check, elm[i], sizeof(*check)) != 0) {
+ rte_spinlock_lock(&dump_lock);
+ printf("%s(lc=%u, num=%u) failed at %u-th iter, "
+ "offending object: %p\n",
+ __func__, rte_lcore_id(), num, i, elm[i]);
+ rte_memdump(stdout, "expected", check, sizeof(*check));
+ rte_memdump(stdout, "result", elm[i], sizeof(*elm[i]));
+ rte_spinlock_unlock(&dump_lock);
+ return -EINVAL;
+ }
+ memcpy(elm[i], fill, sizeof(*elm[i]));
+ }
+
+ return 0;
+}
+
+static int
+check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
+ enum rte_ring_queue_behavior bhv, const char *fname, const char *opname)
+{
+ if (bhv == RTE_RING_QUEUE_FIXED && exp != res) {
+ printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
+ fname, lc, opname, exp, res);
+ return -ENOSPC;
+ }
+ if (bhv == RTE_RING_QUEUE_VARIABLE && exp < res) {
+ printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
+ fname, lc, opname, exp, res);
+ return -ENOSPC;
+ }
+ return 0;
+}
+
+/* num in interval [7/8, 11/8] of BULK_NUM */
+static inline uint32_t
+rand_elem_num(void)
+{
+ uint32_t num;
+
+ num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
+ return num;
+}
+
+/*
+ * for each enabled stage do:
+ * acquire burst of objects
+ * read and check their contents
+ * update and check their contents
+ * release burst of objects
+ * done
+ */
+static int32_t
+test_worker_stages(struct lcore_arg *la, uint32_t lc, const char *fname,
+ struct ring_elem *obj[2 * BULK_NUM],
+ const struct ring_elem *def_elm, const struct ring_elem *loc_elm,
+ const struct ring_elem stg_elm[MAX_STAGES], int32_t prcs)
+{
+ int32_t rc;
+ uint32_t i, n, num, tkn;
+ uint64_t tm0, tm1;
+ const struct ring_elem *celm, *pelm;
+
+ num = rand_elem_num();
+
+ rc = 0;
+ tkn = 0;
+ for (i = 0, pelm = def_elm; i != la->stats.nb_stage; pelm = celm, i++) {
+
+ celm = stg_elm + i;
+
+ /* given stage is not enabled on that lcore */
+ if ((la->stats.role_mask & ROLE_STAGE(i)) == 0)
+ continue;
+
+ /* reset all pointer values */
+ memset(obj, 0, sizeof(*obj) * num);
+
+ /* acquire num elems */
+ tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+ n = _st_ring_acquire_burst(la->rng, i, (void **)obj, num,
+ &tkn, NULL);
+ tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
+
+ /* check return value and objects */
+ rc = check_ring_op(num, n, lc, RTE_RING_QUEUE_VARIABLE, fname,
+ RTE_STR(_st_ring_stage_acquire));
+ if (rc == 0)
+ rc = check_updt_elem(obj, n, pelm, loc_elm);
+ if (rc != 0)
+ break;
+
+ /* release num elems */
+ rte_compiler_barrier();
+ rc = check_updt_elem(obj, n, loc_elm, celm);
+ if (rc != 0)
+ break;
+
+ if (n == 0)
+ tm1 = 0;
+ else {
+ tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+ _st_ring_release(la->rng, i, tkn,
+ (void **)obj, n);
+ tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
+ }
+ lcore_op_stat_update(la->stats.stage + i, 1, n, tm0 + tm1,
+ prcs);
+ }
+
+ return rc;
+}
+
+static int32_t
+test_worker_deqenq(struct lcore_arg *la, uint32_t lc, const char *fname,
+ struct ring_elem *obj[2 * BULK_NUM],
+ const struct ring_elem *def_elm, const struct ring_elem *loc_elm,
+ const struct ring_elem *pelm, int32_t prcs)
+{
+ int32_t rc;
+ uint32_t k, n, num;
+ uint64_t tm0, tm1;
+
+ num = rand_elem_num();
+
+ /* reset all pointer values */
+ memset(obj, 0, sizeof(*obj) * num);
+
+ /* dequeue num elems */
+ tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+ n = _st_ring_dequeue_burst(la->rng, (void **)obj, num, NULL);
+
+ tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
+
+ /* check return value and objects */
+ rc = check_ring_op(num, n, lc, RTE_RING_QUEUE_VARIABLE, fname,
+ RTE_STR(_st_ring_dequeue_bulk));
+ if (rc == 0)
+ rc = check_updt_elem(obj, n, pelm, loc_elm);
+ if (rc != 0)
+ return rc;
+
+ /* enqueue n elems */
+ rte_compiler_barrier();
+ rc = check_updt_elem(obj, n, loc_elm, def_elm);
+ if (rc != 0)
+ return rc;
+
+ tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+ k = _st_ring_enqueue_bulk(la->rng, (void **)obj, n, NULL);
+ tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
+
+ /* check return value */
+ rc = check_ring_op(n, k, lc, RTE_RING_QUEUE_FIXED, fname,
+ RTE_STR(_st_ring_enqueue_bulk));
+ if (rc != 0)
+ return rc;
+
+ lcore_op_stat_update(&la->stats.deqenq, 1, n, tm0 + tm1, prcs);
+ return 0;
+}
+
+static int
+test_worker(void *arg, const char *fname, int32_t prcs)
+{
+ int32_t rc;
+ uint32_t i, lc;
+ uint64_t cl;
+ struct lcore_arg *la;
+ struct ring_elem *obj[2 * BULK_NUM];
+ struct ring_elem *pelm, def_elm, loc_elm, stg_elm[MAX_STAGES];
+
+ la = arg;
+ lc = rte_lcore_id();
+
+ fill_ring_elm(&def_elm, UINT32_MAX);
+ fill_ring_elm(&loc_elm, lc);
+
+ for (i = 0; i != RTE_DIM(stg_elm); i++)
+ fill_ring_elm(stg_elm + i, (i + 1) << 24);
+
+ pelm = stg_elm + la->stats.nb_stage - 1;
+
+ /* Acquire ordering is not required as the main is not
+ * really releasing any data through 'wrk_cmd' to
+ * the worker.
+ */
+ while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) != WRK_CMD_RUN)
+ rte_pause();
+
+ cl = rte_rdtsc_precise();
+
+ do {
+ if ((la->stats.role_mask & ~ROLE_DEQENQ) != 0) {
+ rc = test_worker_stages(la, lc, fname, obj,
+ &def_elm, &loc_elm, stg_elm, prcs);
+ if (rc != 0)
+ break;
+ }
+
+ if ((la->stats.role_mask & ROLE_DEQENQ) != 0) {
+ rc = test_worker_deqenq(la, lc, fname, obj,
+ &def_elm, &loc_elm, pelm, prcs);
+ if (rc != 0)
+ break;
+ }
+
+ } while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) == WRK_CMD_RUN);
+
+ cl = rte_rdtsc_precise() - cl;
+ if (prcs == 0)
+ lcore_stat_update(&la->stats, 0, 0, cl, 0);
+ la->stats.nb_cycle = cl;
+ return rc;
+}
+static int
+test_worker_prcs(void *arg)
+{
+ return test_worker(arg, __func__, 1);
+}
+
+static int
+test_worker_avg(void *arg)
+{
+ return test_worker(arg, __func__, 0);
+}
+
+static void
+mt1_fini(struct rte_soring *rng, void *data)
+{
+ rte_free(rng);
+ rte_free(data);
+}
+
+static int
+mt1_init(struct rte_soring **rng, void **data, uint32_t num,
+ enum rte_ring_sync_type prod_synt, enum rte_ring_sync_type cons_synt,
+ uint32_t nb_stages)
+{
+ int32_t rc;
+ size_t sz;
+ uint32_t i;
+ struct rte_soring *r;
+ struct ring_elem *elm;
+ void *p;
+ struct rte_soring_param prm;
+
+ *rng = NULL;
+ *data = NULL;
+
+ sz = num * sizeof(*elm);
+ elm = rte_zmalloc(NULL, sz, alignof(typeof(*elm)));
+ if (elm == NULL) {
+ printf("%s: alloc(%zu) for %u elems data failed",
+ __func__, sz, num);
+ return -ENOMEM;
+ }
+
+ *data = elm;
+
+ /* alloc soring */
+ memset(&prm, 0, sizeof(prm));
+
+ prm.name = __func__;
+ prm.elems = num;
+ prm.elem_size = sizeof(uintptr_t);
+ prm.stages = nb_stages;
+ prm.prod_synt = prod_synt;
+ prm.cons_synt = cons_synt;
+
+ sz = rte_soring_get_memsize(&prm);
+ r = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+ if (r == NULL) {
+ printf("%s: alloc(%zu) for FIFO with %u elems failed",
+ __func__, sz, prm.elems);
+ return -ENOMEM;
+ }
+
+ *rng = r;
+
+ rc = rte_soring_init(r, &prm);
+ if (rc != 0) {
+ printf("%s: rte_soring_init(r=%p,elems=%u,stages=%u) failed, "
+ "error: %d(%s)\n",
+ __func__, r, prm.elems, prm.stages, rc, strerror(-rc));
+ return rc;
+ }
+
+ for (i = 0; i != num; i++) {
+ fill_ring_elm(elm + i, UINT32_MAX);
+ p = elm + i;
+ if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
+ break;
+ }
+
+ if (i != num) {
+ printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
+ __func__, r, num, i);
+ return -ENOSPC;
+ }
+
+ return 0;
+}
+
+static int
+test_mt(int (*test)(void *), enum rte_ring_sync_type prod_synt,
+ enum rte_ring_sync_type cons_synt, uint32_t nb_stage,
+ const uint32_t role_mask[RTE_MAX_LCORE])
+{
+ int32_t rc;
+ uint32_t i, lc, mc;
+ struct rte_soring *r;
+ void *data;
+ struct lcore_arg arg[RTE_MAX_LCORE];
+
+ static const struct lcore_op_stat init_stat = {
+ .min_cycle = UINT64_MAX,
+ };
+
+ rc = mt1_init(&r, &data, RING_SIZE, prod_synt, cons_synt, nb_stage);
+
+ if (rc != 0) {
+ mt1_fini(r, data);
+ return rc;
+ }
+
+ memset(arg, 0, sizeof(arg));
+
+ /* launch on all workers */
+ RTE_LCORE_FOREACH_WORKER(lc) {
+ arg[lc].rng = r;
+ arg[lc].stats.deqenq = init_stat;
+ arg[lc].stats.nb_stage = nb_stage;
+ arg[lc].stats.role_mask = role_mask[lc];
+ for (i = 0; i != arg[lc].stats.nb_stage; i++)
+ arg[lc].stats.stage[i] = init_stat;
+ rte_eal_remote_launch(test, &arg[lc], lc);
+ }
+
+ /* signal workers to start test */
+ __atomic_store_n(&wrk_cmd, WRK_CMD_RUN, __ATOMIC_RELEASE);
+
+ rte_delay_us(run_time * US_PER_S);
+
+ /* signal workers to stop test */
+ __atomic_store_n(&wrk_cmd, WRK_CMD_STOP, __ATOMIC_RELEASE);
+
+ /* wait for workers and collect stats. */
+ mc = rte_lcore_id();
+ arg[mc].stats.deqenq = init_stat;
+ arg[mc].stats.nb_stage = nb_stage;
+ for (i = 0; i != arg[mc].stats.nb_stage; i++)
+ arg[mc].stats.stage[i] = init_stat;
+
+ rc = 0;
+ RTE_LCORE_FOREACH_WORKER(lc) {
+ rc |= rte_eal_wait_lcore(lc);
+ lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
+ if (verbose != 0)
+ lcore_stat_dump(stdout, lc, &arg[lc].stats);
+ }
+
+ lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
+ rte_soring_dump(stdout, r);
+ mt1_fini(r, data);
+ return rc;
+}
+
+/*
+ * launch all stages and deq+enq on all worker lcores
+ */
+static void
+role_mask_sym(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+ uint32_t lc;
+ const uint32_t msk = RTE_BIT32(nb_stage + 2) - 1;
+
+ memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+ RTE_LCORE_FOREACH_WORKER(lc)
+ role_mask[lc] = msk;
+}
+
+/*
+ * Divide all workers in two (nearly) equal groups:
+ * - workers from 'even' group do deque+enque
+ * - workers from 'odd' group do acquire/release (for all stages)
+ */
+static void
+role_mask_even_odd(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+ uint32_t i, lc;
+ const uint32_t msk[2] = {
+ [0] = ROLE_DEQENQ,
+ [1] = RTE_GENMASK32(nb_stage + 1, 1),
+ };
+
+ memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+
+ i = 0;
+ RTE_LCORE_FOREACH_WORKER(lc) {
+ role_mask[lc] = msk[i & 1];
+ i++;
+ }
+ if (i == 1) {
+ lc = rte_get_next_lcore(-1, 1, 0);
+ role_mask[lc] |= msk[i & 1];
+ }
+}
+
+/*
+ * Divide all workers (nearly) evenly among all possible stages
+ */
+static void
+role_mask_div(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+ uint32_t i, lc;
+ uint32_t msk[nb_stage + 1];
+
+ memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+
+ for (i = 0; i != RTE_DIM(msk); i++) {
+ msk[i] = RTE_BIT32(i);
+ };
+
+ i = 0;
+ RTE_LCORE_FOREACH_WORKER(lc) {
+ role_mask[lc] = msk[i % RTE_DIM(msk)];
+ i++;
+ }
+ if (i < RTE_DIM(msk)) {
+ lc = rte_get_next_lcore(-1, 1, 0);
+ for (; i != RTE_DIM(msk); i++)
+ role_mask[lc] |= msk[i % RTE_DIM(msk)];
+ }
+}
+
+/*
+ * one worker does ST enqueue+dequeue, while all others - stages processing.
+ */
+static void
+role_mask_denq_st(uint32_t nb_stage, uint32_t role_mask[RTE_MAX_LCORE])
+{
+ uint32_t i, lc;
+ const uint32_t msk[2] = {
+ [0] = ROLE_DEQENQ,
+ [1] = RTE_GENMASK32(nb_stage + 1, 1),
+ };
+
+ memset(role_mask, 0, sizeof(role_mask[0]) * RTE_MAX_LCORE);
+
+ i = 0;
+ RTE_LCORE_FOREACH_WORKER(lc) {
+ if (i == 0)
+ role_mask[lc] = msk[0];
+ else
+ role_mask[lc] = msk[1];
+ i++;
+ }
+ if (i == 1) {
+ lc = rte_get_next_lcore(-1, 1, 0);
+ role_mask[lc] |= msk[1];
+ }
+}
+
+
+static int
+test_sym_mt1(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+ const uint32_t nb_stage = 1;
+
+ role_mask_sym(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+ nb_stage, role_mask);
+}
+
+static int
+test_sym_mt4(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+
+ const uint32_t nb_stage = 4;
+
+ role_mask_sym(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+ nb_stage, role_mask);
+}
+
+static int
+test_sym_mt_rts4(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+
+ const uint32_t nb_stage = 4;
+
+ role_mask_sym(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_MT_RTS, RTE_RING_SYNC_MT_RTS,
+ nb_stage, role_mask);
+}
+
+static int
+test_sym_mt_hts4(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+
+ const uint32_t nb_stage = 4;
+
+ role_mask_sym(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_MT_HTS, RTE_RING_SYNC_MT_HTS,
+ nb_stage, role_mask);
+}
+
+static int
+test_stdenq_stage4(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+
+ const uint32_t nb_stage = 4;
+
+ role_mask_denq_st(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_ST, RTE_RING_SYNC_ST,
+ nb_stage, role_mask);
+}
+
+
+static int
+test_even_odd_mt5(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+
+ const uint32_t nb_stage = 5;
+
+ role_mask_even_odd(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+ nb_stage, role_mask);
+}
+
+static int
+test_div_mt3(int (*test)(void *))
+{
+ uint32_t role_mask[RTE_MAX_LCORE];
+
+ const uint32_t nb_stage = 3;
+
+ role_mask_div(nb_stage, role_mask);
+ return test_mt(test, RTE_RING_SYNC_MT, RTE_RING_SYNC_MT,
+ nb_stage, role_mask);
+}
+
+static const struct test_case tests[] = {
+ {
+ .name = "MT_DEQENQ-MT_STG1-PRCS",
+ .func = test_sym_mt1,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "MT_DEQENQ-MT_STG1-AVG",
+ .func = test_sym_mt1,
+ .wfunc = test_worker_avg,
+ },
+ {
+ .name = "ST_DEQENQ-MT_STG4-PRCS",
+ .func = test_stdenq_stage4,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "ST_DEQENQ-MT_STG4-AVG",
+ .func = test_stdenq_stage4,
+ .wfunc = test_worker_avg,
+ },
+ {
+ .name = "MT_DEQENQ-MT_STG4-PRCS",
+ .func = test_sym_mt4,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "MT_DEQENQ-MT_STG4-AVG",
+ .func = test_sym_mt4,
+ .wfunc = test_worker_avg,
+ },
+ {
+ .name = "MTRTS_DEQENQ-MT_STG4-PRCS",
+ .func = test_sym_mt_rts4,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "MTRTS_DEQENQ-MT_STG4-AVG",
+ .func = test_sym_mt_rts4,
+ .wfunc = test_worker_avg,
+ },
+ {
+ .name = "MTHTS_DEQENQ-MT_STG4-PRCS",
+ .func = test_sym_mt_hts4,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "MTHTS_DEQENQ-MT_STG4-AVG",
+ .func = test_sym_mt_hts4,
+ .wfunc = test_worker_avg,
+ },
+ {
+ .name = "MT_DEQENQ-MT_STG5-1:1-PRCS",
+ .func = test_even_odd_mt5,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "MT_DEQENQ-MT_STG5-1:1-AVG",
+ .func = test_even_odd_mt5,
+ .wfunc = test_worker_avg,
+ },
+ {
+ .name = "MT_DEQENQ-MT_STG3-1:3-PRCS",
+ .func = test_div_mt3,
+ .wfunc = test_worker_prcs,
+ },
+ {
+ .name = "MT_DEQENQ_MT_STG3-1:3-AVG",
+ .func = test_div_mt3,
+ .wfunc = test_worker_avg,
+ },
+};