[v9,2/6] lib/ring: apis to support configurable element size

Message ID 20200116052511.8557-3-honnappa.nagarahalli@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series lib/ring: APIs to support custom element size |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Honnappa Nagarahalli Jan. 16, 2020, 5:25 a.m. UTC
  Current APIs assume ring elements to be pointers. However, in many
use cases, the size can be different. Add new APIs to support
configurable ring element sizes.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
 lib/librte_ring/Makefile             |    3 +-
 lib/librte_ring/meson.build          |    4 +
 lib/librte_ring/rte_ring.c           |   41 +-
 lib/librte_ring/rte_ring.h           |    1 +
 lib/librte_ring/rte_ring_elem.h      | 1003 ++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_version.map |    2 +
 6 files changed, 1045 insertions(+), 9 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_elem.h
  

Comments

Olivier Matz Jan. 17, 2020, 4:34 p.m. UTC | #1
Hi Honnappa,

On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> Current APIs assume ring elements to be pointers. However, in many
> use cases, the size can be different. Add new APIs to support
> configurable ring element sizes.
> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> ---
>  lib/librte_ring/Makefile             |    3 +-
>  lib/librte_ring/meson.build          |    4 +
>  lib/librte_ring/rte_ring.c           |   41 +-
>  lib/librte_ring/rte_ring.h           |    1 +
>  lib/librte_ring/rte_ring_elem.h      | 1003 ++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_version.map |    2 +
>  6 files changed, 1045 insertions(+), 9 deletions(-)
>  create mode 100644 lib/librte_ring/rte_ring_elem.h
> 

[...]

> +static __rte_always_inline void
> +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	uint32_t *ring = (uint32_t *)&r[1];
> +	const uint32_t *obj = (const uint32_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];
> +			ring[idx + 2] = obj[i + 2];
> +			ring[idx + 3] = obj[i + 3];
> +			ring[idx + 4] = obj[i + 4];
> +			ring[idx + 5] = obj[i + 5];
> +			ring[idx + 6] = obj[i + 6];
> +			ring[idx + 7] = obj[i + 7];
> +		}
> +		switch (n & 0x7) {
> +		case 7:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 6:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 5:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 4:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 3:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t idx = prod_head & r->mask;
> +	uint64_t *ring = (uint64_t *)&r[1];
> +	const uint64_t *obj = (const uint64_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];
> +			ring[idx + 2] = obj[i + 2];
> +			ring[idx + 3] = obj[i + 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			ring[idx++] = obj[i++];
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t idx = prod_head & r->mask;
> +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> +			memcpy((void *)(ring + idx),
> +				(const void *)(obj + i), 32);
> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(ring + idx),
> +				(const void *)(obj + i), 16);
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			memcpy((void *)(ring + idx),
> +				(const void *)(obj + i), 16);
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			memcpy((void *)(ring + idx),
> +				(const void *)(obj + i), 16);
> +	}
> +}
> +
> +/* the actual enqueue of elements on the ring.
> + * Placed here since identical code needed in both
> + * single and multi producer enqueue functions.
> + */
> +static __rte_always_inline void
> +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
> +		uint32_t esize, uint32_t num)
> +{
> +	/* 8B and 16B copies implemented individually to retain
> +	 * the current performance.
> +	 */
> +	if (esize == 8)
> +		enqueue_elems_64(r, prod_head, obj_table, num);
> +	else if (esize == 16)
> +		enqueue_elems_128(r, prod_head, obj_table, num);
> +	else {
> +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
> +
> +		/* Normalize to uint32_t */
> +		scale = esize / sizeof(uint32_t);
> +		nr_num = num * scale;
> +		idx = prod_head & r->mask;
> +		nr_idx = idx * scale;
> +		nr_size = r->size * scale;
> +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> +	}
> +}

Following Konstatin's comment on v7, enqueue_elems_128() was modified to
ensure it won't crash if the object is unaligned. Are we sure that this
same problem cannot also occurs with 64b copies on all supported
architectures? (I mean 64b access that is only aligned on 32b)

Out of curiosity, would it make a big perf difference to only use
enqueue_elems_32()?
  
Honnappa Nagarahalli Jan. 17, 2020, 4:45 p.m. UTC | #2
<snip>

> 
> Hi Honnappa,
Thanks Olivier for your review, appreciate your feedback.

> 
> On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> > Current APIs assume ring elements to be pointers. However, in many use
> > cases, the size can be different. Add new APIs to support configurable
> > ring element sizes.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > ---
> >  lib/librte_ring/Makefile             |    3 +-
> >  lib/librte_ring/meson.build          |    4 +
> >  lib/librte_ring/rte_ring.c           |   41 +-
> >  lib/librte_ring/rte_ring.h           |    1 +
> >  lib/librte_ring/rte_ring_elem.h      | 1003 ++++++++++++++++++++++++++
> >  lib/librte_ring/rte_ring_version.map |    2 +
> >  6 files changed, 1045 insertions(+), 9 deletions(-)  create mode
> > 100644 lib/librte_ring/rte_ring_elem.h
> >
> 
> [...]
> 
> > +static __rte_always_inline void
> > +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	uint32_t *ring = (uint32_t *)&r[1];
> > +	const uint32_t *obj = (const uint32_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> > +			ring[idx] = obj[i];
> > +			ring[idx + 1] = obj[i + 1];
> > +			ring[idx + 2] = obj[i + 2];
> > +			ring[idx + 3] = obj[i + 3];
> > +			ring[idx + 4] = obj[i + 4];
> > +			ring[idx + 5] = obj[i + 5];
> > +			ring[idx + 6] = obj[i + 6];
> > +			ring[idx + 7] = obj[i + 7];
> > +		}
> > +		switch (n & 0x7) {
> > +		case 7:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 6:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 5:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 4:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 3:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 2:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 1:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			ring[idx] = obj[i];
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			ring[idx] = obj[i];
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	const uint32_t size = r->size;
> > +	uint32_t idx = prod_head & r->mask;
> > +	uint64_t *ring = (uint64_t *)&r[1];
> > +	const uint64_t *obj = (const uint64_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> > +			ring[idx] = obj[i];
> > +			ring[idx + 1] = obj[i + 1];
> > +			ring[idx + 2] = obj[i + 2];
> > +			ring[idx + 3] = obj[i + 3];
> > +		}
> > +		switch (n & 0x3) {
> > +		case 3:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 2:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 1:
> > +			ring[idx++] = obj[i++];
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			ring[idx] = obj[i];
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			ring[idx] = obj[i];
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	const uint32_t size = r->size;
> > +	uint32_t idx = prod_head & r->mask;
> > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 32);
> > +		switch (n & 0x1) {
> > +		case 1:
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 16);
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 16);
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 16);
> > +	}
> > +}
> > +
> > +/* the actual enqueue of elements on the ring.
> > + * Placed here since identical code needed in both
> > + * single and multi producer enqueue functions.
> > + */
> > +static __rte_always_inline void
> > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> *obj_table,
> > +		uint32_t esize, uint32_t num)
> > +{
> > +	/* 8B and 16B copies implemented individually to retain
> > +	 * the current performance.
> > +	 */
> > +	if (esize == 8)
> > +		enqueue_elems_64(r, prod_head, obj_table, num);
> > +	else if (esize == 16)
> > +		enqueue_elems_128(r, prod_head, obj_table, num);
> > +	else {
> > +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
> > +
> > +		/* Normalize to uint32_t */
> > +		scale = esize / sizeof(uint32_t);
> > +		nr_num = num * scale;
> > +		idx = prod_head & r->mask;
> > +		nr_idx = idx * scale;
> > +		nr_size = r->size * scale;
> > +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> > +	}
> > +}
> 
> Following Konstatin's comment on v7, enqueue_elems_128() was modified to
> ensure it won't crash if the object is unaligned. Are we sure that this same
> problem cannot also occurs with 64b copies on all supported architectures? (I
> mean 64b access that is only aligned on 32b)
Konstantin mentioned that the 64b load/store instructions on x86 can handle unaligned access. On aarch64, the load/store (non-atomic, which will be used in this case) can handle unaligned access.

+ David Christensen to comment for PPC

> 
> Out of curiosity, would it make a big perf difference to only use
> enqueue_elems_32()?
Yes, this was having a significant impact on 128b elements. I did not try on 64b elements.
I will run the perf test with 32b copy for 64b element size and get back.
  
