From patchwork Wed Oct 30 21:22:59 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Konstantin Ananyev X-Patchwork-Id: 147762 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 20C8A45B82; Wed, 30 Oct 2024 21:32:55 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 10B9843366; Wed, 30 Oct 2024 21:32:55 +0100 (CET) Received: from frasgout.his.huawei.com (frasgout.his.huawei.com [185.176.79.56]) by mails.dpdk.org (Postfix) with ESMTP id 994ED4335B for ; Wed, 30 Oct 2024 21:32:51 +0100 (CET) Received: from mail.maildlp.com (unknown [172.18.186.31]) by frasgout.his.huawei.com (SkyGuard) with ESMTP id 4XdzG748mdz6L74N; Thu, 31 Oct 2024 04:27:59 +0800 (CST) Received: from frapeml500007.china.huawei.com (unknown [7.182.85.172]) by mail.maildlp.com (Postfix) with ESMTPS id 2AC501404F4; Thu, 31 Oct 2024 04:32:51 +0800 (CST) Received: from localhost.localdomain (10.220.239.45) by frapeml500007.china.huawei.com (7.182.85.172) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.1.2507.39; Wed, 30 Oct 2024 21:32:50 +0100 From: Konstantin Ananyev To: CC: , , , , , , , , Subject: [PATCH v7 2/7] ring: common functions for 'move head' ops Date: Wed, 30 Oct 2024 17:22:59 -0400 Message-ID: <20241030212304.104180-3-konstantin.ananyev@huawei.com> X-Mailer: git-send-email 2.35.3 In-Reply-To: <20241030212304.104180-1-konstantin.ananyev@huawei.com> References: <20241021174745.1843-1-konstantin.ananyev@huawei.com> <20241030212304.104180-1-konstantin.ananyev@huawei.com> MIME-Version: 1.0 X-Originating-IP: [10.220.239.45] X-ClientProxiedBy: frapeml100006.china.huawei.com (7.182.85.201) To frapeml500007.china.huawei.com (7.182.85.172) X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Note upfront: that change doesn't introduce any functional or performance changes. It is just a code-reordering for: - code deduplication - ability in future to re-use the same code to introduce new functionality For each sync mode corresponding move_prod_head() and move_cons_head() are nearly identical to each other, the only differences are: - do we need to use a @capacity to calculate number of entries or not. - what we need to update (prod/cons) and what is used as read-only counterpart. So instead of having 2 copies of nearly identical functions, introduce a new common one that could be used by both functions: move_prod_head() and move_cons_head(). As another positive thing - we can get rid of referencing whole rte_ring structure in that new common sub-function. Signed-off-by: Konstantin Ananyev Acked-by: Morten Brørup --- lib/ring/rte_ring_c11_pvt.h | 156 ++++++++++--------------------- lib/ring/rte_ring_elem_pvt.h | 66 +++++++++++++ lib/ring/rte_ring_generic_pvt.h | 143 +++++++++------------------- lib/ring/rte_ring_hts_elem_pvt.h | 107 ++++++++++----------- lib/ring/rte_ring_rts_elem_pvt.h | 107 ++++++++++----------- 5 files changed, 255 insertions(+), 324 deletions(-) diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h index 629b2d9288..b9388af0da 100644 --- a/lib/ring/rte_ring_c11_pvt.h +++ b/lib/ring/rte_ring_c11_pvt.h @@ -11,6 +11,17 @@ #ifndef _RTE_RING_C11_PVT_H_ #define _RTE_RING_C11_PVT_H_ +/** + * @file rte_ring_c11_pvt.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for MP/SP and MC/SC ring modes. + * For more information please refer to . + */ + +/** + * @internal This function updates tail values. + */ static __rte_always_inline void __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue) @@ -29,40 +40,45 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, } /** - * @internal This function updates the producer head for enqueue + * @internal This is a helper function that moves the producer/consumer head * - * @param r - * A pointer to the ring structure - * @param is_sp - * Indicates whether multi-producer path is needed or not + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * @param is_st + * Indicates whether multi-thread safe path is needed or not * @param n - * The number of elements we will want to enqueue, i.e. how far should the - * head be moved + * The number of elements we want to move head value on * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible * @param old_head - * Returns head value as it was before the move, i.e. where enqueue starts + * Returns head value as it was before the move * @param new_head - * Returns the current/new head value i.e. where enqueue finishes - * @param free_entries - * Returns the amount of free space in the ring BEFORE head was moved + * Returns the new head value + * @param entries + * Returns the number of ring entries available BEFORE head was moved * @return - * Actual number of objects enqueued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, - unsigned int n, enum rte_ring_queue_behavior behavior, - uint32_t *old_head, uint32_t *new_head, - uint32_t *free_entries) +__rte_ring_headtail_move_head(struct rte_ring_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, + unsigned int is_st, unsigned int n, + enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - const uint32_t capacity = r->capacity; - uint32_t cons_tail; - unsigned int max = n; + uint32_t stail; int success; + unsigned int max = n; - *old_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed); + *old_head = rte_atomic_load_explicit(&d->head, + rte_memory_order_relaxed); do { /* Reset n to the initial burst count */ n = max; @@ -73,112 +89,36 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, /* load-acquire synchronize with store-release of ht->tail * in update_tail. */ - cons_tail = rte_atomic_load_explicit(&r->cons.tail, + stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have - * *old_head > cons_tail). So 'free_entries' is always between 0 + * *old_head > s->tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *free_entries = (capacity + cons_tail - *old_head); + *entries = (capacity + stail - *old_head); /* check that we have enough room in ring */ - if (unlikely(n > *free_entries)) + if (unlikely(n > *entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? - 0 : *free_entries; + 0 : *entries; if (n == 0) return 0; *new_head = *old_head + n; - if (is_sp) { - r->prod.head = *new_head; + if (is_st) { + d->head = *new_head; success = 1; } else /* on failure, *old_head is updated */ - success = rte_atomic_compare_exchange_strong_explicit(&r->prod.head, - old_head, *new_head, + success = rte_atomic_compare_exchange_strong_explicit( + &d->head, old_head, *new_head, rte_memory_order_relaxed, rte_memory_order_relaxed); } while (unlikely(success == 0)); return n; } -/** - * @internal This function updates the consumer head for dequeue - * - * @param r - * A pointer to the ring structure - * @param is_sc - * Indicates whether multi-consumer path is needed or not - * @param n - * The number of elements we will want to dequeue, i.e. how far should the - * head be moved - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring - * @param old_head - * Returns head value as it was before the move, i.e. where dequeue starts - * @param new_head - * Returns the current/new head value i.e. where dequeue finishes - * @param entries - * Returns the number of entries in the ring BEFORE head was moved - * @return - * - Actual number of objects dequeued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, - unsigned int n, enum rte_ring_queue_behavior behavior, - uint32_t *old_head, uint32_t *new_head, - uint32_t *entries) -{ - unsigned int max = n; - uint32_t prod_tail; - int success; - - /* move cons.head atomically */ - *old_head = rte_atomic_load_explicit(&r->cons.head, rte_memory_order_relaxed); - do { - /* Restore n as it may change every loop */ - n = max; - - /* Ensure the head is read before tail */ - rte_atomic_thread_fence(rte_memory_order_acquire); - - /* this load-acquire synchronize with store-release of ht->tail - * in update_tail. - */ - prod_tail = rte_atomic_load_explicit(&r->prod.tail, - rte_memory_order_acquire); - - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. - */ - *entries = (prod_tail - *old_head); - - /* Set the actual entries for dequeue */ - if (n > *entries) - n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; - - if (unlikely(n == 0)) - return 0; - - *new_head = *old_head + n; - if (is_sc) { - r->cons.head = *new_head; - success = 1; - } else - /* on failure, *old_head will be updated */ - success = rte_atomic_compare_exchange_strong_explicit(&r->cons.head, - old_head, *new_head, - rte_memory_order_relaxed, - rte_memory_order_relaxed); - } while (unlikely(success == 0)); - return n; -} - #endif /* _RTE_RING_C11_PVT_H_ */ diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index 4b80f58980..3a83668a08 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -293,6 +293,72 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, #include "rte_ring_generic_pvt.h" #endif +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *free_entries) +{ + return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity, + is_sp, n, behavior, old_head, new_head, free_entries); +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to dequeue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0, + is_sc, n, behavior, old_head, new_head, entries); +} + /** * @internal Enqueue several objects on the ring * diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h index 457f41dab3..affd2d5ba7 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_generic_pvt.h @@ -10,6 +10,17 @@ #ifndef _RTE_RING_GENERIC_PVT_H_ #define _RTE_RING_GENERIC_PVT_H_ +/** + * @file rte_ring_generic_pvt.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for MP/SP and MC/SC ring modes. + * For more information please refer to . + */ + +/** + * @internal This function updates tail values. + */ static __rte_always_inline void __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue) @@ -30,35 +41,39 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, } /** - * @internal This function updates the producer head for enqueue + * @internal This is a helper function that moves the producer/consumer head * - * @param r - * A pointer to the ring structure - * @param is_sp - * Indicates whether multi-producer path is needed or not + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * @param is_st + * Indicates whether multi-thread safe path is needed or not * @param n - * The number of elements we will want to enqueue, i.e. how far should the - * head be moved + * The number of elements we want to move head value on * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible * @param old_head - * Returns head value as it was before the move, i.e. where enqueue starts + * Returns head value as it was before the move * @param new_head - * Returns the current/new head value i.e. where enqueue finishes - * @param free_entries - * Returns the amount of free space in the ring BEFORE head was moved + * Returns the new head value + * @param entries + * Returns the number of ring entries available BEFORE head was moved * @return - * Actual number of objects enqueued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, - unsigned int n, enum rte_ring_queue_behavior behavior, - uint32_t *old_head, uint32_t *new_head, - uint32_t *free_entries) +__rte_ring_headtail_move_head(struct rte_ring_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, + unsigned int is_st, unsigned int n, + enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - const uint32_t capacity = r->capacity; unsigned int max = n; int success; @@ -66,7 +81,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, /* Reset n to the initial burst count */ n = max; - *old_head = r->prod.head; + *old_head = d->head; /* add rmb barrier to avoid load/load reorder in weak * memory model. It is noop on x86 @@ -76,97 +91,27 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have - * *old_head > cons_tail). So 'free_entries' is always between 0 + * *old_head > s->tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *free_entries = (capacity + r->cons.tail - *old_head); + *entries = (capacity + s->tail - *old_head); /* check that we have enough room in ring */ - if (unlikely(n > *free_entries)) + if (unlikely(n > *entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? - 0 : *free_entries; + 0 : *entries; if (n == 0) return 0; *new_head = *old_head + n; - if (is_sp) { - r->prod.head = *new_head; + if (is_st) { + d->head = *new_head; success = 1; } else - success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->prod.head, - *old_head, *new_head); - } while (unlikely(success == 0)); - return n; -} - -/** - * @internal This function updates the consumer head for dequeue - * - * @param r - * A pointer to the ring structure - * @param is_sc - * Indicates whether multi-consumer path is needed or not - * @param n - * The number of elements we will want to dequeue, i.e. how far should the - * head be moved - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring - * @param old_head - * Returns head value as it was before the move, i.e. where dequeue starts - * @param new_head - * Returns the current/new head value i.e. where dequeue finishes - * @param entries - * Returns the number of entries in the ring BEFORE head was moved - * @return - * - Actual number of objects dequeued. - * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. - */ -static __rte_always_inline unsigned int -__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, - unsigned int n, enum rte_ring_queue_behavior behavior, - uint32_t *old_head, uint32_t *new_head, - uint32_t *entries) -{ - unsigned int max = n; - int success; - - /* move cons.head atomically */ - do { - /* Restore n as it may change every loop */ - n = max; - - *old_head = r->cons.head; - - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 - */ - rte_smp_rmb(); - - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. - */ - *entries = (r->prod.tail - *old_head); - - /* Set the actual entries for dequeue */ - if (n > *entries) - n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; - - if (unlikely(n == 0)) - return 0; - - *new_head = *old_head + n; - if (is_sc) { - r->cons.head = *new_head; - rte_smp_rmb(); - success = 1; - } else { - success = rte_atomic32_cmpset((uint32_t *)(uintptr_t)&r->cons.head, + success = rte_atomic32_cmpset( + (uint32_t *)(uintptr_t)&d->head, *old_head, *new_head); - } } while (unlikely(success == 0)); return n; } diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h index 91f5eeccb9..e2b82dd1e6 100644 --- a/lib/ring/rte_ring_hts_elem_pvt.h +++ b/lib/ring/rte_ring_hts_elem_pvt.h @@ -51,19 +51,39 @@ __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, } /** - * @internal This function updates the producer head for enqueue + * @internal This is a helper function that moves the producer/consumer head + * + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * Indicates whether multi-thread safe path is needed or not + * @param num + * The number of elements we want to move head value on + * @param behavior + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible + * @param old_head + * Returns head value as it was before the move + * @param entries + * Returns the number of ring entries available BEFORE head was moved + * @return + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ -static __rte_always_inline unsigned int -__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, +static __rte_always_inline uint32_t +__rte_ring_hts_move_head(struct rte_ring_hts_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, - uint32_t *free_entries) + uint32_t *entries) { uint32_t n; union __rte_ring_hts_pos np, op; - const uint32_t capacity = r->capacity; - - op.raw = rte_atomic_load_explicit(&r->hts_prod.ht.raw, rte_memory_order_acquire); + op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire); do { /* Reset n to the initial burst count */ @@ -74,20 +94,20 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, * make sure that we read prod head/tail *before* * reading cons tail. */ - __rte_ring_hts_head_wait(&r->hts_prod, &op); + __rte_ring_hts_head_wait(d, &op); /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have - * *old_head > cons_tail). So 'free_entries' is always between 0 + * *old_head > cons_tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *free_entries = capacity + r->cons.tail - op.pos.head; + *entries = capacity + s->tail - op.pos.head; /* check that we have enough room in ring */ - if (unlikely(n > *free_entries)) + if (unlikely(n > *entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? - 0 : *free_entries; + 0 : *entries; if (n == 0) break; @@ -100,13 +120,25 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, * - OOO reads of cons tail value * - OOO copy of elems from the ring */ - } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_prod.ht.raw, + } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw, (uint64_t *)(uintptr_t)&op.raw, np.raw, - rte_memory_order_acquire, rte_memory_order_acquire) == 0); + rte_memory_order_acquire, + rte_memory_order_acquire) == 0); *old_head = op.pos.head; return n; } +/** + * @internal This function updates the producer head for enqueue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + return __rte_ring_hts_move_head(&r->hts_prod, &r->cons, + r->capacity, num, behavior, old_head, free_entries); +} /** * @internal This function updates the consumer head for dequeue @@ -116,51 +148,8 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *entries) { - uint32_t n; - union __rte_ring_hts_pos np, op; - - op.raw = rte_atomic_load_explicit(&r->hts_cons.ht.raw, rte_memory_order_acquire); - - /* move cons.head atomically */ - do { - /* Restore n as it may change every loop */ - n = num; - - /* - * wait for tail to be equal to head, - * make sure that we read cons head/tail *before* - * reading prod tail. - */ - __rte_ring_hts_head_wait(&r->hts_cons, &op); - - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. - */ - *entries = r->prod.tail - op.pos.head; - - /* Set the actual entries for dequeue */ - if (n > *entries) - n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; - - if (unlikely(n == 0)) - break; - - np.pos.tail = op.pos.tail; - np.pos.head = op.pos.head + n; - - /* - * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: - * - OOO reads of prod tail value - * - OOO copy of elems from the ring - */ - } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_cons.ht.raw, - (uint64_t *)(uintptr_t)&op.raw, np.raw, - rte_memory_order_acquire, rte_memory_order_acquire) == 0); - - *old_head = op.pos.head; - return n; + return __rte_ring_hts_move_head(&r->hts_cons, &r->prod, + 0, num, behavior, old_head, entries); } /** diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h index 122650346b..96825931f8 100644 --- a/lib/ring/rte_ring_rts_elem_pvt.h +++ b/lib/ring/rte_ring_rts_elem_pvt.h @@ -65,19 +65,40 @@ __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, } /** - * @internal This function updates the producer head for enqueue. + * @internal This is a helper function that moves the producer/consumer head + * + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * Indicates whether multi-thread safe path is needed or not + * @param num + * The number of elements we want to move head value on + * @param behavior + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible + * @param old_head + * Returns head value as it was before the move + * @param entries + * Returns the number of ring entries available BEFORE head was moved + * @return + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline uint32_t -__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, +__rte_ring_rts_move_head(struct rte_ring_rts_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, - uint32_t *free_entries) + uint32_t *entries) { uint32_t n; union __rte_ring_rts_poscnt nh, oh; - const uint32_t capacity = r->capacity; - - oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire); + oh.raw = rte_atomic_load_explicit(&d->head.raw, + rte_memory_order_acquire); do { /* Reset n to the initial burst count */ @@ -88,20 +109,20 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, * make sure that we read prod head *before* * reading cons tail. */ - __rte_ring_rts_head_wait(&r->rts_prod, &oh); + __rte_ring_rts_head_wait(d, &oh); /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have - * *old_head > cons_tail). So 'free_entries' is always between 0 + * *old_head > cons_tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *free_entries = capacity + r->cons.tail - oh.val.pos; + *entries = capacity + s->tail - oh.val.pos; /* check that we have enough room in ring */ - if (unlikely(n > *free_entries)) + if (unlikely(n > *entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? - 0 : *free_entries; + 0 : *entries; if (n == 0) break; @@ -114,14 +135,27 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, * - OOO reads of cons tail value * - OOO copy of elems to the ring */ - } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw, + } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw, (uint64_t *)(uintptr_t)&oh.raw, nh.raw, - rte_memory_order_acquire, rte_memory_order_acquire) == 0); + rte_memory_order_acquire, + rte_memory_order_acquire) == 0); *old_head = oh.val.pos; return n; } +/** + * @internal This function updates the producer head for enqueue. + */ +static __rte_always_inline uint32_t +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + return __rte_ring_rts_move_head(&r->rts_prod, &r->cons, + r->capacity, num, behavior, old_head, free_entries); +} + /** * @internal This function updates the consumer head for dequeue */ @@ -130,51 +164,8 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *entries) { - uint32_t n; - union __rte_ring_rts_poscnt nh, oh; - - oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire); - - /* move cons.head atomically */ - do { - /* Restore n as it may change every loop */ - n = num; - - /* - * wait for cons head/tail distance, - * make sure that we read cons head *before* - * reading prod tail. - */ - __rte_ring_rts_head_wait(&r->rts_cons, &oh); - - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. - */ - *entries = r->prod.tail - oh.val.pos; - - /* Set the actual entries for dequeue */ - if (n > *entries) - n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; - - if (unlikely(n == 0)) - break; - - nh.val.pos = oh.val.pos + n; - nh.val.cnt = oh.val.cnt + 1; - - /* - * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: - * - OOO reads of prod tail value - * - OOO copy of elems from the ring - */ - } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw, - (uint64_t *)(uintptr_t)&oh.raw, nh.raw, - rte_memory_order_acquire, rte_memory_order_acquire) == 0); - - *old_head = oh.val.pos; - return n; + return __rte_ring_rts_move_head(&r->rts_cons, &r->prod, + 0, num, behavior, old_head, entries); } /**