ring: empty and count optimizations

Message ID 20200513170812.38233-1-mb@smartsharesystems.com (mailing list archive)
State Superseded, archived
Headers
Series ring: empty and count optimizations |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-nxp-Performance success Performance Testing PASS
ci/travis-robot success Travis build: passed
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-testing fail Testing issues

Commit Message

Morten Brørup May 13, 2020, 5:08 p.m. UTC
Testing if the ring is empty is as simple as comparing the producer and
consumer pointers.
In theory, this optimization reduces the number of potential cache misses
from 3 to 2 by not having to read r->mask in rte_ring_count().

It is not possible to enqueue more elements than the capacity of a ring,
so the capacity comparison is a safeguard for observer threads only.
Instead of completely removing the comparison, I have reorganized it to
resemble the other trigrahps in the ring library and added a likely().

The modification of these two functions were discussed in the RFC here:
https://mails.dpdk.org/archives/dev/2020-April/165752.html

Also fixed some existing code not passing checkpatch.

Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
---
 lib/librte_ring/rte_ring.h | 36 +++++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 17 deletions(-)
  

Comments

Ananyev, Konstantin May 14, 2020, 12:23 p.m. UTC | #1
Hi Morten,

> Testing if the ring is empty is as simple as comparing the producer and
> consumer pointers.
> In theory, this optimization reduces the number of potential cache misses
> from 3 to 2 by not having to read r->mask in rte_ring_count().
> 
> It is not possible to enqueue more elements than the capacity of a ring,
> so the capacity comparison is a safeguard for observer threads only.
> Instead of completely removing the comparison, I have reorganized it to
> resemble the other trigrahps in the ring library and added a likely().
> 
> The modification of these two functions were discussed in the RFC here:
> https://mails.dpdk.org/archives/dev/2020-April/165752.html
> 
> Also fixed some existing code not passing checkpatch.
> 
> Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
> ---
>  lib/librte_ring/rte_ring.h | 36 +++++++++++++++++++-----------------
>  1 file changed, 19 insertions(+), 17 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 86faede81..36438d9cd 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -55,7 +55,7 @@ extern "C" {
>   *   - The memory size needed for the ring on success.
>   *   - -EINVAL if count is not a power of 2.
>   */
> -ssize_t rte_ring_get_memsize(unsigned count);
> +ssize_t rte_ring_get_memsize(unsigned int count);

All these changes to replace 'unsigned' with insigned int' -
seems to be irrelevant to the patch subject, so can you
put them to a separate patch in the series. 
 
>  /**
>   * Initialize a ring structure.
> @@ -109,8 +109,8 @@ ssize_t rte_ring_get_memsize(unsigned count);
>   * @return
>   *   0 on success, or a negative value on error.
>   */
> -int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> -	unsigned flags);
> +int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
> +	unsigned int flags);
> 
>  /**
>   * Create a new ring named *name* in memory.
> @@ -169,8 +169,8 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
>   *    - EEXIST - a memzone with the same name already exists
>   *    - ENOMEM - no appropriate memory area found in which to create memzone
>   */
> -struct rte_ring *rte_ring_create(const char *name, unsigned count,
> -				 int socket_id, unsigned flags);
> +struct rte_ring *rte_ring_create(const char *name, unsigned int count,
> +				 int socket_id, unsigned int flags);
> 
>  /**
>   * De-allocate all memory used by the ring.
> @@ -199,7 +199,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  	uint32_t idx = prod_head & (r)->mask; \
>  	obj_type *ring = (obj_type *)ring_start; \
>  	if (likely(idx + n < size)) { \
> -		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { \
>  			ring[idx] = obj_table[i]; \
>  			ring[idx+1] = obj_table[i+1]; \
>  			ring[idx+2] = obj_table[i+2]; \
> @@ -230,7 +230,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  	const uint32_t size = (r)->size; \
>  	obj_type *ring = (obj_type *)ring_start; \
>  	if (likely(idx + n < size)) { \
> -		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {\
>  			obj_table[i] = ring[idx]; \
>  			obj_table[i+1] = ring[idx+1]; \
>  			obj_table[i+2] = ring[idx+2]; \
> @@ -683,13 +683,13 @@ rte_ring_reset(struct rte_ring *r);
>   * @return
>   *   The number of entries in the ring.
>   */
> -static inline unsigned
> +static inline unsigned int
>  rte_ring_count(const struct rte_ring *r)
>  {
>  	uint32_t prod_tail = r->prod.tail;
>  	uint32_t cons_tail = r->cons.tail;
>  	uint32_t count = (prod_tail - cons_tail) & r->mask;
> -	return (count > r->capacity) ? r->capacity : count;
> +	return likely(count <= r->capacity) ? count : r->capacity;

Honestly, I don't see there is any point of that change:
I think it wouldn't change anything in terms of functionality
or performance. 

>  }
> 
>  /**
> @@ -700,7 +700,7 @@ rte_ring_count(const struct rte_ring *r)
>   * @return
>   *   The number of free entries in the ring.
>   */
> -static inline unsigned
> +static inline unsigned int
>  rte_ring_free_count(const struct rte_ring *r)
>  {
>  	return r->capacity - rte_ring_count(r);
> @@ -733,7 +733,9 @@ rte_ring_full(const struct rte_ring *r)
>  static inline int
>  rte_ring_empty(const struct rte_ring *r)
>  {
> -	return rte_ring_count(r) == 0;
> +	uint32_t prod_tail = r->prod.tail;
> +	uint32_t cons_tail = r->cons.tail;
> +	return cons_tail == prod_tail;
>  }
> 
>  /**
> @@ -860,7 +862,7 @@ struct rte_ring *rte_ring_lookup(const char *name);
>   * @return
>   *   - n: Actual number of objects enqueued.
>   */
> -static __rte_always_inline unsigned
> +static __rte_always_inline unsigned int
>  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> @@ -883,7 +885,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>   * @return
>   *   - n: Actual number of objects enqueued.
>   */
> -static __rte_always_inline unsigned
> +static __rte_always_inline unsigned int
>  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> @@ -910,7 +912,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>   * @return
>   *   - n: Actual number of objects enqueued.
>   */
> -static __rte_always_inline unsigned
> +static __rte_always_inline unsigned int
>  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> @@ -954,7 +956,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>   * @return
>   *   - n: Actual number of objects dequeued, 0 if ring is empty
>   */
> -static __rte_always_inline unsigned
> +static __rte_always_inline unsigned int
>  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> @@ -979,7 +981,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
>   * @return
>   *   - n: Actual number of objects dequeued, 0 if ring is empty
>   */
> -static __rte_always_inline unsigned
> +static __rte_always_inline unsigned int
>  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> @@ -1006,7 +1008,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
>   * @return
>   *   - Number of objects dequeued
>   */
> -static __rte_always_inline unsigned
> +static __rte_always_inline unsigned int
>  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> --
> 2.17.1
  
Morten Brørup May 14, 2020, 1:45 p.m. UTC | #2
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Ananyev,
> Konstantin
> Sent: Thursday, May 14, 2020 2:24 PM
> 
> 
> Hi Morten,
> 
> > Testing if the ring is empty is as simple as comparing the producer
> and
> > consumer pointers.
> > In theory, this optimization reduces the number of potential cache
> misses
> > from 3 to 2 by not having to read r->mask in rte_ring_count().
> >
> > It is not possible to enqueue more elements than the capacity of a
> ring,
> > so the capacity comparison is a safeguard for observer threads only.
> > Instead of completely removing the comparison, I have reorganized it
> to
> > resemble the other trigrahps in the ring library and added a
> likely().
> >
> > The modification of these two functions were discussed in the RFC
> here:
> > https://mails.dpdk.org/archives/dev/2020-April/165752.html
> >
> > Also fixed some existing code not passing checkpatch.
> >
> > Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
> > ---
> >  lib/librte_ring/rte_ring.h | 36 +++++++++++++++++++-----------------
> >  1 file changed, 19 insertions(+), 17 deletions(-)
> >
> > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > index 86faede81..36438d9cd 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -55,7 +55,7 @@ extern "C" {
> >   *   - The memory size needed for the ring on success.
> >   *   - -EINVAL if count is not a power of 2.
> >   */
> > -ssize_t rte_ring_get_memsize(unsigned count);
> > +ssize_t rte_ring_get_memsize(unsigned int count);
> 
> All these changes to replace 'unsigned' with insigned int' -
> seems to be irrelevant to the patch subject, so can you
> put them to a separate patch in the series.

Will do.

Perhaps you could find an intern in Intel to fix all these quirks in ancient DPDK code that checkpatch complains about in one big patch, so we don't run into this over and over again, when submitting patches. :-)

> 
> >  /**
> >   * Initialize a ring structure.
> > @@ -109,8 +109,8 @@ ssize_t rte_ring_get_memsize(unsigned count);
> >   * @return
> >   *   0 on success, or a negative value on error.
> >   */
> > -int rte_ring_init(struct rte_ring *r, const char *name, unsigned
> count,
> > -	unsigned flags);
> > +int rte_ring_init(struct rte_ring *r, const char *name, unsigned int
> count,
> > +	unsigned int flags);
> >
> >  /**
> >   * Create a new ring named *name* in memory.
> > @@ -169,8 +169,8 @@ int rte_ring_init(struct rte_ring *r, const char
> *name, unsigned count,
> >   *    - EEXIST - a memzone with the same name already exists
> >   *    - ENOMEM - no appropriate memory area found in which to create
> memzone
> >   */
> > -struct rte_ring *rte_ring_create(const char *name, unsigned count,
> > -				 int socket_id, unsigned flags);
> > +struct rte_ring *rte_ring_create(const char *name, unsigned int
> count,
> > +				 int socket_id, unsigned int flags);
> >
> >  /**
> >   * De-allocate all memory used by the ring.
> > @@ -199,7 +199,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> *r);
> >  	uint32_t idx = prod_head & (r)->mask; \
> >  	obj_type *ring = (obj_type *)ring_start; \
> >  	if (likely(idx + n < size)) { \
> > -		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
> > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { \
> >  			ring[idx] = obj_table[i]; \
> >  			ring[idx+1] = obj_table[i+1]; \
> >  			ring[idx+2] = obj_table[i+2]; \
> > @@ -230,7 +230,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> *r);
> >  	const uint32_t size = (r)->size; \
> >  	obj_type *ring = (obj_type *)ring_start; \
> >  	if (likely(idx + n < size)) { \
> > -		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
> > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {\
> >  			obj_table[i] = ring[idx]; \
> >  			obj_table[i+1] = ring[idx+1]; \
> >  			obj_table[i+2] = ring[idx+2]; \
> > @@ -683,13 +683,13 @@ rte_ring_reset(struct rte_ring *r);
> >   * @return
> >   *   The number of entries in the ring.
> >   */
> > -static inline unsigned
> > +static inline unsigned int
> >  rte_ring_count(const struct rte_ring *r)
> >  {
> >  	uint32_t prod_tail = r->prod.tail;
> >  	uint32_t cons_tail = r->cons.tail;
> >  	uint32_t count = (prod_tail - cons_tail) & r->mask;
> > -	return (count > r->capacity) ? r->capacity : count;
> > +	return likely(count <= r->capacity) ? count : r->capacity;
> 
> Honestly, I don't see there is any point of that change:
> I think it wouldn't change anything in terms of functionality
> or performance.

Chapter 3.4.1 "Branch Prediction Optimization" in the Intel 64 and IA-32 Architectures Optimization Reference Manual recommends this kind of optimization as Assembly/Compiler Coding Rule 3, which is why I rearranged the trigraph. Essentially, there is a limit to the number of BTB (Branch Target Buffer) entries, so they should be conserved if possible.

In addition to that, I have added the likely() because I consider it nearly impossible that the count will exceed the capacity.

However, it's not the first time I see this kind of response to a suggested branch optimization on the DPDK mailing list. Everyone seem to think that branch prediction is infinite and always works. It may seem as if infinite on trivial applications, but BTB entries may be a scarce resource on complex applications. I assume Intel's recommendations are not just for the fun of it.

Konstantin, please note that I'm letting out my frustration about the general misconception about branch prediction here. You are doing a great job, so I feel bad about responding like this to you.

> 
> >  }
> >
> >  /**
> > @@ -700,7 +700,7 @@ rte_ring_count(const struct rte_ring *r)
> >   * @return
> >   *   The number of free entries in the ring.
> >   */
> > -static inline unsigned
> > +static inline unsigned int
> >  rte_ring_free_count(const struct rte_ring *r)
> >  {
> >  	return r->capacity - rte_ring_count(r);
> > @@ -733,7 +733,9 @@ rte_ring_full(const struct rte_ring *r)
> >  static inline int
> >  rte_ring_empty(const struct rte_ring *r)
> >  {
> > -	return rte_ring_count(r) == 0;
> > +	uint32_t prod_tail = r->prod.tail;
> > +	uint32_t cons_tail = r->cons.tail;
> > +	return cons_tail == prod_tail;
> >  }
> >
> >  /**
> > @@ -860,7 +862,7 @@ struct rte_ring *rte_ring_lookup(const char
> *name);
> >   * @return
> >   *   - n: Actual number of objects enqueued.
> >   */
> > -static __rte_always_inline unsigned
> > +static __rte_always_inline unsigned int
> >  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const
> *obj_table,
> >  			 unsigned int n, unsigned int *free_space)
> >  {
> > @@ -883,7 +885,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r,
> void * const *obj_table,
> >   * @return
> >   *   - n: Actual number of objects enqueued.
> >   */
> > -static __rte_always_inline unsigned
> > +static __rte_always_inline unsigned int
> >  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const
> *obj_table,
> >  			 unsigned int n, unsigned int *free_space)
> >  {
> > @@ -910,7 +912,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r,
> void * const *obj_table,
> >   * @return
> >   *   - n: Actual number of objects enqueued.
> >   */
> > -static __rte_always_inline unsigned
> > +static __rte_always_inline unsigned int
> >  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> >  		      unsigned int n, unsigned int *free_space)
> >  {
> > @@ -954,7 +956,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
> >   * @return
> >   *   - n: Actual number of objects dequeued, 0 if ring is empty
> >   */
> > -static __rte_always_inline unsigned
> > +static __rte_always_inline unsigned int
> >  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
> >  		unsigned int n, unsigned int *available)
> >  {
> > @@ -979,7 +981,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r,
> void **obj_table,
> >   * @return
> >   *   - n: Actual number of objects dequeued, 0 if ring is empty
> >   */
> > -static __rte_always_inline unsigned
> > +static __rte_always_inline unsigned int
> >  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
> >  		unsigned int n, unsigned int *available)
> >  {
> > @@ -1006,7 +1008,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r,
> void **obj_table,
> >   * @return
> >   *   - Number of objects dequeued
> >   */
> > -static __rte_always_inline unsigned
> > +static __rte_always_inline unsigned int
> >  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
> >  		unsigned int n, unsigned int *available)
> >  {
> > --
> > 2.17.1
  
Ananyev, Konstantin May 14, 2020, 4:46 p.m. UTC | #3
>
> > > -static inline unsigned
> > > +static inline unsigned int
> > >  rte_ring_count(const struct rte_ring *r)
> > >  {
> > >  	uint32_t prod_tail = r->prod.tail;
> > >  	uint32_t cons_tail = r->cons.tail;
> > >  	uint32_t count = (prod_tail - cons_tail) & r->mask;
> > > -	return (count > r->capacity) ? r->capacity : count;
> > > +	return likely(count <= r->capacity) ? count : r->capacity;
> >
> > Honestly, I don't see there is any point of that change:
> > I think it wouldn't change anything in terms of functionality
> > or performance.
> 
> Chapter 3.4.1 "Branch Prediction Optimization" in the Intel 64 and IA-32 Architectures Optimization Reference Manual recommends this
> kind of optimization as Assembly/Compiler Coding Rule 3, which is why I rearranged the trigraph. Essentially, there is a limit to the number
> of BTB (Branch Target Buffer) entries, so they should be conserved if possible.
> 
> In addition to that, I have added the likely() because I consider it nearly impossible that the count will exceed the capacity.
> 
> However, it's not the first time I see this kind of response to a suggested branch optimization on the DPDK mailing list. Everyone seem to
> think that branch prediction is infinite and always works. It may seem as if infinite on trivial applications, but BTB entries may be a scarce
> resource on complex applications. I assume Intel's recommendations are not just for the fun of it.

I think it is better to leave such level of micro-optimizations to the compiler.
BTW, in that particular case, compiler most likely will generate a code
without any branches at all (at least for IA).
Let say on my box with gcc 7.3:

$ cat trc1.c
#include <stdint.h>
#include <rte_config.h>
#include <rte_ring.h>

uint32_t
fffx1(const struct rte_ring *r)
{
        uint32_t prod_tail = r->prod.tail;
        uint32_t cons_tail = r->cons.tail;
        uint32_t count = (prod_tail - cons_tail) & r->mask;
        return (count > r->capacity) ? r->capacity : count;
}

uint32_t
fffx2(const struct rte_ring *r)
{
        uint32_t prod_tail = r->prod.tail;
        uint32_t cons_tail = r->cons.tail;
        uint32_t count = (prod_tail - cons_tail) & r->mask;
        return likely(count <= r->capacity) ? count : r->capacity;
}
   
$ gcc -m64 -O3 -march=native -I${RTE_SDK}/x86_64-native-linuxapp-gcc/include -c trc1.c

$ objdump -d trc1.o

0000000000000000 <fffx1>:
   0:   8b 87 84 00 00 00       mov    0x84(%rdi),%eax
   6:   8b 97 04 01 00 00       mov    0x104(%rdi),%edx
   c:   29 d0                   sub    %edx,%eax
   e:   8b 57 38                mov    0x38(%rdi),%edx
  11:   23 47 34                and    0x34(%rdi),%eax
  14:   39 d0                   cmp    %edx,%eax
  16:   0f 47 c2                cmova  %edx,%eax
  19:   c3                      retq
  1a:   66 0f 1f 44 00 00       nopw   0x0(%rax,%rax,1)

0000000000000020 <fffx2>:
  20:   8b 87 84 00 00 00       mov    0x84(%rdi),%eax
  26:   8b 97 04 01 00 00       mov    0x104(%rdi),%edx
  2c:   29 d0                   sub    %edx,%eax
  2e:   8b 57 38                mov    0x38(%rdi),%edx
  31:   23 47 34                and    0x34(%rdi),%eax
  34:   39 d0                   cmp    %edx,%eax
  36:   0f 47 c2                cmova  %edx,%eax
  39:   c3                      retq

As you can see, there is no difference.

> 
> Konstantin, please note that I'm letting out my frustration about the general misconception about branch prediction here. You are doing a
> great job, so I feel bad about responding like this to you.

No worries, in fact I am glad to know that DPDK contributors
read IA optimization manual that thoughtfully 😊

Konstantin
  
Morten Brørup May 14, 2020, 6 p.m. UTC | #4
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Ananyev,
> Konstantin
> Sent: Thursday, May 14, 2020 6:47 PM
> >
> > > > -static inline unsigned
> > > > +static inline unsigned int
> > > >  rte_ring_count(const struct rte_ring *r)
> > > >  {
> > > >  	uint32_t prod_tail = r->prod.tail;
> > > >  	uint32_t cons_tail = r->cons.tail;
> > > >  	uint32_t count = (prod_tail - cons_tail) & r->mask;
> > > > -	return (count > r->capacity) ? r->capacity : count;
> > > > +	return likely(count <= r->capacity) ? count : r->capacity;
> > >
> > > Honestly, I don't see there is any point of that change:
> > > I think it wouldn't change anything in terms of functionality
> > > or performance.
> >
> > Chapter 3.4.1 "Branch Prediction Optimization" in the Intel 64 and
> IA-32 Architectures Optimization Reference Manual recommends this
> > kind of optimization as Assembly/Compiler Coding Rule 3, which is why
> I rearranged the trigraph. Essentially, there is a limit to the number
> > of BTB (Branch Target Buffer) entries, so they should be conserved if
> possible.
> >
> > In addition to that, I have added the likely() because I consider it
> nearly impossible that the count will exceed the capacity.
> >
> > However, it's not the first time I see this kind of response to a
> suggested branch optimization on the DPDK mailing list. Everyone seem
> to
> > think that branch prediction is infinite and always works. It may
> seem as if infinite on trivial applications, but BTB entries may be a
> scarce
> > resource on complex applications. I assume Intel's recommendations
> are not just for the fun of it.
> 
> I think it is better to leave such level of micro-optimizations to the
> compiler.
> BTW, in that particular case, compiler most likely will generate a code
> without any branches at all (at least for IA).
> Let say on my box with gcc 7.3:
> 
> $ cat trc1.c
> #include <stdint.h>
> #include <rte_config.h>
> #include <rte_ring.h>
> 
> uint32_t
> fffx1(const struct rte_ring *r)
> {
>         uint32_t prod_tail = r->prod.tail;
>         uint32_t cons_tail = r->cons.tail;
>         uint32_t count = (prod_tail - cons_tail) & r->mask;
>         return (count > r->capacity) ? r->capacity : count;
> }
> 
> uint32_t
> fffx2(const struct rte_ring *r)
> {
>         uint32_t prod_tail = r->prod.tail;
>         uint32_t cons_tail = r->cons.tail;
>         uint32_t count = (prod_tail - cons_tail) & r->mask;
>         return likely(count <= r->capacity) ? count : r->capacity;
> }
> 
> $ gcc -m64 -O3 -march=native -I${RTE_SDK}/x86_64-native-linuxapp-
> gcc/include -c trc1.c
> 
> $ objdump -d trc1.o
> 
> 0000000000000000 <fffx1>:
>    0:   8b 87 84 00 00 00       mov    0x84(%rdi),%eax
>    6:   8b 97 04 01 00 00       mov    0x104(%rdi),%edx
>    c:   29 d0                   sub    %edx,%eax
>    e:   8b 57 38                mov    0x38(%rdi),%edx
>   11:   23 47 34                and    0x34(%rdi),%eax
>   14:   39 d0                   cmp    %edx,%eax
>   16:   0f 47 c2                cmova  %edx,%eax
>   19:   c3                      retq
>   1a:   66 0f 1f 44 00 00       nopw   0x0(%rax,%rax,1)
> 
> 0000000000000020 <fffx2>:
>   20:   8b 87 84 00 00 00       mov    0x84(%rdi),%eax
>   26:   8b 97 04 01 00 00       mov    0x104(%rdi),%edx
>   2c:   29 d0                   sub    %edx,%eax
>   2e:   8b 57 38                mov    0x38(%rdi),%edx
>   31:   23 47 34                and    0x34(%rdi),%eax
>   34:   39 d0                   cmp    %edx,%eax
>   36:   0f 47 c2                cmova  %edx,%eax
>   39:   c3                      retq
> 
> As you can see, there is no difference.
> 

Thank you for the detailed feedback.

Reality trumps theory, so I will leave the count function as is. :-)


> >
> > Konstantin, please note that I'm letting out my frustration about the
> general misconception about branch prediction here. You are doing a
> > great job, so I feel bad about responding like this to you.
> 
> No worries, in fact I am glad to know that DPDK contributors
> read IA optimization manual that thoughtfully 😊
> 
> Konstantin
  

Patch

diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 86faede81..36438d9cd 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -55,7 +55,7 @@  extern "C" {
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count);
 
 /**
  * Initialize a ring structure.
@@ -109,8 +109,8 @@  ssize_t rte_ring_get_memsize(unsigned count);
  * @return
  *   0 on success, or a negative value on error.
  */
-int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
-	unsigned flags);
+int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
+	unsigned int flags);
 
 /**
  * Create a new ring named *name* in memory.
@@ -169,8 +169,8 @@  int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
  */
-struct rte_ring *rte_ring_create(const char *name, unsigned count,
-				 int socket_id, unsigned flags);
+struct rte_ring *rte_ring_create(const char *name, unsigned int count,
+				 int socket_id, unsigned int flags);
 
 /**
  * De-allocate all memory used by the ring.
@@ -199,7 +199,7 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	uint32_t idx = prod_head & (r)->mask; \
 	obj_type *ring = (obj_type *)ring_start; \
 	if (likely(idx + n < size)) { \
-		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { \
 			ring[idx] = obj_table[i]; \
 			ring[idx+1] = obj_table[i+1]; \
 			ring[idx+2] = obj_table[i+2]; \
@@ -230,7 +230,7 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	const uint32_t size = (r)->size; \
 	obj_type *ring = (obj_type *)ring_start; \
 	if (likely(idx + n < size)) { \
-		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {\
 			obj_table[i] = ring[idx]; \
 			obj_table[i+1] = ring[idx+1]; \
 			obj_table[i+2] = ring[idx+2]; \
@@ -683,13 +683,13 @@  rte_ring_reset(struct rte_ring *r);
  * @return
  *   The number of entries in the ring.
  */
-static inline unsigned
+static inline unsigned int
 rte_ring_count(const struct rte_ring *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
 	uint32_t count = (prod_tail - cons_tail) & r->mask;
-	return (count > r->capacity) ? r->capacity : count;
+	return likely(count <= r->capacity) ? count : r->capacity;
 }
 
 /**
@@ -700,7 +700,7 @@  rte_ring_count(const struct rte_ring *r)
  * @return
  *   The number of free entries in the ring.
  */
-static inline unsigned
+static inline unsigned int
 rte_ring_free_count(const struct rte_ring *r)
 {
 	return r->capacity - rte_ring_count(r);
@@ -733,7 +733,9 @@  rte_ring_full(const struct rte_ring *r)
 static inline int
 rte_ring_empty(const struct rte_ring *r)
 {
-	return rte_ring_count(r) == 0;
+	uint32_t prod_tail = r->prod.tail;
+	uint32_t cons_tail = r->cons.tail;
+	return cons_tail == prod_tail;
 }
 
 /**
@@ -860,7 +862,7 @@  struct rte_ring *rte_ring_lookup(const char *name);
  * @return
  *   - n: Actual number of objects enqueued.
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
@@ -883,7 +885,7 @@  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @return
  *   - n: Actual number of objects enqueued.
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
@@ -910,7 +912,7 @@  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @return
  *   - n: Actual number of objects enqueued.
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
@@ -954,7 +956,7 @@  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  * @return
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
@@ -979,7 +981,7 @@  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
  * @return
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
@@ -1006,7 +1008,7 @@  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
  * @return
  *   - Number of objects dequeued
  */
-static __rte_always_inline unsigned
+static __rte_always_inline unsigned int
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {