[v2,1/2] lib/distributor: fix deadlock issue for aarch64
diff mbox series

Message ID 20191012024352.23545-2-ruifeng.wang@arm.com
State Superseded
Headers show
Series
  • fix distributor unit test
Related show

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-compilation success Compile Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-mellanox-Performance success Performance Testing PASS

Commit Message

Ruifeng Wang (Arm Technology China) Oct. 12, 2019, 2:43 a.m. UTC
Distributor and worker threads rely on data structs in cache line
for synchronization. The shared data structs were not protected.
This caused deadlock issue on weaker memory ordering platforms as
aarch64.
Fix this issue by adding memory barriers to ensure synchronization
among cores.

Bugzilla ID: 342
Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
Cc: stable@dpdk.org

Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
---
 lib/librte_distributor/meson.build           |  5 ++
 lib/librte_distributor/rte_distributor.c     | 39 ++++++++++------
 lib/librte_distributor/rte_distributor_v20.c | 49 +++++++++++++-------
 3 files changed, 63 insertions(+), 30 deletions(-)

Comments

Honnappa Nagarahalli Oct. 13, 2019, 2:31 a.m. UTC | #1
Hi Ruifeng,
	Typically, we have followed the convention of adding comments whenever C11 atomic APIs are used. Can you please add comments indicating why acquire or release semantics are used?