David Christensen Jan. 17, 2020, 6:10 p.m. UTC | #3
>>> +static __rte_always_inline void
>>> +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
>>> +		const void *obj_table, uint32_t n)
>>> +{
>>> +	unsigned int i;
>>> +	const uint32_t size = r->size;
>>> +	uint32_t idx = prod_head & r->mask;
>>> +	rte_int128_t *ring = (rte_int128_t *)&r[1];
>>> +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
>>> +	if (likely(idx + n < size)) {
>>> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>>> +			memcpy((void *)(ring + idx),
>>> +				(const void *)(obj + i), 32);
>>> +		switch (n & 0x1) {
>>> +		case 1:
>>> +			memcpy((void *)(ring + idx),
>>> +				(const void *)(obj + i), 16);
>>> +		}
>>> +	} else {
>>> +		for (i = 0; idx < size; i++, idx++)
>>> +			memcpy((void *)(ring + idx),
>>> +				(const void *)(obj + i), 16);
>>> +		/* Start at the beginning */
>>> +		for (idx = 0; i < n; i++, idx++)
>>> +			memcpy((void *)(ring + idx),
>>> +				(const void *)(obj + i), 16);
>>> +	}
>>> +}
>>> +
>>> +/* the actual enqueue of elements on the ring.
>>> + * Placed here since identical code needed in both
>>> + * single and multi producer enqueue functions.
>>> + */
>>> +static __rte_always_inline void
>>> +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
>> *obj_table,
>>> +		uint32_t esize, uint32_t num)
>>> +{
>>> +	/* 8B and 16B copies implemented individually to retain
>>> +	 * the current performance.
>>> +	 */
>>> +	if (esize == 8)
>>> +		enqueue_elems_64(r, prod_head, obj_table, num);
>>> +	else if (esize == 16)
>>> +		enqueue_elems_128(r, prod_head, obj_table, num);
>>> +	else {
>>> +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
>>> +
>>> +		/* Normalize to uint32_t */
>>> +		scale = esize / sizeof(uint32_t);
>>> +		nr_num = num * scale;
>>> +		idx = prod_head & r->mask;
>>> +		nr_idx = idx * scale;
>>> +		nr_size = r->size * scale;
>>> +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
>>> +	}
>>> +}
>>
>> Following Konstatin's comment on v7, enqueue_elems_128() was modified to
>> ensure it won't crash if the object is unaligned. Are we sure that this same
>> problem cannot also occurs with 64b copies on all supported architectures? (I
>> mean 64b access that is only aligned on 32b)
> Konstantin mentioned that the 64b load/store instructions on x86 can handle unaligned access. On aarch64, the load/store (non-atomic, which will be used in this case) can handle unaligned access.
> 
> + David Christensen to comment for PPC

The vectorized version of memcpy for Power can handle unaligned access 
as well.

Dave
  
