[v3,9/9] ring: add C11 memory model for new sync modes

Message ID 20200403174235.23308-10-konstantin.ananyev@intel.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series New sync modes for ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/travis-robot success Travis build: passed
ci/Intel-compilation success Compilation OK

Commit Message

Ananyev, Konstantin April 3, 2020, 5:42 p.m. UTC
  Add C11 atomics based implementation for RTS and HTS
head/tail update primitivies.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 lib/librte_ring/Makefile               |   4 +-
 lib/librte_ring/meson.build            |   2 +
 lib/librte_ring/rte_ring_hts.h         |   4 +
 lib/librte_ring/rte_ring_hts_c11_mem.h | 222 +++++++++++++++++++++++++
 lib/librte_ring/rte_ring_hts_elem.h    |   4 +
 lib/librte_ring/rte_ring_rts.h         |   4 +
 lib/librte_ring/rte_ring_rts_c11_mem.h | 198 ++++++++++++++++++++++
 lib/librte_ring/rte_ring_rts_elem.h    |   4 +
 8 files changed, 441 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_ring/rte_ring_hts_c11_mem.h
 create mode 100644 lib/librte_ring/rte_ring_rts_c11_mem.h
  

Comments

Honnappa Nagarahalli April 14, 2020, 4:28 a.m. UTC | #1
<snip>
Hi Konstantin,
	It would be good to blend this commit with the other commits. Few comments inline.

> Subject: [PATCH v3 9/9] ring: add C11 memory model for new sync modes
> 
> Add C11 atomics based implementation for RTS and HTS head/tail update
> primitivies.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
>  lib/librte_ring/Makefile               |   4 +-
>  lib/librte_ring/meson.build            |   2 +
>  lib/librte_ring/rte_ring_hts.h         |   4 +
>  lib/librte_ring/rte_ring_hts_c11_mem.h | 222 +++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_hts_elem.h    |   4 +
>  lib/librte_ring/rte_ring_rts.h         |   4 +
>  lib/librte_ring/rte_ring_rts_c11_mem.h | 198 ++++++++++++++++++++++
>  lib/librte_ring/rte_ring_rts_elem.h    |   4 +
>  8 files changed, 441 insertions(+), 1 deletion(-)  create mode 100644
> lib/librte_ring/rte_ring_hts_c11_mem.h
>  create mode 100644 lib/librte_ring/rte_ring_rts_c11_mem.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 5f8662737..927d105bf 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -22,9 +22,11 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=
> rte_ring.h \
>  					rte_ring_hts.h \
>  					rte_ring_hts_elem.h \
>  					rte_ring_hts_generic.h \
> +					rte_ring_hts_c11_mem.h \
>  					rte_ring_peek.h \
>  					rte_ring_rts.h \
>  					rte_ring_rts_elem.h \
> -					rte_ring_rts_generic.h
> +					rte_ring_rts_generic.h \
> +					rte_ring_rts_c11_mem.h
> 
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> f5f84dc6e..f2e37a8e4 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -7,10 +7,12 @@ headers = files('rte_ring.h',
>  		'rte_ring_c11_mem.h',
>  		'rte_ring_generic.h',
>  		'rte_ring_hts.h',
> +		'rte_ring_hts_c11_mem.h',
>  		'rte_ring_hts_elem.h',
>  		'rte_ring_hts_generic.h',
>  		'rte_ring_peek.h',
>  		'rte_ring_rts.h',
> +		'rte_ring_rts_c11_mem.h',
>  		'rte_ring_rts_elem.h',
>  		'rte_ring_rts_generic.h')
> 
> diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h index
> 062d7be6c..ddaa47ff1 100644
> --- a/lib/librte_ring/rte_ring_hts.h
> +++ b/lib/librte_ring/rte_ring_hts.h
> @@ -29,7 +29,11 @@
>  extern "C" {
>  #endif
> 
> +#ifdef RTE_USE_C11_MEM_MODEL
> +#include <rte_ring_hts_c11_mem.h>
> +#else
>  #include <rte_ring_hts_generic.h>
> +#endif
> 
>  /**
>   * @internal Enqueue several objects on the HTS ring.
> diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h
> b/lib/librte_ring/rte_ring_hts_c11_mem.h
> new file mode 100644
> index 000000000..0218d0e7d
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_hts_c11_mem.h
> @@ -0,0 +1,222 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2020 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_HTS_C11_MEM_H_
> +#define _RTE_RING_HTS_C11_MEM_H_
> +
> +/**
> + * @file rte_ring_hts_c11_mem.h
> + * It is not recommended to include this file directly,
> + * include <rte_ring.h> instead.
> + * Contains internal helper functions for head/tail sync (HTS) ring mode.
> + * For more information please refer to <rte_ring_hts.h>.
> + */
> +
> +/**
> + * @internal get current tail value.
> + * Check that user didn't request to move tail above the head.
> + * In that situation:
> + * - return zero, that will cause abort any pending changes and
> + *   return head to its previous position.
> + * - throw an assert in debug mode.
> + */
> +static __rte_always_inline uint32_t
> +__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail,
> +	uint32_t num)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_pos p;
> +
> +	p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED);
> +	n = p.pos.head - p.pos.tail;
> +
> +	RTE_ASSERT(n >= num);
> +	num = (n >= num) ? num : 0;
> +
> +	*tail = p.pos.tail;
> +	return num;
> +}
> +
> +/**
> + * @internal set new values for head and tail as one atomic 64 bit operation.
> + * Should be used only in conjunction with __rte_ring_hts_get_tail.
> + */
> +static __rte_always_inline void
> +__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail,
> +	uint32_t num, uint32_t enqueue)
> +{
> +	union rte_ring_ht_pos p;
> +
> +	RTE_SET_USED(enqueue);
> +
> +	p.pos.head = tail + num;
> +	p.pos.tail = p.pos.head;
> +
> +	__atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE); }
> +
> +static __rte_always_inline void
> +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
> +	uint32_t enqueue)
> +{
> +	uint32_t tail;
> +
> +	num = __rte_ring_hts_get_tail(ht, &tail, num);
> +	__rte_ring_hts_set_head_tail(ht, tail, num, enqueue); }
> +
> +/**
> + * @internal waits till tail will become equal to head.
> + * Means no writer/reader is active for that ring.
> + * Suppose to work as serialization point.
> + */
> +static __rte_always_inline void
> +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
> +		union rte_ring_ht_pos *p)
> +{
> +	p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
> +
> +	while (p->pos.head != p->pos.tail) {
> +		rte_pause();
> +		p->raw = __atomic_load_n(&ht->ht.raw,
> __ATOMIC_ACQUIRE);
> +	}
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *free_entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_pos np, op;
> +
> +	const uint32_t capacity = r->capacity;
> +
> +	do {
> +		/* Reset n to the initial burst count */
> +		n = num;
> +
> +		/* wait for tail to be equal to head, , acquire point */
> +		__rte_ring_hts_head_wait(&r->hts_prod, &op);
> +
> +		/*
> +		 *  The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * *old_head > cons_tail). So 'free_entries' is always between
> 0
> +		 * and capacity (which is < size).
> +		 */
> +		*free_entries = capacity + r->cons.tail - op.pos.head;
> +
> +		/* check that we have enough room in ring */
> +		if (unlikely(n > *free_entries))
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +					0 : *free_entries;
> +
> +		if (n == 0)
> +			break;
> +
> +		np.pos.tail = op.pos.tail;
> +		np.pos.head = op.pos.head + n;
> +
> +	} while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
> +			&op.raw, np.raw,
> +			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
__ATOMIC_RELEASE can be __ATOMIC_RELAXED. The RELEASE while updating after the elements are written is enough.

> +
> +	*old_head = op.pos.head;
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_pos np, op;
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n = num;
> +
> +		/* wait for tail to be equal to head */
> +		__rte_ring_hts_head_wait(&r->hts_cons, &op);
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries = r->prod.tail - op.pos.head;
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> *entries;
> +
> +		if (unlikely(n == 0))
> +			break;
> +
> +		np.pos.tail = op.pos.tail;
> +		np.pos.head = op.pos.head + n;
> +
> +	} while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
> +			&op.raw, np.raw,
> +			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
Same here, RELEASE can be RELAXED.

> +
> +	*old_head = op.pos.head;
> +	return n;
> +}
> +
> +#endif /* _RTE_RING_HTS_C11_MEM_H_ */

<snip>

