[v3,4/4] eventdev: relax smp barriers with C11 atomics
Checks
Commit Message
The impl_opaque field is shared between the timer arm and cancel
operations. Meanwhile, the state flag acts as a guard variable to
make sure the update of impl_opaque is synchronized. The original
code uses rte_smp barriers to achieve that. This patch uses C11
atomics with an explicit one-way memory barrier instead of full
barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64.
Since compilers can generate the same instructions for volatile and
non-volatile variable in C11 __atomics built-ins, so remain the volatile
keyword in front of state enum to avoid the ABI break issue.
Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
---
v3:
Fix ABI issue: revert to 'volatile enum rte_event_timer_state type state'.
v2:
1. Removed implementation-specific opaque data cleanup code.
2. Replaced thread fence with atomic ACQURE/RELEASE ordering on state access.
lib/librte_eventdev/rte_event_timer_adapter.c | 55 ++++++++++++++++++---------
1 file changed, 37 insertions(+), 18 deletions(-)
Comments
On Tue, Jul 7, 2020 at 4:45 PM Phil Yang <phil.yang@arm.com> wrote:
>
> The impl_opaque field is shared between the timer arm and cancel
> operations. Meanwhile, the state flag acts as a guard variable to
> make sure the update of impl_opaque is synchronized. The original
> code uses rte_smp barriers to achieve that. This patch uses C11
> atomics with an explicit one-way memory barrier instead of full
> barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64.
>
> Since compilers can generate the same instructions for volatile and
> non-volatile variable in C11 __atomics built-ins, so remain the volatile
> keyword in front of state enum to avoid the ABI break issue.
>
> Signed-off-by: Phil Yang <phil.yang@arm.com>
> Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
Could you fix the following:
WARNING:TYPO_SPELLING: 'opague' may be misspelled - perhaps 'opaque'?
#184: FILE: lib/librte_eventdev/rte_event_timer_adapter.c:1161:
+ * specific opague data under the correct state.
total: 0 errors, 1 warnings, 124 lines checked
> ---
> v3:
> Fix ABI issue: revert to 'volatile enum rte_event_timer_state type state'.
>
> v2:
> 1. Removed implementation-specific opaque data cleanup code.
> 2. Replaced thread fence with atomic ACQURE/RELEASE ordering on state access.
>
> lib/librte_eventdev/rte_event_timer_adapter.c | 55 ++++++++++++++++++---------
> 1 file changed, 37 insertions(+), 18 deletions(-)
>
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index d75415c..eb2c93a 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim)
> sw->expired_timers[sw->n_expired_timers++] = tim;
> sw->stats.evtim_exp_count++;
>
> - evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
> + __atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED,
> + __ATOMIC_RELEASE);
> }
>
> if (event_buffer_batch_ready(&sw->buffer)) {
> @@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> int n_lcores;
> /* Timer list for this lcore is not in use. */
> uint16_t exp_state = 0;
> + enum rte_event_timer_state n_state;
>
> #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
> /* Check that the service is running. */
> @@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> }
>
> for (i = 0; i < nb_evtims; i++) {
> - /* Don't modify the event timer state in these cases */
> - if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
> + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
> + if (n_state == RTE_EVENT_TIMER_ARMED) {
> rte_errno = EALREADY;
> break;
> - } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
> - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> + } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED ||
> + n_state == RTE_EVENT_TIMER_CANCELED)) {
> rte_errno = EINVAL;
> break;
> }
>
> ret = check_timeout(evtims[i], adapter);
> if (unlikely(ret == -1)) {
> - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
> + __atomic_store_n(&evtims[i]->state,
> + RTE_EVENT_TIMER_ERROR_TOOLATE,
> + __ATOMIC_RELAXED);
> rte_errno = EINVAL;
> break;
> } else if (unlikely(ret == -2)) {
> - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
> + __atomic_store_n(&evtims[i]->state,
> + RTE_EVENT_TIMER_ERROR_TOOEARLY,
> + __ATOMIC_RELAXED);
> rte_errno = EINVAL;
> break;
> }
>
> if (unlikely(check_destination_event_queue(evtims[i],
> adapter) < 0)) {
> - evtims[i]->state = RTE_EVENT_TIMER_ERROR;
> + __atomic_store_n(&evtims[i]->state,
> + RTE_EVENT_TIMER_ERROR,
> + __ATOMIC_RELAXED);
> rte_errno = EINVAL;
> break;
> }
> @@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> SINGLE, lcore_id, NULL, evtims[i]);
> if (ret < 0) {
> /* tim was in RUNNING or CONFIG state */
> - evtims[i]->state = RTE_EVENT_TIMER_ERROR;
> + __atomic_store_n(&evtims[i]->state,
> + RTE_EVENT_TIMER_ERROR,
> + __ATOMIC_RELEASE);
> break;
> }
>
> - rte_smp_wmb();
> EVTIM_LOG_DBG("armed an event timer");
> - evtims[i]->state = RTE_EVENT_TIMER_ARMED;
> + /* RELEASE ordering guarantees the adapter specific value
> + * changes observed before the update of state.
> + */
> + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED,
> + __ATOMIC_RELEASE);
> }
>
> if (i < nb_evtims)
> @@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
> struct rte_timer *timp;
> uint64_t opaque;
> struct swtim *sw = swtim_pmd_priv(adapter);
> + enum rte_event_timer_state n_state;
>
> #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
> /* Check that the service is running. */
> @@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
>
> for (i = 0; i < nb_evtims; i++) {
> /* Don't modify the event timer state in these cases */
> - if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
> + /* ACQUIRE ordering guarantees the access of implementation
> + * specific opague data under the correct state.
> + */
> + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
> + if (n_state == RTE_EVENT_TIMER_CANCELED) {
> rte_errno = EALREADY;
> break;
> - } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
> + } else if (n_state != RTE_EVENT_TIMER_ARMED) {
> rte_errno = EINVAL;
> break;
> }
>
> - rte_smp_rmb();
> -
> opaque = evtims[i]->impl_opaque[0];
> timp = (struct rte_timer *)(uintptr_t)opaque;
> RTE_ASSERT(timp != NULL);
> @@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
>
> rte_mempool_put(sw->tim_pool, (void **)timp);
>
> - evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
> -
> - rte_smp_wmb();
> + /* The RELEASE ordering here pairs with atomic ordering
> + * to make sure the state update data observed between
> + * threads.
> + */
> + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED,
> + __ATOMIC_RELEASE);
> }
>
> return i;
> --
> 2.7.4
>
> -----Original Message-----
> From: Jerin Jacob <jerinjacobk@gmail.com>
> Sent: Tuesday, July 7, 2020 10:30 PM
> To: Phil Yang <Phil.Yang@arm.com>
> Cc: thomas@monjalon.net; Erik Gabriel Carrillo <erik.g.carrillo@intel.com>;
> dpdk-dev <dev@dpdk.org>; jerinj@marvell.com; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; David Christensen
> <drc@linux.vnet.ibm.com>; Ruifeng Wang <Ruifeng.Wang@arm.com>;
> Dharmik Thakkar <Dharmik.Thakkar@arm.com>; nd <nd@arm.com>; David
> Marchand <david.marchand@redhat.com>; Ray Kinsella <mdr@ashroe.eu>;
> Neil Horman <nhorman@tuxdriver.com>; dodji@redhat.com
> Subject: Re: [dpdk-dev] [PATCH v3 4/4] eventdev: relax smp barriers with
> C11 atomics
>
> On Tue, Jul 7, 2020 at 4:45 PM Phil Yang <phil.yang@arm.com> wrote:
> >
> > The impl_opaque field is shared between the timer arm and cancel
> > operations. Meanwhile, the state flag acts as a guard variable to
> > make sure the update of impl_opaque is synchronized. The original
> > code uses rte_smp barriers to achieve that. This patch uses C11
> > atomics with an explicit one-way memory barrier instead of full
> > barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64.
> >
> > Since compilers can generate the same instructions for volatile and
> > non-volatile variable in C11 __atomics built-ins, so remain the volatile
> > keyword in front of state enum to avoid the ABI break issue.
> >
> > Signed-off-by: Phil Yang <phil.yang@arm.com>
> > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
>
>
> Could you fix the following:
>
> WARNING:TYPO_SPELLING: 'opague' may be misspelled - perhaps 'opaque'?
> #184: FILE: lib/librte_eventdev/rte_event_timer_adapter.c:1161:
> + * specific opague data under the correct state.
Done.
Thanks,
Phil
@@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim)
sw->expired_timers[sw->n_expired_timers++] = tim;
sw->stats.evtim_exp_count++;
- evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+ __atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED,
+ __ATOMIC_RELEASE);
}
if (event_buffer_batch_ready(&sw->buffer)) {
@@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
int n_lcores;
/* Timer list for this lcore is not in use. */
uint16_t exp_state = 0;
+ enum rte_event_timer_state n_state;
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
/* Check that the service is running. */
@@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
}
for (i = 0; i < nb_evtims; i++) {
- /* Don't modify the event timer state in these cases */
- if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
+ n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+ if (n_state == RTE_EVENT_TIMER_ARMED) {
rte_errno = EALREADY;
break;
- } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
- evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+ } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED ||
+ n_state == RTE_EVENT_TIMER_CANCELED)) {
rte_errno = EINVAL;
break;
}
ret = check_timeout(evtims[i], adapter);
if (unlikely(ret == -1)) {
- evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+ __atomic_store_n(&evtims[i]->state,
+ RTE_EVENT_TIMER_ERROR_TOOLATE,
+ __ATOMIC_RELAXED);
rte_errno = EINVAL;
break;
} else if (unlikely(ret == -2)) {
- evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+ __atomic_store_n(&evtims[i]->state,
+ RTE_EVENT_TIMER_ERROR_TOOEARLY,
+ __ATOMIC_RELAXED);
rte_errno = EINVAL;
break;
}
if (unlikely(check_destination_event_queue(evtims[i],
adapter) < 0)) {
- evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+ __atomic_store_n(&evtims[i]->state,
+ RTE_EVENT_TIMER_ERROR,
+ __ATOMIC_RELAXED);
rte_errno = EINVAL;
break;
}
@@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
SINGLE, lcore_id, NULL, evtims[i]);
if (ret < 0) {
/* tim was in RUNNING or CONFIG state */
- evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+ __atomic_store_n(&evtims[i]->state,
+ RTE_EVENT_TIMER_ERROR,
+ __ATOMIC_RELEASE);
break;
}
- rte_smp_wmb();
EVTIM_LOG_DBG("armed an event timer");
- evtims[i]->state = RTE_EVENT_TIMER_ARMED;
+ /* RELEASE ordering guarantees the adapter specific value
+ * changes observed before the update of state.
+ */
+ __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED,
+ __ATOMIC_RELEASE);
}
if (i < nb_evtims)
@@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
struct rte_timer *timp;
uint64_t opaque;
struct swtim *sw = swtim_pmd_priv(adapter);
+ enum rte_event_timer_state n_state;
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
/* Check that the service is running. */
@@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
for (i = 0; i < nb_evtims; i++) {
/* Don't modify the event timer state in these cases */
- if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
+ /* ACQUIRE ordering guarantees the access of implementation
+ * specific opague data under the correct state.
+ */
+ n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+ if (n_state == RTE_EVENT_TIMER_CANCELED) {
rte_errno = EALREADY;
break;
- } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
+ } else if (n_state != RTE_EVENT_TIMER_ARMED) {
rte_errno = EINVAL;
break;
}
- rte_smp_rmb();
-
opaque = evtims[i]->impl_opaque[0];
timp = (struct rte_timer *)(uintptr_t)opaque;
RTE_ASSERT(timp != NULL);
@@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
rte_mempool_put(sw->tim_pool, (void **)timp);
- evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
-
- rte_smp_wmb();
+ /* The RELEASE ordering here pairs with atomic ordering
+ * to make sure the state update data observed between
+ * threads.
+ */
+ __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED,
+ __ATOMIC_RELEASE);
}
return i;