Ananyev, Konstantin Jan. 18, 2020, 12:32 p.m. UTC | #4
> > On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> > > Current APIs assume ring elements to be pointers. However, in many use
> > > cases, the size can be different. Add new APIs to support configurable
> > > ring element sizes.
> > >
> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > ---
> > >  lib/librte_ring/Makefile             |    3 +-
> > >  lib/librte_ring/meson.build          |    4 +
> > >  lib/librte_ring/rte_ring.c           |   41 +-
> > >  lib/librte_ring/rte_ring.h           |    1 +
> > >  lib/librte_ring/rte_ring_elem.h      | 1003 ++++++++++++++++++++++++++
> > >  lib/librte_ring/rte_ring_version.map |    2 +
> > >  6 files changed, 1045 insertions(+), 9 deletions(-)  create mode
> > > 100644 lib/librte_ring/rte_ring_elem.h
> > >
> >
> > [...]
> >
> > > +static __rte_always_inline void
> > > +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> > > +		const void *obj_table, uint32_t n)
> > > +{
> > > +	unsigned int i;
> > > +	uint32_t *ring = (uint32_t *)&r[1];
> > > +	const uint32_t *obj = (const uint32_t *)obj_table;
> > > +	if (likely(idx + n < size)) {
> > > +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> > > +			ring[idx] = obj[i];
> > > +			ring[idx + 1] = obj[i + 1];
> > > +			ring[idx + 2] = obj[i + 2];
> > > +			ring[idx + 3] = obj[i + 3];
> > > +			ring[idx + 4] = obj[i + 4];
> > > +			ring[idx + 5] = obj[i + 5];
> > > +			ring[idx + 6] = obj[i + 6];
> > > +			ring[idx + 7] = obj[i + 7];
> > > +		}
> > > +		switch (n & 0x7) {
> > > +		case 7:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 6:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 5:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 4:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 3:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 2:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 1:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		}
> > > +	} else {
> > > +		for (i = 0; idx < size; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +		/* Start at the beginning */
> > > +		for (idx = 0; i < n; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +	}
> > > +}
> > > +
> > > +static __rte_always_inline void
> > > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> > > +		const void *obj_table, uint32_t n)
> > > +{
> > > +	unsigned int i;
> > > +	const uint32_t size = r->size;
> > > +	uint32_t idx = prod_head & r->mask;
> > > +	uint64_t *ring = (uint64_t *)&r[1];
> > > +	const uint64_t *obj = (const uint64_t *)obj_table;
> > > +	if (likely(idx + n < size)) {
> > > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> > > +			ring[idx] = obj[i];
> > > +			ring[idx + 1] = obj[i + 1];
> > > +			ring[idx + 2] = obj[i + 2];
> > > +			ring[idx + 3] = obj[i + 3];
> > > +		}
> > > +		switch (n & 0x3) {
> > > +		case 3:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 2:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 1:
> > > +			ring[idx++] = obj[i++];
> > > +		}
> > > +	} else {
> > > +		for (i = 0; idx < size; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +		/* Start at the beginning */
> > > +		for (idx = 0; i < n; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +	}
> > > +}
> > > +
> > > +static __rte_always_inline void
> > > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > > +		const void *obj_table, uint32_t n)
> > > +{
> > > +	unsigned int i;
> > > +	const uint32_t size = r->size;
> > > +	uint32_t idx = prod_head & r->mask;
> > > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > > +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> > > +	if (likely(idx + n < size)) {
> > > +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 32);
> > > +		switch (n & 0x1) {
> > > +		case 1:
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 16);
> > > +		}
> > > +	} else {
> > > +		for (i = 0; idx < size; i++, idx++)
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 16);
> > > +		/* Start at the beginning */
> > > +		for (idx = 0; i < n; i++, idx++)
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 16);
> > > +	}
> > > +}
> > > +
> > > +/* the actual enqueue of elements on the ring.
> > > + * Placed here since identical code needed in both
> > > + * single and multi producer enqueue functions.
> > > + */
> > > +static __rte_always_inline void
> > > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> > *obj_table,
> > > +		uint32_t esize, uint32_t num)
> > > +{
> > > +	/* 8B and 16B copies implemented individually to retain
> > > +	 * the current performance.
> > > +	 */
> > > +	if (esize == 8)
> > > +		enqueue_elems_64(r, prod_head, obj_table, num);
> > > +	else if (esize == 16)
> > > +		enqueue_elems_128(r, prod_head, obj_table, num);
> > > +	else {
> > > +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
> > > +
> > > +		/* Normalize to uint32_t */
> > > +		scale = esize / sizeof(uint32_t);
> > > +		nr_num = num * scale;
> > > +		idx = prod_head & r->mask;
> > > +		nr_idx = idx * scale;
> > > +		nr_size = r->size * scale;
> > > +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> > > +	}
> > > +}
> >
> > Following Konstatin's comment on v7, enqueue_elems_128() was modified to
> > ensure it won't crash if the object is unaligned. Are we sure that this same
> > problem cannot also occurs with 64b copies on all supported architectures? (I
> > mean 64b access that is only aligned on 32b)
> Konstantin mentioned that the 64b load/store instructions on x86 can handle unaligned access.

Yep, I think we are ok here for IA and IA-32.

> On aarch64, the load/store (non-atomic,
> which will be used in this case) can handle unaligned access.
> 
> + David Christensen to comment for PPC

If we are in doubt here, probably worth to add a new test-case(s) for UT?

> 
> >
> > Out of curiosity, would it make a big perf difference to only use
> > enqueue_elems_32()?
> Yes, this was having a significant impact on 128b elements. I did not try on 64b elements.
> I will run the perf test with 32b copy for 64b element size and get back.
  
Honnappa Nagarahalli Jan. 18, 2020, 3:01 p.m. UTC | #5
<snip>

