[v3,1/9] test/ring: add contention stress test

Message ID 20200403174235.23308-2-konstantin.ananyev@intel.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series New sync modes for ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-testing success Testing PASS

Commit Message

Ananyev, Konstantin April 3, 2020, 5:42 p.m. UTC
  Introduce new test-case to measure ring perfomance under contention
(miltiple producers/consumers).
Starts dequeue/enqueue loop on all available slave lcores.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 app/test/Makefile                |   2 +
 app/test/meson.build             |   2 +
 app/test/test_ring_mpmc_stress.c |  31 +++
 app/test/test_ring_stress.c      |  48 ++++
 app/test/test_ring_stress.h      |  35 +++
 app/test/test_ring_stress_impl.h | 444 +++++++++++++++++++++++++++++++
 6 files changed, 562 insertions(+)
 create mode 100644 app/test/test_ring_mpmc_stress.c
 create mode 100644 app/test/test_ring_stress.c
 create mode 100644 app/test/test_ring_stress.h
 create mode 100644 app/test/test_ring_stress_impl.h
  

Comments

Honnappa Nagarahalli April 8, 2020, 4:59 a.m. UTC | #1
<snip>

> Subject: [PATCH v3 1/9] test/ring: add contention stress test
Minor, would 'add stress test for overcommitted use case' sound better?

> 
> Introduce new test-case to measure ring perfomance under contention
Minor, 'over committed' seems to the word commonly used from the references you provided. Does it make sense to use that?

> (miltiple producers/consumers).
    ^^^^^^^ multiple

> Starts dequeue/enqueue loop on all available slave lcores.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
>  app/test/Makefile                |   2 +
>  app/test/meson.build             |   2 +
>  app/test/test_ring_mpmc_stress.c |  31 +++
>  app/test/test_ring_stress.c      |  48 ++++
>  app/test/test_ring_stress.h      |  35 +++
>  app/test/test_ring_stress_impl.h | 444 +++++++++++++++++++++++++++++++
Would be good to change the file names to indicate that these tests are for over-committed usecase/configuration.
These are performance tests, better to have 'perf' or 'performance' in their names.

>  6 files changed, 562 insertions(+)
>  create mode 100644 app/test/test_ring_mpmc_stress.c  create mode 100644
> app/test/test_ring_stress.c  create mode 100644 app/test/test_ring_stress.h
> create mode 100644 app/test/test_ring_stress_impl.h
> 
> diff --git a/app/test/Makefile b/app/test/Makefile index
> 1f080d162..4eefaa887 100644
> --- a/app/test/Makefile
> +++ b/app/test/Makefile
> @@ -77,7 +77,9 @@ SRCS-y += test_external_mem.c  SRCS-y +=
> test_rand_perf.c
> 
>  SRCS-y += test_ring.c
> +SRCS-y += test_ring_mpmc_stress.c
>  SRCS-y += test_ring_perf.c
> +SRCS-y += test_ring_stress.c
>  SRCS-y += test_pmd_perf.c
> 
>  ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y)
> diff --git a/app/test/meson.build b/app/test/meson.build index
> 351d29cb6..827b04886 100644
> --- a/app/test/meson.build
> +++ b/app/test/meson.build
> @@ -100,7 +100,9 @@ test_sources = files('commands.c',
>  	'test_rib.c',
>  	'test_rib6.c',
>  	'test_ring.c',
> +	'test_ring_mpmc_stress.c',
>  	'test_ring_perf.c',
> +	'test_ring_stress.c',
>  	'test_rwlock.c',
>  	'test_sched.c',
>  	'test_service_cores.c',
> diff --git a/app/test/test_ring_mpmc_stress.c
> b/app/test/test_ring_mpmc_stress.c
> new file mode 100644
> index 000000000..1524b0248
> --- /dev/null
> +++ b/app/test/test_ring_mpmc_stress.c
> @@ -0,0 +1,31 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include "test_ring_stress_impl.h"
> +
> +static inline uint32_t
> +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> +	uint32_t *avail)
> +{
> +	return rte_ring_mc_dequeue_bulk(r, obj, n, avail); }
> +
> +static inline uint32_t
> +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> +	uint32_t *free)
> +{
> +	return rte_ring_mp_enqueue_bulk(r, obj, n, free); }
> +
> +static int
> +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) {
> +	return rte_ring_init(r, name, num, 0); }
> +
> +const struct test test_ring_mpmc_stress = {
> +	.name = "MP/MC",
> +	.nb_case = RTE_DIM(tests),
> +	.cases = tests,
> +};
> diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c new file
> mode 100644 index 000000000..60706f799
> --- /dev/null
> +++ b/app/test/test_ring_stress.c
> @@ -0,0 +1,48 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include "test_ring_stress.h"
> +
> +static int
> +run_test(const struct test *test)
> +{
> +	int32_t rc;
> +	uint32_t i, k;
> +
> +	for (i = 0, k = 0; i != test->nb_case; i++) {
> +
> +		printf("TEST-CASE %s %s START\n",
> +			test->name, test->cases[i].name);
> +
> +		rc = test->cases[i].func(test->cases[i].wfunc);
> +		k += (rc == 0);
> +
> +		if (rc != 0)
> +			printf("TEST-CASE %s %s FAILED\n",
> +				test->name, test->cases[i].name);
> +		else
> +			printf("TEST-CASE %s %s OK\n",
> +				test->name, test->cases[i].name);
> +	}
> +
> +	return k;
> +}
> +
> +static int
> +test_ring_stress(void)
> +{
> +	uint32_t n, k;
> +
> +	n = 0;
> +	k = 0;
> +
> +	n += test_ring_mpmc_stress.nb_case;
> +	k += run_test(&test_ring_mpmc_stress);
> +
> +	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
> +		n, k, n - k);
> +	return (k != n);
> +}
> +
> +REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress);
> diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h new file
> mode 100644 index 000000000..60eac6216
> --- /dev/null
> +++ b/app/test/test_ring_stress.h
> @@ -0,0 +1,35 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +
> +#include <inttypes.h>
> +#include <stddef.h>
> +#include <stdalign.h>
> +#include <string.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#include <rte_ring.h>
> +#include <rte_cycles.h>
> +#include <rte_launch.h>
> +#include <rte_pause.h>
> +#include <rte_random.h>
> +#include <rte_malloc.h>
> +#include <rte_spinlock.h>
> +
> +#include "test.h"
> +
> +struct test_case {
> +	const char *name;
> +	int (*func)(int (*)(void *));
> +	int (*wfunc)(void *arg);
> +};
> +
> +struct test {
> +	const char *name;
> +	uint32_t nb_case;
> +	const struct test_case *cases;
> +};
> +
> +extern const struct test test_ring_mpmc_stress;
> diff --git a/app/test/test_ring_stress_impl.h
> b/app/test/test_ring_stress_impl.h
> new file mode 100644
> index 000000000..11476d28c
> --- /dev/null
> +++ b/app/test/test_ring_stress_impl.h
> @@ -0,0 +1,444 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include "test_ring_stress.h"
> +
> +/*
> + * Measures performance of ring enqueue/dequeue under high contention
> +*/
> +
> +#define RING_NAME	"RING_STRESS"
> +#define BULK_NUM	32
> +#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
> +
> +enum {
> +	WRK_CMD_STOP,
> +	WRK_CMD_RUN,
> +};
> +
> +static volatile uint32_t wrk_cmd __rte_cache_aligned;
> +
> +/* test run-time in seconds */
> +static const uint32_t run_time = 60;
> +static const uint32_t verbose;
> +
> +struct lcore_stat {
> +	uint64_t nb_cycle;
> +	struct {
> +		uint64_t nb_call;
> +		uint64_t nb_obj;
> +		uint64_t nb_cycle;
> +		uint64_t max_cycle;
> +		uint64_t min_cycle;
> +	} op;
> +};
> +
> +struct lcore_arg {
> +	struct rte_ring *rng;
> +	struct lcore_stat stats;
> +} __rte_cache_aligned;
> +
> +struct ring_elem {
> +	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; }
> +__rte_cache_aligned;
> +
> +/*
> + * redefinable functions
> + */
> +static uint32_t
> +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> +	uint32_t *avail);
> +
> +static uint32_t
> +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> +	uint32_t *free);
> +
> +static int
> +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
> +
> +
> +static void
> +lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
> +	uint64_t tm, int32_t prcs)
> +{
> +	ls->op.nb_call += call;
> +	ls->op.nb_obj += obj;
> +	ls->op.nb_cycle += tm;
> +	if (prcs) {
> +		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
> +		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
> +	}
> +}
> +
> +static void
> +lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
> +{
> +
> +	ms->op.nb_call += ls->op.nb_call;
> +	ms->op.nb_obj += ls->op.nb_obj;
> +	ms->op.nb_cycle += ls->op.nb_cycle;
> +	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
> +	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); }
> +
> +static void
> +lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) {
> +	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
> +	lcore_op_stat_aggr(ms, ls);
> +}
> +
> +static void
> +lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) {
> +	long double st;
> +
> +	st = (long double)rte_get_timer_hz() / US_PER_S;
> +
> +	if (lc == UINT32_MAX)
> +		fprintf(f, "%s(AGGREGATE)={\n", __func__);
> +	else
> +		fprintf(f, "%s(lc=%u)={\n", __func__, lc);
> +
> +	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
> +		ls->nb_cycle, (long double)ls->nb_cycle / st);
> +
> +	fprintf(f, "\tDEQ+ENQ={\n");
> +
> +	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
> +	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
> +	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
> +	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
> +		(long double)ls->op.nb_obj / ls->op.nb_call);
> +	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
> +		(long double)ls->op.nb_cycle / ls->op.nb_obj);
> +	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
> +		(long double)ls->op.nb_cycle / ls->op.nb_call);
> +
> +	/* if min/max cycles per call stats was collected */
> +	if (ls->op.min_cycle != UINT64_MAX) {
> +		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> +			ls->op.max_cycle,
> +			(long double)ls->op.max_cycle / st);
> +		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> +			ls->op.min_cycle,
> +			(long double)ls->op.min_cycle / st);
> +	}
> +
> +	fprintf(f, "\t},\n");
> +	fprintf(f, "};\n");
> +}
> +
> +static void
> +fill_ring_elm(struct ring_elem *elm, uint32_t fill) {
> +	uint32_t i;
> +
> +	for (i = 0; i != RTE_DIM(elm->cnt); i++)
> +		elm->cnt[i] = fill;
> +}
> +
> +static int32_t
> +check_updt_elem(struct ring_elem *elm[], uint32_t num,
> +	const struct ring_elem *check, const struct ring_elem *fill) {
> +	uint32_t i;
> +
> +	static rte_spinlock_t dump_lock;
> +
> +	for (i = 0; i != num; i++) {
> +		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
> +			rte_spinlock_lock(&dump_lock);
> +			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
> +				"offending object: %p\n",
> +				__func__, rte_lcore_id(), num, i, elm[i]);
> +			rte_memdump(stdout, "expected", check,
> sizeof(*check));
> +			rte_memdump(stdout, "result", elm[i], sizeof(elm[i]));
> +			rte_spinlock_unlock(&dump_lock);
> +			return -EINVAL;
> +		}
> +		memcpy(elm[i], fill, sizeof(*elm[i]));
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
minor, lcore instead of lc would be better

> +	const char *fname, const char *opname) {
> +	if (exp != res) {
> +		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
Suggest using lcore in the printf

> +			fname, lc, opname, exp, res);
> +		return -ENOSPC;
> +	}
> +	return 0;
> +}
> +
> +static int
> +test_worker_prcs(void *arg)
> +{
> +	int32_t rc;
> +	uint32_t lc, n, num;
minor, lcore instead of lc would be better

> +	uint64_t cl, tm0, tm1;
> +	struct lcore_arg *la;
> +	struct ring_elem def_elm, loc_elm;
> +	struct ring_elem *obj[2 * BULK_NUM];
> +
> +	la = arg;
> +	lc = rte_lcore_id();
> +
> +	fill_ring_elm(&def_elm, UINT32_MAX);
> +	fill_ring_elm(&loc_elm, lc);
> +
> +	while (wrk_cmd != WRK_CMD_RUN) {
> +		rte_smp_rmb();
> +		rte_pause();
> +	}
> +
> +	cl = rte_rdtsc_precise();
> +
> +	do {
> +		/* num in interval [7/8, 11/8] of BULK_NUM */
> +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> +
> +		/* reset all pointer values */
> +		memset(obj, 0, sizeof(obj));
> +
> +		/* dequeue num elems */
> +		tm0 = rte_rdtsc_precise();
> +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> +		tm0 = rte_rdtsc_precise() - tm0;
> +
> +		/* check return value and objects */
> +		rc = check_ring_op(num, n, lc, __func__,
> +			RTE_STR(_st_ring_dequeue_bulk));
> +		if (rc == 0)
> +			rc = check_updt_elem(obj, num, &def_elm,
> &loc_elm);
> +		if (rc != 0)
> +			break;
Since this seems like a performance test, should we skip validating the objects?
Did these tests run on Travis CI? I believe Travis CI has trouble running stress/performance tests if they take too much time.
The RTS and HTS tests should be added to functional tests.

> +
> +		/* enqueue num elems */
> +		rte_compiler_barrier();
> +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> +		if (rc != 0)
> +			break;
> +
> +		tm1 = rte_rdtsc_precise();
> +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> +		tm1 = rte_rdtsc_precise() - tm1;
> +
> +		/* check return value */
> +		rc = check_ring_op(num, n, lc, __func__,
> +			RTE_STR(_st_ring_enqueue_bulk));
> +		if (rc != 0)
> +			break;
> +
> +		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, 1);
> +
> +	} while (wrk_cmd == WRK_CMD_RUN);
> +
> +	la->stats.nb_cycle = rte_rdtsc_precise() - cl;
> +	return rc;
> +}
> +
> +static int
> +test_worker_avg(void *arg)
> +{
> +	int32_t rc;
> +	uint32_t lc, n, num;
> +	uint64_t cl;
> +	struct lcore_arg *la;
> +	struct ring_elem def_elm, loc_elm;
> +	struct ring_elem *obj[2 * BULK_NUM];
> +
> +	la = arg;
> +	lc = rte_lcore_id();
> +
> +	fill_ring_elm(&def_elm, UINT32_MAX);
> +	fill_ring_elm(&loc_elm, lc);
> +
> +	while (wrk_cmd != WRK_CMD_RUN) {
> +		rte_smp_rmb();
> +		rte_pause();
> +	}
> +
> +	cl = rte_rdtsc_precise();
> +
> +	do {
> +		/* num in interval [7/8, 11/8] of BULK_NUM */
> +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> +
> +		/* reset all pointer values */
> +		memset(obj, 0, sizeof(obj));
> +
> +		/* dequeue num elems */
> +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> +
> +		/* check return value and objects */
> +		rc = check_ring_op(num, n, lc, __func__,
> +			RTE_STR(_st_ring_dequeue_bulk));
> +		if (rc == 0)
> +			rc = check_updt_elem(obj, num, &def_elm,
> &loc_elm);
> +		if (rc != 0)
> +			break;
> +
> +		/* enqueue num elems */
> +		rte_compiler_barrier();
> +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> +		if (rc != 0)
> +			break;
> +
> +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> +
> +		/* check return value */
> +		rc = check_ring_op(num, n, lc, __func__,
> +			RTE_STR(_st_ring_enqueue_bulk));
> +		if (rc != 0)
> +			break;
> +
> +		lcore_stat_update(&la->stats, 1, num, 0, 0);
> +
> +	} while (wrk_cmd == WRK_CMD_RUN);
> +
> +	/* final stats update */
> +	cl = rte_rdtsc_precise() - cl;
> +	lcore_stat_update(&la->stats, 0, 0, cl, 0);
> +	la->stats.nb_cycle = cl;
> +
> +	return rc;
> +}
Just wondering about the need of 2 tests which run the same functionality. The difference is the way in which numbers are collected. 
Does 'test_worker_avg' adding any value? IMO, we can remove 'test_worker_avg'.

> +
> +static void
> +mt1_fini(struct rte_ring *rng, void *data) {
> +	rte_free(rng);
> +	rte_free(data);
> +}
> +
> +static int
> +mt1_init(struct rte_ring **rng, void **data, uint32_t num) {
> +	int32_t rc;
> +	size_t sz;
> +	uint32_t i, nr;
> +	struct rte_ring *r;
> +	struct ring_elem *elm;
> +	void *p;
> +
> +	*rng = NULL;
> +	*data = NULL;
> +
> +	sz = num * sizeof(*elm);
> +	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
> +	if (elm == NULL) {
> +		printf("%s: alloc(%zu) for %u elems data failed",
> +			__func__, sz, num);
> +		return -ENOMEM;
> +	}
> +
> +	*data = elm;
> +
> +	/* alloc ring */
> +	nr = 2 * num;
> +	sz = rte_ring_get_memsize(nr);
> +	r = rte_zmalloc(NULL, sz, __alignof__(*r));
> +	if (r == NULL) {
> +		printf("%s: alloc(%zu) for FIFO with %u elems failed",
> +			__func__, sz, nr);
> +		return -ENOMEM;
> +	}
> +
> +	*rng = r;
> +
> +	rc = _st_ring_init(r, RING_NAME, nr);
> +	if (rc != 0) {
> +		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
> +			__func__, r, nr, rc, strerror(-rc));
> +		return rc;
> +	}
> +
> +	for (i = 0; i != num; i++) {
> +		fill_ring_elm(elm + i, UINT32_MAX);
> +		p = elm + i;
> +		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
> +			break;
> +	}
> +
> +	if (i != num) {
> +		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
> +			__func__, r, num, i);
> +		return -ENOSPC;
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +test_mt1(int (*test)(void *))
> +{
> +	int32_t rc;
> +	uint32_t lc, mc;
> +	struct rte_ring *r;
> +	void *data;
> +	struct lcore_arg arg[RTE_MAX_LCORE];
> +
> +	static const struct lcore_stat init_stat = {
> +		.op.min_cycle = UINT64_MAX,
> +	};
> +
> +	rc = mt1_init(&r, &data, RING_SIZE);
> +	if (rc != 0) {
> +		mt1_fini(r, data);
> +		return rc;
> +	}
> +
> +	memset(arg, 0, sizeof(arg));
> +
> +	/* launch on all slaves */
> +	RTE_LCORE_FOREACH_SLAVE(lc) {
> +		arg[lc].rng = r;
> +		arg[lc].stats = init_stat;
> +		rte_eal_remote_launch(test, &arg[lc], lc);
> +	}
> +
> +	/* signal worker to start test */
> +	wrk_cmd = WRK_CMD_RUN;
> +	rte_smp_wmb();
> +
> +	usleep(run_time * US_PER_S);
> +
> +	/* signal worker to start test */
> +	wrk_cmd = WRK_CMD_STOP;
> +	rte_smp_wmb();
> +
> +	/* wait for slaves and collect stats. */
> +	mc = rte_lcore_id();
> +	arg[mc].stats = init_stat;
> +
> +	rc = 0;
> +	RTE_LCORE_FOREACH_SLAVE(lc) {
> +		rc |= rte_eal_wait_lcore(lc);
> +		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
> +		if (verbose != 0)
> +			lcore_stat_dump(stdout, lc, &arg[lc].stats);
> +	}
> +
> +	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
> +	mt1_fini(r, data);
> +	return rc;
> +}
> +
> +static const struct test_case tests[] = {
> +	{
> +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
> +		.func = test_mt1,
> +		.wfunc = test_worker_prcs,
> +	},
> +	{
> +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
> +		.func = test_mt1,
> +		.wfunc = test_worker_avg,
> +	},
> +};
> --
> 2.17.1
  
Ananyev, Konstantin April 9, 2020, 12:36 p.m. UTC | #2
> <snip>
> 
> > Subject: [PATCH v3 1/9] test/ring: add contention stress test
> Minor, would 'add stress test for overcommitted use case' sound better?

I  liked to point out that this test-case can be used as contention stress-test
(many threads do enqueue/dequeue to/from the same ring) for both
over-committed and not scenarios...
Will probably try to add few extra explanations in v4.  
 
> >
> > Introduce new test-case to measure ring perfomance under contention
> Minor, 'over committed' seems to the word commonly used from the references you provided. Does it make sense to use that?
> 
> > (miltiple producers/consumers).
>     ^^^^^^^ multiple

ack.