> -----Original Message-----
> From: Ruifeng Wang <ruifeng.wang@arm.com>
> Sent: Friday, October 11, 2019 9:44 PM
> To: david.hunt@intel.com
> Cc: dev@dpdk.org; hkalra@marvell.com; Gavin Hu (Arm Technology China)
> <Gavin.Hu@arm.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>; Ruifeng Wang (Arm
> Technology China) <Ruifeng.Wang@arm.com>; stable@dpdk.org
> Subject: [PATCH v2 1/2] lib/distributor: fix deadlock issue for aarch64
> 
> Distributor and worker threads rely on data structs in cache line for
> synchronization. The shared data structs were not protected.
> This caused deadlock issue on weaker memory ordering platforms as aarch64.
> Fix this issue by adding memory barriers to ensure synchronization among
> cores.
> 
> Bugzilla ID: 342
> Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
> Cc: stable@dpdk.org
> 
> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> ---
>  lib/librte_distributor/meson.build           |  5 ++
>  lib/librte_distributor/rte_distributor.c     | 39 ++++++++++------
>  lib/librte_distributor/rte_distributor_v20.c | 49 +++++++++++++-------
>  3 files changed, 63 insertions(+), 30 deletions(-)
> 
> diff --git a/lib/librte_distributor/meson.build
> b/lib/librte_distributor/meson.build
> index dba7e3b2a..26577dbc1 100644
> --- a/lib/librte_distributor/meson.build
> +++ b/lib/librte_distributor/meson.build
> @@ -9,3 +9,8 @@ else
>  endif
>  headers = files('rte_distributor.h')
>  deps += ['mbuf']
> +
> +# for clang 32-bit compiles we need libatomic for 64-bit atomic ops if
> +cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
> +	ext_deps += cc.find_library('atomic')
> +endif
> diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> index 21eb1fb0a..b653146d0 100644
> --- a/lib/librte_distributor/rte_distributor.c
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -50,7 +50,8 @@ rte_distributor_request_pkt_v1705(struct
> rte_distributor *d,
> 
>  	retptr64 = &(buf->retptr64[0]);
>  	/* Spin while handshake bits are set (scheduler clears it) */
> -	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
> +	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
> +			& RTE_DISTRIB_GET_BUF)) {
>  		rte_pause();
>  		uint64_t t = rte_rdtsc()+100;
> 
> @@ -76,7 +77,8 @@ rte_distributor_request_pkt_v1705(struct
> rte_distributor *d,
>  	 * Finally, set the GET_BUF  to signal to distributor that cache
>  	 * line is ready for processing
>  	 */
> -	*retptr64 |= RTE_DISTRIB_GET_BUF;
> +	__atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
> +			__ATOMIC_RELEASE);
>  }
>  BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
> MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor
> *d, @@ -99,7 +101,8 @@ rte_distributor_poll_pkt_v1705(struct
> rte_distributor *d,
>  	}
> 
>  	/* If bit is set, return */
> -	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
> +	if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
> +		& RTE_DISTRIB_GET_BUF)
>  		return -1;
> 
>  	/* since bufptr64 is signed, this should be an arithmetic shift */ @@ -
> 115,7 +118,8 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
>  	 * mbuf pointers, so toggle the bit so scheduler can start working
>  	 * on the next cacheline while we're working.
>  	 */
> -	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
> +	__atomic_store_n(&(buf->bufptr64[0]),
> +		buf->bufptr64[0] | RTE_DISTRIB_GET_BUF,
> __ATOMIC_RELEASE);
> 
>  	return count;
>  }
> @@ -174,6 +178,7 @@ rte_distributor_return_pkt_v1705(struct
> rte_distributor *d,
>  			return -EINVAL;
>  	}
> 
> +	__atomic_thread_fence(__ATOMIC_ACQUIRE);
>  	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
>  		/* Switch off the return bit first */
>  		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; @@ -183,7
> +188,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d,
>  			RTE_DISTRIB_FLAG_BITS) |
> RTE_DISTRIB_RETURN_BUF;
> 
>  	/* set the GET_BUF but even if we got no returns */
> -	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> +	__atomic_store_n(&(buf->retptr64[0]),
> +		buf->retptr64[0] | RTE_DISTRIB_GET_BUF,
> __ATOMIC_RELEASE);
> 
>  	return 0;
>  }
> @@ -273,7 +279,8 @@ handle_returns(struct rte_distributor *d, unsigned
> int wkr)
>  	unsigned int count = 0;
>  	unsigned int i;
> 
> -	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
> +	if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
> +		& RTE_DISTRIB_GET_BUF) {
>  		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
>  			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
>  				oldbuf = ((uintptr_t)(buf->retptr64[i] >> @@
> -287,7 +294,7 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)
>  		d->returns.start = ret_start;
>  		d->returns.count = ret_count;
>  		/* Clear for the worker to populate with more returns */
> -		buf->retptr64[0] = 0;
> +		__atomic_store_n(&(buf->retptr64[0]), 0,
> __ATOMIC_RELEASE);
>  	}
>  	return count;
>  }
> @@ -307,7 +314,8 @@ release(struct rte_distributor *d, unsigned int wkr)
>  	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
>  	unsigned int i;
> 
> -	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> +	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
> __ATOMIC_ACQUIRE)
> +		& RTE_DISTRIB_GET_BUF))
>  		rte_pause();
> 
>  	handle_returns(d, wkr);
> @@ -328,7 +336,8 @@ release(struct rte_distributor *d, unsigned int wkr)
>  	d->backlog[wkr].count = 0;
> 
>  	/* Clear the GET bit */
> -	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
> +	__atomic_store_n(&(buf->bufptr64[0]),
> +		buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF,
> __ATOMIC_RELEASE);
>  	return  buf->count;
> 
>  }
> @@ -355,7 +364,8 @@ rte_distributor_process_v1705(struct rte_distributor
> *d,
>  	if (unlikely(num_mbufs == 0)) {
>  		/* Flush out all non-full cache-lines to workers. */
>  		for (wid = 0 ; wid < d->num_workers; wid++) {
> -			if (d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> {
> +			if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
> +				__ATOMIC_ACQUIRE) &
> RTE_DISTRIB_GET_BUF) {
>  				release(d, wid);
>  				handle_returns(d, wid);
>  			}
> @@ -367,7 +377,8 @@ rte_distributor_process_v1705(struct rte_distributor
> *d,
>  		uint16_t matches[RTE_DIST_BURST_SIZE];
>  		unsigned int pkts;
> 
> -		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> +		if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
> +			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
>  			d->bufs[wkr].count = 0;
> 
>  		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) @@ -
> 465,7 +476,8 @@ rte_distributor_process_v1705(struct rte_distributor *d,
> 
>  	/* Flush out all non-full cache-lines to workers. */
>  	for (wid = 0 ; wid < d->num_workers; wid++)
> -		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> +		if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
> +			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
>  			release(d, wid);
> 
>  	return num_mbufs;
> @@ -574,7 +586,8 @@ rte_distributor_clear_returns_v1705(struct
> rte_distributor *d)
> 
>  	/* throw away returns, so workers can exit */
>  	for (wkr = 0; wkr < d->num_workers; wkr++)
> -		d->bufs[wkr].retptr64[0] = 0;
> +		__atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
> +				__ATOMIC_RELEASE);
>  }
>  BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
> MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct
> rte_distributor *d), diff --git a/lib/librte_distributor/rte_distributor_v20.c
> b/lib/librte_distributor/rte_distributor_v20.c
> index cdc0969a8..41411e3c1 100644
> --- a/lib/librte_distributor/rte_distributor_v20.c
> +++ b/lib/librte_distributor/rte_distributor_v20.c
> @@ -34,9 +34,10 @@ rte_distributor_request_pkt_v20(struct
> rte_distributor_v20 *d,
>  	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
>  	int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
>  			| RTE_DISTRIB_GET_BUF;
> -	while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
> +	while (unlikely(__atomic_load_n(&(buf->bufptr64),
> __ATOMIC_ACQUIRE)
> +		& RTE_DISTRIB_FLAGS_MASK))
>  		rte_pause();
> -	buf->bufptr64 = req;
> +	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
>  }
>  VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
> 
> @@ -45,7 +46,8 @@ rte_distributor_poll_pkt_v20(struct rte_distributor_v20
> *d,
>  		unsigned worker_id)
>  {
>  	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> -	if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
> +	if (__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
> +		& RTE_DISTRIB_GET_BUF)
>  		return NULL;
> 
>  	/* since bufptr64 is signed, this should be an arithmetic shift */ @@ -
> 73,7 +75,7 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d,
>  	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
>  	uint64_t req = (((int64_t)(uintptr_t)oldpkt) <<
> RTE_DISTRIB_FLAG_BITS)
>  			| RTE_DISTRIB_RETURN_BUF;
> -	buf->bufptr64 = req;
> +	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
>  	return 0;
>  }
>  VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0); @@ -117,7 +119,7
> @@ handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int
> wkr)  {
>  	d->in_flight_tags[wkr] = 0;
>  	d->in_flight_bitmask &= ~(1UL << wkr);
> -	d->bufs[wkr].bufptr64 = 0;
> +	__atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
>  	if (unlikely(d->backlog[wkr].count != 0)) {
>  		/* On return of a packet, we need to move the
>  		 * queued packets for this core elsewhere.
> @@ -165,18 +167,23 @@ process_returns(struct rte_distributor_v20 *d)
>  		const int64_t data = d->bufs[wkr].bufptr64;
>  		uintptr_t oldbuf = 0;
> 
> -		if (data & RTE_DISTRIB_GET_BUF) {
> +		if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> +			& RTE_DISTRIB_GET_BUF) {
>  			flushed++;
>  			if (d->backlog[wkr].count)
> -				d->bufs[wkr].bufptr64 =
> -						backlog_pop(&d-
> >backlog[wkr]);
> +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> +					backlog_pop(&d->backlog[wkr]),
> +					__ATOMIC_RELEASE);
>  			else {
> -				d->bufs[wkr].bufptr64 =
> RTE_DISTRIB_GET_BUF;
> +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> +					RTE_DISTRIB_GET_BUF,
> +					__ATOMIC_RELEASE);
>  				d->in_flight_tags[wkr] = 0;
>  				d->in_flight_bitmask &= ~(1UL << wkr);
>  			}
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> -		} else if (data & RTE_DISTRIB_RETURN_BUF) {
> +		} else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> +			& RTE_DISTRIB_RETURN_BUF) {
>  			handle_worker_shutdown(d, wkr);
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
>  		}
> @@ -251,21 +258,26 @@ rte_distributor_process_v20(struct
> rte_distributor_v20 *d,
>  			}
>  		}
> 
> -		if ((data & RTE_DISTRIB_GET_BUF) &&
> +		if ((__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> +			& RTE_DISTRIB_GET_BUF) &&
>  				(d->backlog[wkr].count || next_mb)) {
> 
>  			if (d->backlog[wkr].count)
> -				d->bufs[wkr].bufptr64 =
> -						backlog_pop(&d-
> >backlog[wkr]);
> +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> +						backlog_pop(&d-
> >backlog[wkr]),
> +						__ATOMIC_RELEASE);
> 
>  			else {
> -				d->bufs[wkr].bufptr64 = next_value;
> +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> +						next_value,
> +						__ATOMIC_RELEASE);
>  				d->in_flight_tags[wkr] = new_tag;
>  				d->in_flight_bitmask |= (1UL << wkr);
>  				next_mb = NULL;
>  			}
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> -		} else if (data & RTE_DISTRIB_RETURN_BUF) {
> +		} else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> +			& RTE_DISTRIB_RETURN_BUF) {
>  			handle_worker_shutdown(d, wkr);
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
>  		}
> @@ -280,13 +292,16 @@ rte_distributor_process_v20(struct
> rte_distributor_v20 *d,
>  	 * if they are ready */
>  	for (wkr = 0; wkr < d->num_workers; wkr++)
>  		if (d->backlog[wkr].count &&
> -				(d->bufs[wkr].bufptr64 &
> RTE_DISTRIB_GET_BUF)) {
> +				(__atomic_load_n(&(d->bufs[wkr].bufptr64),
> +				__ATOMIC_ACQUIRE) &
> RTE_DISTRIB_GET_BUF)) {
> 
>  			int64_t oldbuf = d->bufs[wkr].bufptr64 >>
>  					RTE_DISTRIB_FLAG_BITS;
>  			store_return(oldbuf, d, &ret_start, &ret_count);
> 
> -			d->bufs[wkr].bufptr64 = backlog_pop(&d-
> >backlog[wkr]);
> +			__atomic_store_n(&(d->bufs[wkr].bufptr64),
> +				backlog_pop(&d->backlog[wkr]),
> +				__ATOMIC_RELEASE);
>  		}
> 
>  	d->returns.start = ret_start;
> --
> 2.17.1
Ruifeng Wang (Arm Technology China) Oct. 14, 2019, 10 a.m. UTC | #2
> -----Original Message-----
> From: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Sent: Sunday, October 13, 2019 10:32
> To: Ruifeng Wang (Arm Technology China) <Ruifeng.Wang@arm.com>;
> david.hunt@intel.com
> Cc: dev@dpdk.org; hkalra@marvell.com; Gavin Hu (Arm Technology China)
> <Gavin.Hu@arm.com>; nd <nd@arm.com>; Ruifeng Wang (Arm Technology
> China) <Ruifeng.Wang@arm.com>; stable@dpdk.org; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>
> Subject: RE: [PATCH v2 1/2] lib/distributor: fix deadlock issue for aarch64
> 
> Hi Ruifeng,
> 	Typically, we have followed the convention of adding comments
> whenever C11 atomic APIs are used. Can you please add comments
> indicating why acquire or release semantics are used?
> 
OK. Comments will be added to explain acquire/release semantics used.

> > -----Original Message-----
> > From: Ruifeng Wang <ruifeng.wang@arm.com>
> > Sent: Friday, October 11, 2019 9:44 PM
> > To: david.hunt@intel.com
> > Cc: dev@dpdk.org; hkalra@marvell.com; Gavin Hu (Arm Technology China)
> > <Gavin.Hu@arm.com>; Honnappa Nagarahalli
> > <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>; Ruifeng Wang
> (Arm
> > Technology China) <Ruifeng.Wang@arm.com>; stable@dpdk.org
> > Subject: [PATCH v2 1/2] lib/distributor: fix deadlock issue for
> > aarch64
> >
> > Distributor and worker threads rely on data structs in cache line for
> > synchronization. The shared data structs were not protected.
> > This caused deadlock issue on weaker memory ordering platforms as
> aarch64.
> > Fix this issue by adding memory barriers to ensure synchronization
> > among cores.
> >
> > Bugzilla ID: 342
> > Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
> > Cc: stable@dpdk.org
> >
> > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > ---
> >  lib/librte_distributor/meson.build           |  5 ++
> >  lib/librte_distributor/rte_distributor.c     | 39 ++++++++++------
> >  lib/librte_distributor/rte_distributor_v20.c | 49
> > +++++++++++++-------
> >  3 files changed, 63 insertions(+), 30 deletions(-)
> >
> > diff --git a/lib/librte_distributor/meson.build
> > b/lib/librte_distributor/meson.build
> > index dba7e3b2a..26577dbc1 100644
> > --- a/lib/librte_distributor/meson.build
> > +++ b/lib/librte_distributor/meson.build
> > @@ -9,3 +9,8 @@ else
> >  endif
> >  headers = files('rte_distributor.h')
> >  deps += ['mbuf']
> > +
> > +# for clang 32-bit compiles we need libatomic for 64-bit atomic ops
> > +if
> > +cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
> > +	ext_deps += cc.find_library('atomic') endif
> > diff --git a/lib/librte_distributor/rte_distributor.c
> > b/lib/librte_distributor/rte_distributor.c
> > index 21eb1fb0a..b653146d0 100644
> > --- a/lib/librte_distributor/rte_distributor.c
> > +++ b/lib/librte_distributor/rte_distributor.c
> > @@ -50,7 +50,8 @@ rte_distributor_request_pkt_v1705(struct
> > rte_distributor *d,
> >
> >  	retptr64 = &(buf->retptr64[0]);
> >  	/* Spin while handshake bits are set (scheduler clears it) */
> > -	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
> > +	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
> > +			& RTE_DISTRIB_GET_BUF)) {
> >  		rte_pause();
> >  		uint64_t t = rte_rdtsc()+100;
> >
> > @@ -76,7 +77,8 @@ rte_distributor_request_pkt_v1705(struct
> > rte_distributor *d,
> >  	 * Finally, set the GET_BUF  to signal to distributor that cache
> >  	 * line is ready for processing
> >  	 */
> > -	*retptr64 |= RTE_DISTRIB_GET_BUF;
> > +	__atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
> > +			__ATOMIC_RELEASE);
> >  }
> >  BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
> > MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct
> > rte_distributor *d, @@ -99,7 +101,8 @@
> > rte_distributor_poll_pkt_v1705(struct
> > rte_distributor *d,
> >  	}
> >
> >  	/* If bit is set, return */
> > -	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
> > +	if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
> > +		& RTE_DISTRIB_GET_BUF)
> >  		return -1;
> >
> >  	/* since bufptr64 is signed, this should be an arithmetic shift */
> > @@ -
> > 115,7 +118,8 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
> >  	 * mbuf pointers, so toggle the bit so scheduler can start working
> >  	 * on the next cacheline while we're working.
> >  	 */
> > -	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
> > +	__atomic_store_n(&(buf->bufptr64[0]),
> > +		buf->bufptr64[0] | RTE_DISTRIB_GET_BUF,
> > __ATOMIC_RELEASE);
> >
> >  	return count;
> >  }
> > @@ -174,6 +178,7 @@ rte_distributor_return_pkt_v1705(struct
> > rte_distributor *d,
> >  			return -EINVAL;
> >  	}
> >
> > +	__atomic_thread_fence(__ATOMIC_ACQUIRE);
> >  	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
> >  		/* Switch off the return bit first */
> >  		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; @@ -183,7
> > +188,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d,
> >  			RTE_DISTRIB_FLAG_BITS) |
> > RTE_DISTRIB_RETURN_BUF;
> >
> >  	/* set the GET_BUF but even if we got no returns */
> > -	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> > +	__atomic_store_n(&(buf->retptr64[0]),
> > +		buf->retptr64[0] | RTE_DISTRIB_GET_BUF,
> > __ATOMIC_RELEASE);
> >
> >  	return 0;
> >  }
> > @@ -273,7 +279,8 @@ handle_returns(struct rte_distributor *d, unsigned
> > int wkr)
> >  	unsigned int count = 0;
> >  	unsigned int i;
> >
> > -	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
> > +	if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
> > +		& RTE_DISTRIB_GET_BUF) {
> >  		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
> >  			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
> >  				oldbuf = ((uintptr_t)(buf->retptr64[i] >> @@
> > -287,7 +294,7 @@ handle_returns(struct rte_distributor *d, unsigned int
> wkr)
> >  		d->returns.start = ret_start;
> >  		d->returns.count = ret_count;
> >  		/* Clear for the worker to populate with more returns */
> > -		buf->retptr64[0] = 0;
> > +		__atomic_store_n(&(buf->retptr64[0]), 0,
> > __ATOMIC_RELEASE);
> >  	}
> >  	return count;
> >  }
> > @@ -307,7 +314,8 @@ release(struct rte_distributor *d, unsigned int wkr)
> >  	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
> >  	unsigned int i;
> >
> > -	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> > +	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
> > __ATOMIC_ACQUIRE)
> > +		& RTE_DISTRIB_GET_BUF))
> >  		rte_pause();
> >
> >  	handle_returns(d, wkr);
> > @@ -328,7 +336,8 @@ release(struct rte_distributor *d, unsigned int wkr)
> >  	d->backlog[wkr].count = 0;
> >
> >  	/* Clear the GET bit */
> > -	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
> > +	__atomic_store_n(&(buf->bufptr64[0]),
> > +		buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF,
> > __ATOMIC_RELEASE);
> >  	return  buf->count;
> >
> >  }
> > @@ -355,7 +364,8 @@ rte_distributor_process_v1705(struct
> > rte_distributor *d,
> >  	if (unlikely(num_mbufs == 0)) {
> >  		/* Flush out all non-full cache-lines to workers. */
> >  		for (wid = 0 ; wid < d->num_workers; wid++) {
> > -			if (d->bufs[wid].bufptr64[0] &
> RTE_DISTRIB_GET_BUF)
> > {
> > +			if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
> > +				__ATOMIC_ACQUIRE) &
> > RTE_DISTRIB_GET_BUF) {
> >  				release(d, wid);
> >  				handle_returns(d, wid);
> >  			}
> > @@ -367,7 +377,8 @@ rte_distributor_process_v1705(struct
> > rte_distributor *d,
> >  		uint16_t matches[RTE_DIST_BURST_SIZE];
> >  		unsigned int pkts;
> >
> > -		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> > +		if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
> > +			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
> >  			d->bufs[wkr].count = 0;
> >
> >  		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) @@ -
> > 465,7 +476,8 @@ rte_distributor_process_v1705(struct rte_distributor
> > *d,
> >
> >  	/* Flush out all non-full cache-lines to workers. */
> >  	for (wid = 0 ; wid < d->num_workers; wid++)
> > -		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> > +		if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
> > +			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
> >  			release(d, wid);
> >
> >  	return num_mbufs;
> > @@ -574,7 +586,8 @@ rte_distributor_clear_returns_v1705(struct
> > rte_distributor *d)
> >
> >  	/* throw away returns, so workers can exit */
> >  	for (wkr = 0; wkr < d->num_workers; wkr++)
> > -		d->bufs[wkr].retptr64[0] = 0;
> > +		__atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
> > +				__ATOMIC_RELEASE);
> >  }
> >  BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
> > MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct
> > rte_distributor *d), diff --git
> > a/lib/librte_distributor/rte_distributor_v20.c
> > b/lib/librte_distributor/rte_distributor_v20.c
> > index cdc0969a8..41411e3c1 100644
> > --- a/lib/librte_distributor/rte_distributor_v20.c
> > +++ b/lib/librte_distributor/rte_distributor_v20.c
> > @@ -34,9 +34,10 @@ rte_distributor_request_pkt_v20(struct
> > rte_distributor_v20 *d,
> >  	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> >  	int64_t req = (((int64_t)(uintptr_t)oldpkt) <<
> RTE_DISTRIB_FLAG_BITS)
> >  			| RTE_DISTRIB_GET_BUF;
> > -	while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
> > +	while (unlikely(__atomic_load_n(&(buf->bufptr64),
> > __ATOMIC_ACQUIRE)
> > +		& RTE_DISTRIB_FLAGS_MASK))
> >  		rte_pause();
> > -	buf->bufptr64 = req;
> > +	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
> >  }
> >  VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
> >
> > @@ -45,7 +46,8 @@ rte_distributor_poll_pkt_v20(struct
> > rte_distributor_v20 *d,
> >  		unsigned worker_id)
> >  {
> >  	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> > -	if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
> > +	if (__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
> > +		& RTE_DISTRIB_GET_BUF)
> >  		return NULL;
> >
> >  	/* since bufptr64 is signed, this should be an arithmetic shift */
> > @@ -
> > 73,7 +75,7 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20
> *d,
> >  	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> >  	uint64_t req = (((int64_t)(uintptr_t)oldpkt) <<
> > RTE_DISTRIB_FLAG_BITS)
> >  			| RTE_DISTRIB_RETURN_BUF;
> > -	buf->bufptr64 = req;
> > +	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
> >  	return 0;
> >  }
> >  VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0); @@ -117,7
> > +119,7 @@ handle_worker_shutdown(struct rte_distributor_v20 *d,
> > unsigned int
> > wkr)  {
> >  	d->in_flight_tags[wkr] = 0;
> >  	d->in_flight_bitmask &= ~(1UL << wkr);
> > -	d->bufs[wkr].bufptr64 = 0;
> > +	__atomic_store_n(&(d->bufs[wkr].bufptr64), 0,
> __ATOMIC_RELEASE);
> >  	if (unlikely(d->backlog[wkr].count != 0)) {
> >  		/* On return of a packet, we need to move the
> >  		 * queued packets for this core elsewhere.
> > @@ -165,18 +167,23 @@ process_returns(struct rte_distributor_v20 *d)
> >  		const int64_t data = d->bufs[wkr].bufptr64;
> >  		uintptr_t oldbuf = 0;
> >
> > -		if (data & RTE_DISTRIB_GET_BUF) {
> > +		if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +			& RTE_DISTRIB_GET_BUF) {
> >  			flushed++;
> >  			if (d->backlog[wkr].count)
> > -				d->bufs[wkr].bufptr64 =
> > -						backlog_pop(&d-
> > >backlog[wkr]);
> > +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +					backlog_pop(&d->backlog[wkr]),
> > +					__ATOMIC_RELEASE);
> >  			else {
> > -				d->bufs[wkr].bufptr64 =
> > RTE_DISTRIB_GET_BUF;
> > +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +					RTE_DISTRIB_GET_BUF,
> > +					__ATOMIC_RELEASE);
> >  				d->in_flight_tags[wkr] = 0;
> >  				d->in_flight_bitmask &= ~(1UL << wkr);
> >  			}
> >  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > -		} else if (data & RTE_DISTRIB_RETURN_BUF) {
> > +		} else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +			& RTE_DISTRIB_RETURN_BUF) {
> >  			handle_worker_shutdown(d, wkr);
> >  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >  		}
> > @@ -251,21 +258,26 @@ rte_distributor_process_v20(struct
> > rte_distributor_v20 *d,
> >  			}
> >  		}
> >
> > -		if ((data & RTE_DISTRIB_GET_BUF) &&
> > +		if ((__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +			& RTE_DISTRIB_GET_BUF) &&
> >  				(d->backlog[wkr].count || next_mb)) {
> >
> >  			if (d->backlog[wkr].count)
> > -				d->bufs[wkr].bufptr64 =
> > -						backlog_pop(&d-
> > >backlog[wkr]);
> > +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +						backlog_pop(&d-
> > >backlog[wkr]),
> > +						__ATOMIC_RELEASE);
> >
> >  			else {
> > -				d->bufs[wkr].bufptr64 = next_value;
> > +				__atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +						next_value,
> > +						__ATOMIC_RELEASE);
> >  				d->in_flight_tags[wkr] = new_tag;
> >  				d->in_flight_bitmask |= (1UL << wkr);
> >  				next_mb = NULL;
> >  			}
> >  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > -		} else if (data & RTE_DISTRIB_RETURN_BUF) {
> > +		} else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +			& RTE_DISTRIB_RETURN_BUF) {
> >  			handle_worker_shutdown(d, wkr);
> >  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >  		}
> > @@ -280,13 +292,16 @@ rte_distributor_process_v20(struct
> > rte_distributor_v20 *d,
> >  	 * if they are ready */
> >  	for (wkr = 0; wkr < d->num_workers; wkr++)
> >  		if (d->backlog[wkr].count &&
> > -				(d->bufs[wkr].bufptr64 &
> > RTE_DISTRIB_GET_BUF)) {
> > +				(__atomic_load_n(&(d->bufs[wkr].bufptr64),
> > +				__ATOMIC_ACQUIRE) &
> > RTE_DISTRIB_GET_BUF)) {
> >
> >  			int64_t oldbuf = d->bufs[wkr].bufptr64 >>
> >  					RTE_DISTRIB_FLAG_BITS;
> >  			store_return(oldbuf, d, &ret_start, &ret_count);
> >
> > -			d->bufs[wkr].bufptr64 = backlog_pop(&d-
> > >backlog[wkr]);
> > +			__atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +				backlog_pop(&d->backlog[wkr]),
> > +				__ATOMIC_RELEASE);
> >  		}
> >
> >  	d->returns.start = ret_start;
> > --
> > 2.17.1

Patch
diff mbox series

diff --git a/lib/librte_distributor/meson.build b/lib/librte_distributor/meson.build
index dba7e3b2a..26577dbc1 100644
--- a/lib/librte_distributor/meson.build
+++ b/lib/librte_distributor/meson.build
@@ -9,3 +9,8 @@  else
 endif
 headers = files('rte_distributor.h')
 deps += ['mbuf']
+
+# for clang 32-bit compiles we need libatomic for 64-bit atomic ops
+if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
+	ext_deps += cc.find_library('atomic')
+endif
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 21eb1fb0a..b653146d0 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -50,7 +50,8 @@  rte_distributor_request_pkt_v1705(struct rte_distributor *d,
 
 	retptr64 = &(buf->retptr64[0]);
 	/* Spin while handshake bits are set (scheduler clears it) */
-	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF)) {
 		rte_pause();
 		uint64_t t = rte_rdtsc()+100;
 
@@ -76,7 +77,8 @@  rte_distributor_request_pkt_v1705(struct rte_distributor *d,
 	 * Finally, set the GET_BUF  to signal to distributor that cache
 	 * line is ready for processing
 	 */
-	*retptr64 |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
+			__ATOMIC_RELEASE);
 }
 BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
 MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor *d,
@@ -99,7 +101,8 @@  rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
 	}
 
 	/* If bit is set, return */
-	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+	if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF)
 		return -1;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -115,7 +118,8 @@  rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
 	 * mbuf pointers, so toggle the bit so scheduler can start working
 	 * on the next cacheline while we're working.
 	 */
-	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->bufptr64[0]),
+		buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 
 	return count;
 }
@@ -174,6 +178,7 @@  rte_distributor_return_pkt_v1705(struct rte_distributor *d,
 			return -EINVAL;
 	}
 
+	__atomic_thread_fence(__ATOMIC_ACQUIRE);
 	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
 		/* Switch off the return bit first */
 		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
@@ -183,7 +188,8 @@  rte_distributor_return_pkt_v1705(struct rte_distributor *d,
 			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
 
 	/* set the GET_BUF but even if we got no returns */
-	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->retptr64[0]),
+		buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 
 	return 0;
 }
@@ -273,7 +279,8 @@  handle_returns(struct rte_distributor *d, unsigned int wkr)
 	unsigned int count = 0;
 	unsigned int i;
 
-	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+	if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF) {
 		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
 			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
 				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
@@ -287,7 +294,7 @@  handle_returns(struct rte_distributor *d, unsigned int wkr)
 		d->returns.start = ret_start;
 		d->returns.count = ret_count;
 		/* Clear for the worker to populate with more returns */
-		buf->retptr64[0] = 0;
+		__atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
 	}
 	return count;
 }
