[v5,1/9] test/ring: add contention stress test

Message ID 20200418163225.17635-2-konstantin.ananyev@intel.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series New sync modes for ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-nxp-Performance success Performance Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation fail Compilation issues
ci/iol-testing fail Testing issues

Commit Message

Ananyev, Konstantin April 18, 2020, 4:32 p.m. UTC
  Introduce stress test for ring enqueue/dequeue operations.
Performs the following pattern on each slave worker:
dequeue/read-write data from the dequeued objects/enqueue.
Serves as both functional and performance test of ring
enqueue/dequeue operations under high contention
(for both over committed and non-over committed scenarios).

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 app/test/Makefile                |   2 +
 app/test/meson.build             |   2 +
 app/test/test_ring_mpmc_stress.c |  31 +++
 app/test/test_ring_stress.c      |  48 ++++
 app/test/test_ring_stress.h      |  35 +++
 app/test/test_ring_stress_impl.h | 396 +++++++++++++++++++++++++++++++
 6 files changed, 514 insertions(+)
 create mode 100644 app/test/test_ring_mpmc_stress.c
 create mode 100644 app/test/test_ring_stress.c
 create mode 100644 app/test/test_ring_stress.h
 create mode 100644 app/test/test_ring_stress_impl.h
  

Comments

Honnappa Nagarahalli April 19, 2020, 2:30 a.m. UTC | #1
<snip>

> 
> Introduce stress test for ring enqueue/dequeue operations.
> Performs the following pattern on each slave worker:
> dequeue/read-write data from the dequeued objects/enqueue.
> Serves as both functional and performance test of ring enqueue/dequeue
> operations under high contention (for both over committed and non-over
> committed scenarios).
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
ci/intel-compilation fails for meson due to clang+32b. I believe it is solved by [1] (as you indicated). Can you make this patch dependent on [1]?
Otherwise,
Acked-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

[1] http://patches.dpdk.org/patch/68280/
> ---
>  app/test/Makefile                |   2 +
>  app/test/meson.build             |   2 +
>  app/test/test_ring_mpmc_stress.c |  31 +++
>  app/test/test_ring_stress.c      |  48 ++++
>  app/test/test_ring_stress.h      |  35 +++
>  app/test/test_ring_stress_impl.h | 396 +++++++++++++++++++++++++++++++
>  6 files changed, 514 insertions(+)
>  create mode 100644 app/test/test_ring_mpmc_stress.c  create mode 100644
> app/test/test_ring_stress.c  create mode 100644 app/test/test_ring_stress.h
> create mode 100644 app/test/test_ring_stress_impl.h
> 
> diff --git a/app/test/Makefile b/app/test/Makefile index
> be53d33c3..a23a011df 100644
> --- a/app/test/Makefile
> +++ b/app/test/Makefile
> @@ -77,7 +77,9 @@ SRCS-y += test_external_mem.c  SRCS-y +=
> test_rand_perf.c
> 
>  SRCS-y += test_ring.c
> +SRCS-y += test_ring_mpmc_stress.c
>  SRCS-y += test_ring_perf.c
> +SRCS-y += test_ring_stress.c
>  SRCS-y += test_pmd_perf.c
> 
>  ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y)
> diff --git a/app/test/meson.build b/app/test/meson.build index
> 04b59cffa..8824f366c 100644
> --- a/app/test/meson.build
> +++ b/app/test/meson.build
> @@ -100,7 +100,9 @@ test_sources = files('commands.c',
>  	'test_rib.c',
>  	'test_rib6.c',
>  	'test_ring.c',
> +	'test_ring_mpmc_stress.c',
>  	'test_ring_perf.c',
> +	'test_ring_stress.c',
>  	'test_rwlock.c',
>  	'test_sched.c',
>  	'test_service_cores.c',
> diff --git a/app/test/test_ring_mpmc_stress.c
> b/app/test/test_ring_mpmc_stress.c
> new file mode 100644
> index 000000000..1524b0248
> --- /dev/null
> +++ b/app/test/test_ring_mpmc_stress.c
> @@ -0,0 +1,31 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include "test_ring_stress_impl.h"
> +
> +static inline uint32_t
> +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> +	uint32_t *avail)
> +{
> +	return rte_ring_mc_dequeue_bulk(r, obj, n, avail); }
> +
> +static inline uint32_t
> +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> +	uint32_t *free)
> +{
> +	return rte_ring_mp_enqueue_bulk(r, obj, n, free); }
> +
> +static int
> +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) {
> +	return rte_ring_init(r, name, num, 0); }
> +
> +const struct test test_ring_mpmc_stress = {
> +	.name = "MP/MC",
> +	.nb_case = RTE_DIM(tests),
> +	.cases = tests,
> +};
> diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c new file
> mode 100644 index 000000000..60706f799
> --- /dev/null
> +++ b/app/test/test_ring_stress.c
> @@ -0,0 +1,48 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include "test_ring_stress.h"
> +
> +static int
> +run_test(const struct test *test)
> +{
> +	int32_t rc;
> +	uint32_t i, k;
> +
> +	for (i = 0, k = 0; i != test->nb_case; i++) {
> +
> +		printf("TEST-CASE %s %s START\n",
> +			test->name, test->cases[i].name);
> +
> +		rc = test->cases[i].func(test->cases[i].wfunc);
> +		k += (rc == 0);
> +
> +		if (rc != 0)
> +			printf("TEST-CASE %s %s FAILED\n",
> +				test->name, test->cases[i].name);
> +		else
> +			printf("TEST-CASE %s %s OK\n",
> +				test->name, test->cases[i].name);
> +	}
> +
> +	return k;
> +}
> +
> +static int
> +test_ring_stress(void)
> +{
> +	uint32_t n, k;
> +
> +	n = 0;
> +	k = 0;
> +
> +	n += test_ring_mpmc_stress.nb_case;
> +	k += run_test(&test_ring_mpmc_stress);
> +
> +	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
> +		n, k, n - k);
> +	return (k != n);
> +}
> +
> +REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress);
> diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h new file
> mode 100644 index 000000000..60eac6216
> --- /dev/null
> +++ b/app/test/test_ring_stress.h
> @@ -0,0 +1,35 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +
> +#include <inttypes.h>
> +#include <stddef.h>
> +#include <stdalign.h>
> +#include <string.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#include <rte_ring.h>
> +#include <rte_cycles.h>
> +#include <rte_launch.h>
> +#include <rte_pause.h>
> +#include <rte_random.h>
> +#include <rte_malloc.h>
> +#include <rte_spinlock.h>
> +
> +#include "test.h"
> +
> +struct test_case {
> +	const char *name;
> +	int (*func)(int (*)(void *));
> +	int (*wfunc)(void *arg);
> +};
> +
> +struct test {
> +	const char *name;
> +	uint32_t nb_case;
> +	const struct test_case *cases;
> +};
> +
> +extern const struct test test_ring_mpmc_stress;
> diff --git a/app/test/test_ring_stress_impl.h
> b/app/test/test_ring_stress_impl.h
> new file mode 100644
> index 000000000..222d62bc4
> --- /dev/null
> +++ b/app/test/test_ring_stress_impl.h
> @@ -0,0 +1,396 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include "test_ring_stress.h"
> +
> +/**
> + * Stress test for ring enqueue/dequeue operations.
> + * Performs the following pattern on each slave worker:
> + * dequeue/read-write data from the dequeued objects/enqueue.
> + * Serves as both functional and performance test of ring
> + * enqueue/dequeue operations under high contention
> + * (for both over committed and non-over committed scenarios).
> + */
> +
> +#define RING_NAME	"RING_STRESS"
> +#define BULK_NUM	32
> +#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
> +
> +enum {
> +	WRK_CMD_STOP,
> +	WRK_CMD_RUN,
> +};
> +
> +static volatile uint32_t wrk_cmd __rte_cache_aligned;
> +
> +/* test run-time in seconds */
> +static const uint32_t run_time = 60;
> +static const uint32_t verbose;
> +
> +struct lcore_stat {
> +	uint64_t nb_cycle;
> +	struct {
> +		uint64_t nb_call;
> +		uint64_t nb_obj;
> +		uint64_t nb_cycle;
> +		uint64_t max_cycle;
> +		uint64_t min_cycle;
> +	} op;
> +};
> +
> +struct lcore_arg {
> +	struct rte_ring *rng;
> +	struct lcore_stat stats;
> +} __rte_cache_aligned;
> +
> +struct ring_elem {
> +	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; }
> +__rte_cache_aligned;
> +
> +/*
> + * redefinable functions
> + */
> +static uint32_t
> +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
> +	uint32_t *avail);
> +
> +static uint32_t
> +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
> +	uint32_t *free);
> +
> +static int
> +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
> +
> +
> +static void
> +lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
> +	uint64_t tm, int32_t prcs)
> +{
> +	ls->op.nb_call += call;
> +	ls->op.nb_obj += obj;
> +	ls->op.nb_cycle += tm;
> +	if (prcs) {
> +		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
> +		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
> +	}
> +}
> +
> +static void
> +lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
> +{
> +
> +	ms->op.nb_call += ls->op.nb_call;
> +	ms->op.nb_obj += ls->op.nb_obj;
> +	ms->op.nb_cycle += ls->op.nb_cycle;
> +	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
> +	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); }
> +
> +static void
> +lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) {
> +	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
> +	lcore_op_stat_aggr(ms, ls);
> +}
> +
> +static void
> +lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) {
> +	long double st;
> +
> +	st = (long double)rte_get_timer_hz() / US_PER_S;
> +
> +	if (lc == UINT32_MAX)
> +		fprintf(f, "%s(AGGREGATE)={\n", __func__);
> +	else
> +		fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
> +
> +	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
> +		ls->nb_cycle, (long double)ls->nb_cycle / st);
> +
> +	fprintf(f, "\tDEQ+ENQ={\n");
> +
> +	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
> +	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
> +	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
> +	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
> +		(long double)ls->op.nb_obj / ls->op.nb_call);
> +	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
> +		(long double)ls->op.nb_cycle / ls->op.nb_obj);
> +	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
> +		(long double)ls->op.nb_cycle / ls->op.nb_call);
> +
> +	/* if min/max cycles per call stats was collected */
> +	if (ls->op.min_cycle != UINT64_MAX) {
> +		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> +			ls->op.max_cycle,
> +			(long double)ls->op.max_cycle / st);
> +		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
> +			ls->op.min_cycle,
> +			(long double)ls->op.min_cycle / st);
> +	}
> +
> +	fprintf(f, "\t},\n");
> +	fprintf(f, "};\n");
> +}
> +
> +static void
> +fill_ring_elm(struct ring_elem *elm, uint32_t fill) {
> +	uint32_t i;
> +
> +	for (i = 0; i != RTE_DIM(elm->cnt); i++)
> +		elm->cnt[i] = fill;
> +}
> +
> +static int32_t
> +check_updt_elem(struct ring_elem *elm[], uint32_t num,
> +	const struct ring_elem *check, const struct ring_elem *fill) {
> +	uint32_t i;
> +
> +	static rte_spinlock_t dump_lock;
> +
> +	for (i = 0; i != num; i++) {
> +		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
> +			rte_spinlock_lock(&dump_lock);
> +			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
> +				"offending object: %p\n",
> +				__func__, rte_lcore_id(), num, i, elm[i]);
> +			rte_memdump(stdout, "expected", check,
> sizeof(*check));
> +			rte_memdump(stdout, "result", elm[i], sizeof(elm[i]));
> +			rte_spinlock_unlock(&dump_lock);
> +			return -EINVAL;
> +		}
> +		memcpy(elm[i], fill, sizeof(*elm[i]));
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
> +	const char *fname, const char *opname) {
> +	if (exp != res) {
> +		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
> +			fname, lc, opname, exp, res);
> +		return -ENOSPC;
> +	}
> +	return 0;
> +}
> +
> +static int
> +test_worker(void *arg, const char *fname, int32_t prcs) {
> +	int32_t rc;
> +	uint32_t lc, n, num;
> +	uint64_t cl, tm0, tm1;
> +	struct lcore_arg *la;
> +	struct ring_elem def_elm, loc_elm;
> +	struct ring_elem *obj[2 * BULK_NUM];
> +
> +	la = arg;
> +	lc = rte_lcore_id();
> +
> +	fill_ring_elm(&def_elm, UINT32_MAX);
> +	fill_ring_elm(&loc_elm, lc);
> +
> +	while (wrk_cmd != WRK_CMD_RUN) {
> +		rte_smp_rmb();
> +		rte_pause();
> +	}
> +
> +	cl = rte_rdtsc_precise();
> +
> +	do {
> +		/* num in interval [7/8, 11/8] of BULK_NUM */
> +		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
> +
> +		/* reset all pointer values */
> +		memset(obj, 0, sizeof(obj));
> +
> +		/* dequeue num elems */
> +		tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
> +		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
> +		tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
> +
> +		/* check return value and objects */
> +		rc = check_ring_op(num, n, lc, fname,
> +			RTE_STR(_st_ring_dequeue_bulk));
> +		if (rc == 0)
> +			rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
> +		if (rc != 0)
> +			break;
> +
> +		/* enqueue num elems */
> +		rte_compiler_barrier();
> +		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
> +		if (rc != 0)
> +			break;
> +
> +		tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
> +		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
> +		tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
> +
> +		/* check return value */
> +		rc = check_ring_op(num, n, lc, fname,
> +			RTE_STR(_st_ring_enqueue_bulk));
> +		if (rc != 0)
> +			break;
> +
> +		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs);
> +
> +	} while (wrk_cmd == WRK_CMD_RUN);
> +
> +	cl = rte_rdtsc_precise() - cl;
> +	if (prcs == 0)
> +		lcore_stat_update(&la->stats, 0, 0, cl, 0);
> +	la->stats.nb_cycle = cl;
> +	return rc;
> +}
> +static int
> +test_worker_prcs(void *arg)
> +{
> +	return test_worker(arg, __func__, 1);
> +}
> +
> +static int
> +test_worker_avg(void *arg)
> +{
> +	return test_worker(arg, __func__, 0);
> +}
> +
> +static void
> +mt1_fini(struct rte_ring *rng, void *data) {
> +	rte_free(rng);
> +	rte_free(data);
> +}
> +
> +static int
> +mt1_init(struct rte_ring **rng, void **data, uint32_t num) {
> +	int32_t rc;
> +	size_t sz;
> +	uint32_t i, nr;
> +	struct rte_ring *r;
> +	struct ring_elem *elm;
> +	void *p;
> +
> +	*rng = NULL;
> +	*data = NULL;
> +
> +	sz = num * sizeof(*elm);
> +	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
> +	if (elm == NULL) {
> +		printf("%s: alloc(%zu) for %u elems data failed",
> +			__func__, sz, num);
> +		return -ENOMEM;
> +	}
> +
> +	*data = elm;
> +
> +	/* alloc ring */
> +	nr = 2 * num;
> +	sz = rte_ring_get_memsize(nr);
> +	r = rte_zmalloc(NULL, sz, __alignof__(*r));
> +	if (r == NULL) {
> +		printf("%s: alloc(%zu) for FIFO with %u elems failed",
> +			__func__, sz, nr);
> +		return -ENOMEM;
> +	}
> +
> +	*rng = r;
> +
> +	rc = _st_ring_init(r, RING_NAME, nr);
> +	if (rc != 0) {
> +		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
> +			__func__, r, nr, rc, strerror(-rc));
> +		return rc;
> +	}
> +
> +	for (i = 0; i != num; i++) {
> +		fill_ring_elm(elm + i, UINT32_MAX);
> +		p = elm + i;
> +		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
> +			break;
> +	}
> +
> +	if (i != num) {
> +		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
> +			__func__, r, num, i);
> +		return -ENOSPC;
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +test_mt1(int (*test)(void *))
> +{
> +	int32_t rc;
> +	uint32_t lc, mc;
> +	struct rte_ring *r;
> +	void *data;
> +	struct lcore_arg arg[RTE_MAX_LCORE];
> +
> +	static const struct lcore_stat init_stat = {
> +		.op.min_cycle = UINT64_MAX,
> +	};
> +
> +	rc = mt1_init(&r, &data, RING_SIZE);
> +	if (rc != 0) {
> +		mt1_fini(r, data);
> +		return rc;
> +	}
> +
> +	memset(arg, 0, sizeof(arg));
> +
> +	/* launch on all slaves */
> +	RTE_LCORE_FOREACH_SLAVE(lc) {
> +		arg[lc].rng = r;
> +		arg[lc].stats = init_stat;
> +		rte_eal_remote_launch(test, &arg[lc], lc);
> +	}
> +
> +	/* signal worker to start test */
> +	wrk_cmd = WRK_CMD_RUN;
> +	rte_smp_wmb();
> +
> +	usleep(run_time * US_PER_S);
> +
> +	/* signal worker to start test */
> +	wrk_cmd = WRK_CMD_STOP;
> +	rte_smp_wmb();
> +
> +	/* wait for slaves and collect stats. */
> +	mc = rte_lcore_id();
> +	arg[mc].stats = init_stat;
> +
> +	rc = 0;
> +	RTE_LCORE_FOREACH_SLAVE(lc) {
> +		rc |= rte_eal_wait_lcore(lc);
> +		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
> +		if (verbose != 0)
> +			lcore_stat_dump(stdout, lc, &arg[lc].stats);
> +	}
> +
> +	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
> +	mt1_fini(r, data);
> +	return rc;
> +}
> +
> +static const struct test_case tests[] = {
> +	{
> +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
> +		.func = test_mt1,
> +		.wfunc = test_worker_prcs,
> +	},
> +	{
> +		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
> +		.func = test_mt1,
> +		.wfunc = test_worker_avg,
> +	},
> +};
> --
> 2.17.1
  