> 
> > Starts dequeue/enqueue loop on all available slave lcores.
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > ---
> >  app/test/Makefile                |   2 +
> >  app/test/meson.build             |   2 +
> >  app/test/test_ring_mpmc_stress.c |  31 +++
> >  app/test/test_ring_stress.c      |  48 ++++
> >  app/test/test_ring_stress.h      |  35 +++
> >  app/test/test_ring_stress_impl.h | 444 +++++++++++++++++++++++++++++++
> Would be good to change the file names to indicate that these tests are for over-committed usecase/configuration.
> These are performance tests, better to have 'perf' or 'performance' in their names.
> 
> >  6 files changed, 562 insertions(+)
> >  create mode 100644 app/test/test_ring_mpmc_stress.c  create mode 100644
> > app/test/test_ring_stress.c  create mode 100644 app/test/test_ring_stress.h
> > create mode 100644 app/test/test_ring_stress_impl.h
> >
> > diff --git a/app/test/Makefile b/app/test/Makefile index
> > 1f080d162..4eefaa887 100644
> > --- a/app/test/Makefile
> > +++ b/app/test/Makefile
> > @@ -77,7 +77,9 @@ SRCS-y += test_external_mem.c  SRCS-y +=
> > test_rand_perf.c
> >
> >  SRCS-y += test_ring.c
> > +SRCS-y += test_ring_mpmc_stress.c
> >  SRCS-y += test_ring_perf.c
> > +SRCS-y += test_ring_stress.c
> >  SRCS-y += test_pmd_perf.c
> >
> >  ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y)
> > diff --git a/app/test/meson.build b/app/test/meson.build index
> > 351d29cb6..827b04886 100644
> > --- a/app/test/meson.build
> > +++ b/app/test/meson.build
> > @@ -100,7 +100,9 @@ test_sources = files('commands.c',
> >  	'test_rib.c',
> >  	'test_rib6.c',
> >  	'test_ring.c',
> > +	'test_ring_mpmc_stress.c',
> >  	'test_ring_perf.c',
> > +	'test_ring_stress.c',
> >  	'test_rwlock.c',
> >  	'test_sched.c',
> >  	'test_service_cores.c',
> > diff --git a/app/test/test_ring_mpmc_stress.c
> > b/app/test/test_ring_mpmc_stress.c
> > new file mode 100644
> > index 000000000..1524b0248
> > --- /dev/null
> > +++ b/app/test/test_ring_mpmc_stress.c
> > @@ -0,0 +1,31 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2020 Intel Corporation
> > + */
> > +
> > +#include "test_ring_stress_impl.h"
> > +
> > +static inline uint32_t
> > +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> > +	uint32_t *avail)
> > +{
> > +	return rte_ring_mc_dequeue_bulk(r, obj, n, avail); }
> > +
> > +static inline uint32_t
> > +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> > +	uint32_t *free)
> > +{
> > +	return rte_ring_mp_enqueue_bulk(r, obj, n, free); }
> > +
> > +static int
> > +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) {
> > +	return rte_ring_init(r, name, num, 0); }
> > +
> > +const struct test test_ring_mpmc_stress = {
> > +	.name = "MP/MC",
> > +	.nb_case = RTE_DIM(tests),
> > +	.cases = tests,
> > +};
> > diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c new file
> > mode 100644 index 000000000..60706f799
> > --- /dev/null
> > +++ b/app/test/test_ring_stress.c
> > @@ -0,0 +1,48 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2020 Intel Corporation
> > + */
> > +
> > +#include "test_ring_stress.h"
> > +
> > +static int
> > +run_test(const struct test *test)
> > +{
> > +	int32_t rc;
> > +	uint32_t i, k;
> > +
> > +	for (i = 0, k = 0; i != test->nb_case; i++) {
> > +
> > +		printf("TEST-CASE %s %s START\n",
> > +			test->name, test->cases[i].name);
> > +
> > +		rc = test->cases[i].func(test->cases[i].wfunc);
> > +		k += (rc == 0);
> > +
> > +		if (rc != 0)
> > +			printf("TEST-CASE %s %s FAILED\n",
> > +				test->name, test->cases[i].name);
> > +		else
> > +			printf("TEST-CASE %s %s OK\n",
> > +				test->name, test->cases[i].name);
> > +	}
> > +
> > +	return k;
> > +}
> > +
> > +static int
> > +test_ring_stress(void)
> > +{
> > +	uint32_t n, k;
> > +
> > +	n = 0;
> > +	k = 0;
> > +
> > +	n += test_ring_mpmc_stress.nb_case;
> > +	k += run_test(&test_ring_mpmc_stress);
> > +
> > +	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
> > +		n, k, n - k);
> > +	return (k != n);
> > +}
> > +
> > +REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress);
> > diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h new file
> > mode 100644 index 000000000..60eac6216
> > --- /dev/null
> > +++ b/app/test/test_ring_stress.h
> > @@ -0,0 +1,35 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2020 Intel Corporation
> > + */
> > +
> > +
> > +#include <inttypes.h>
> > +#include <stddef.h>
> > +#include <stdalign.h>
> > +#include <string.h>
> > +#include <stdio.h>
> > +#include <unistd.h>
> > +
> > +#include <rte_ring.h>
> > +#include <rte_cycles.h>
> > +#include <rte_launch.h>
> > +#include <rte_pause.h>
> > +#include <rte_random.h>
> > +#include <rte_malloc.h>
> > +#include <rte_spinlock.h>
> > +
> > +#include "test.h"
> > +
> > +struct test_case {
> > +	const char *name;
> > +	int (*func)(int (*)(void *));
> > +	int (*wfunc)(void *arg);
> > +};
> > +
> > +struct test {
> > +	const char *name;
> > +	uint32_t nb_case;
> > +	const struct test_case *cases;
> > +};
> > +
> > +extern const struct test test_ring_mpmc_stress;
> > diff --git a/app/test/test_ring_stress_impl.h
> > b/app/test/test_ring_stress_impl.h
> > new file mode 100644
> > index 000000000..11476d28c
> > --- /dev/null
> > +++ b/app/test/test_ring_stress_impl.h
> > @@ -0,0 +1,444 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2020 Intel Corporation
> > + */
> > +
> > +#include "test_ring_stress.h"
> > +
> > +/*
> > + * Measures performance of ring enqueue/dequeue under high contention
> > +*/
> > +
> > +#define RING_NAME	"RING_STRESS"
> > +#define BULK_NUM	32
> > +#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
> > +
> > +enum {
> > +	WRK_CMD_STOP,
> > +	WRK_CMD_RUN,
> > +};
> > +
> > +static volatile uint32_t wrk_cmd __rte_cache_aligned;
> > +
> > +/* test run-time in seconds */
> > +static const uint32_t run_time = 60;
> > +static const uint32_t verbose;
> > +
> > +struct lcore_stat {
> > +	uint64_t nb_cycle;
> > +	struct {
> > +		uint64_t nb_call;
> > +		uint64_t nb_obj;
> > +		uint64_t nb_cycle;
> > +		uint64_t max_cycle;
> > +		uint64_t min_cycle;
> > +	} op;
> > +};
> > +
> > +struct lcore_arg {
> > +	struct rte_ring *rng;
> > +	struct lcore_stat stats;
> > +} __rte_cache_aligned;
> > +
> > +struct ring_elem {
> > +	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; }
> > +__rte_cache_aligned;
> > +
> > +/*
> > + * redefinable functions
> > + */
> > +static uint32_t
> > +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> > +	uint32_t *avail);
> > +
> > +static uint32_t
> > +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> > +	uint32_t *free);
> > +
> > +static int
> > +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
> > +
> > +
> > +static void
> > +lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
> > +	uint64_t tm, int32_t prcs)
> > +{
> > +	ls->op.nb_call += call;
> > +	ls->op.nb_obj += obj;
> > +	ls->op.nb_cycle += tm;
> > +	if (prcs) {
> > +		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
> > +		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
> > +	}
> > +}
> > +
> > +static void
> > +lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
> > +{
> > +
> > +	ms->op.nb_call += ls->op.nb_call;
> > +	ms->op.nb_obj += ls->op.nb_obj;
> > +	ms->op.nb_cycle += ls->op.nb_cycle;
> > +	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
> > +	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); }
> > +
> > +static void
> > +lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) {
> > +	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
> > +	lcore_op_stat_aggr(ms, ls);
> > +}
> > +
> > +static void
> > +lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) {
> > +	long double st;
> > +
> > +	st = (long double)rte_get_timer_hz() / US_PER_S;
> > +
> > +	if (lc == UINT32_MAX)
> > +		fprintf(f, "%s(AGGREGATE)={\n", __func__);
> > +	else
> > +		fprintf(f, "%s(lc=%u)={\n", __func__, lc);
> > +
> > +	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
> > +		ls->nb_cycle, (long double)ls->nb_cycle / st);
> > +
> > +	fprintf(f, "\tDEQ+ENQ={\n");
> > +
> > +	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
> > +	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
> > +	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
> > +	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
> > +		(long double)ls->op.nb_obj / ls->op.nb_call);
> > +	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
> > +		(long double)ls->op.nb_cycle / ls->op.nb_obj);
> > +	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
> > +		(long double)ls->op.nb_cycle / ls->op.nb_call);
> > +
> > +	/* if min/max cycles per call stats was collected */
> > +	if (ls->op.min_cycle != UINT64_MAX) {
> > +		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> > +			ls->op.max_cycle,
> > +			(long double)ls->op.max_cycle / st);
> > +		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> > +			ls->op.min_cycle,
> > +			(long double)ls->op.min_cycle / st);
> > +	}
> > +
> > +	fprintf(f, "\t},\n");
> > +	fprintf(f, "};\n");
> > +}
> > +
> > +static void
> > +fill_ring_elm(struct ring_elem *elm, uint32_t fill) {
> > +	uint32_t i;
> > +
> > +	for (i = 0; i != RTE_DIM(elm->cnt); i++)
> > +		elm->cnt[i] = fill;
> > +}
> > +
> > +static int32_t
> > +check_updt_elem(struct ring_elem *elm[], uint32_t num,
> > +	const struct ring_elem *check, const struct ring_elem *fill) {
> > +	uint32_t i;
> > +
> > +	static rte_spinlock_t dump_lock;
> > +
> > +	for (i = 0; i != num; i++) {
> > +		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
> > +			rte_spinlock_lock(&dump_lock);
> > +			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
> > +				"offending object: %p\n",
> > +				__func__, rte_lcore_id(), num, i, elm[i]);
> > +			rte_memdump(stdout, "expected", check,
> > sizeof(*check));
> > +			rte_memdump(stdout, "result", elm[i], sizeof(elm[i]));
> > +			rte_spinlock_unlock(&dump_lock);
> > +			return -EINVAL;
> > +		}
> > +		memcpy(elm[i], fill, sizeof(*elm[i]));
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> > +static int
> > +check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
> minor, lcore instead of lc would be better
> 
> > +	const char *fname, const char *opname) {
> > +	if (exp != res) {
> > +		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
> Suggest using lcore in the printf
> 
> > +			fname, lc, opname, exp, res);
> > +		return -ENOSPC;
> > +	}
> > +	return 0;
> > +}
> > +
> > +static int
> > +test_worker_prcs(void *arg)
> > +{
> > +	int32_t rc;
> > +	uint32_t lc, n, num;
> minor, lcore instead of lc would be better
> 
> > +	uint64_t cl, tm0, tm1;
> > +	struct lcore_arg *la;
> > +	struct ring_elem def_elm, loc_elm;
> > +	struct ring_elem *obj[2 * BULK_NUM];
> > +
> > +	la = arg;
> > +	lc = rte_lcore_id();
> > +
> > +	fill_ring_elm(&def_elm, UINT32_MAX);
> > +	fill_ring_elm(&loc_elm, lc);
> > +
> > +	while (wrk_cmd != WRK_CMD_RUN) {
> > +		rte_smp_rmb();
> > +		rte_pause();
> > +	}
> > +
> > +	cl = rte_rdtsc_precise();
> > +
> > +	do {
> > +		/* num in interval [7/8, 11/8] of BULK_NUM */
> > +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> > +
> > +		/* reset all pointer values */
> > +		memset(obj, 0, sizeof(obj));
> > +
> > +		/* dequeue num elems */
> > +		tm0 = rte_rdtsc_precise();
> > +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> > +		tm0 = rte_rdtsc_precise() - tm0;
> > +
> > +		/* check return value and objects */
> > +		rc = check_ring_op(num, n, lc, __func__,
> > +			RTE_STR(_st_ring_dequeue_bulk));
> > +		if (rc == 0)
> > +			rc = check_updt_elem(obj, num, &def_elm,
> > &loc_elm);
> > +		if (rc != 0)
> > +			break;
> Since this seems like a performance test, should we skip validating the objects?
> Did these tests run on Travis CI? I believe Travis CI has trouble running stress/performance tests if they take too much time.
> The RTS and HTS tests should be added to functional tests.
> 
> > +
> > +		/* enqueue num elems */
> > +		rte_compiler_barrier();
> > +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> > +		if (rc != 0)
> > +			break;
> > +
> > +		tm1 = rte_rdtsc_precise();
> > +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> > +		tm1 = rte_rdtsc_precise() - tm1;
> > +
> > +		/* check return value */
> > +		rc = check_ring_op(num, n, lc, __func__,
> > +			RTE_STR(_st_ring_enqueue_bulk));
> > +		if (rc != 0)
> > +			break;
> > +
> > +		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, 1);
> > +
> > +	} while (wrk_cmd == WRK_CMD_RUN);
> > +
> > +	la->stats.nb_cycle = rte_rdtsc_precise() - cl;
> > +	return rc;
> > +}
> > +
> > +static int
> > +test_worker_avg(void *arg)
> > +{
> > +	int32_t rc;
> > +	uint32_t lc, n, num;
> > +	uint64_t cl;
> > +	struct lcore_arg *la;
> > +	struct ring_elem def_elm, loc_elm;
> > +	struct ring_elem *obj[2 * BULK_NUM];
> > +
> > +	la = arg;
> > +	lc = rte_lcore_id();
> > +
> > +	fill_ring_elm(&def_elm, UINT32_MAX);
> > +	fill_ring_elm(&loc_elm, lc);
> > +
> > +	while (wrk_cmd != WRK_CMD_RUN) {
> > +		rte_smp_rmb();
> > +		rte_pause();
> > +	}
> > +
> > +	cl = rte_rdtsc_precise();
> > +
> > +	do {
> > +		/* num in interval [7/8, 11/8] of BULK_NUM */
> > +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> > +
> > +		/* reset all pointer values */
> > +		memset(obj, 0, sizeof(obj));
> > +
> > +		/* dequeue num elems */
> > +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> > +
> > +		/* check return value and objects */
> > +		rc = check_ring_op(num, n, lc, __func__,
> > +			RTE_STR(_st_ring_dequeue_bulk));
> > +		if (rc == 0)
> > +			rc = check_updt_elem(obj, num, &def_elm,
> > &loc_elm);
> > +		if (rc != 0)
> > +			break;
> > +
> > +		/* enqueue num elems */
> > +		rte_compiler_barrier();
> > +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> > +		if (rc != 0)
> > +			break;
> > +
> > +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> > +
> > +		/* check return value */
> > +		rc = check_ring_op(num, n, lc, __func__,
> > +			RTE_STR(_st_ring_enqueue_bulk));
> > +		if (rc != 0)
> > +			break;
> > +
> > +		lcore_stat_update(&la->stats, 1, num, 0, 0);
> > +
> > +	} while (wrk_cmd == WRK_CMD_RUN);
> > +
> > +	/* final stats update */
> > +	cl = rte_rdtsc_precise() - cl;
> > +	lcore_stat_update(&la->stats, 0, 0, cl, 0);
> > +	la->stats.nb_cycle = cl;
> > +
> > +	return rc;
> > +}
> Just wondering about the need of 2 tests which run the same functionality. The difference is the way in which numbers are collected.
> Does 'test_worker_avg' adding any value? IMO, we can remove 'test_worker_avg'.