@@ -307,7 +314,8 @@  release(struct rte_distributor *d, unsigned int wkr)
 	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
 	unsigned int i;
 
-	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF))
 		rte_pause();
 
 	handle_returns(d, wkr);
@@ -328,7 +336,8 @@  release(struct rte_distributor *d, unsigned int wkr)
 	d->backlog[wkr].count = 0;
 
 	/* Clear the GET bit */
-	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->bufptr64[0]),
+		buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 	return  buf->count;
 
 }
@@ -355,7 +364,8 @@  rte_distributor_process_v1705(struct rte_distributor *d,
 	if (unlikely(num_mbufs == 0)) {
 		/* Flush out all non-full cache-lines to workers. */
 		for (wid = 0 ; wid < d->num_workers; wid++) {
-			if (d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF) {
+			if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
+				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
 				release(d, wid);
 				handle_returns(d, wid);
 			}
@@ -367,7 +377,8 @@  rte_distributor_process_v1705(struct rte_distributor *d,
 		uint16_t matches[RTE_DIST_BURST_SIZE];
 		unsigned int pkts;
 
-		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
+		if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
+			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
 			d->bufs[wkr].count = 0;
 
 		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
@@ -465,7 +476,8 @@  rte_distributor_process_v1705(struct rte_distributor *d,
 
 	/* Flush out all non-full cache-lines to workers. */
 	for (wid = 0 ; wid < d->num_workers; wid++)
-		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+		if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
+			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
 			release(d, wid);
 
 	return num_mbufs;
@@ -574,7 +586,8 @@  rte_distributor_clear_returns_v1705(struct rte_distributor *d)
 
 	/* throw away returns, so workers can exit */
 	for (wkr = 0; wkr < d->num_workers; wkr++)
-		d->bufs[wkr].retptr64[0] = 0;
+		__atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
+				__ATOMIC_RELEASE);
 }
 BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
 MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor *d),
diff --git a/lib/librte_distributor/rte_distributor_v20.c b/lib/librte_distributor/rte_distributor_v20.c
index cdc0969a8..41411e3c1 100644
--- a/lib/librte_distributor/rte_distributor_v20.c
+++ b/lib/librte_distributor/rte_distributor_v20.c
@@ -34,9 +34,10 @@  rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d,
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
 	int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
 			| RTE_DISTRIB_GET_BUF;
-	while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+	while (unlikely(__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_FLAGS_MASK))
 		rte_pause();
-	buf->bufptr64 = req;
+	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
 }
 VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
 
@@ -45,7 +46,8 @@  rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d,
 		unsigned worker_id)
 {
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
-	if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+	if (__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF)
 		return NULL;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -73,7 +75,7 @@  rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d,
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
 	uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
 			| RTE_DISTRIB_RETURN_BUF;
-	buf->bufptr64 = req;
+	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
 	return 0;
 }
 VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0);