> 
> > > On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> > > > Current APIs assume ring elements to be pointers. However, in many
> > > > use cases, the size can be different. Add new APIs to support
> > > > configurable ring element sizes.
> > > >
> > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > > > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > > ---
> > > >  lib/librte_ring/Makefile             |    3 +-
> > > >  lib/librte_ring/meson.build          |    4 +
> > > >  lib/librte_ring/rte_ring.c           |   41 +-
> > > >  lib/librte_ring/rte_ring.h           |    1 +
> > > >  lib/librte_ring/rte_ring_elem.h      | 1003
> ++++++++++++++++++++++++++
> > > >  lib/librte_ring/rte_ring_version.map |    2 +
> > > >  6 files changed, 1045 insertions(+), 9 deletions(-)  create mode
> > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > >
> > >
> > > [...]
> > >
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> > > > +		const void *obj_table, uint32_t n) {
> > > > +	unsigned int i;
> > > > +	uint32_t *ring = (uint32_t *)&r[1];
> > > > +	const uint32_t *obj = (const uint32_t *)obj_table;
> > > > +	if (likely(idx + n < size)) {
> > > > +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> > > > +			ring[idx] = obj[i];
> > > > +			ring[idx + 1] = obj[i + 1];
> > > > +			ring[idx + 2] = obj[i + 2];
> > > > +			ring[idx + 3] = obj[i + 3];
> > > > +			ring[idx + 4] = obj[i + 4];
> > > > +			ring[idx + 5] = obj[i + 5];
> > > > +			ring[idx + 6] = obj[i + 6];
> > > > +			ring[idx + 7] = obj[i + 7];
> > > > +		}
> > > > +		switch (n & 0x7) {
> > > > +		case 7:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 6:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 5:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 4:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 3:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 2:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 1:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		}
> > > > +	} else {
> > > > +		for (i = 0; idx < size; i++, idx++)
> > > > +			ring[idx] = obj[i];
> > > > +		/* Start at the beginning */
> > > > +		for (idx = 0; i < n; i++, idx++)
> > > > +			ring[idx] = obj[i];
> > > > +	}
> > > > +}
> > > > +
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> > > > +		const void *obj_table, uint32_t n) {
> > > > +	unsigned int i;
> > > > +	const uint32_t size = r->size;
> > > > +	uint32_t idx = prod_head & r->mask;
> > > > +	uint64_t *ring = (uint64_t *)&r[1];
> > > > +	const uint64_t *obj = (const uint64_t *)obj_table;
> > > > +	if (likely(idx + n < size)) {
> > > > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> > > > +			ring[idx] = obj[i];
> > > > +			ring[idx + 1] = obj[i + 1];
> > > > +			ring[idx + 2] = obj[i + 2];
> > > > +			ring[idx + 3] = obj[i + 3];
> > > > +		}
> > > > +		switch (n & 0x3) {
> > > > +		case 3:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 2:
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > > +		case 1:
> > > > +			ring[idx++] = obj[i++];
> > > > +		}
> > > > +	} else {
> > > > +		for (i = 0; idx < size; i++, idx++)
> > > > +			ring[idx] = obj[i];
> > > > +		/* Start at the beginning */
> > > > +		for (idx = 0; i < n; i++, idx++)
> > > > +			ring[idx] = obj[i];
> > > > +	}
> > > > +}
> > > > +
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > > > +		const void *obj_table, uint32_t n) {
> > > > +	unsigned int i;
> > > > +	const uint32_t size = r->size;
> > > > +	uint32_t idx = prod_head & r->mask;
> > > > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > > > +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> > > > +	if (likely(idx + n < size)) {
> > > > +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > > > +			memcpy((void *)(ring + idx),
> > > > +				(const void *)(obj + i), 32);
> > > > +		switch (n & 0x1) {
> > > > +		case 1:
> > > > +			memcpy((void *)(ring + idx),
> > > > +				(const void *)(obj + i), 16);
> > > > +		}
> > > > +	} else {
> > > > +		for (i = 0; idx < size; i++, idx++)
> > > > +			memcpy((void *)(ring + idx),
> > > > +				(const void *)(obj + i), 16);
> > > > +		/* Start at the beginning */
> > > > +		for (idx = 0; i < n; i++, idx++)
> > > > +			memcpy((void *)(ring + idx),
> > > > +				(const void *)(obj + i), 16);
> > > > +	}
> > > > +}
> > > > +
> > > > +/* the actual enqueue of elements on the ring.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi producer enqueue functions.
> > > > + */
> > > > +static __rte_always_inline void
> > > > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> > > *obj_table,
> > > > +		uint32_t esize, uint32_t num)
> > > > +{
> > > > +	/* 8B and 16B copies implemented individually to retain
> > > > +	 * the current performance.
> > > > +	 */
> > > > +	if (esize == 8)
> > > > +		enqueue_elems_64(r, prod_head, obj_table, num);
> > > > +	else if (esize == 16)
> > > > +		enqueue_elems_128(r, prod_head, obj_table, num);
> > > > +	else {
> > > > +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
> > > > +
> > > > +		/* Normalize to uint32_t */
> > > > +		scale = esize / sizeof(uint32_t);
> > > > +		nr_num = num * scale;
> > > > +		idx = prod_head & r->mask;
> > > > +		nr_idx = idx * scale;
> > > > +		nr_size = r->size * scale;
> > > > +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> > > > +	}
> > > > +}
> > >
> > > Following Konstatin's comment on v7, enqueue_elems_128() was
> > > modified to ensure it won't crash if the object is unaligned. Are we
> > > sure that this same problem cannot also occurs with 64b copies on
> > > all supported architectures? (I mean 64b access that is only aligned
> > > on 32b)
> > Konstantin mentioned that the 64b load/store instructions on x86 can
> handle unaligned access.
> 
> Yep, I think we are ok here for IA and IA-32.
> 
> > On aarch64, the load/store (non-atomic, which will be used in this
> > case) can handle unaligned access.
> >
> > + David Christensen to comment for PPC
> 
> If we are in doubt here, probably worth to add a new test-case(s) for UT?
I will modify one of the functional tests to use unaligned address.