>  /**
>   * @internal Enqueue several objects on the RTS ring.
> diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h
> b/lib/librte_ring/rte_ring_rts_c11_mem.h
> new file mode 100644
> index 000000000..b72901497
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_rts_c11_mem.h
> @@ -0,0 +1,198 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_RTS_C11_MEM_H_
> +#define _RTE_RING_RTS_C11_MEM_H_
> +
> +/**
> + * @file rte_ring_rts_c11_mem.h
> + * It is not recommended to include this file directly,
> + * include <rte_ring.h> instead.
> + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
> + * For more information please refer to <rte_ring_rts.h>.
> + */
> +
> +/**
> + * @internal This function updates tail values.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) {
> +	union rte_ring_ht_poscnt h, ot, nt;
> +
> +	/*
> +	 * If there are other enqueues/dequeues in progress that
> +	 * might preceded us, then don't update tail with new value.
> +	 */
> +
> +	ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
This can be RELAXED. This thread is reading the value that it updated earlier, so it should be able to see the value it updated.

> +
> +	do {
> +		/* on 32-bit systems we have to do atomic read here */
> +		h.raw = __atomic_load_n(&ht->head.raw,
> __ATOMIC_RELAXED);
> +
> +		nt.raw = ot.raw;
> +		if (++nt.val.cnt == h.val.cnt)
> +			nt.val.pos = h.val.pos;
> +
> +	} while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw,
> nt.raw,
> +			0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0); }
> +
> +/**
> + * @internal This function waits till head/tail distance wouldn't
> + * exceed pre-defined max value.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
> +	union rte_ring_ht_poscnt *h)
> +{
> +	uint32_t max;
> +
> +	max = ht->htd_max;
> +	h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
> +
> +	while (h->val.pos - ht->tail.val.pos > max) {
> +		rte_pause();
> +		h->raw = __atomic_load_n(&ht->head.raw,
> __ATOMIC_ACQUIRE);
> +	}
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue.
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline uint32_t
> +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *free_entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_poscnt nh, oh;
> +
> +	const uint32_t capacity = r->capacity;
> +
> +	do {
> +		/* Reset n to the initial burst count */
> +		n = num;
> +
> +		/* read prod head (may spin on prod tail, acquire point) */
> +		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
> +
> +		/*
> +		 *  The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * *old_head > cons_tail). So 'free_entries' is always between
> 0
> +		 * and capacity (which is < size).
> +		 */
> +		*free_entries = capacity + r->cons.tail - oh.val.pos;
> +
> +		/* check that we have enough room in ring */
> +		if (unlikely(n > *free_entries))
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +					0 : *free_entries;
> +
> +		if (n == 0)
> +			break;
> +
> +		nh.val.pos = oh.val.pos + n;
> +		nh.val.cnt = oh.val.cnt + 1;
> +
> +	} while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
> +			&oh.raw, nh.raw,
> +			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
> +
> +	*old_head = oh.val.pos;
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_poscnt nh, oh;
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n = num;
> +
> +		/* read cons head (may spin on cons tail, acquire point) */
> +		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries = r->prod.tail - oh.val.pos;
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> *entries;
> +
> +		if (unlikely(n == 0))
> +			break;
> +
> +		nh.val.pos = oh.val.pos + n;
> +		nh.val.cnt = oh.val.cnt + 1;
> +
> +	} while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
> +			&oh.raw, nh.raw,
> +			1, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
> +
> +	*old_head = oh.val.pos;
> +	return n;
> +}
> +
> +#endif /* _RTE_RING_RTS_C11_MEM_H_ */
> diff --git a/lib/librte_ring/rte_ring_rts_elem.h
> b/lib/librte_ring/rte_ring_rts_elem.h
> index 71a331b23..23d8aeec7 100644
> --- a/lib/librte_ring/rte_ring_rts_elem.h
> +++ b/lib/librte_ring/rte_ring_rts_elem.h
> @@ -24,7 +24,11 @@
>  extern "C" {
>  #endif
> 
> +#ifdef RTE_USE_C11_MEM_MODEL
> +#include <rte_ring_rts_c11_mem.h>
> +#else
>  #include <rte_ring_rts_generic.h>
> +#endif
> 
>  /**
>   * @internal Enqueue several objects on the RTS ring.
> --
> 2.17.1
  
Ananyev, Konstantin April 14, 2020, 6:29 p.m. UTC | #2
> <snip>
> 
> >  /**
> >   * @internal Enqueue several objects on the RTS ring.
> > diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h
> > b/lib/librte_ring/rte_ring_rts_c11_mem.h
> > new file mode 100644
> > index 000000000..b72901497
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_rts_c11_mem.h
> > @@ -0,0 +1,198 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2010-2017 Intel Corporation
> > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_RTS_C11_MEM_H_
> > +#define _RTE_RING_RTS_C11_MEM_H_
> > +
> > +/**
> > + * @file rte_ring_rts_c11_mem.h
> > + * It is not recommended to include this file directly,
> > + * include <rte_ring.h> instead.
> > + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
> > + * For more information please refer to <rte_ring_rts.h>.
> > + */
> > +
> > +/**
> > + * @internal This function updates tail values.
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) {
> > +	union rte_ring_ht_poscnt h, ot, nt;
> > +
> > +	/*
> > +	 * If there are other enqueues/dequeues in progress that
> > +	 * might preceded us, then don't update tail with new value.
> > +	 */
> > +
> > +	ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
> This can be RELAXED. This thread is reading the value that it updated earlier, so it should be able to see the value it updated.

It serves as a hoist barrier, to make sure that we read tail before head (see below).
 
> > +
> > +	do {
> > +		/* on 32-bit systems we have to do atomic read here */
> > +		h.raw = __atomic_load_n(&ht->head.raw,
> > __ATOMIC_RELAXED);
> > +
> > +		nt.raw = ot.raw;
> > +		if (++nt.val.cnt == h.val.cnt)
> > +			nt.val.pos = h.val.pos;
> > +
> > +	} while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw,
> > nt.raw,
> > +			0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0); }
> > +
> > +/**
> > + * @internal This function waits till head/tail distance wouldn't
> > + * exceed pre-defined max value.
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
> > +	union rte_ring_ht_poscnt *h)
> > +{
> > +	uint32_t max;
> > +
> > +	max = ht->htd_max;
> > +	h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
> > +
> > +	while (h->val.pos - ht->tail.val.pos > max) {
> > +		rte_pause();
> > +		h->raw = __atomic_load_n(&ht->head.raw,
> > __ATOMIC_ACQUIRE);
> > +	}
> > +}
> > +
  
Ananyev, Konstantin April 15, 2020, 8:28 p.m. UTC | #3
Hi Honnappa,

> > +
> > +/**
> > + * @internal This function updates the producer head for enqueue
> > + *
> > + * @param r
> > + *   A pointer to the ring structure
> > + * @param is_sp
> > + *   Indicates whether multi-producer path is needed or not
> > + * @param n
> > + *   The number of elements we will want to enqueue, i.e. how far should the
> > + *   head be moved
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> > ring
> > + * @param old_head
> > + *   Returns head value as it was before the move, i.e. where enqueue starts
> > + * @param new_head
> > + *   Returns the current/new head value i.e. where enqueue finishes
> > + * @param free_entries
> > + *   Returns the amount of free space in the ring BEFORE head was moved
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
> > +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> > +	uint32_t *free_entries)
> > +{
> > +	uint32_t n;
> > +	union rte_ring_ht_pos np, op;
> > +
> > +	const uint32_t capacity = r->capacity;
> > +
> > +	do {
> > +		/* Reset n to the initial burst count */
> > +		n = num;
> > +
> > +		/* wait for tail to be equal to head, , acquire point */
> > +		__rte_ring_hts_head_wait(&r->hts_prod, &op);
> > +
> > +		/*
> > +		 *  The subtraction is done between two unsigned 32bits value
> > +		 * (the result is always modulo 32 bits even if we have
> > +		 * *old_head > cons_tail). So 'free_entries' is always between
> > 0
> > +		 * and capacity (which is < size).
> > +		 */
> > +		*free_entries = capacity + r->cons.tail - op.pos.head;
> > +
> > +		/* check that we have enough room in ring */
> > +		if (unlikely(n > *free_entries))
> > +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > +					0 : *free_entries;
> > +
> > +		if (n == 0)
> > +			break;
> > +
> > +		np.pos.tail = op.pos.tail;
> > +		np.pos.head = op.pos.head + n;
> > +
> > +	} while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
> > +			&op.raw, np.raw,
> > +			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
> __ATOMIC_RELEASE can be __ATOMIC_RELAXED. The RELEASE while updating after the elements are written is enough.

I looked at it once again and I think RELAXED probably not enough here
(same as RELEASE).
Seems we have to use ACQUIRE here (and in other similar places)
to forbid CPU speculatively do actual objects copy before CAS.
Another alternative would probably use :
cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
*free_entries = capacity + cons_tail - op.pos.head;
instead of just
*free_entries = capacity + r->cons.tail - op.pos.head;
above.
But that would mean two acquire points inside the loop:
load(prod, ACQUIRE);
load(cons.tail, ACQUIRE);
Plus for me CAS(..., ACQUIRE, ACQUIRE) seems more logical here,
so leaning that way.  

> 
> > +
> > +	*old_head = op.pos.head;
> > +	return n;
> > +}
  

Patch

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 5f8662737..927d105bf 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -22,9 +22,11 @@  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
 					rte_ring_hts.h \
 					rte_ring_hts_elem.h \
 					rte_ring_hts_generic.h \
+					rte_ring_hts_c11_mem.h \
 					rte_ring_peek.h \
 					rte_ring_rts.h \
 					rte_ring_rts_elem.h \
-					rte_ring_rts_generic.h
+					rte_ring_rts_generic.h \
+					rte_ring_rts_c11_mem.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index f5f84dc6e..f2e37a8e4 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -7,10 +7,12 @@  headers = files('rte_ring.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h',
 		'rte_ring_hts.h',
+		'rte_ring_hts_c11_mem.h',
 		'rte_ring_hts_elem.h',
 		'rte_ring_hts_generic.h',
 		'rte_ring_peek.h',
 		'rte_ring_rts.h',
+		'rte_ring_rts_c11_mem.h',
 		'rte_ring_rts_elem.h',
 		'rte_ring_rts_generic.h')
 
diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h
index 062d7be6c..ddaa47ff1 100644
--- a/lib/librte_ring/rte_ring_hts.h
+++ b/lib/librte_ring/rte_ring_hts.h
@@ -29,7 +29,11 @@ 
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_hts_c11_mem.h>
+#else
 #include <rte_ring_hts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the HTS ring.
diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h b/lib/librte_ring/rte_ring_hts_c11_mem.h
new file mode 100644
index 000000000..0218d0e7d
--- /dev/null
+++ b/lib/librte_ring/rte_ring_hts_c11_mem.h
@@ -0,0 +1,222 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2020 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_HTS_C11_MEM_H_
+#define _RTE_RING_HTS_C11_MEM_H_
+
+/**
+ * @file rte_ring_hts_c11_mem.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for head/tail sync (HTS) ring mode.
+ * For more information please refer to <rte_ring_hts.h>.
+ */
+
+/**
+ * @internal get current tail value.
+ * Check that user didn't request to move tail above the head.
+ * In that situation:
+ * - return zero, that will cause abort any pending changes and
+ *   return head to its previous position.
+ * - throw an assert in debug mode.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail,
+	uint32_t num)
+{
+	uint32_t n;
+	union rte_ring_ht_pos p;
+
+	p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED);
+	n = p.pos.head - p.pos.tail;
+
+	RTE_ASSERT(n >= num);
+	num = (n >= num) ? num : 0;
+
+	*tail = p.pos.tail;
+	return num;
+}
+
+/**
+ * @internal set new values for head and tail as one atomic 64 bit operation.
+ * Should be used only in conjunction with __rte_ring_hts_get_tail.
+ */
+static __rte_always_inline void
+__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail,
+	uint32_t num, uint32_t enqueue)
+{
+	union rte_ring_ht_pos p;
+
+	RTE_SET_USED(enqueue);
+
+	p.pos.head = tail + num;
+	p.pos.tail = p.pos.head;
+
+	__atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE);
+}
+
+static __rte_always_inline void
+__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
+	uint32_t enqueue)
+{
+	uint32_t tail;
+
+	num = __rte_ring_hts_get_tail(ht, &tail, num);
+	__rte_ring_hts_set_head_tail(ht, tail, num, enqueue);
+}
+
+/**
+ * @internal waits till tail will become equal to head.
+ * Means no writer/reader is active for that ring.
+ * Suppose to work as serialization point.
+ */
+static __rte_always_inline void
+__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
+		union rte_ring_ht_pos *p)
+{
+	p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+
+	while (p->pos.head != p->pos.tail) {
+		rte_pause();
+		p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+	}
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	uint32_t n;
+	union rte_ring_ht_pos np, op;
+
+	const uint32_t capacity = r->capacity;
+
+	do {
+		/* Reset n to the initial burst count */
+		n = num;
+
+		/* wait for tail to be equal to head, , acquire point */
+		__rte_ring_hts_head_wait(&r->hts_prod, &op);
+
+		/*
+		 *  The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * and capacity (which is < size).
+		 */
+		*free_entries = capacity + r->cons.tail - op.pos.head;
+
+		/* check that we have enough room in ring */
+		if (unlikely(n > *free_entries))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : *free_entries;
+
+		if (n == 0)
+			break;
+
+		np.pos.tail = op.pos.tail;
+		np.pos.head = op.pos.head + n;
+
+	} while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
+			&op.raw, np.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = op.pos.head;
+	return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *entries)
+{
+	uint32_t n;
+	union rte_ring_ht_pos np, op;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = num;
+
+		/* wait for tail to be equal to head */
+		__rte_ring_hts_head_wait(&r->hts_cons, &op);
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = r->prod.tail - op.pos.head;
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			break;
+
+		np.pos.tail = op.pos.tail;
+		np.pos.head = op.pos.head + n;
+
+	} while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
+			&op.raw, np.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = op.pos.head;
+	return n;
+}
+
+#endif /* _RTE_RING_HTS_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_hts_elem.h b/lib/librte_ring/rte_ring_hts_elem.h
index 34f0d121d..1e9a49c7a 100644
--- a/lib/librte_ring/rte_ring_hts_elem.h
+++ b/lib/librte_ring/rte_ring_hts_elem.h
@@ -24,7 +24,11 @@ 
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_hts_c11_mem.h>
+#else
 #include <rte_ring_hts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the HTS ring.
diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_rts.h
index 18404fe48..28b2d25f5 100644
--- a/lib/librte_ring/rte_ring_rts.h
+++ b/lib/librte_ring/rte_ring_rts.h
@@ -55,7 +55,11 @@ 
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_rts_c11_mem.h>
+#else
 #include <rte_ring_rts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the RTS ring.
diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h b/lib/librte_ring/rte_ring_rts_c11_mem.h
new file mode 100644
index 000000000..b72901497
--- /dev/null
+++ b/lib/librte_ring/rte_ring_rts_c11_mem.h
@@ -0,0 +1,198 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_RTS_C11_MEM_H_
+#define _RTE_RING_RTS_C11_MEM_H_
+
+/**
+ * @file rte_ring_rts_c11_mem.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
+ * For more information please refer to <rte_ring_rts.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
+static __rte_always_inline void
+__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
+{
+	union rte_ring_ht_poscnt h, ot, nt;
+
+	/*
+	 * If there are other enqueues/dequeues in progress that
+	 * might preceded us, then don't update tail with new value.
+	 */
+
+	ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
+
+	do {
+		/* on 32-bit systems we have to do atomic read here */
+		h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
+
+		nt.raw = ot.raw;
+		if (++nt.val.cnt == h.val.cnt)
+			nt.val.pos = h.val.pos;
+
+	} while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
+}
+
+/**
+ * @internal This function waits till head/tail distance wouldn't
+ * exceed pre-defined max value.
+ */
+static __rte_always_inline void
+__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
+	union rte_ring_ht_poscnt *h)
+{
+	uint32_t max;
+
+	max = ht->htd_max;
+	h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+
+	while (h->val.pos - ht->tail.val.pos > max) {
+		rte_pause();
+		h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+	}
+}
+
+/**
+ * @internal This function updates the producer head for enqueue.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	uint32_t n;
+	union rte_ring_ht_poscnt nh, oh;
+
+	const uint32_t capacity = r->capacity;
+
+	do {
+		/* Reset n to the initial burst count */
+		n = num;
+
+		/* read prod head (may spin on prod tail, acquire point) */
+		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
+
+		/*
+		 *  The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * and capacity (which is < size).
+		 */
+		*free_entries = capacity + r->cons.tail - oh.val.pos;
+
+		/* check that we have enough room in ring */
+		if (unlikely(n > *free_entries))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : *free_entries;
+
+		if (n == 0)
+			break;
+
+		nh.val.pos = oh.val.pos + n;
+		nh.val.cnt = oh.val.cnt + 1;
+
+	} while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
+			&oh.raw, nh.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = oh.val.pos;
+	return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *entries)
+{
+	uint32_t n;
+	union rte_ring_ht_poscnt nh, oh;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = num;
+
+		/* read cons head (may spin on cons tail, acquire point) */
+		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = r->prod.tail - oh.val.pos;
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			break;
+
+		nh.val.pos = oh.val.pos + n;
+		nh.val.cnt = oh.val.cnt + 1;
+
+	} while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
+			&oh.raw, nh.raw,
+			1, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = oh.val.pos;
+	return n;
+}
+
+#endif /* _RTE_RING_RTS_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_rts_elem.h b/lib/librte_ring/rte_ring_rts_elem.h
index 71a331b23..23d8aeec7 100644
--- a/lib/librte_ring/rte_ring_rts_elem.h
+++ b/lib/librte_ring/rte_ring_rts_elem.h
@@ -24,7 +24,11 @@ 
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_rts_c11_mem.h>
+#else
 #include <rte_ring_rts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the RTS ring.