@@ -117,7 +119,7 @@  handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr)
 {
 	d->in_flight_tags[wkr] = 0;
 	d->in_flight_bitmask &= ~(1UL << wkr);
-	d->bufs[wkr].bufptr64 = 0;
+	__atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
 	if (unlikely(d->backlog[wkr].count != 0)) {
 		/* On return of a packet, we need to move the
 		 * queued packets for this core elsewhere.
@@ -165,18 +167,23 @@  process_returns(struct rte_distributor_v20 *d)
 		const int64_t data = d->bufs[wkr].bufptr64;
 		uintptr_t oldbuf = 0;
 
-		if (data & RTE_DISTRIB_GET_BUF) {
+		if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF) {
 			flushed++;
 			if (d->backlog[wkr].count)
-				d->bufs[wkr].bufptr64 =
-						backlog_pop(&d->backlog[wkr]);
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+					backlog_pop(&d->backlog[wkr]),
+					__ATOMIC_RELEASE);
 			else {
-				d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+					RTE_DISTRIB_GET_BUF,
+					__ATOMIC_RELEASE);
 				d->in_flight_tags[wkr] = 0;
 				d->in_flight_bitmask &= ~(1UL << wkr);
 			}
 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
-		} else if (data & RTE_DISTRIB_RETURN_BUF) {
+		} else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_RETURN_BUF) {
 			handle_worker_shutdown(d, wkr);
 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 		}
@@ -251,21 +258,26 @@  rte_distributor_process_v20(struct rte_distributor_v20 *d,
 			}
 		}
 
-		if ((data & RTE_DISTRIB_GET_BUF) &&
+		if ((__atomic_load_n(&data, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF) &&
 				(d->backlog[wkr].count || next_mb)) {
 
 			if (d->backlog[wkr].count)
-				d->bufs[wkr].bufptr64 =
-						backlog_pop(&d->backlog[wkr]);
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+						backlog_pop(&d->backlog[wkr]),
+						__ATOMIC_RELEASE);
 
 			else {
-				d->bufs[wkr].bufptr64 = next_value;
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+						next_value,
+						__ATOMIC_RELEASE);
 				d->in_flight_tags[wkr] = new_tag;
 				d->in_flight_bitmask |= (1UL << wkr);
 				next_mb = NULL;
 			}
 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
-		} else if (data & RTE_DISTRIB_RETURN_BUF) {
+		} else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_RETURN_BUF) {
 			handle_worker_shutdown(d, wkr);
 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 		}
@@ -280,13 +292,16 @@  rte_distributor_process_v20(struct rte_distributor_v20 *d,
 	 * if they are ready */
 	for (wkr = 0; wkr < d->num_workers; wkr++)
 		if (d->backlog[wkr].count &&
-				(d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+				(__atomic_load_n(&(d->bufs[wkr].bufptr64),
+				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
 
 			int64_t oldbuf = d->bufs[wkr].bufptr64 >>
 					RTE_DISTRIB_FLAG_BITS;
 			store_return(oldbuf, d, &ret_start, &ret_count);
 
-			d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
+			__atomic_store_n(&(d->bufs[wkr].bufptr64),
+				backlog_pop(&d->backlog[wkr]),
+				__ATOMIC_RELEASE);
 		}
 
 	d->returns.start = ret_start;