David Marchand April 19, 2020, 8:03 a.m. UTC | #2
On Sun, Apr 19, 2020 at 4:31 AM Honnappa Nagarahalli
<Honnappa.Nagarahalli@arm.com> wrote:
> > Introduce stress test for ring enqueue/dequeue operations.
> > Performs the following pattern on each slave worker:
> > dequeue/read-write data from the dequeued objects/enqueue.
> > Serves as both functional and performance test of ring enqueue/dequeue
> > operations under high contention (for both over committed and non-over
> > committed scenarios).
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ci/intel-compilation fails for meson due to clang+32b. I believe it is solved by [1] (as you indicated). Can you make this patch dependent on [1]?
> Otherwise,
> Acked-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
>
> [1] http://patches.dpdk.org/patch/68280/

I will take this patch as part of the first series that makes it into
master (between ring or traces series).
  
Ananyev, Konstantin April 19, 2020, 11:47 a.m. UTC | #3
> 
> On Sun, Apr 19, 2020 at 4:31 AM Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com> wrote:
> > > Introduce stress test for ring enqueue/dequeue operations.
> > > Performs the following pattern on each slave worker:
> > > dequeue/read-write data from the dequeued objects/enqueue.
> > > Serves as both functional and performance test of ring enqueue/dequeue
> > > operations under high contention (for both over committed and non-over
> > > committed scenarios).
> > >
> > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > ci/intel-compilation fails for meson due to clang+32b. I believe it is solved by [1] (as you indicated). Can you make this patch dependent on
> [1]?
> > Otherwise,
> > Acked-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> >
> > [1] http://patches.dpdk.org/patch/68280/
> 
> I will take this patch as part of the first series that makes it into
> master (between ring or traces series).

