From patchwork Mon Jan 28 18:14:03 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 50073 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 88C7D7EC7; Mon, 28 Jan 2019 19:15:12 +0100 (CET) Received: from mga18.intel.com (mga18.intel.com [134.134.136.126]) by dpdk.org (Postfix) with ESMTP id 34F2C695D for ; Mon, 28 Jan 2019 19:15:07 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by orsmga106.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 28 Jan 2019 10:15:06 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,534,1539673200"; d="scan'208";a="139508008" Received: from txasoft-yocto.an.intel.com ([10.123.72.192]) by fmsmga004.fm.intel.com with ESMTP; 28 Jan 2019 10:15:04 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org, jerinj@marvell.com, mczekaj@marvell.com, nd@arm.com, Ola.Liljedahl@arm.com Date: Mon, 28 Jan 2019 12:14:03 -0600 Message-Id: <20190128181407.32739-2-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190128181407.32739-1-gage.eads@intel.com> References: <20190118152326.22686-1-gage.eads@intel.com> <20190128181407.32739-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH v4 1/5] ring: add 64-bit headtail structure X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" 64-bit head and tail index widths greatly increases the time it takes for them to wrap-around (with current CPU speeds, it won't happen within the author's lifetime). This is fundamental to avoiding the ABA problem -- in which a thread mistakes reading the same tail index in two accesses to mean that the ring was not modified in the intervening time -- in the upcoming non-blocking ring implementation. Using a 64-bit index makes the possibility of this occurring effectively zero. This commit places the new producer and consumer structures in the same location in struct rte_ring as their 32-bit counterparts. Since the 32-bit versions are padded out to a cache line, there is space for the new structure without affecting the layout of struct rte_ring. Thus, the ABI is preserved. Signed-off-by: Gage Eads --- lib/librte_ring/rte_ring.h | 23 +++++- lib/librte_ring/rte_ring_c11_mem.h | 153 +++++++++++++++++++++++++++++++++++++ lib/librte_ring/rte_ring_generic.h | 139 +++++++++++++++++++++++++++++++++ 3 files changed, 312 insertions(+), 3 deletions(-) diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index af5444a9f..00dfb5b85 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2010-2019 Intel Corporation * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -70,6 +70,15 @@ struct rte_ring_headtail { uint32_t single; /**< True if single prod/cons */ }; +/* 64-bit version of rte_ring_headtail, for use by rings that need to avoid + * head/tail wrap-around. + */ +struct rte_ring_headtail_64 { + volatile uint64_t head; /**< Prod/consumer head. */ + volatile uint64_t tail; /**< Prod/consumer tail. */ + uint32_t single; /**< True if single prod/cons */ +}; + /** * An RTE ring structure. * @@ -97,11 +106,19 @@ struct rte_ring { char pad0 __rte_cache_aligned; /**< empty cache line */ /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail prod __rte_cache_aligned; + struct rte_ring_headtail_64 prod_64 __rte_cache_aligned; + }; char pad1 __rte_cache_aligned; /**< empty cache line */ /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail cons __rte_cache_aligned; + struct rte_ring_headtail_64 cons_64 __rte_cache_aligned; + }; char pad2 __rte_cache_aligned; /**< empty cache line */ }; diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 0fb73a337..47acd4c7c 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -178,4 +178,157 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, return n; } +/** + * @internal This function updates the producer head for enqueue using + * 64-bit head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head_64(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint64_t *old_head, uint64_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + uint64_t cons_tail; + unsigned int max = n; + int success; + + *old_head = __atomic_load_n(&r->prod_64.head, __ATOMIC_RELAXED); + do { + /* Reset n to the initial burst count */ + n = max; + + /* Ensure the head is read before tail */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + cons_tail = __atomic_load_n(&r->cons_64.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + cons_tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod_64.head = *new_head, success = 1; + else + /* on failure, *old_head is updated */ + success = __atomic_compare_exchange_n(&r->prod_64.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue using + * 64-bit head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_64(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint64_t *old_head, uint64_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + uint64_t prod_tail; + int success; + + /* move cons.head atomically */ + *old_head = __atomic_load_n(&r->cons_64.head, __ATOMIC_RELAXED); + do { + /* Restore n as it may change every loop */ + n = max; + + /* Ensure the head is read before tail */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + prod_tail = __atomic_load_n(&r->prod_64.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons_64.head = *new_head, success = 1; + else + /* on failure, *old_head will be updated */ + success = __atomic_compare_exchange_n(&r->cons_64.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + #endif /* _RTE_RING_C11_MEM_H_ */ diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h index ea7dbe5b9..2158e092a 100644 --- a/lib/librte_ring/rte_ring_generic.h +++ b/lib/librte_ring/rte_ring_generic.h @@ -167,4 +167,143 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, return n; } +/** + * @internal This function updates the producer head for enqueue using + * 64-bit head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head_64(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint64_t *old_head, uint64_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + + *old_head = r->prod_64.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 64bits value + * (the result is always modulo 64 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + r->cons_64.tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod_64.head = *new_head, success = 1; + else + success = rte_atomic64_cmpset(&r->prod_64.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue using + * 64-bit head/tail values. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_64(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint64_t *old_head, uint64_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons_64.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* The subtraction is done between two unsigned 64bits value + * (the result is always modulo 64 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (r->prod_64.tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons_64.head = *new_head, success = 1; + else + success = rte_atomic64_cmpset(&r->cons_64.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + #endif /* _RTE_RING_GENERIC_H_ */