[v5,2/9] ring: prepare ring to allow new sync schemes

Message ID 20200418163225.17635-3-konstantin.ananyev@intel.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series New sync modes for ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Ananyev, Konstantin April 18, 2020, 4:32 p.m. UTC
  To make these preparations two main things are done:
- Change from *single* to *sync_type* to allow different
  synchronisation schemes to be applied.
  Mark *single* as deprecated in comments.
  Add new functions to allow user to query ring sync types.
  Replace direct access to *single* with appropriate function call.
- Move actual rte_ring and related structures definitions into a
  separate file: <rte_ring_core.h>. It allows to refer contents
  of <rte_ring_elem.h> from <rte_ring.h> without introducing a
  circular dependency.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 app/test/test_pdump.c           |   6 +-
 lib/librte_pdump/rte_pdump.c    |   2 +-
 lib/librte_port/rte_port_ring.c |  12 +--
 lib/librte_ring/Makefile        |   1 +
 lib/librte_ring/meson.build     |   1 +
 lib/librte_ring/rte_ring.c      |   6 +-
 lib/librte_ring/rte_ring.h      | 170 ++++++++++++++------------------
 lib/librte_ring/rte_ring_core.h | 132 +++++++++++++++++++++++++
 lib/librte_ring/rte_ring_elem.h |  42 +++-----
 9 files changed, 234 insertions(+), 138 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_core.h
  

Comments

Honnappa Nagarahalli April 19, 2020, 2:31 a.m. UTC | #1
<snip>