Thanks David.
BTW, do we have a special keyword for that (Depends-on: ... or so)?
  

Patch

diff --git a/app/test/Makefile b/app/test/Makefile
index be53d33c3..a23a011df 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -77,7 +77,9 @@  SRCS-y += test_external_mem.c
 SRCS-y += test_rand_perf.c
 
 SRCS-y += test_ring.c
+SRCS-y += test_ring_mpmc_stress.c
 SRCS-y += test_ring_perf.c
+SRCS-y += test_ring_stress.c
 SRCS-y += test_pmd_perf.c
 
 ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y)
diff --git a/app/test/meson.build b/app/test/meson.build
index 04b59cffa..8824f366c 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -100,7 +100,9 @@  test_sources = files('commands.c',
 	'test_rib.c',
 	'test_rib6.c',
 	'test_ring.c',
+	'test_ring_mpmc_stress.c',
 	'test_ring_perf.c',
+	'test_ring_stress.c',
 	'test_rwlock.c',
 	'test_sched.c',
 	'test_service_cores.c',
diff --git a/app/test/test_ring_mpmc_stress.c b/app/test/test_ring_mpmc_stress.c
new file mode 100644
index 000000000..1524b0248
--- /dev/null
+++ b/app/test/test_ring_mpmc_stress.c
@@ -0,0 +1,31 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress_impl.h"
+
+static inline uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail)
+{
+	return rte_ring_mc_dequeue_bulk(r, obj, n, avail);
+}
+
+static inline uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free)
+{
+	return rte_ring_mp_enqueue_bulk(r, obj, n, free);
+}
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num)
+{
+	return rte_ring_init(r, name, num, 0);
+}
+
+const struct test test_ring_mpmc_stress = {
+	.name = "MP/MC",
+	.nb_case = RTE_DIM(tests),
+	.cases = tests,
+};
diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c
new file mode 100644
index 000000000..60706f799
--- /dev/null
+++ b/app/test/test_ring_stress.c
@@ -0,0 +1,48 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress.h"
+
+static int
+run_test(const struct test *test)
+{
+	int32_t rc;
+	uint32_t i, k;
+
+	for (i = 0, k = 0; i != test->nb_case; i++) {
+
+		printf("TEST-CASE %s %s START\n",
+			test->name, test->cases[i].name);
+
+		rc = test->cases[i].func(test->cases[i].wfunc);
+		k += (rc == 0);
+
+		if (rc != 0)
+			printf("TEST-CASE %s %s FAILED\n",
+				test->name, test->cases[i].name);
+		else
+			printf("TEST-CASE %s %s OK\n",
+				test->name, test->cases[i].name);
+	}
+
+	return k;
+}
+
+static int
+test_ring_stress(void)
+{
+	uint32_t n, k;
+
+	n = 0;
+	k = 0;
+
+	n += test_ring_mpmc_stress.nb_case;
+	k += run_test(&test_ring_mpmc_stress);
+
+	printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n",
+		n, k, n - k);
+	return (k != n);
+}
+
+REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress);
diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h
new file mode 100644
index 000000000..60eac6216
--- /dev/null
+++ b/app/test/test_ring_stress.h
@@ -0,0 +1,35 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+
+#include <inttypes.h>
+#include <stddef.h>
+#include <stdalign.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <rte_ring.h>
+#include <rte_cycles.h>
+#include <rte_launch.h>
+#include <rte_pause.h>
+#include <rte_random.h>
+#include <rte_malloc.h>
+#include <rte_spinlock.h>
+
+#include "test.h"
+
+struct test_case {
+	const char *name;
+	int (*func)(int (*)(void *));
+	int (*wfunc)(void *arg);
+};
+
+struct test {
+	const char *name;
+	uint32_t nb_case;
+	const struct test_case *cases;
+};
+
+extern const struct test test_ring_mpmc_stress;
diff --git a/app/test/test_ring_stress_impl.h b/app/test/test_ring_stress_impl.h
new file mode 100644
index 000000000..222d62bc4
--- /dev/null
+++ b/app/test/test_ring_stress_impl.h
@@ -0,0 +1,396 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include "test_ring_stress.h"
+
+/**
+ * Stress test for ring enqueue/dequeue operations.
+ * Performs the following pattern on each slave worker:
+ * dequeue/read-write data from the dequeued objects/enqueue.
+ * Serves as both functional and performance test of ring
+ * enqueue/dequeue operations under high contention
+ * (for both over committed and non-over committed scenarios).
+ */
+
+#define RING_NAME	"RING_STRESS"
+#define BULK_NUM	32
+#define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
+
+enum {
+	WRK_CMD_STOP,
+	WRK_CMD_RUN,
+};
+
+static volatile uint32_t wrk_cmd __rte_cache_aligned;
+
+/* test run-time in seconds */
+static const uint32_t run_time = 60;
+static const uint32_t verbose;
+
+struct lcore_stat {
+	uint64_t nb_cycle;
+	struct {
+		uint64_t nb_call;
+		uint64_t nb_obj;
+		uint64_t nb_cycle;
+		uint64_t max_cycle;
+		uint64_t min_cycle;
+	} op;
+};
+
+struct lcore_arg {
+	struct rte_ring *rng;
+	struct lcore_stat stats;
+} __rte_cache_aligned;
+
+struct ring_elem {
+	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
+} __rte_cache_aligned;
+
+/*
+ * redefinable functions
+ */
+static uint32_t
+_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
+	uint32_t *avail);
+
+static uint32_t
+_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
+	uint32_t *free);
+
+static int
+_st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
+
+
+static void
+lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
+	uint64_t tm, int32_t prcs)
+{
+	ls->op.nb_call += call;
+	ls->op.nb_obj += obj;
+	ls->op.nb_cycle += tm;
+	if (prcs) {
+		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
+		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
+	}
+}
+
+static void
+lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
+{
+
+	ms->op.nb_call += ls->op.nb_call;
+	ms->op.nb_obj += ls->op.nb_obj;
+	ms->op.nb_cycle += ls->op.nb_cycle;
+	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
+	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle);
+}
+
+static void
+lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
+{
+	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
+	lcore_op_stat_aggr(ms, ls);
+}
+
+static void
+lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
+{
+	long double st;
+
+	st = (long double)rte_get_timer_hz() / US_PER_S;
+
+	if (lc == UINT32_MAX)
+		fprintf(f, "%s(AGGREGATE)={\n", __func__);
+	else
+		fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
+
+	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
+		ls->nb_cycle, (long double)ls->nb_cycle / st);
+
+	fprintf(f, "\tDEQ+ENQ={\n");
+
+	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
+	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
+	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
+	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
+		(long double)ls->op.nb_obj / ls->op.nb_call);
+	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
+		(long double)ls->op.nb_cycle / ls->op.nb_obj);
+	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
+		(long double)ls->op.nb_cycle / ls->op.nb_call);
+
+	/* if min/max cycles per call stats was collected */
+	if (ls->op.min_cycle != UINT64_MAX) {
+		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+			ls->op.max_cycle,
+			(long double)ls->op.max_cycle / st);
+		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
+			ls->op.min_cycle,
+			(long double)ls->op.min_cycle / st);
+	}
+
+	fprintf(f, "\t},\n");
+	fprintf(f, "};\n");
+}
+
+static void
+fill_ring_elm(struct ring_elem *elm, uint32_t fill)
+{
+	uint32_t i;
+
+	for (i = 0; i != RTE_DIM(elm->cnt); i++)
+		elm->cnt[i] = fill;
+}
+
+static int32_t
+check_updt_elem(struct ring_elem *elm[], uint32_t num,
+	const struct ring_elem *check, const struct ring_elem *fill)
+{
+	uint32_t i;
+
+	static rte_spinlock_t dump_lock;
+
+	for (i = 0; i != num; i++) {
+		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
+			rte_spinlock_lock(&dump_lock);
+			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
+				"offending object: %p\n",
+				__func__, rte_lcore_id(), num, i, elm[i]);
+			rte_memdump(stdout, "expected", check, sizeof(*check));
+			rte_memdump(stdout, "result", elm[i], sizeof(elm[i]));
+			rte_spinlock_unlock(&dump_lock);
+			return -EINVAL;
+		}
+		memcpy(elm[i], fill, sizeof(*elm[i]));
+	}
+
+	return 0;
+}
+
+static int
+check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
+	const char *fname, const char *opname)
+{
+	if (exp != res) {
+		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
+			fname, lc, opname, exp, res);
+		return -ENOSPC;
+	}
+	return 0;
+}
+
+static int
+test_worker(void *arg, const char *fname, int32_t prcs)
+{
+	int32_t rc;
+	uint32_t lc, n, num;
+	uint64_t cl, tm0, tm1;
+	struct lcore_arg *la;
+	struct ring_elem def_elm, loc_elm;
+	struct ring_elem *obj[2 * BULK_NUM];
+
+	la = arg;
+	lc = rte_lcore_id();
+
+	fill_ring_elm(&def_elm, UINT32_MAX);
+	fill_ring_elm(&loc_elm, lc);
+
+	while (wrk_cmd != WRK_CMD_RUN) {
+		rte_smp_rmb();
+		rte_pause();
+	}
+
+	cl = rte_rdtsc_precise();
+
+	do {
+		/* num in interval [7/8, 11/8] of BULK_NUM */
+		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
+
+		/* reset all pointer values */
+		memset(obj, 0, sizeof(obj));
+
+		/* dequeue num elems */
+		tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
+		tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
+
+		/* check return value and objects */
+		rc = check_ring_op(num, n, lc, fname,
+			RTE_STR(_st_ring_dequeue_bulk));
+		if (rc == 0)
+			rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
+		if (rc != 0)
+			break;
+
+		/* enqueue num elems */
+		rte_compiler_barrier();
+		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
+		if (rc != 0)
+			break;
+
+		tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
+		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
+		tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
+
+		/* check return value */
+		rc = check_ring_op(num, n, lc, fname,
+			RTE_STR(_st_ring_enqueue_bulk));
+		if (rc != 0)
+			break;
+
+		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs);
+
+	} while (wrk_cmd == WRK_CMD_RUN);
+
+	cl = rte_rdtsc_precise() - cl;
+	if (prcs == 0)
+		lcore_stat_update(&la->stats, 0, 0, cl, 0);
+	la->stats.nb_cycle = cl;
+	return rc;
+}
+static int
+test_worker_prcs(void *arg)
+{
+	return test_worker(arg, __func__, 1);
+}
+
+static int
+test_worker_avg(void *arg)
+{
+	return test_worker(arg, __func__, 0);
+}
+
+static void
+mt1_fini(struct rte_ring *rng, void *data)
+{
+	rte_free(rng);
+	rte_free(data);
+}
+
+static int
+mt1_init(struct rte_ring **rng, void **data, uint32_t num)
+{
+	int32_t rc;
+	size_t sz;
+	uint32_t i, nr;
+	struct rte_ring *r;
+	struct ring_elem *elm;
+	void *p;
+
+	*rng = NULL;
+	*data = NULL;
+
+	sz = num * sizeof(*elm);
+	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
+	if (elm == NULL) {
+		printf("%s: alloc(%zu) for %u elems data failed",
+			__func__, sz, num);
+		return -ENOMEM;
+	}
+
+	*data = elm;
+
+	/* alloc ring */
+	nr = 2 * num;
+	sz = rte_ring_get_memsize(nr);
+	r = rte_zmalloc(NULL, sz, __alignof__(*r));
+	if (r == NULL) {
+		printf("%s: alloc(%zu) for FIFO with %u elems failed",
+			__func__, sz, nr);
+		return -ENOMEM;
+	}
+
+	*rng = r;
+
+	rc = _st_ring_init(r, RING_NAME, nr);
+	if (rc != 0) {
+		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
+			__func__, r, nr, rc, strerror(-rc));
+		return rc;
+	}
+
+	for (i = 0; i != num; i++) {
+		fill_ring_elm(elm + i, UINT32_MAX);
+		p = elm + i;
+		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
+			break;
+	}
+
+	if (i != num) {
+		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
+			__func__, r, num, i);
+		return -ENOSPC;
+	}
+
+	return 0;
+}
+
+static int
+test_mt1(int (*test)(void *))
+{
+	int32_t rc;
+	uint32_t lc, mc;
+	struct rte_ring *r;
+	void *data;
+	struct lcore_arg arg[RTE_MAX_LCORE];
+
+	static const struct lcore_stat init_stat = {
+		.op.min_cycle = UINT64_MAX,
+	};
+
+	rc = mt1_init(&r, &data, RING_SIZE);
+	if (rc != 0) {
+		mt1_fini(r, data);
+		return rc;
+	}
+
+	memset(arg, 0, sizeof(arg));
+
+	/* launch on all slaves */
+	RTE_LCORE_FOREACH_SLAVE(lc) {
+		arg[lc].rng = r;
+		arg[lc].stats = init_stat;
+		rte_eal_remote_launch(test, &arg[lc], lc);
+	}
+
+	/* signal worker to start test */
+	wrk_cmd = WRK_CMD_RUN;
+	rte_smp_wmb();
+
+	usleep(run_time * US_PER_S);
+
+	/* signal worker to start test */
+	wrk_cmd = WRK_CMD_STOP;
+	rte_smp_wmb();
+
+	/* wait for slaves and collect stats. */
+	mc = rte_lcore_id();
+	arg[mc].stats = init_stat;
+
+	rc = 0;
+	RTE_LCORE_FOREACH_SLAVE(lc) {
+		rc |= rte_eal_wait_lcore(lc);
+		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
+		if (verbose != 0)
+			lcore_stat_dump(stdout, lc, &arg[lc].stats);
+	}
+
+	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
+	mt1_fini(r, data);
+	return rc;
+}
+
+static const struct test_case tests[] = {
+	{
+		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
+		.func = test_mt1,
+		.wfunc = test_worker_prcs,
+	},
+	{
+		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
+		.func = test_mt1,
+		.wfunc = test_worker_avg,
+	},
+};