Yeh, they are quite similar.
I added _average_ version for two reasons:
1. In precise I call rte_rdtsc_precise() straight before/after 
    enqueue/dequeue op.
    At least at IA rte_rdtsc_precise()  means mb().
    This extra sync point might hide some sync problems in the ring
    enqueue/dequeue itself.
    So having a separate test without such extra sync points
    gives extra confidence that these tests would catch ring sync problems if any.  
2. People usually don't do enqueue/dequeue on its own.
    One of common patterns: dequeue/read-write data from the dequed objects/enqueue.
    So this test measures cycles for dequeue/enqueue plus some reads/writes
    to the objects from the ring.
 
> > +
> > +static void
> > +mt1_fini(struct rte_ring *rng, void *data) {
> > +	rte_free(rng);
> > +	rte_free(data);
> > +}
> > +
> > +static int
> > +mt1_init(struct rte_ring **rng, void **data, uint32_t num) {
> > +	int32_t rc;
> > +	size_t sz;
> > +	uint32_t i, nr;
> > +	struct rte_ring *r;
> > +	struct ring_elem *elm;
> > +	void *p;
> > +
> > +	*rng = NULL;
> > +	*data = NULL;
> > +
> > +	sz = num * sizeof(*elm);
> > +	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
> > +	if (elm == NULL) {
> > +		printf("%s: alloc(%zu) for %u elems data failed",
> > +			__func__, sz, num);
> > +		return -ENOMEM;
> > +	}
> > +
> > +	*data = elm;
> > +
> > +	/* alloc ring */
> > +	nr = 2 * num;
> > +	sz = rte_ring_get_memsize(nr);
> > +	r = rte_zmalloc(NULL, sz, __alignof__(*r));
> > +	if (r == NULL) {
> > +		printf("%s: alloc(%zu) for FIFO with %u elems failed",
> > +			__func__, sz, nr);
> > +		return -ENOMEM;
> > +	}
> > +
> > +	*rng = r;
> > +
> > +	rc = _st_ring_init(r, RING_NAME, nr);
> > +	if (rc != 0) {
> > +		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
> > +			__func__, r, nr, rc, strerror(-rc));
> > +		return rc;
> > +	}
> > +
> > +	for (i = 0; i != num; i++) {
> > +		fill_ring_elm(elm + i, UINT32_MAX);
> > +		p = elm + i;
> > +		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
> > +			break;
> > +	}
> > +
> > +	if (i != num) {
> > +		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
> > +			__func__, r, num, i);
> > +		return -ENOSPC;
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> > +static int
> > +test_mt1(int (*test)(void *))
> > +{
> > +	int32_t rc;
> > +	uint32_t lc, mc;
> > +	struct rte_ring *r;
> > +	void *data;
> > +	struct lcore_arg arg[RTE_MAX_LCORE];
> > +
> > +	static const struct lcore_stat init_stat = {
> > +		.op.min_cycle = UINT64_MAX,
> > +	};
> > +
> > +	rc = mt1_init(&r, &data, RING_SIZE);
> > +	if (rc != 0) {
> > +		mt1_fini(r, data);
> > +		return rc;
> > +	}
> > +
> > +	memset(arg, 0, sizeof(arg));
> > +
> > +	/* launch on all slaves */
> > +	RTE_LCORE_FOREACH_SLAVE(lc) {
> > +		arg[lc].rng = r;
> > +		arg[lc].stats = init_stat;
> > +		rte_eal_remote_launch(test, &arg[lc], lc);
> > +	}
> > +
> > +	/* signal worker to start test */
> > +	wrk_cmd = WRK_CMD_RUN;
> > +	rte_smp_wmb();
> > +
> > +	usleep(run_time * US_PER_S);
> > +
> > +	/* signal worker to start test */
> > +	wrk_cmd = WRK_CMD_STOP;
> > +	rte_smp_wmb();
> > +
> > +	/* wait for slaves and collect stats. */
> > +	mc = rte_lcore_id();
> > +	arg[mc].stats = init_stat;
> > +
> > +	rc = 0;
> > +	RTE_LCORE_FOREACH_SLAVE(lc) {
> > +		rc |= rte_eal_wait_lcore(lc);
> > +		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
> > +		if (verbose != 0)
> > +			lcore_stat_dump(stdout, lc, &arg[lc].stats);
> > +	}
> > +
> > +	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
> > +	mt1_fini(r, data);
> > +	return rc;
> > +}
> > +
> > +static const struct test_case tests[] = {
> > +	{
> > +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
> > +		.func = test_mt1,
> > +		.wfunc = test_worker_prcs,
> > +	},
> > +	{
> > +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
> > +		.func = test_mt1,
> > +		.wfunc = test_worker_avg,
> > +	},
> > +};
> > --
> > 2.17.1
  