> 
> >
> > >
> > > Out of curiosity, would it make a big perf difference to only use
> > > enqueue_elems_32()?
> > Yes, this was having a significant impact on 128b elements. I did not try on
> 64b elements.
> > I will run the perf test with 32b copy for 64b element size and get back.
Following is the data. Mostly, the 64b element copy performs better.

8B with 64b element copy                                                                                           8B with 32b element copy
RTE>>ring_perf_autotest                                                                                            RTE>>ring_perf_autotest
		
### Testing single element enq/deq ###	                                                                 ### Testing single element enq/deq ###
elem APIs: element size 8B: SP/SC: single: 10.02                                                    elem APIs: element size 8B: SP/SC: single: 11.21
elem APIs: element size 8B: MP/MC: single: 51.21                                                elem APIs: element size 8B: MP/MC: single: 46.22
	
### Testing burst enq/deq ###                                                                                  ### Testing burst enq/deq ###
elem APIs: element size 8B: SP/SC: burst (size: 8): 39.30                                      elem APIs: element size 8B: SP/SC: burst (size: 8): 62.01
elem APIs: element size 8B: SP/SC: burst (size: 32): 92.73                                   elem APIs: element size 8B: SP/SC: burst (size: 32): 189.15
elem APIs: element size 8B: MP/MC: burst (size: 8): 76.92                                  elem APIs: element size 8B: MP/MC: burst (size: 8): 109.28
elem APIs: element size 8B: MP/MC: burst (size: 32): 134.84                             elem APIs: element size 8B: MP/MC: burst (size: 32): 229.98
		
### Testing bulk enq/deq ###                                                                                   ### Testing bulk enq/deq ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 32.56                                       elem APIs: element size 8B: SP/SC: bulk (size: 8): 62.01
elem APIs: element size 8B: SP/SC: bulk (size: 32): 93.53                                     elem APIs: element size 8B: SP/SC: bulk (size: 32): 189.78
elem APIs: element size 8B: MP/MC: bulk (size: 8): 76.76	                                   elem APIs: element size 8B: MP/MC: bulk (size: 8): 109.16
elem APIs: element size 8B: MP/MC: bulk (size: 32): 134.74                               elem APIs: element size 8B: MP/MC: bulk (size: 32): 229.87

### Testing empty bulk deq ###                                                                               ### Testing empty bulk deq ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 5.00                                         elem APIs: element size 8B: SP/SC: bulk (size: 8): 3.00
elem APIs: element size 8B: MP/MC: bulk (size: 8): 6.00                                     elem APIs: element size 8B: MP/MC: bulk (size: 8): 6.00

### Testing using two physical cores ###                                                               ### Testing using two physical cores ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 26.33                                      elem APIs: element size 8B: SP/SC: bulk (size: 8): 47.03
elem APIs: element size 8B: MP/MC: bulk (size: 8): 83.22	                                   elem APIs: element size 8B: MP/MC: bulk (size: 8): 83.12
elem APIs: element size 8B: SP/SC: bulk (size: 32): 12.48                                    elem APIs: element size 8B: SP/SC: bulk (size: 32): 20.73
elem APIs: element size 8B: MP/MC: bulk (size: 32): 24.53                                elem APIs: element size 8B: MP/MC: bulk (size: 32): 23.18

### Testing using two NUMA nodes ###                                                               ### Testing using two NUMA nodes ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 54.35                                     elem APIs: element size 8B: SP/SC: bulk (size: 8): 92.40
elem APIs: element size 8B: MP/MC: bulk (size: 8): 197.59                               elem APIs: element size 8B: MP/MC: bulk (size: 8): 198.98
elem APIs: element size 8B: SP/SC: bulk (size: 32): 36.54                                  elem APIs: element size 8B: SP/SC: bulk (size: 32): 57.22
elem APIs: element size 8B: MP/MC: bulk (size: 32): 72.57                              elem APIs: element size 8B: MP/MC: bulk (size: 32): 65.65
  

Patch

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 22454b084..917c560ad 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -6,7 +6,7 @@  include $(RTE_SDK)/mk/rte.vars.mk
 # library name
 LIB = librte_ring.a
 
-CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -DALLOW_EXPERIMENTAL_API
 LDLIBS += -lrte_eal
 
 EXPORT_MAP := rte_ring_version.map
