From patchwork Wed Jan 11 15:05:25 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bruce Richardson X-Patchwork-Id: 19135 X-Patchwork-Delegate: ferruh.yigit@amd.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 3DBCBF966; Wed, 11 Jan 2017 16:06:35 +0100 (CET) Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by dpdk.org (Postfix) with ESMTP id A2492F93D for ; Wed, 11 Jan 2017 16:06:26 +0100 (CET) Received: from orsmga002.jf.intel.com ([10.7.209.21]) by fmsmga101.fm.intel.com with ESMTP; 11 Jan 2017 07:06:26 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,346,1477983600"; d="scan'208";a="29056932" Received: from sivswdev01.ir.intel.com (HELO localhost.localdomain) ([10.237.217.45]) by orsmga002.jf.intel.com with ESMTP; 11 Jan 2017 07:06:25 -0800 From: Bruce Richardson To: olivier.matz@6wind.com Cc: dev@dpdk.org, Bruce Richardson Date: Wed, 11 Jan 2017 15:05:25 +0000 Message-Id: <1484147125-5948-12-git-send-email-bruce.richardson@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: <1484147125-5948-1-git-send-email-bruce.richardson@intel.com> References: <1484147125-5948-1-git-send-email-bruce.richardson@intel.com> Subject: [dpdk-dev] [RFC PATCH 11/11] ring: reuse typed ring enqueue and dequeue functions X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" reduce code duplicated between rte_ring.h and rte_typed_ring.h by having the enqueue and dequeue code reused by rte_ring from rte_typed_ring. Signed-off-by: Bruce Richardson --- lib/librte_ring/rte_ring.h | 380 +-------------------------------------------- 1 file changed, 8 insertions(+), 372 deletions(-) diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 4e74efd..25c4849 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -231,366 +231,10 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count); */ void rte_ring_dump(FILE *f, const struct rte_ring *r); -/** - * @internal Enqueue several objects on the ring (multi-producers safe). - * - * This function uses a "compare and set" instruction to move the - * producer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring - * @return - * Depend on the behavior value - * if behavior = RTE_RING_QUEUE_FIXED - * - 0: Success; objects enqueue. - * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the - * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. - * if behavior = RTE_RING_QUEUE_VARIABLE - * - n: Actual number of objects enqueued. - */ -static inline int __attribute__((always_inline)) -__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) -{ - uint32_t prod_head, prod_next; - uint32_t cons_tail, free_entries; - const unsigned max = n; - int success; - unsigned i, rep = 0; - uint32_t mask = r->prod.mask; - int ret; - - /* Avoid the unnecessary cmpset operation below, which is also - * potentially harmful when n equals 0. */ - if (n == 0) - return 0; - - /* move prod.head atomically */ - do { - /* Reset n to the initial burst count */ - n = max; - - prod_head = r->prod.head; - cons_tail = r->cons.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * prod_head > cons_tail). So 'free_entries' is always between 0 - * and size(ring)-1. */ - free_entries = (mask + cons_tail - prod_head); - - /* check that we have enough room in ring */ - if (unlikely(n > free_entries)) { - if (behavior == RTE_RING_QUEUE_FIXED) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; - } - else { - /* No free entry available */ - if (unlikely(free_entries == 0)) { - __RING_STAT_ADD(r, enq_fail, n); - return 0; - } - - n = free_entries; - } - } - - prod_next = prod_head + n; - success = rte_atomic32_cmpset(&r->prod.head, prod_head, - prod_next); - } while (unlikely(success == 0)); - - /* write entries in ring */ - ENQUEUE_PTRS(); - rte_smp_wmb(); - - /* if we exceed the watermark */ - if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : - (int)(n | RTE_RING_QUOT_EXCEED); - __RING_STAT_ADD(r, enq_quota, n); - } - else { - ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; - __RING_STAT_ADD(r, enq_success, n); - } - - /* - * If there are other enqueues in progress that preceded us, - * we need to wait for them to complete - */ - while (unlikely(r->prod.tail != prod_head)) { - rte_pause(); - - /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting - * for other thread finish. It gives pre-empted thread a chance - * to proceed and finish with ring dequeue operation. */ - if (RTE_RING_PAUSE_REP_COUNT && - ++rep == RTE_RING_PAUSE_REP_COUNT) { - rep = 0; - sched_yield(); - } - } - r->prod.tail = prod_next; - return ret; -} - -/** - * @internal Enqueue several objects on a ring (NOT multi-producers safe). - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring - * @return - * Depend on the behavior value - * if behavior = RTE_RING_QUEUE_FIXED - * - 0: Success; objects enqueue. - * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the - * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. - * if behavior = RTE_RING_QUEUE_VARIABLE - * - n: Actual number of objects enqueued. - */ -static inline int __attribute__((always_inline)) -__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) -{ - uint32_t prod_head, cons_tail; - uint32_t prod_next, free_entries; - unsigned i; - uint32_t mask = r->prod.mask; - int ret; - - prod_head = r->prod.head; - cons_tail = r->cons.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * prod_head > cons_tail). So 'free_entries' is always between 0 - * and size(ring)-1. */ - free_entries = mask + cons_tail - prod_head; - - /* check that we have enough room in ring */ - if (unlikely(n > free_entries)) { - if (behavior == RTE_RING_QUEUE_FIXED) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; - } - else { - /* No free entry available */ - if (unlikely(free_entries == 0)) { - __RING_STAT_ADD(r, enq_fail, n); - return 0; - } - - n = free_entries; - } - } - - prod_next = prod_head + n; - r->prod.head = prod_next; - - /* write entries in ring */ - ENQUEUE_PTRS(); - rte_smp_wmb(); - - /* if we exceed the watermark */ - if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : - (int)(n | RTE_RING_QUOT_EXCEED); - __RING_STAT_ADD(r, enq_quota, n); - } - else { - ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; - __RING_STAT_ADD(r, enq_success, n); - } - - r->prod.tail = prod_next; - return ret; -} - -/** - * @internal Dequeue several objects from a ring (multi-consumers safe). When - * the request objects are more than the available objects, only dequeue the - * actual number of objects - * - * This function uses a "compare and set" instruction to move the - * consumer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. - * @param n - * The number of objects to dequeue from the ring to the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring - * @return - * Depend on the behavior value - * if behavior = RTE_RING_QUEUE_FIXED - * - 0: Success; objects dequeued. - * - -ENOENT: Not enough entries in the ring to dequeue; no object is - * dequeued. - * if behavior = RTE_RING_QUEUE_VARIABLE - * - n: Actual number of objects dequeued. - */ - -static inline int __attribute__((always_inline)) -__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) -{ - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - const unsigned max = n; - int success; - unsigned i, rep = 0; - uint32_t mask = r->prod.mask; - - /* Avoid the unnecessary cmpset operation below, which is also - * potentially harmful when n equals 0. */ - if (n == 0) - return 0; - - /* move cons.head atomically */ - do { - /* Restore n as it may change every loop */ - n = max; - - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = (prod_tail - cons_head); - - /* Set the actual entries for dequeue */ - if (n > entries) { - if (behavior == RTE_RING_QUEUE_FIXED) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - else { - if (unlikely(entries == 0)){ - __RING_STAT_ADD(r, deq_fail, n); - return 0; - } - - n = entries; - } - } - - cons_next = cons_head + n; - success = rte_atomic32_cmpset(&r->cons.head, cons_head, - cons_next); - } while (unlikely(success == 0)); - - /* copy in table */ - DEQUEUE_PTRS(); - rte_smp_rmb(); - - /* - * If there are other dequeues in progress that preceded us, - * we need to wait for them to complete - */ - while (unlikely(r->cons.tail != cons_head)) { - rte_pause(); - - /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting - * for other thread finish. It gives pre-empted thread a chance - * to proceed and finish with ring dequeue operation. */ - if (RTE_RING_PAUSE_REP_COUNT && - ++rep == RTE_RING_PAUSE_REP_COUNT) { - rep = 0; - sched_yield(); - } - } - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - - return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; -} - -/** - * @internal Dequeue several objects from a ring (NOT multi-consumers safe). - * When the request objects are more than the available objects, only dequeue - * the actual number of objects - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. - * @param n - * The number of objects to dequeue from the ring to the obj_table. - * @param behavior - * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring - * @return - * Depend on the behavior value - * if behavior = RTE_RING_QUEUE_FIXED - * - 0: Success; objects dequeued. - * - -ENOENT: Not enough entries in the ring to dequeue; no object is - * dequeued. - * if behavior = RTE_RING_QUEUE_VARIABLE - * - n: Actual number of objects dequeued. - */ -static inline int __attribute__((always_inline)) -__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) -{ - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - unsigned i; - uint32_t mask = r->prod.mask; - - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = prod_tail - cons_head; - - if (n > entries) { - if (behavior == RTE_RING_QUEUE_FIXED) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - else { - if (unlikely(entries == 0)){ - __RING_STAT_ADD(r, deq_fail, n); - return 0; - } - - n = entries; - } - } - - cons_next = cons_head + n; - r->cons.head = cons_next; - - /* copy in table */ - DEQUEUE_PTRS(); - rte_smp_rmb(); - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; -} +#define __rte_ring_mp_do_enqueue rte_void___ring_mp_do_enqueue +#define __rte_ring_sp_do_enqueue rte_void___ring_sp_do_enqueue +#define __rte_ring_mc_do_dequeue rte_void___ring_mc_do_dequeue +#define __rte_ring_sc_do_dequeue rte_void___ring_sc_do_dequeue /** * Enqueue several objects on the ring (multi-producers safe). @@ -882,9 +526,7 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p) static inline int rte_ring_full(const struct rte_ring *r) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0; + return rte_void_ring_full(r); } /** @@ -899,9 +541,7 @@ rte_ring_full(const struct rte_ring *r) static inline int rte_ring_empty(const struct rte_ring *r) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - return !!(cons_tail == prod_tail); + return rte_void_ring_empty(r); } /** @@ -915,9 +555,7 @@ rte_ring_empty(const struct rte_ring *r) static inline unsigned rte_ring_count(const struct rte_ring *r) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - return (prod_tail - cons_tail) & r->prod.mask; + return rte_void_ring_count(r); } /** @@ -931,9 +569,7 @@ rte_ring_count(const struct rte_ring *r) static inline unsigned rte_ring_free_count(const struct rte_ring *r) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - return (cons_tail - prod_tail - 1) & r->prod.mask; + return rte_void_ring_free_count(r); } /**