Ananyev, Konstantin April 9, 2020, 1 p.m. UTC | #3
> > > +static int
> > > +test_worker_prcs(void *arg)
> > > +{
> > > +	int32_t rc;
> > > +	uint32_t lc, n, num;
> > minor, lcore instead of lc would be better
> >
> > > +	uint64_t cl, tm0, tm1;
> > > +	struct lcore_arg *la;
> > > +	struct ring_elem def_elm, loc_elm;
> > > +	struct ring_elem *obj[2 * BULK_NUM];
> > > +
> > > +	la = arg;
> > > +	lc = rte_lcore_id();
> > > +
> > > +	fill_ring_elm(&def_elm, UINT32_MAX);
> > > +	fill_ring_elm(&loc_elm, lc);
> > > +
> > > +	while (wrk_cmd != WRK_CMD_RUN) {
> > > +		rte_smp_rmb();
> > > +		rte_pause();
> > > +	}
> > > +
> > > +	cl = rte_rdtsc_precise();
> > > +
> > > +	do {
> > > +		/* num in interval [7/8, 11/8] of BULK_NUM */
> > > +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> > > +
> > > +		/* reset all pointer values */
> > > +		memset(obj, 0, sizeof(obj));
> > > +
> > > +		/* dequeue num elems */
> > > +		tm0 = rte_rdtsc_precise();
> > > +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> > > +		tm0 = rte_rdtsc_precise() - tm0;
> > > +
> > > +		/* check return value and objects */
> > > +		rc = check_ring_op(num, n, lc, __func__,
> > > +			RTE_STR(_st_ring_dequeue_bulk));
> > > +		if (rc == 0)
> > > +			rc = check_updt_elem(obj, num, &def_elm,
> > > &loc_elm);
> > > +		if (rc != 0)
> > > +			break;
> > Since this seems like a performance test, should we skip validating the objects?

I think it is good to have test doing validation too.
It shouldn't affect measurements, but brings extra confidentiality
that our ring implementation works properly and doesn't introduce 
any races. 

> > Did these tests run on Travis CI?

AFAIK, no but people can still run it manually.

>> I believe Travis CI has trouble running stress/performance tests if they take too much time.
> > The RTS and HTS tests should be added to functional tests.

Ok, I'll try to add some extra functional tests in v4.
  
Honnappa Nagarahalli April 10, 2020, 4:59 p.m. UTC | #4
<snip>

> >
> > > Subject: [PATCH v3 1/9] test/ring: add contention stress test
> > Minor, would 'add stress test for overcommitted use case' sound better?
> 
> I  liked to point out that this test-case can be used as contention stress-test
> (many threads do enqueue/dequeue to/from the same ring) for both over-
> committed and not scenarios...
> Will probably try to add few extra explanations in v4.
The test cases for non-over committed case are already available in the function 'run_on_all_cores'. There we are running enqueue/dequeue in all the available cores.

> 
> > >
> > > Introduce new test-case to measure ring perfomance under contention
> > Minor, 'over committed' seems to the word commonly used from the
> references you provided. Does it make sense to use that?
> >
> > > (miltiple producers/consumers).
> >     ^^^^^^^ multiple
> 
> ack.
> 
> >
> > > Starts dequeue/enqueue loop on all available slave lcores.
> > >
> > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > > ---
> > >  app/test/Makefile                |   2 +
> > >  app/test/meson.build             |   2 +
> > >  app/test/test_ring_mpmc_stress.c |  31 +++
> > >  app/test/test_ring_stress.c      |  48 ++++
> > >  app/test/test_ring_stress.h      |  35 +++
> > >  app/test/test_ring_stress_impl.h | 444
> > > +++++++++++++++++++++++++++++++
> > Would be good to change the file names to indicate that these tests are for
> over-committed usecase/configuration.
> > These are performance tests, better to have 'perf' or 'performance' in their
> names.
> >
> > >  6 files changed, 562 insertions(+)
> > >  create mode 100644 app/test/test_ring_mpmc_stress.c  create mode
> > > 100644 app/test/test_ring_stress.c  create mode 100644
> > > app/test/test_ring_stress.h create mode 100644
> > > app/test/test_ring_stress_impl.h
> > >
> > > diff --git a/app/test/Makefile b/app/test/Makefile index
> > > 1f080d162..4eefaa887 100644
> > > --- a/app/test/Makefile
> > > +++ b/app/test/Makefile
> > > @@ -77,7 +77,9 @@ SRCS-y += test_external_mem.c  SRCS-y +=
> > > test_rand_perf.c
> > >
> > >  SRCS-y += test_ring.c
> > > +SRCS-y += test_ring_mpmc_stress.c
> > >  SRCS-y += test_ring_perf.c
> > > +SRCS-y += test_ring_stress.c
> > >  SRCS-y += test_pmd_perf.c
> > >
> > >  ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y) diff --git
> > > a/app/test/meson.build b/app/test/meson.build index
> > > 351d29cb6..827b04886 100644
> > > --- a/app/test/meson.build
> > > +++ b/app/test/meson.build
> > > @@ -100,7 +100,9 @@ test_sources = files('commands.c',
> > >  	'test_rib.c',
> > >  	'test_rib6.c',
> > >  	'test_ring.c',
> > > +	'test_ring_mpmc_stress.c',
> > >  	'test_ring_perf.c',
> > > +	'test_ring_stress.c',
> > >  	'test_rwlock.c',
> > >  	'test_sched.c',
> > >  	'test_service_cores.c',
> > > diff --git a/app/test/test_ring_mpmc_stress.c
> > > b/app/test/test_ring_mpmc_stress.c
> > > new file mode 100644
> > > index 000000000..1524b0248
> > > --- /dev/null
> > > +++ b/app/test/test_ring_mpmc_stress.c
> > > @@ -0,0 +1,31 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2020 Intel Corporation  */
> > > +
> > > +#include "test_ring_stress_impl.h"
> > > +
> > > +static inline uint32_t
> > > +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> > > +	uint32_t *avail)
> > > +{
> > > +	return rte_ring_mc_dequeue_bulk(r, obj, n, avail); }
> > > +
> > > +static inline uint32_t
> > > +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> > > +	uint32_t *free)
> > > +{
> > > +	return rte_ring_mp_enqueue_bulk(r, obj, n, free); }
> > > +
> > > +static int
> > > +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) {
> > > +	return rte_ring_init(r, name, num, 0); }
> > > +
> > > +const struct test test_ring_mpmc_stress = {
> > > +	.name = "MP/MC",
> > > +	.nb_case = RTE_DIM(tests),
> > > +	.cases = tests,
> > > +};
> > > diff --git a/app/test/test_ring_stress.c
> > > b/app/test/test_ring_stress.c new file mode 100644 index
> > > 000000000..60706f799
> > > --- /dev/null
> > > +++ b/app/test/test_ring_stress.c
> > > @@ -0,0 +1,48 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2020 Intel Corporation  */
> > > +
> > > +#include "test_ring_stress.h"
> > > +
> > > +static int
> > > +run_test(const struct test *test)
> > > +{
> > > +	int32_t rc;
> > > +	uint32_t i, k;
> > > +
> > > +	for (i = 0, k = 0; i != test->nb_case; i++) {
> > > +
> > > +		printf("TEST-CASE %s %s START\n",
> > > +			test->name, test->cases[i].name);
> > > +
> > > +		rc = test->cases[i].func(test->cases[i].wfunc);
> > > +		k += (rc == 0);
> > > +
> > > +		if (rc != 0)
> > > +			printf("TEST-CASE %s %s FAILED\n",
> > > +				test->name, test->cases[i].name);
> > > +		else
> > > +			printf("TEST-CASE %s %s OK\n",
> > > +				test->name, test->cases[i].name);
> > > +	}
> > > +
> > > +	return k;
> > > +}
> > > +
> > > +static int
> > > +test_ring_stress(void)
> > > +{
> > > +	uint32_t n, k;
> > > +
> > > +	n = 0;
> > > +	k = 0;
> > > +
> > > +	n += test_ring_mpmc_stress.nb_case;
> > > +	k += run_test(&test_ring_mpmc_stress);
> > > +
> > > +	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
> > > +		n, k, n - k);
> > > +	return (k != n);
> > > +}
> > > +
> > > +REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress);
> > > diff --git a/app/test/test_ring_stress.h
> > > b/app/test/test_ring_stress.h new file mode 100644 index
> > > 000000000..60eac6216
> > > --- /dev/null
> > > +++ b/app/test/test_ring_stress.h
> > > @@ -0,0 +1,35 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2020 Intel Corporation  */
> > > +
> > > +
> > > +#include <inttypes.h>
> > > +#include <stddef.h>
> > > +#include <stdalign.h>
> > > +#include <string.h>
> > > +#include <stdio.h>
> > > +#include <unistd.h>
> > > +
> > > +#include <rte_ring.h>
> > > +#include <rte_cycles.h>
> > > +#include <rte_launch.h>
> > > +#include <rte_pause.h>
> > > +#include <rte_random.h>
> > > +#include <rte_malloc.h>
> > > +#include <rte_spinlock.h>
> > > +
> > > +#include "test.h"
> > > +
> > > +struct test_case {
> > > +	const char *name;
> > > +	int (*func)(int (*)(void *));
> > > +	int (*wfunc)(void *arg);
> > > +};
> > > +
> > > +struct test {
> > > +	const char *name;
> > > +	uint32_t nb_case;
> > > +	const struct test_case *cases;
> > > +};
> > > +
> > > +extern const struct test test_ring_mpmc_stress;
> > > diff --git a/app/test/test_ring_stress_impl.h
> > > b/app/test/test_ring_stress_impl.h
> > > new file mode 100644
> > > index 000000000..11476d28c
> > > --- /dev/null
> > > +++ b/app/test/test_ring_stress_impl.h
> > > @@ -0,0 +1,444 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2020 Intel Corporation  */
> > > +
> > > +#include "test_ring_stress.h"
> > > +
> > > +/*
> > > + * Measures performance of ring enqueue/dequeue under high
> > > +contention */
> > > +
> > > +#define RING_NAME	"RING_STRESS"
> > > +#define BULK_NUM	32
> > > +#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
> > > +
> > > +enum {
> > > +	WRK_CMD_STOP,
> > > +	WRK_CMD_RUN,
> > > +};
> > > +
> > > +static volatile uint32_t wrk_cmd __rte_cache_aligned;
> > > +
> > > +/* test run-time in seconds */
> > > +static const uint32_t run_time = 60; static const uint32_t verbose;
> > > +
> > > +struct lcore_stat {
> > > +	uint64_t nb_cycle;
> > > +	struct {
> > > +		uint64_t nb_call;
> > > +		uint64_t nb_obj;
> > > +		uint64_t nb_cycle;
> > > +		uint64_t max_cycle;
> > > +		uint64_t min_cycle;
> > > +	} op;
> > > +};
> > > +
> > > +struct lcore_arg {
> > > +	struct rte_ring *rng;
> > > +	struct lcore_stat stats;
> > > +} __rte_cache_aligned;
> > > +
> > > +struct ring_elem {
> > > +	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; }
> > > +__rte_cache_aligned;
> > > +
> > > +/*
> > > + * redefinable functions
> > > + */
> > > +static uint32_t
> > > +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> > > +	uint32_t *avail);
> > > +
> > > +static uint32_t
> > > +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> > > +	uint32_t *free);
> > > +
> > > +static int
> > > +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
> > > +
> > > +
> > > +static void
> > > +lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
> > > +	uint64_t tm, int32_t prcs)
> > > +{
> > > +	ls->op.nb_call += call;
> > > +	ls->op.nb_obj += obj;
> > > +	ls->op.nb_cycle += tm;
> > > +	if (prcs) {
> > > +		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
> > > +		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
> > > +	}
> > > +}
> > > +
> > > +static void
> > > +lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat
> > > +*ls) {
> > > +
> > > +	ms->op.nb_call += ls->op.nb_call;
> > > +	ms->op.nb_obj += ls->op.nb_obj;
> > > +	ms->op.nb_cycle += ls->op.nb_cycle;
> > > +	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
> > > +	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); }
> > > +
> > > +static void
> > > +lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) {
> > > +	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
> > > +	lcore_op_stat_aggr(ms, ls);
> > > +}
> > > +
> > > +static void
> > > +lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) {
> > > +	long double st;
> > > +
> > > +	st = (long double)rte_get_timer_hz() / US_PER_S;
> > > +
> > > +	if (lc == UINT32_MAX)
> > > +		fprintf(f, "%s(AGGREGATE)={\n", __func__);
> > > +	else
> > > +		fprintf(f, "%s(lc=%u)={\n", __func__, lc);
> > > +
> > > +	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
> > > +		ls->nb_cycle, (long double)ls->nb_cycle / st);
> > > +
> > > +	fprintf(f, "\tDEQ+ENQ={\n");
> > > +
> > > +	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
> > > +	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
> > > +	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
> > > +	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
> > > +		(long double)ls->op.nb_obj / ls->op.nb_call);
> > > +	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
> > > +		(long double)ls->op.nb_cycle / ls->op.nb_obj);
> > > +	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
> > > +		(long double)ls->op.nb_cycle / ls->op.nb_call);
> > > +
> > > +	/* if min/max cycles per call stats was collected */
> > > +	if (ls->op.min_cycle != UINT64_MAX) {
> > > +		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> > > +			ls->op.max_cycle,
> > > +			(long double)ls->op.max_cycle / st);
> > > +		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> > > +			ls->op.min_cycle,
> > > +			(long double)ls->op.min_cycle / st);
> > > +	}
> > > +
> > > +	fprintf(f, "\t},\n");
> > > +	fprintf(f, "};\n");
> > > +}
> > > +
> > > +static void
> > > +fill_ring_elm(struct ring_elem *elm, uint32_t fill) {
> > > +	uint32_t i;
> > > +
> > > +	for (i = 0; i != RTE_DIM(elm->cnt); i++)
> > > +		elm->cnt[i] = fill;
> > > +}
> > > +
> > > +static int32_t
> > > +check_updt_elem(struct ring_elem *elm[], uint32_t num,
> > > +	const struct ring_elem *check, const struct ring_elem *fill) {
> > > +	uint32_t i;
> > > +
> > > +	static rte_spinlock_t dump_lock;
> > > +
> > > +	for (i = 0; i != num; i++) {
> > > +		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
> > > +			rte_spinlock_lock(&dump_lock);
> > > +			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
> > > +				"offending object: %p\n",
> > > +				__func__, rte_lcore_id(), num, i, elm[i]);
> > > +			rte_memdump(stdout, "expected", check,
> > > sizeof(*check));
> > > +			rte_memdump(stdout, "result", elm[i], sizeof(elm[i]));
> > > +			rte_spinlock_unlock(&dump_lock);
> > > +			return -EINVAL;
> > > +		}
> > > +		memcpy(elm[i], fill, sizeof(*elm[i]));
> > > +	}
> > > +
> > > +	return 0;
> > > +}
> > > +
> > > +static int
> > > +check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
> > minor, lcore instead of lc would be better
> >
> > > +	const char *fname, const char *opname) {
> > > +	if (exp != res) {
> > > +		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
> > Suggest using lcore in the printf
> >
> > > +			fname, lc, opname, exp, res);
> > > +		return -ENOSPC;
> > > +	}
> > > +	return 0;
> > > +}
> > > +
> > > +static int
> > > +test_worker_prcs(void *arg)
> > > +{
> > > +	int32_t rc;
> > > +	uint32_t lc, n, num;
> > minor, lcore instead of lc would be better
> >
> > > +	uint64_t cl, tm0, tm1;
> > > +	struct lcore_arg *la;
> > > +	struct ring_elem def_elm, loc_elm;
> > > +	struct ring_elem *obj[2 * BULK_NUM];
> > > +
> > > +	la = arg;
> > > +	lc = rte_lcore_id();
> > > +
> > > +	fill_ring_elm(&def_elm, UINT32_MAX);
> > > +	fill_ring_elm(&loc_elm, lc);
> > > +
> > > +	while (wrk_cmd != WRK_CMD_RUN) {
> > > +		rte_smp_rmb();
> > > +		rte_pause();
> > > +	}
> > > +
> > > +	cl = rte_rdtsc_precise();
> > > +
> > > +	do {
> > > +		/* num in interval [7/8, 11/8] of BULK_NUM */
> > > +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> > > +
> > > +		/* reset all pointer values */
> > > +		memset(obj, 0, sizeof(obj));
> > > +
> > > +		/* dequeue num elems */
> > > +		tm0 = rte_rdtsc_precise();
> > > +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> > > +		tm0 = rte_rdtsc_precise() - tm0;
> > > +
> > > +		/* check return value and objects */
> > > +		rc = check_ring_op(num, n, lc, __func__,
> > > +			RTE_STR(_st_ring_dequeue_bulk));
> > > +		if (rc == 0)
> > > +			rc = check_updt_elem(obj, num, &def_elm,
> > > &loc_elm);
> > > +		if (rc != 0)
> > > +			break;
> > Since this seems like a performance test, should we skip validating the
> objects?
> > Did these tests run on Travis CI? I believe Travis CI has trouble running
> stress/performance tests if they take too much time.
> > The RTS and HTS tests should be added to functional tests.
> >
> > > +
> > > +		/* enqueue num elems */
> > > +		rte_compiler_barrier();
> > > +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> > > +		if (rc != 0)
> > > +			break;
> > > +
> > > +		tm1 = rte_rdtsc_precise();
> > > +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> > > +		tm1 = rte_rdtsc_precise() - tm1;
> > > +
> > > +		/* check return value */
> > > +		rc = check_ring_op(num, n, lc, __func__,
> > > +			RTE_STR(_st_ring_enqueue_bulk));
> > > +		if (rc != 0)
> > > +			break;
> > > +
> > > +		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, 1);
> > > +
> > > +	} while (wrk_cmd == WRK_CMD_RUN);
> > > +
> > > +	la->stats.nb_cycle = rte_rdtsc_precise() - cl;
> > > +	return rc;
> > > +}
> > > +
> > > +static int
> > > +test_worker_avg(void *arg)
> > > +{
> > > +	int32_t rc;
> > > +	uint32_t lc, n, num;
> > > +	uint64_t cl;
> > > +	struct lcore_arg *la;
> > > +	struct ring_elem def_elm, loc_elm;
> > > +	struct ring_elem *obj[2 * BULK_NUM];
> > > +
> > > +	la = arg;
> > > +	lc = rte_lcore_id();
> > > +
> > > +	fill_ring_elm(&def_elm, UINT32_MAX);
> > > +	fill_ring_elm(&loc_elm, lc);
> > > +
> > > +	while (wrk_cmd != WRK_CMD_RUN) {
> > > +		rte_smp_rmb();
> > > +		rte_pause();
> > > +	}
> > > +
> > > +	cl = rte_rdtsc_precise();
> > > +
> > > +	do {
> > > +		/* num in interval [7/8, 11/8] of BULK_NUM */
> > > +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> > > +
> > > +		/* reset all pointer values */
> > > +		memset(obj, 0, sizeof(obj));
> > > +
> > > +		/* dequeue num elems */
> > > +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> > > +
> > > +		/* check return value and objects */
> > > +		rc = check_ring_op(num, n, lc, __func__,
> > > +			RTE_STR(_st_ring_dequeue_bulk));
> > > +		if (rc == 0)
> > > +			rc = check_updt_elem(obj, num, &def_elm,
> > > &loc_elm);
> > > +		if (rc != 0)
> > > +			break;
> > > +
> > > +		/* enqueue num elems */
> > > +		rte_compiler_barrier();
> > > +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> > > +		if (rc != 0)
> > > +			break;
> > > +
> > > +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> > > +
> > > +		/* check return value */
> > > +		rc = check_ring_op(num, n, lc, __func__,
> > > +			RTE_STR(_st_ring_enqueue_bulk));
> > > +		if (rc != 0)
> > > +			break;
> > > +
> > > +		lcore_stat_update(&la->stats, 1, num, 0, 0);
> > > +
> > > +	} while (wrk_cmd == WRK_CMD_RUN);
> > > +
> > > +	/* final stats update */
> > > +	cl = rte_rdtsc_precise() - cl;
> > > +	lcore_stat_update(&la->stats, 0, 0, cl, 0);
> > > +	la->stats.nb_cycle = cl;
> > > +
> > > +	return rc;
> > > +}
> > Just wondering about the need of 2 tests which run the same functionality.
> The difference is the way in which numbers are collected.
> > Does 'test_worker_avg' adding any value? IMO, we can remove
> 'test_worker_avg'.
> 
> Yeh, they are quite similar.
> I added _average_ version for two reasons:
> 1. In precise I call rte_rdtsc_precise() straight before/after
>     enqueue/dequeue op.
>     At least at IA rte_rdtsc_precise()  means mb().
>     This extra sync point might hide some sync problems in the ring
>     enqueue/dequeue itself.
>     So having a separate test without such extra sync points
>     gives extra confidence that these tests would catch ring sync problems if
> any.
The functional tests should be captured separately as we need to run them on Travis CI (please see the comment above).
Do we need the precise version? I think the average version captures the data well already.
Consider merging the two functions into a single one with a parameter to capture precise or average cycles. Since such a parameter would be a constant, compiler will remove the unwanted code.