@@ -16,6 +16,7 @@  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+					rte_ring_elem.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h
 
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index ca8a435e9..f2f3ccc88 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -3,5 +3,9 @@ 
 
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
+		'rte_ring_elem.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h')
+
+# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
+allow_experimental_apis = true
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d9b308036..3e15dc398 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -33,6 +33,7 @@ 
 #include <rte_tailq.h>
 
 #include "rte_ring.h"
+#include "rte_ring_elem.h"
 
 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
@@ -46,23 +47,38 @@  EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_elem(unsigned int esize, unsigned int count)
 {
 	ssize_t sz;
 
+	/* Check if element size is a multiple of 4B */
+	if (esize % 4 != 0) {
+		RTE_LOG(ERR, RING, "element size is not a multiple of 4\n");
+
+		return -EINVAL;
+	}
+
 	/* count must be a power of 2 */
 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
 		RTE_LOG(ERR, RING,
-			"Requested size is invalid, must be power of 2, and "
-			"do not exceed the size limit %u\n", RTE_RING_SZ_MASK);
+			"Requested number of elements is invalid, must be power of 2, and not exceed %u\n",
+			RTE_RING_SZ_MASK);
+
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = sizeof(struct rte_ring) + count * esize;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
 
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_ring_get_memsize(unsigned count)
+{
+	return rte_ring_get_memsize_elem(sizeof(void *), count);
+}
+
 void
 rte_ring_reset(struct rte_ring *r)
 {
@@ -114,10 +130,10 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	return 0;
 }
 
-/* create the ring */
+/* create the ring for a given element size */
 struct rte_ring *
-rte_ring_create(const char *name, unsigned count, int socket_id,
-		unsigned flags)
+rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
+		int socket_id, unsigned int flags)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
 	struct rte_ring *r;
@@ -135,7 +151,7 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	if (flags & RING_F_EXACT_SZ)
 		count = rte_align32pow2(count + 1);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = rte_ring_get_memsize_elem(esize, count);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -182,6 +198,15 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	return r;
 }
 
+/* create the ring */
+struct rte_ring *
+rte_ring_create(const char *name, unsigned count, int socket_id,
+		unsigned flags)
+{
+	return rte_ring_create_elem(name, sizeof(void *), count, socket_id,
+		flags);
+}
+
 /* free the ring */
 void
 rte_ring_free(struct rte_ring *r)
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 2a9f768a1..18fc5d845 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -216,6 +216,7 @@  int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  */
 struct rte_ring *rte_ring_create(const char *name, unsigned count,
 				 int socket_id, unsigned flags);