> 
> To make these preparations two main things are done:
> - Change from *single* to *sync_type* to allow different
>   synchronisation schemes to be applied.
>   Mark *single* as deprecated in comments.
>   Add new functions to allow user to query ring sync types.
>   Replace direct access to *single* with appropriate function call.
> - Move actual rte_ring and related structures definitions into a
>   separate file: <rte_ring_core.h>. It allows to refer contents
>   of <rte_ring_elem.h> from <rte_ring.h> without introducing a
>   circular dependency.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
few nits inline, otherwise
Acked-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> ---
>  app/test/test_pdump.c           |   6 +-
>  lib/librte_pdump/rte_pdump.c    |   2 +-
>  lib/librte_port/rte_port_ring.c |  12 +--
>  lib/librte_ring/Makefile        |   1 +
>  lib/librte_ring/meson.build     |   1 +
>  lib/librte_ring/rte_ring.c      |   6 +-
>  lib/librte_ring/rte_ring.h      | 170 ++++++++++++++------------------
>  lib/librte_ring/rte_ring_core.h | 132 +++++++++++++++++++++++++
> lib/librte_ring/rte_ring_elem.h |  42 +++-----
>  9 files changed, 234 insertions(+), 138 deletions(-)  create mode 100644
> lib/librte_ring/rte_ring_core.h
> 
> diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index
> ad183184c..6a1180bcb 100644
> --- a/app/test/test_pdump.c
> +++ b/app/test/test_pdump.c
> @@ -57,8 +57,7 @@ run_pdump_client_tests(void)
>  	if (ret < 0)
>  		return -1;
>  	mp->flags = 0x0000;
> -	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> -				      RING_F_SP_ENQ | RING_F_SC_DEQ);
> +	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
>  	if (ring_client == NULL) {
>  		printf("rte_ring_create SR0 failed");
>  		return -1;
> @@ -71,9 +70,6 @@ run_pdump_client_tests(void)
>  	}
>  	rte_eth_dev_probing_finish(eth_dev);
> 
> -	ring_client->prod.single = 0;
> -	ring_client->cons.single = 0;
> -
>  	printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
> 
>  	for (itr = 0; itr < NUM_ITR; itr++) {
> diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
> index 8a01ac510..f96709f95 100644
> --- a/lib/librte_pdump/rte_pdump.c
> +++ b/lib/librte_pdump/rte_pdump.c
> @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct
> rte_mempool *mp)
>  		rte_errno = EINVAL;
>  		return -1;
>  	}
> -	if (ring->prod.single || ring->cons.single) {
> +	if (rte_ring_is_prod_single(ring) || rte_ring_is_cons_single(ring)) {
>  		PDUMP_LOG(ERR, "ring with either SP or SC settings"
>  		" is not valid for pdump, should have MP and MC settings\n");
>  		rte_errno = EINVAL;
> diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
> index 47fcdd06a..52b2d8e55 100644
> --- a/lib/librte_port/rte_port_ring.c
> +++ b/lib/librte_port/rte_port_ring.c
> @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int
> socket_id,
>  	/* Check input parameters */
>  	if ((conf == NULL) ||
>  		(conf->ring == NULL) ||
> -		(conf->ring->cons.single && is_multi) ||
> -		(!(conf->ring->cons.single) && !is_multi)) {
> +		(rte_ring_is_cons_single(conf->ring) && is_multi) ||
> +		(!rte_ring_is_cons_single(conf->ring) && !is_multi)) {
>  		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
>  		return NULL;
>  	}
> @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params,
> int socket_id,
>  	/* Check input parameters */
>  	if ((conf == NULL) ||
>  		(conf->ring == NULL) ||
> -		(conf->ring->prod.single && is_multi) ||
> -		(!(conf->ring->prod.single) && !is_multi) ||
> +		(rte_ring_is_prod_single(conf->ring) && is_multi) ||
> +		(!rte_ring_is_prod_single(conf->ring) && !is_multi) ||
>  		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
>  		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
>  		return NULL;
> @@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void
> *params, int socket_id,
>  	/* Check input parameters */
>  	if ((conf == NULL) ||
>  		(conf->ring == NULL) ||
> -		(conf->ring->prod.single && is_multi) ||
> -		(!(conf->ring->prod.single) && !is_multi) ||
> +		(rte_ring_is_prod_single(conf->ring) && is_multi) ||
> +		(!rte_ring_is_prod_single(conf->ring) && !is_multi) ||
>  		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
>  		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
>  		return NULL;
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 28368e6d1..6572768c9 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -16,6 +16,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> 
>  # install includes
>  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> +					rte_ring_core.h \
>  					rte_ring_elem.h \
>  					rte_ring_generic.h \
>  					rte_ring_c11_mem.h
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> 05402e4f0..c656781da 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -3,6 +3,7 @@
> 
>  sources = files('rte_ring.c')
>  headers = files('rte_ring.h',
> +		'rte_ring_core.h',
>  		'rte_ring_elem.h',
>  		'rte_ring_c11_mem.h',
>  		'rte_ring_generic.h')
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> 77e5de099..fa5733907 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  	if (ret < 0 || ret >= (int)sizeof(r->name))
>  		return -ENAMETOOLONG;
>  	r->flags = flags;
> -	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> -	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> +	r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> +		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> +	r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> +		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> 
>  	if (flags & RING_F_EXACT_SZ) {
>  		r->size = rte_align32pow2(count + 1); diff --git
> a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> 18fc5d845..35ee4491c 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -36,91 +36,7 @@
>  extern "C" {
>  #endif
> 
> -#include <stdio.h>
> -#include <stdint.h>
> -#include <sys/queue.h>
> -#include <errno.h>
> -#include <rte_common.h>
> -#include <rte_config.h>
> -#include <rte_memory.h>
> -#include <rte_lcore.h>
> -#include <rte_atomic.h>
> -#include <rte_branch_prediction.h>
> -#include <rte_memzone.h>
> -#include <rte_pause.h>
> -
> -#define RTE_TAILQ_RING_NAME "RTE_RING"
> -
> -enum rte_ring_queue_behavior {
> -	RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items
> from a ring */
> -	RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible
> from ring */
> -};
> -
> -#define RTE_RING_MZ_PREFIX "RG_"
> -/** The maximum length of a ring name. */ -#define RTE_RING_NAMESIZE
> (RTE_MEMZONE_NAMESIZE - \
> -			   sizeof(RTE_RING_MZ_PREFIX) + 1)
> -
> -/* structure to hold a pair of head/tail values and other metadata */ -struct
> rte_ring_headtail {
> -	volatile uint32_t head;  /**< Prod/consumer head. */
> -	volatile uint32_t tail;  /**< Prod/consumer tail. */
> -	uint32_t single;         /**< True if single prod/cons */
> -};
> -
> -/**
> - * An RTE ring structure.
> - *
> - * The producer and the consumer have a head and a tail index. The
> particularity
> - * of these index is that they are not between 0 and size(ring). These indexes
> - * are between 0 and 2^32, and we mask their value when we access the
> ring[]
> - * field. Thanks to this assumption, we can do subtractions between 2 index
> - * values in a modulo-32bit base: that's why the overflow of the indexes is
> not
> - * a problem.
> - */
> -struct rte_ring {
> -	/*
> -	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
> -	 * compatibility requirements, it could be changed to
> RTE_RING_NAMESIZE
> -	 * next time the ABI changes
> -	 */
> -	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**<
> Name of the ring. */
> -	int flags;               /**< Flags supplied at creation. */
> -	const struct rte_memzone *memzone;
> -			/**< Memzone, if any, containing the rte_ring */
> -	uint32_t size;           /**< Size of ring. */
> -	uint32_t mask;           /**< Mask (size-1) of ring. */
> -	uint32_t capacity;       /**< Usable size of ring */
> -
> -	char pad0 __rte_cache_aligned; /**< empty cache line */
> -
> -	/** Ring producer status. */
> -	struct rte_ring_headtail prod __rte_cache_aligned;
> -	char pad1 __rte_cache_aligned; /**< empty cache line */
> -
> -	/** Ring consumer status. */
> -	struct rte_ring_headtail cons __rte_cache_aligned;
> -	char pad2 __rte_cache_aligned; /**< empty cache line */
> -};
> -
> -#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-
> producer". */ -#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is
> "single-consumer". */
These do not appear in the API document anymore

> -/**
> - * Ring is to hold exactly requested number of entries.
> - * Without this flag set, the ring size requested must be a power of 2, and the
> - * usable space will be that size - 1. With the flag, the requested size will
> - * be rounded up to the next power of two, but the usable space will be
> exactly
> - * that requested. Worst case, if a power-of-2 size is requested, half the
> - * ring space will be wasted.
> - */
> -#define RING_F_EXACT_SZ 0x0004
> -#define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> -
> -/* @internal defines for passing to the enqueue dequeue worker functions */
> -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define __IS_MC 0
> +#include <rte_ring_core.h>
> 
>  /**
>   * Calculate the memory size needed for a ring @@ -420,7 +336,7 @@
> rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -			__IS_MP, free_space);
> +			RTE_RING_SYNC_MT, free_space);
>  }
> 
>  /**
> @@ -443,9 +359,13 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
>  			 unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -			__IS_SP, free_space);
> +			RTE_RING_SYNC_ST, free_space);
>  }
> 
> +#ifdef ALLOW_EXPERIMENTAL_API
> +#include <rte_ring_elem.h>
> +#endif
> +
>  /**
>   * Enqueue several objects on a ring.
>   *
> @@ -470,7 +390,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
>  		      unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -			r->prod.single, free_space);
> +			r->prod.sync_type, free_space);
>  }
> 
>  /**
> @@ -554,7 +474,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void
> **obj_table,
>  		unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -			__IS_MC, available);
> +			RTE_RING_SYNC_MT, available);
>  }
> 
>  /**
> @@ -578,7 +498,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void
> **obj_table,
>  		unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -			__IS_SC, available);
> +			RTE_RING_SYNC_ST, available);
>  }
> 
>  /**
> @@ -605,7 +525,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> **obj_table, unsigned int n,
>  		unsigned int *available)
>  {
>  	return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -				r->cons.single, available);
> +				r->cons.sync_type, available);
>  }
> 
>  /**
> @@ -777,6 +697,62 @@ rte_ring_get_capacity(const struct rte_ring *r)
>  	return r->capacity;
>  }
> 
> +/**
> + * Return sync type used by producer in the ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   Producer sync type value.
> + */
> +static inline enum rte_ring_sync_type
> +rte_ring_get_prod_sync_type(const struct rte_ring *r) {
> +	return r->prod.sync_type;
> +}
> +
> +/**
> + * Check is the ring for single producer.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   true if ring is SP, zero otherwise.
> + */
> +static inline int
> +rte_ring_is_prod_single(const struct rte_ring *r) {
> +	return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); }
> +
> +/**
> + * Return sync type used by consumer in the ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   Consumer sync type value.
> + */
> +static inline enum rte_ring_sync_type
> +rte_ring_get_cons_sync_type(const struct rte_ring *r) {
> +	return r->cons.sync_type;
> +}
> +
> +/**
> + * Check is the ring for single consumer.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   true if ring is SC, zero otherwise.
> + */
> +static inline int
> +rte_ring_is_cons_single(const struct rte_ring *r) {
> +	return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); }
> +
>  /**
>   * Dump the status of all rings on the console
>   *
> @@ -820,7 +796,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
>  			 unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT,
> free_space);
>  }
> 
>  /**
> @@ -843,7 +819,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
>  			 unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST,
> free_space);
>  }
> 
>  /**
> @@ -870,7 +846,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
>  		      unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> -			r->prod.single, free_space);
> +			r->prod.sync_type, free_space);
>  }
> 
>  /**
> @@ -898,7 +874,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void
> **obj_table,
>  		unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT,
> available);
>  }
> 
>  /**
> @@ -923,7 +899,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void
> **obj_table,
>  		unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST,
> available);
>  }
> 
>  /**
> @@ -951,7 +927,7 @@ rte_ring_dequeue_burst(struct rte_ring *r, void
> **obj_table,  {
>  	return __rte_ring_do_dequeue(r, obj_table, n,
>  				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.single, available);
> +				r->cons.sync_type, available);
>  }
> 
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h
I like this separation, thanks.

> new file mode 100644 index 000000000..d9cef763f
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_core.h
> @@ -0,0 +1,132 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2020 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_CORE_H_
> +#define _RTE_RING_CORE_H_
> +
> +/**
> + * @file
> + * This file contains definion of RTE ring structure itself,
                                         ^^^^^^^ definition
> + * init flags and some related macros.
> + * For majority of DPDK entities, it is not recommended to include
> + * this file directly, use include <rte_ring.h> or <rte_ring_elem.h>
> + * instead.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +#include <rte_debug.h>
> +
> +#define RTE_TAILQ_RING_NAME "RTE_RING"
> +
> +/** enqueue/dequeue behavior types */
> +enum rte_ring_queue_behavior {
> +	/** Enq/Deq a fixed number of items from a ring */
> +	RTE_RING_QUEUE_FIXED = 0,
> +	/** Enq/Deq as many items as possible from ring */
> +	RTE_RING_QUEUE_VARIABLE
> +};
> +
> +#define RTE_RING_MZ_PREFIX "RG_"
> +/** The maximum length of a ring name. */ #define RTE_RING_NAMESIZE
> +(RTE_MEMZONE_NAMESIZE - \
> +			   sizeof(RTE_RING_MZ_PREFIX) + 1)
> +
> +/** prod/cons sync types */
> +enum rte_ring_sync_type {
> +	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
> +	RTE_RING_SYNC_ST,     /**< single thread only */
> +};
> +
> +/**
> + * structures to hold a pair of head/tail values and other metadata.
> + * Depending on sync_type format of that structure might be different,
> + * but offset for *sync_type* and *tail* values should remain the same.
> + */
> +struct rte_ring_headtail {
> +	volatile uint32_t head;      /**< prod/consumer head. */
> +	volatile uint32_t tail;      /**< prod/consumer tail. */
> +	RTE_STD_C11
> +	union {
> +		/** sync type of prod/cons */
> +		enum rte_ring_sync_type sync_type;
> +		/** deprecated -  True if single prod/cons */
> +		uint32_t single;
> +	};
> +};
> +
> +/**
> + * An RTE ring structure.
> + *
> + * The producer and the consumer have a head and a tail index. The
> +particularity
> + * of these index is that they are not between 0 and size(ring). These
> +indexes
> + * are between 0 and 2^32, and we mask their value when we access the
> +ring[]
> + * field. Thanks to this assumption, we can do subtractions between 2
> +index
> + * values in a modulo-32bit base: that's why the overflow of the
> +indexes is not
> + * a problem.
> + */
> +struct rte_ring {
> +	/*
> +	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
> +	 * compatibility requirements, it could be changed to
> RTE_RING_NAMESIZE
> +	 * next time the ABI changes
> +	 */
> +	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned;
> +	/**< Name of the ring. */
> +	int flags;               /**< Flags supplied at creation. */
> +	const struct rte_memzone *memzone;
> +			/**< Memzone, if any, containing the rte_ring */
> +	uint32_t size;           /**< Size of ring. */
> +	uint32_t mask;           /**< Mask (size-1) of ring. */
> +	uint32_t capacity;       /**< Usable size of ring */
> +
> +	char pad0 __rte_cache_aligned; /**< empty cache line */
> +
> +	/** Ring producer status. */
> +	struct rte_ring_headtail prod __rte_cache_aligned;
> +	char pad1 __rte_cache_aligned; /**< empty cache line */
> +
> +	/** Ring consumer status. */
> +	struct rte_ring_headtail cons __rte_cache_aligned;
> +	char pad2 __rte_cache_aligned; /**< empty cache line */ };
> +
> +#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is
> +"single-producer". */ #define RING_F_SC_DEQ 0x0002 /**< The default
> +dequeue is "single-consumer". */
> +/**
> + * Ring is to hold exactly requested number of entries.
> + * Without this flag set, the ring size requested must be a power of 2,
> +and the
> + * usable space will be that size - 1. With the flag, the requested
> +size will
> + * be rounded up to the next power of two, but the usable space will be
> +exactly
> + * that requested. Worst case, if a power-of-2 size is requested, half
> +the
> + * ring space will be wasted.
> + */
> +#define RING_F_EXACT_SZ 0x0004
> +#define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_CORE_H_ */
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> index 663addc73..7406c0b0f 100644
> --- a/lib/librte_ring/rte_ring_elem.h
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -20,21 +20,7 @@
>  extern "C" {
>  #endif
> 
> -#include <stdio.h>
> -#include <stdint.h>
> -#include <string.h>
> -#include <sys/queue.h>
> -#include <errno.h>
> -#include <rte_common.h>
> -#include <rte_config.h>
> -#include <rte_memory.h>
> -#include <rte_lcore.h>
> -#include <rte_atomic.h>
> -#include <rte_branch_prediction.h>
> -#include <rte_memzone.h>
> -#include <rte_pause.h>
> -
> -#include "rte_ring.h"
> +#include <rte_ring_core.h>
> 
>  /**
>   * @warning
> @@ -510,7 +496,7 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r,
> const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
> +			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT,
> free_space);
>  }
> 
>  /**
> @@ -539,7 +525,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r,
> const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
> +			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST,
> free_space);
>  }
> 
>  /**
> @@ -570,7 +556,7 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const
> void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
> +			RTE_RING_QUEUE_FIXED, r->prod.sync_type,
> free_space);
>  }
> 
>  /**
> @@ -675,7 +661,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r,
> void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -				RTE_RING_QUEUE_FIXED, __IS_MC, available);
> +				RTE_RING_QUEUE_FIXED,
> RTE_RING_SYNC_MT, available);
>  }
> 
>  /**
> @@ -703,7 +689,7 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r,
> void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, __IS_SC, available);
> +			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST,
> available);
>  }
> 
>  /**
> @@ -734,7 +720,7 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void
> *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, r->cons.single, available);
> +			RTE_RING_QUEUE_FIXED, r->cons.sync_type,
> available);
>  }
> 
>  /**
> @@ -842,7 +828,7 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r,
> const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT,
> free_space);
>  }
> 
>  /**
> @@ -871,7 +857,7 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r,
> const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST,
> free_space);
>  }
> 
>  /**
> @@ -902,7 +888,7 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r,
> const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, r->prod.single,
> free_space);
> +			RTE_RING_QUEUE_VARIABLE, r->prod.sync_type,
> free_space);
>  }
> 
>  /**
> @@ -934,7 +920,7 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r,
> void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT,
> available);
>  }
> 
>  /**
> @@ -963,7 +949,7 @@ rte_ring_sc_dequeue_burst_elem(struct rte_ring *r,
> void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
> +			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST,
> available);
>  }
> 
>  /**
> @@ -995,9 +981,11 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r,
> void *obj_table,  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
>  				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.single, available);
> +				r->cons.sync_type, available);
>  }
> 
> +#include <rte_ring.h>
> +
>  #ifdef __cplusplus
>  }
>  #endif
> --
> 2.17.1
  

Patch

diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c
index ad183184c..6a1180bcb 100644
--- a/app/test/test_pdump.c
+++ b/app/test/test_pdump.c
@@ -57,8 +57,7 @@  run_pdump_client_tests(void)
 	if (ret < 0)
 		return -1;
 	mp->flags = 0x0000;
-	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
-				      RING_F_SP_ENQ | RING_F_SC_DEQ);
+	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
 	if (ring_client == NULL) {
 		printf("rte_ring_create SR0 failed");
 		return -1;
@@ -71,9 +70,6 @@  run_pdump_client_tests(void)
 	}
 	rte_eth_dev_probing_finish(eth_dev);
 
-	ring_client->prod.single = 0;
-	ring_client->cons.single = 0;
-
 	printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
 
 	for (itr = 0; itr < NUM_ITR; itr++) {
diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
index 8a01ac510..f96709f95 100644
--- a/lib/librte_pdump/rte_pdump.c
+++ b/lib/librte_pdump/rte_pdump.c
@@ -380,7 +380,7 @@  pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp)
 		rte_errno = EINVAL;
 		return -1;
 	}
-	if (ring->prod.single || ring->cons.single) {
+	if (rte_ring_is_prod_single(ring) || rte_ring_is_cons_single(ring)) {
 		PDUMP_LOG(ERR, "ring with either SP or SC settings"
 		" is not valid for pdump, should have MP and MC settings\n");
 		rte_errno = EINVAL;
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index 47fcdd06a..52b2d8e55 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -44,8 +44,8 @@  rte_port_ring_reader_create_internal(void *params, int socket_id,
 	/* Check input parameters */
 	if ((conf == NULL) ||
 		(conf->ring == NULL) ||
-		(conf->ring->cons.single && is_multi) ||
-		(!(conf->ring->cons.single) && !is_multi)) {
+		(rte_ring_is_cons_single(conf->ring) && is_multi) ||
+		(!rte_ring_is_cons_single(conf->ring) && !is_multi)) {
 		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
 		return NULL;
 	}
@@ -171,8 +171,8 @@  rte_port_ring_writer_create_internal(void *params, int socket_id,
 	/* Check input parameters */
 	if ((conf == NULL) ||
 		(conf->ring == NULL) ||
-		(conf->ring->prod.single && is_multi) ||
-		(!(conf->ring->prod.single) && !is_multi) ||
+		(rte_ring_is_prod_single(conf->ring) && is_multi) ||
+		(!rte_ring_is_prod_single(conf->ring) && !is_multi) ||
 		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
 		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
 		return NULL;
@@ -440,8 +440,8 @@  rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
 	/* Check input parameters */
 	if ((conf == NULL) ||
 		(conf->ring == NULL) ||
-		(conf->ring->prod.single && is_multi) ||
-		(!(conf->ring->prod.single) && !is_multi) ||
+		(rte_ring_is_prod_single(conf->ring) && is_multi) ||
+		(!rte_ring_is_prod_single(conf->ring) && !is_multi) ||
 		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
 		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
 		return NULL;
diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 28368e6d1..6572768c9 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -16,6 +16,7 @@  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+					rte_ring_core.h \
 					rte_ring_elem.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index 05402e4f0..c656781da 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -3,6 +3,7 @@ 
 
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
+		'rte_ring_core.h',
 		'rte_ring_elem.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h')
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 77e5de099..fa5733907 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -106,8 +106,10 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
-	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
-	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+	r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
+		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
+	r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
+		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
 
 	if (flags & RING_F_EXACT_SZ) {
 		r->size = rte_align32pow2(count + 1);
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 18fc5d845..35ee4491c 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -36,91 +36,7 @@ 
 extern "C" {
 #endif
 
-#include <stdio.h>
-#include <stdint.h>
-#include <sys/queue.h>
-#include <errno.h>
-#include <rte_common.h>
-#include <rte_config.h>
-#include <rte_memory.h>
-#include <rte_lcore.h>
-#include <rte_atomic.h>
-#include <rte_branch_prediction.h>
-#include <rte_memzone.h>
-#include <rte_pause.h>
-
-#define RTE_TAILQ_RING_NAME "RTE_RING"
-
-enum rte_ring_queue_behavior {
-	RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
-	RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible from ring */
-};
-
-#define RTE_RING_MZ_PREFIX "RG_"
-/** The maximum length of a ring name. */
-#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
-			   sizeof(RTE_RING_MZ_PREFIX) + 1)
-
-/* structure to hold a pair of head/tail values and other metadata */
-struct rte_ring_headtail {
-	volatile uint32_t head;  /**< Prod/consumer head. */
-	volatile uint32_t tail;  /**< Prod/consumer tail. */
-	uint32_t single;         /**< True if single prod/cons */
-};
-
-/**
- * An RTE ring structure.
- *
- * The producer and the consumer have a head and a tail index. The particularity
- * of these index is that they are not between 0 and size(ring). These indexes
- * are between 0 and 2^32, and we mask their value when we access the ring[]
- * field. Thanks to this assumption, we can do subtractions between 2 index
- * values in a modulo-32bit base: that's why the overflow of the indexes is not
- * a problem.
- */
-struct rte_ring {
-	/*
-	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
-	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
-	 * next time the ABI changes
-	 */
-	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
-	int flags;               /**< Flags supplied at creation. */
-	const struct rte_memzone *memzone;
-			/**< Memzone, if any, containing the rte_ring */
-	uint32_t size;           /**< Size of ring. */
-	uint32_t mask;           /**< Mask (size-1) of ring. */
-	uint32_t capacity;       /**< Usable size of ring */
-
-	char pad0 __rte_cache_aligned; /**< empty cache line */
-
-	/** Ring producer status. */
-	struct rte_ring_headtail prod __rte_cache_aligned;
-	char pad1 __rte_cache_aligned; /**< empty cache line */
-
-	/** Ring consumer status. */
-	struct rte_ring_headtail cons __rte_cache_aligned;
-	char pad2 __rte_cache_aligned; /**< empty cache line */
-};
-
-#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
-#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
-/**
- * Ring is to hold exactly requested number of entries.
- * Without this flag set, the ring size requested must be a power of 2, and the
- * usable space will be that size - 1. With the flag, the requested size will
- * be rounded up to the next power of two, but the usable space will be exactly
- * that requested. Worst case, if a power-of-2 size is requested, half the
- * ring space will be wasted.
- */
-#define RING_F_EXACT_SZ 0x0004
-#define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
-
-/* @internal defines for passing to the enqueue dequeue worker functions */
-#define __IS_SP 1
-#define __IS_MP 0
-#define __IS_SC 1
-#define __IS_MC 0
+#include <rte_ring_core.h>
 
 /**
  * Calculate the memory size needed for a ring
@@ -420,7 +336,7 @@  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MP, free_space);
+			RTE_RING_SYNC_MT, free_space);
 }
 
 /**
@@ -443,9 +359,13 @@  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SP, free_space);
+			RTE_RING_SYNC_ST, free_space);
 }
 
+#ifdef ALLOW_EXPERIMENTAL_API
+#include <rte_ring_elem.h>
+#endif
+
 /**
  * Enqueue several objects on a ring.
  *
@@ -470,7 +390,7 @@  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			r->prod.single, free_space);
+			r->prod.sync_type, free_space);
 }
 
 /**
@@ -554,7 +474,7 @@  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MC, available);
+			RTE_RING_SYNC_MT, available);
 }
 
 /**
@@ -578,7 +498,7 @@  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SC, available);
+			RTE_RING_SYNC_ST, available);
 }
 
 /**
@@ -605,7 +525,7 @@  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-				r->cons.single, available);
+				r->cons.sync_type, available);
 }
 
 /**
@@ -777,6 +697,62 @@  rte_ring_get_capacity(const struct rte_ring *r)
 	return r->capacity;
 }
 
+/**
+ * Return sync type used by producer in the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   Producer sync type value.
+ */
+static inline enum rte_ring_sync_type
+rte_ring_get_prod_sync_type(const struct rte_ring *r)
+{
+	return r->prod.sync_type;
+}
+
+/**
+ * Check is the ring for single producer.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   true if ring is SP, zero otherwise.
+ */
+static inline int
+rte_ring_is_prod_single(const struct rte_ring *r)
+{
+	return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
+/**
+ * Return sync type used by consumer in the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   Consumer sync type value.
+ */
+static inline enum rte_ring_sync_type
+rte_ring_get_cons_sync_type(const struct rte_ring *r)
+{
+	return r->cons.sync_type;
+}
+
+/**
+ * Check is the ring for single consumer.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   true if ring is SC, zero otherwise.
+ */
+static inline int
+rte_ring_is_cons_single(const struct rte_ring *r)
+{
+	return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
 /**
  * Dump the status of all rings on the console
  *
@@ -820,7 +796,7 @@  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
 }
 
 /**
@@ -843,7 +819,7 @@  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
 }
 
 /**
@@ -870,7 +846,7 @@  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
-			r->prod.single, free_space);
+			r->prod.sync_type, free_space);
 }
 
 /**
@@ -898,7 +874,7 @@  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
 }
 
 /**
@@ -923,7 +899,7 @@  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
 }
 
 /**
@@ -951,7 +927,7 @@  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 {
 	return __rte_ring_do_dequeue(r, obj_table, n,
 				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+				r->cons.sync_type, available);
 }
 
 #ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h
new file mode 100644
index 000000000..d9cef763f
--- /dev/null
+++ b/lib/librte_ring/rte_ring_core.h
@@ -0,0 +1,132 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2020 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_CORE_H_
+#define _RTE_RING_CORE_H_
+
+/**
+ * @file
+ * This file contains definion of RTE ring structure itself,
+ * init flags and some related macros.
+ * For majority of DPDK entities, it is not recommended to include
+ * this file directly, use include <rte_ring.h> or <rte_ring_elem.h>
+ * instead.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include <rte_debug.h>
+
+#define RTE_TAILQ_RING_NAME "RTE_RING"
+
+/** enqueue/dequeue behavior types */
+enum rte_ring_queue_behavior {
+	/** Enq/Deq a fixed number of items from a ring */
+	RTE_RING_QUEUE_FIXED = 0,
+	/** Enq/Deq as many items as possible from ring */
+	RTE_RING_QUEUE_VARIABLE
+};
+
+#define RTE_RING_MZ_PREFIX "RG_"
+/** The maximum length of a ring name. */
+#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_RING_MZ_PREFIX) + 1)
+
+/** prod/cons sync types */
+enum rte_ring_sync_type {
+	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
+	RTE_RING_SYNC_ST,     /**< single thread only */
+};
+
+/**
+ * structures to hold a pair of head/tail values and other metadata.
+ * Depending on sync_type format of that structure might be different,
+ * but offset for *sync_type* and *tail* values should remain the same.
+ */
+struct rte_ring_headtail {
+	volatile uint32_t head;      /**< prod/consumer head. */
+	volatile uint32_t tail;      /**< prod/consumer tail. */
+	RTE_STD_C11
+	union {
+		/** sync type of prod/cons */
+		enum rte_ring_sync_type sync_type;
+		/** deprecated -  True if single prod/cons */
+		uint32_t single;
+	};
+};
+
+/**
+ * An RTE ring structure.
+ *
+ * The producer and the consumer have a head and a tail index. The particularity
+ * of these index is that they are not between 0 and size(ring). These indexes
+ * are between 0 and 2^32, and we mask their value when we access the ring[]
+ * field. Thanks to this assumption, we can do subtractions between 2 index
+ * values in a modulo-32bit base: that's why the overflow of the indexes is not
+ * a problem.
+ */
+struct rte_ring {
+	/*
+	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
+	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
+	 * next time the ABI changes
+	 */
+	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned;
+	/**< Name of the ring. */
+	int flags;               /**< Flags supplied at creation. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the rte_ring */
+	uint32_t size;           /**< Size of ring. */
+	uint32_t mask;           /**< Mask (size-1) of ring. */
+	uint32_t capacity;       /**< Usable size of ring */
+
+	char pad0 __rte_cache_aligned; /**< empty cache line */
+
+	/** Ring producer status. */
+	struct rte_ring_headtail prod __rte_cache_aligned;
+	char pad1 __rte_cache_aligned; /**< empty cache line */
+
+	/** Ring consumer status. */
+	struct rte_ring_headtail cons __rte_cache_aligned;
+	char pad2 __rte_cache_aligned; /**< empty cache line */
+};
+
+#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+/**
+ * Ring is to hold exactly requested number of entries.
+ * Without this flag set, the ring size requested must be a power of 2, and the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * ring space will be wasted.
+ */
+#define RING_F_EXACT_SZ 0x0004
+#define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_CORE_H_ */
diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
index 663addc73..7406c0b0f 100644
--- a/lib/librte_ring/rte_ring_elem.h
+++ b/lib/librte_ring/rte_ring_elem.h
@@ -20,21 +20,7 @@ 
 extern "C" {
 #endif
 
-#include <stdio.h>
-#include <stdint.h>
-#include <string.h>
-#include <sys/queue.h>
-#include <errno.h>
-#include <rte_common.h>
-#include <rte_config.h>
-#include <rte_memory.h>
-#include <rte_lcore.h>
-#include <rte_atomic.h>
-#include <rte_branch_prediction.h>
-#include <rte_memzone.h>
-#include <rte_pause.h>
-
-#include "rte_ring.h"
+#include <rte_ring_core.h>
 
 /**
  * @warning
@@ -510,7 +496,7 @@  rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
+			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space);
 }
 
 /**
@@ -539,7 +525,7 @@  rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
+			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space);
 }
 
 /**
@@ -570,7 +556,7 @@  rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+			RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
 }
 
 /**
@@ -675,7 +661,7 @@  rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
-				RTE_RING_QUEUE_FIXED, __IS_MC, available);
+				RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available);
 }
 
 /**
@@ -703,7 +689,7 @@  rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, __IS_SC, available);
+			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available);
 }
 
 /**
@@ -734,7 +720,7 @@  rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, r->cons.single, available);
+			RTE_RING_QUEUE_FIXED, r->cons.sync_type, available);
 }
 
 /**
@@ -842,7 +828,7 @@  rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
 }
 
 /**
@@ -871,7 +857,7 @@  rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
 }
 
 /**
@@ -902,7 +888,7 @@  rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+			RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space);
 }
 
 /**
@@ -934,7 +920,7 @@  rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
 }
 
 /**
@@ -963,7 +949,7 @@  rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
 }
 
 /**
@@ -995,9 +981,11 @@  rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
 				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+				r->cons.sync_type, available);
 }
 
+#include <rte_ring.h>
+
 #ifdef __cplusplus
 }
 #endif