> 2. People usually don't do enqueue/dequeue on its own.
>     One of common patterns: dequeue/read-write data from the dequed
> objects/enqueue.
>     So this test measures cycles for dequeue/enqueue plus some reads/writes
>     to the objects from the ring.
Users could be doing lot of things after the ring operations, we do not know what all can happen in the application.
My concern here is, the numbers include cycles spent on things other than ring operations. For ex: we cannot compare the average numbers with precise numbers.
If we move the validation of the results to functional tests, we should be good.

> 
> > > +
> > > +static void
> > > +mt1_fini(struct rte_ring *rng, void *data) {
> > > +	rte_free(rng);
> > > +	rte_free(data);
> > > +}
> > > +
> > > +static int
> > > +mt1_init(struct rte_ring **rng, void **data, uint32_t num) {
> > > +	int32_t rc;
> > > +	size_t sz;
> > > +	uint32_t i, nr;
> > > +	struct rte_ring *r;
> > > +	struct ring_elem *elm;
> > > +	void *p;
> > > +
> > > +	*rng = NULL;
> > > +	*data = NULL;
> > > +
> > > +	sz = num * sizeof(*elm);
> > > +	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
> > > +	if (elm == NULL) {
> > > +		printf("%s: alloc(%zu) for %u elems data failed",
> > > +			__func__, sz, num);
> > > +		return -ENOMEM;
> > > +	}
> > > +
> > > +	*data = elm;
> > > +
> > > +	/* alloc ring */
> > > +	nr = 2 * num;
> > > +	sz = rte_ring_get_memsize(nr);
> > > +	r = rte_zmalloc(NULL, sz, __alignof__(*r));
> > > +	if (r == NULL) {
> > > +		printf("%s: alloc(%zu) for FIFO with %u elems failed",
> > > +			__func__, sz, nr);
> > > +		return -ENOMEM;
> > > +	}
> > > +
> > > +	*rng = r;
> > > +
> > > +	rc = _st_ring_init(r, RING_NAME, nr);
> > > +	if (rc != 0) {
> > > +		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
> > > +			__func__, r, nr, rc, strerror(-rc));
> > > +		return rc;
> > > +	}
> > > +
> > > +	for (i = 0; i != num; i++) {
> > > +		fill_ring_elm(elm + i, UINT32_MAX);
> > > +		p = elm + i;
> > > +		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
> > > +			break;
> > > +	}
> > > +
> > > +	if (i != num) {
> > > +		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
> > > +			__func__, r, num, i);
> > > +		return -ENOSPC;
> > > +	}
> > > +
> > > +	return 0;
> > > +}
> > > +
> > > +static int
> > > +test_mt1(int (*test)(void *))
> > > +{
> > > +	int32_t rc;
> > > +	uint32_t lc, mc;
> > > +	struct rte_ring *r;
> > > +	void *data;
> > > +	struct lcore_arg arg[RTE_MAX_LCORE];
> > > +
> > > +	static const struct lcore_stat init_stat = {
> > > +		.op.min_cycle = UINT64_MAX,
> > > +	};
> > > +
> > > +	rc = mt1_init(&r, &data, RING_SIZE);
> > > +	if (rc != 0) {
> > > +		mt1_fini(r, data);
> > > +		return rc;
> > > +	}
> > > +
> > > +	memset(arg, 0, sizeof(arg));
> > > +
> > > +	/* launch on all slaves */
> > > +	RTE_LCORE_FOREACH_SLAVE(lc) {
> > > +		arg[lc].rng = r;
> > > +		arg[lc].stats = init_stat;
> > > +		rte_eal_remote_launch(test, &arg[lc], lc);
> > > +	}
> > > +
> > > +	/* signal worker to start test */
> > > +	wrk_cmd = WRK_CMD_RUN;
> > > +	rte_smp_wmb();
> > > +
> > > +	usleep(run_time * US_PER_S);
> > > +
> > > +	/* signal worker to start test */
> > > +	wrk_cmd = WRK_CMD_STOP;
> > > +	rte_smp_wmb();
> > > +
> > > +	/* wait for slaves and collect stats. */
> > > +	mc = rte_lcore_id();
> > > +	arg[mc].stats = init_stat;
> > > +
> > > +	rc = 0;
> > > +	RTE_LCORE_FOREACH_SLAVE(lc) {
> > > +		rc |= rte_eal_wait_lcore(lc);
> > > +		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
> > > +		if (verbose != 0)
> > > +			lcore_stat_dump(stdout, lc, &arg[lc].stats);
> > > +	}
> > > +
> > > +	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
> > > +	mt1_fini(r, data);
> > > +	return rc;
> > > +}
> > > +
> > > +static const struct test_case tests[] = {
> > > +	{
> > > +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
> > > +		.func = test_mt1,
> > > +		.wfunc = test_worker_prcs,
> > > +	},
> > > +	{
> > > +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
> > > +		.func = test_mt1,
> > > +		.wfunc = test_worker_avg,
> > > +	},
> > > +};
> > > --
> > > 2.17.1
  
Honnappa Nagarahalli April 10, 2020, 6:01 p.m. UTC | #5
<snip>

> 
> > > > +static int
> > > > +test_worker_prcs(void *arg)
> > > > +{
> > > > +	int32_t rc;
> > > > +	uint32_t lc, n, num;
> > > minor, lcore instead of lc would be better
> > >
> > > > +	uint64_t cl, tm0, tm1;
> > > > +	struct lcore_arg *la;
> > > > +	struct ring_elem def_elm, loc_elm;
> > > > +	struct ring_elem *obj[2 * BULK_NUM];
> > > > +
> > > > +	la = arg;
> > > > +	lc = rte_lcore_id();
> > > > +
> > > > +	fill_ring_elm(&def_elm, UINT32_MAX);
> > > > +	fill_ring_elm(&loc_elm, lc);
> > > > +
> > > > +	while (wrk_cmd != WRK_CMD_RUN) {
> > > > +		rte_smp_rmb();
> > > > +		rte_pause();
> > > > +	}
> > > > +
> > > > +	cl = rte_rdtsc_precise();
> > > > +
> > > > +	do {
> > > > +		/* num in interval [7/8, 11/8] of BULK_NUM */
> > > > +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> > > > +
> > > > +		/* reset all pointer values */
> > > > +		memset(obj, 0, sizeof(obj));
> > > > +
> > > > +		/* dequeue num elems */
> > > > +		tm0 = rte_rdtsc_precise();
> > > > +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> > > > +		tm0 = rte_rdtsc_precise() - tm0;
> > > > +
> > > > +		/* check return value and objects */
> > > > +		rc = check_ring_op(num, n, lc, __func__,
> > > > +			RTE_STR(_st_ring_dequeue_bulk));
> > > > +		if (rc == 0)
> > > > +			rc = check_updt_elem(obj, num, &def_elm,
> > > > &loc_elm);
> > > > +		if (rc != 0)
> > > > +			break;
> > > Since this seems like a performance test, should we skip validating the
> objects?
> 
> I think it is good to have test doing validation too.
> It shouldn't affect measurements, but brings extra confidentiality that our
> ring implementation works properly and doesn't introduce any races.
Ok, I am fine here as the cycles for validation are not counted in the cycles for ring APIs.
IMO, this test is enough and do not need the average cycles test.

> 
> > > Did these tests run on Travis CI?
> 
> AFAIK, no but people can still run it manually.
> 
> >> I believe Travis CI has trouble running stress/performance tests if they take
> too much time.
> > > The RTS and HTS tests should be added to functional tests.
> 
> Ok, I'll try to add some extra functional tests in v4.
  

Patch

diff --git a/app/test/Makefile b/app/test/Makefile
index 1f080d162..4eefaa887 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -77,7 +77,9 @@  SRCS-y += test_external_mem.c
 SRCS-y += test_rand_perf.c
 
 SRCS-y += test_ring.c
+SRCS-y += test_ring_mpmc_stress.c
 SRCS-y += test_ring_perf.c
+SRCS-y += test_ring_stress.c
 SRCS-y += test_pmd_perf.c
 
 ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y)
diff --git a/app/test/meson.build b/app/test/meson.build
index 351d29cb6..827b04886 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -100,7 +100,9 @@  test_sources = files('commands.c',
 	'test_rib.c',
 	'test_rib6.c',
 	'test_ring.c',
+	'test_ring_mpmc_stress.c',
 	'test_ring_perf.c',
+	'test_ring_stress.c',
 	'test_rwlock.c',
 	'test_sched.c',
 	'test_service_cores.c',
diff --git a/app/test/test_ring_mpmc_stress.c b/app/test/test_ring_mpmc_stress.c
new file mode 100644
index 000000000..1524b0248
--- /dev/null
+++ b/app/test/test_ring_mpmc_stress.c
@@ -0,0 +1,31 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail)
+{
+	return rte_ring_mc_dequeue_bulk(r, obj, n, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free)
+{
+	return rte_ring_mp_enqueue_bulk(r, obj, n, free);
+}
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num)
+{
+	return rte_ring_init(r, name, num, 0);
+}
+
+const struct test test_ring_mpmc_stress = {
+	.name = "MP/MC",
+	.nb_case = RTE_DIM(tests),
+	.cases = tests,
+};
diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c
new file mode 100644
index 000000000..60706f799
--- /dev/null
+++ b/app/test/test_ring_stress.c
@@ -0,0 +1,48 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress.h"
+
+static int
+run_test(const struct test *test)
+{
+	int32_t rc;
+	uint32_t i, k;
+
+	for (i = 0, k = 0; i != test->nb_case; i++) {
+
+		printf("TEST-CASE %s %s START\n",
+			test->name, test->cases[i].name);
+
+		rc = test->cases[i].func(test->cases[i].wfunc);
+		k += (rc == 0);
+
+		if (rc != 0)
+			printf("TEST-CASE %s %s FAILED\n",
+				test->name, test->cases[i].name);
+		else
+			printf("TEST-CASE %s %s OK\n",
+				test->name, test->cases[i].name);
+	}
+
+	return k;
+}
+
+static int
+test_ring_stress(void)
+{
+	uint32_t n, k;
+
+	n = 0;
+	k = 0;
+
+	n += test_ring_mpmc_stress.nb_case;
+	k += run_test(&test_ring_mpmc_stress);
+
+	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
+		n, k, n - k);
+	return (k != n);
+}
+
+REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress);
diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h
new file mode 100644
index 000000000..60eac6216
--- /dev/null
+++ b/app/test/test_ring_stress.h
@@ -0,0 +1,35 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+
+#include <inttypes.h>
+#include <stddef.h>
+#include <stdalign.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <rte_ring.h>
+#include <rte_cycles.h>
+#include <rte_launch.h>
+#include <rte_pause.h>
+#include <rte_random.h>
+#include <rte_malloc.h>
+#include <rte_spinlock.h>
+
+#include "test.h"
+
+struct test_case {
+	const char *name;
+	int (*func)(int (*)(void *));
+	int (*wfunc)(void *arg);
+};
+
+struct test {
+	const char *name;
+	uint32_t nb_case;
+	const struct test_case *cases;
+};
+
+extern const struct test test_ring_mpmc_stress;
diff --git a/app/test/test_ring_stress_impl.h b/app/test/test_ring_stress_impl.h
new file mode 100644
index 000000000..11476d28c
--- /dev/null
+++ b/app/test/test_ring_stress_impl.h
@@ -0,0 +1,444 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress.h"
+
+/*
+ * Measures performance of ring enqueue/dequeue under high contention
+ */
+
+#define RING_NAME	"RING_STRESS"
+#define BULK_NUM	32
+#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
+
+enum {
+	WRK_CMD_STOP,
+	WRK_CMD_RUN,
+};
+
+static volatile uint32_t wrk_cmd __rte_cache_aligned;
+
+/* test run-time in seconds */
+static const uint32_t run_time = 60;
+static const uint32_t verbose;
+
+struct lcore_stat {
+	uint64_t nb_cycle;
+	struct {
+		uint64_t nb_call;
+		uint64_t nb_obj;
+		uint64_t nb_cycle;
+		uint64_t max_cycle;
+		uint64_t min_cycle;
+	} op;
+};
+
+struct lcore_arg {
+	struct rte_ring *rng;
+	struct lcore_stat stats;
+} __rte_cache_aligned;
+
+struct ring_elem {
+	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
+} __rte_cache_aligned;
+
+/*
+ * redefinable functions
+ */
+static uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail);
+
+static uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free);
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
+
+
+static void
+lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
+	uint64_t tm, int32_t prcs)
+{
+	ls->op.nb_call += call;
+	ls->op.nb_obj += obj;
+	ls->op.nb_cycle += tm;
+	if (prcs) {
+		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
+		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
+	}
+}
+
+static void
+lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
+{
+
+	ms->op.nb_call += ls->op.nb_call;
+	ms->op.nb_obj += ls->op.nb_obj;
+	ms->op.nb_cycle += ls->op.nb_cycle;
+	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
+	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle);
+}
+
+static void
+lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
+{
+	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
+	lcore_op_stat_aggr(ms, ls);
+}
+
+static void
+lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
+{
+	long double st;
+
+	st = (long double)rte_get_timer_hz() / US_PER_S;
+
+	if (lc == UINT32_MAX)
+		fprintf(f, "%s(AGGREGATE)={\n", __func__);
+	else
+		fprintf(f, "%s(lc=%u)={\n", __func__, lc);
+
+	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
+		ls->nb_cycle, (long double)ls->nb_cycle / st);
+
+	fprintf(f, "\tDEQ+ENQ={\n");
+
+	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
+	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
+	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
+	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
+		(long double)ls->op.nb_obj / ls->op.nb_call);
+	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
+		(long double)ls->op.nb_cycle / ls->op.nb_obj);
+	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
+		(long double)ls->op.nb_cycle / ls->op.nb_call);
+
+	/* if min/max cycles per call stats was collected */
+	if (ls->op.min_cycle != UINT64_MAX) {
+		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+			ls->op.max_cycle,
+			(long double)ls->op.max_cycle / st);
+		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+			ls->op.min_cycle,
+			(long double)ls->op.min_cycle / st);
+	}
+
+	fprintf(f, "\t},\n");
+	fprintf(f, "};\n");
+}
+
+static void
+fill_ring_elm(struct ring_elem *elm, uint32_t fill)
+{
+	uint32_t i;
+
+	for (i = 0; i != RTE_DIM(elm->cnt); i++)
+		elm->cnt[i] = fill;
+}
+
+static int32_t
+check_updt_elem(struct ring_elem *elm[], uint32_t num,
+	const struct ring_elem *check, const struct ring_elem *fill)
+{
+	uint32_t i;
+
+	static rte_spinlock_t dump_lock;
+
+	for (i = 0; i != num; i++) {
+		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
+			rte_spinlock_lock(&dump_lock);
+			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
+				"offending object: %p\n",
+				__func__, rte_lcore_id(), num, i, elm[i]);
+			rte_memdump(stdout, "expected", check, sizeof(*check));
+			rte_memdump(stdout, "result", elm[i], sizeof(elm[i]));
+			rte_spinlock_unlock(&dump_lock);
+			return -EINVAL;
+		}
+		memcpy(elm[i], fill, sizeof(*elm[i]));
+	}
+
+	return 0;
+}
+
+static int
+check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
+	const char *fname, const char *opname)
+{
+	if (exp != res) {
+		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
+			fname, lc, opname, exp, res);
+		return -ENOSPC;
+	}
+	return 0;
+}
+
+static int
+test_worker_prcs(void *arg)
+{
+	int32_t rc;
+	uint32_t lc, n, num;
+	uint64_t cl, tm0, tm1;
+	struct lcore_arg *la;
+	struct ring_elem def_elm, loc_elm;
+	struct ring_elem *obj[2 * BULK_NUM];
+
+	la = arg;
+	lc = rte_lcore_id();
+
+	fill_ring_elm(&def_elm, UINT32_MAX);
+	fill_ring_elm(&loc_elm, lc);
+
+	while (wrk_cmd != WRK_CMD_RUN) {
+		rte_smp_rmb();
+		rte_pause();
+	}
+
+	cl = rte_rdtsc_precise();
+
+	do {
+		/* num in interval [7/8, 11/8] of BULK_NUM */
+		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
+
+		/* reset all pointer values */
+		memset(obj, 0, sizeof(obj));
+
+		/* dequeue num elems */
+		tm0 = rte_rdtsc_precise();
+		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
+		tm0 = rte_rdtsc_precise() - tm0;
+
+		/* check return value and objects */
+		rc = check_ring_op(num, n, lc, __func__,
+			RTE_STR(_st_ring_dequeue_bulk));
+		if (rc == 0)
+			rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
+		if (rc != 0)
+			break;
+
+		/* enqueue num elems */
+		rte_compiler_barrier();
+		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
+		if (rc != 0)
+			break;
+
+		tm1 = rte_rdtsc_precise();
+		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
+		tm1 = rte_rdtsc_precise() - tm1;
+
+		/* check return value */
+		rc = check_ring_op(num, n, lc, __func__,
+			RTE_STR(_st_ring_enqueue_bulk));
+		if (rc != 0)
+			break;
+
+		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, 1);
+
+	} while (wrk_cmd == WRK_CMD_RUN);
+
+	la->stats.nb_cycle = rte_rdtsc_precise() - cl;
+	return rc;
+}
+
+static int
+test_worker_avg(void *arg)
+{
+	int32_t rc;
+	uint32_t lc, n, num;
+	uint64_t cl;
+	struct lcore_arg *la;
+	struct ring_elem def_elm, loc_elm;
+	struct ring_elem *obj[2 * BULK_NUM];
+
+	la = arg;
+	lc = rte_lcore_id();
+
+	fill_ring_elm(&def_elm, UINT32_MAX);
+	fill_ring_elm(&loc_elm, lc);
+
+	while (wrk_cmd != WRK_CMD_RUN) {
+		rte_smp_rmb();
+		rte_pause();
+	}
+
+	cl = rte_rdtsc_precise();
+
+	do {
+		/* num in interval [7/8, 11/8] of BULK_NUM */
+		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
+
+		/* reset all pointer values */
+		memset(obj, 0, sizeof(obj));
+
+		/* dequeue num elems */
+		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
+
+		/* check return value and objects */
+		rc = check_ring_op(num, n, lc, __func__,
+			RTE_STR(_st_ring_dequeue_bulk));
+		if (rc == 0)
+			rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
+		if (rc != 0)
+			break;
+
+		/* enqueue num elems */
+		rte_compiler_barrier();
+		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
+		if (rc != 0)
+			break;
+
+		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
+
+		/* check return value */
+		rc = check_ring_op(num, n, lc, __func__,
+			RTE_STR(_st_ring_enqueue_bulk));
+		if (rc != 0)
+			break;
+
+		lcore_stat_update(&la->stats, 1, num, 0, 0);
+
+	} while (wrk_cmd == WRK_CMD_RUN);
+
+	/* final stats update */
+	cl = rte_rdtsc_precise() - cl;
+	lcore_stat_update(&la->stats, 0, 0, cl, 0);
+	la->stats.nb_cycle = cl;
+
+	return rc;
+}
+
+static void
+mt1_fini(struct rte_ring *rng, void *data)
+{
+	rte_free(rng);
+	rte_free(data);
+}
+
+static int
+mt1_init(struct rte_ring **rng, void **data, uint32_t num)
+{
+	int32_t rc;
+	size_t sz;
+	uint32_t i, nr;
+	struct rte_ring *r;
+	struct ring_elem *elm;
+	void *p;
+
+	*rng = NULL;
+	*data = NULL;
+
+	sz = num * sizeof(*elm);
+	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
+	if (elm == NULL) {
+		printf("%s: alloc(%zu) for %u elems data failed",
+			__func__, sz, num);
+		return -ENOMEM;
+	}
+
+	*data = elm;
+
+	/* alloc ring */
+	nr = 2 * num;
+	sz = rte_ring_get_memsize(nr);
+	r = rte_zmalloc(NULL, sz, __alignof__(*r));
+	if (r == NULL) {
+		printf("%s: alloc(%zu) for FIFO with %u elems failed",
+			__func__, sz, nr);
+		return -ENOMEM;
+	}
+
+	*rng = r;
+
+	rc = _st_ring_init(r, RING_NAME, nr);
+	if (rc != 0) {
+		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
+			__func__, r, nr, rc, strerror(-rc));
+		return rc;
+	}
+
+	for (i = 0; i != num; i++) {
+		fill_ring_elm(elm + i, UINT32_MAX);
+		p = elm + i;
+		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
+			break;
+	}
+
+	if (i != num) {
+		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
+			__func__, r, num, i);
+		return -ENOSPC;
+	}
+
+	return 0;
+}
+
+static int
+test_mt1(int (*test)(void *))
+{
+	int32_t rc;
+	uint32_t lc, mc;
+	struct rte_ring *r;
+	void *data;
+	struct lcore_arg arg[RTE_MAX_LCORE];
+
+	static const struct lcore_stat init_stat = {
+		.op.min_cycle = UINT64_MAX,
+	};
+
+	rc = mt1_init(&r, &data, RING_SIZE);
+	if (rc != 0) {
+		mt1_fini(r, data);
+		return rc;
+	}
+
+	memset(arg, 0, sizeof(arg));
+
+	/* launch on all slaves */
+	RTE_LCORE_FOREACH_SLAVE(lc) {
+		arg[lc].rng = r;
+		arg[lc].stats = init_stat;
+		rte_eal_remote_launch(test, &arg[lc], lc);
+	}
+
+	/* signal worker to start test */
+	wrk_cmd = WRK_CMD_RUN;
+	rte_smp_wmb();
+
+	usleep(run_time * US_PER_S);
+
+	/* signal worker to start test */
+	wrk_cmd = WRK_CMD_STOP;
+	rte_smp_wmb();
+
+	/* wait for slaves and collect stats. */
+	mc = rte_lcore_id();
+	arg[mc].stats = init_stat;
+
+	rc = 0;
+	RTE_LCORE_FOREACH_SLAVE(lc) {
+		rc |= rte_eal_wait_lcore(lc);
+		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
+		if (verbose != 0)
+			lcore_stat_dump(stdout, lc, &arg[lc].stats);
+	}
+
+	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
+	mt1_fini(r, data);
+	return rc;
+}
+
+static const struct test_case tests[] = {
+	{
+		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
+		.func = test_mt1,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
+		.func = test_mt1,
+		.wfunc = test_worker_avg,
+	},
+};