+
 /**
  * De-allocate all memory used by the ring.
  *
diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
new file mode 100644
index 000000000..15d79bf2a
--- /dev/null
+++ b/lib/librte_ring/rte_ring_elem.h
@@ -0,0 +1,1003 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2019 Arm Limited
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_ELEM_H_
+#define _RTE_RING_ELEM_H_
+
+/**
+ * @file
+ * RTE Ring with user defined element size
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "rte_ring.h"
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Calculate the memory size needed for a ring with given element size
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the number of elements in it and the size of the element. This value
+ * is the sum of the size of the structure rte_ring and the size of the
+ * memory needed for storing the elements. The value is aligned to a cache
+ * line size.
+ *
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the ring on success.
+ *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
+ *		 power of 2.
+ */
+__rte_experimental
+ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a new ring named *name* that stores elements with given size.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_ring_init() to initialize an empty ring.
+ *
+ * The new ring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable ring size
+ * is *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is added in RTE_TAILQ_RING list.
+ *
+ * @param name
+ *   The name of the ring.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated ring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - esize is not a multiple of 4 or count provided is not a
+ *		 power of 2.
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
+			unsigned int count, int socket_id, unsigned int flags);
+
+static __rte_always_inline void
+enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
+		const void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	uint32_t *ring = (uint32_t *)&r[1];
+	const uint32_t *obj = (const uint32_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			ring[idx] = obj[i];
+			ring[idx + 1] = obj[i + 1];
+			ring[idx + 2] = obj[i + 2];
+			ring[idx + 3] = obj[i + 3];
+			ring[idx + 4] = obj[i + 4];
+			ring[idx + 5] = obj[i + 5];
+			ring[idx + 6] = obj[i + 6];
+			ring[idx + 7] = obj[i + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 6:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 5:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 4:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 3:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			ring[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			ring[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
+		const void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	uint64_t *ring = (uint64_t *)&r[1];
+	const uint64_t *obj = (const uint64_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			ring[idx] = obj[i];
+			ring[idx + 1] = obj[i + 1];
+			ring[idx + 2] = obj[i + 2];
+			ring[idx + 3] = obj[i + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			ring[idx++] = obj[i++];
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			ring[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			ring[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
+		const void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	rte_int128_t *ring = (rte_int128_t *)&r[1];
+	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+			memcpy((void *)(ring + idx),
+				(const void *)(obj + i), 32);
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(ring + idx),
+				(const void *)(obj + i), 16);
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			memcpy((void *)(ring + idx),
+				(const void *)(obj + i), 16);
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			memcpy((void *)(ring + idx),
+				(const void *)(obj + i), 16);
+	}
+}
+
+/* the actual enqueue of elements on the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+static __rte_always_inline void
+enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
+		uint32_t esize, uint32_t num)
+{
+	/* 8B and 16B copies implemented individually to retain
+	 * the current performance.
+	 */
+	if (esize == 8)
+		enqueue_elems_64(r, prod_head, obj_table, num);
+	else if (esize == 16)
+		enqueue_elems_128(r, prod_head, obj_table, num);
+	else {
+		uint32_t idx, scale, nr_idx, nr_num, nr_size;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nr_num = num * scale;
+		idx = prod_head & r->mask;
+		nr_idx = idx * scale;
+		nr_size = r->size * scale;
+		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
+	}
+}
+
+static __rte_always_inline void
+dequeue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
+		void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	uint32_t *ring = (uint32_t *)&r[1];
+	uint32_t *obj = (uint32_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			obj[i] = ring[idx];
+			obj[i + 1] = ring[idx + 1];
+			obj[i + 2] = ring[idx + 2];
+			obj[i + 3] = ring[idx + 3];
+			obj[i + 4] = ring[idx + 4];
+			obj[i + 5] = ring[idx + 5];
+			obj[i + 6] = ring[idx + 6];
+			obj[i + 7] = ring[idx + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 6:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 5:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 4:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 3:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = ring[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = ring[idx];
+	}
+}
+
+static __rte_always_inline void
+dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
+		void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	uint64_t *ring = (uint64_t *)&r[1];
+	uint64_t *obj = (uint64_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			obj[i] = ring[idx];
+			obj[i + 1] = ring[idx + 1];
+			obj[i + 2] = ring[idx + 2];
+			obj[i + 3] = ring[idx + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = ring[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = ring[idx];
+	}
+}
+
+static __rte_always_inline void
+dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
+		void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	rte_int128_t *ring = (rte_int128_t *)&r[1];
+	rte_int128_t *obj = (rte_int128_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(obj + i), (void *)(ring + idx), 16);
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			memcpy((void *)(obj + i), (void *)(ring + idx), 16);
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			memcpy((void *)(obj + i), (void *)(ring + idx), 16);
+	}
+}
+
+/* the actual dequeue of elements from the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+static __rte_always_inline void
+dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table,
+		uint32_t esize, uint32_t num)
+{
+	/* 8B and 16B copies implemented individually to retain
+	 * the current performance.
+	 */
+	if (esize == 8)
+		dequeue_elems_64(r, cons_head, obj_table, num);
+	else if (esize == 16)
+		dequeue_elems_128(r, cons_head, obj_table, num);
+	else {
+		uint32_t idx, scale, nr_idx, nr_num, nr_size;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nr_num = num * scale;
+		idx = cons_head & r->mask;
+		nr_idx = idx * scale;
+		nr_size = r->size * scale;
+		dequeue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
+	}
+}
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
+		unsigned int *free_space)
+{
+	uint32_t prod_head, prod_next;
+	uint32_t free_entries;
+
+	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+			&prod_head, &prod_next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	enqueue_elems(r, prod_head, obj_table, esize, n);
+
+	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
+		unsigned int *available)
+{
+	uint32_t cons_head, cons_next;
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
+			&cons_head, &cons_next, &entries);
+	if (n == 0)
+		goto end;
+
+	dequeue_elems(r, cons_head, obj_table, esize, n);
+
+	update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+}
+
+/**
+ * Enqueue one object on a ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+				RTE_RING_QUEUE_FIXED, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table,
+ *   must be strictly positive.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_SC, available);
+}
+
+/**
+ * Dequeue several objects from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, r->cons.single, available);
+}
+
+/**
+ * Dequeue one object from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
+				unsigned int esize)
+{
+	return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
+				unsigned int esize)
+{
+	return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success, objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
+{
+	return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned
+rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned
+rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+static __rte_always_inline unsigned int
+rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+				RTE_RING_QUEUE_VARIABLE,
+				r->cons.single, available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_ELEM_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index 89d84bcf4..7a5328dd5 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -15,6 +15,8 @@  DPDK_20.0 {
 EXPERIMENTAL {
 	global:
 
+	rte_ring_create_elem;
+	rte_ring_get_memsize_elem;
 	rte_ring_reset;
 
 };