From patchwork Fri Sep 20 16:37:34 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: =?utf-8?q?Morten_Br=C3=B8rup?= X-Patchwork-Id: 144368 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 54628459E0; Fri, 20 Sep 2024 18:37:40 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id D1EB2433A0; Fri, 20 Sep 2024 18:37:39 +0200 (CEST) Received: from dkmailrelay1.smartsharesystems.com (smartserver.smartsharesystems.com [77.243.40.215]) by mails.dpdk.org (Postfix) with ESMTP id 5FA2842F3B for ; Fri, 20 Sep 2024 18:37:38 +0200 (CEST) Received: from smartserver.smartsharesystems.com (smartserver.smartsharesys.local [192.168.4.10]) by dkmailrelay1.smartsharesystems.com (Postfix) with ESMTP id 34CDF21B0A for ; Fri, 20 Sep 2024 18:37:38 +0200 (CEST) Received: from dkrd4.smartsharesys.local ([192.168.4.26]) by smartserver.smartsharesystems.com with Microsoft SMTPSVC(6.0.3790.4675); Fri, 20 Sep 2024 18:37:36 +0200 From: =?utf-8?q?Morten_Br=C3=B8rup?= To: dev@dpdk.org Cc: =?utf-8?q?Morten_Br=C3=B8rup?= Subject: [RFC PATCH v2] mempool: obey configured cache size Date: Fri, 20 Sep 2024 16:37:34 +0000 Message-ID: <20240920163734.841000-1-mb@smartsharesystems.com> X-Mailer: git-send-email 2.43.0 In-Reply-To: <20240920163203.840770-1-mb@smartsharesystems.com> References: <20240920163203.840770-1-mb@smartsharesystems.com> MIME-Version: 1.0 X-OriginalArrivalTime: 20 Sep 2024 16:37:36.0530 (UTC) FILETIME=[6673A720:01DB0B7B] X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Seeking feedback on the concept. I have not yet benchmarked performance. The mempool cache size is configurable, but it could hold 1.5 times the configured size. This was confusing for developers, and added complexity when configuring mempools with caches. This patch modifies the mempool cache to obey the configured size, and removes the cache flush threshold. Furthermore, the mempool caches are now completely flushed/filled to/from the backend, so backend accesses are CPU cache line aligned. Finallly, the mempool get and put functions are optimized to only inline the likely scenarios, and call a non-inline static helper function in other cases. Variuos drivers accessing the mempool directly have been updated accordingly. Signed-off-by: Morten Brørup --- v2: * Removed mempool perf test; not part of patch set. --- drivers/common/idpf/idpf_common_rxtx_avx512.c | 54 +-- drivers/mempool/dpaa/dpaa_mempool.c | 16 +- drivers/mempool/dpaa2/dpaa2_hw_mempool.c | 14 - drivers/net/i40e/i40e_rxtx_vec_avx512.c | 17 +- drivers/net/iavf/iavf_rxtx_vec_avx512.c | 27 +- drivers/net/ice/ice_rxtx_vec_avx512.c | 27 +- lib/mempool/mempool_trace.h | 1 - lib/mempool/rte_mempool.c | 12 +- lib/mempool/rte_mempool.h | 337 +++++++++++------- 9 files changed, 256 insertions(+), 249 deletions(-) diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c index 3b5e124ec8..98535a48f3 100644 --- a/drivers/common/idpf/idpf_common_rxtx_avx512.c +++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c @@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq) rte_lcore_id()); void **cache_objs; - if (cache == NULL || cache->len == 0) - goto normal; - - cache_objs = &cache->objs[cache->len]; - - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); + if (!cache || unlikely(n + cache->len > cache->size)) { + rte_mempool_generic_put(mp, (void *)txep, n, cache); goto done; } - /* The cache follows the following algorithm - * 1. Add the objects to the cache - * 2. Anything greater than the cache min value (if it crosses the - * cache flush threshold) is flushed to the ring. - */ + cache_objs = &cache->objs[cache->len]; + /* Add elements back into the cache */ uint32_t copied = 0; /* n is multiple of 32 */ @@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq) } cache->len += n; - if (cache->len >= cache->flushthresh) { - rte_mempool_ops_enqueue_bulk(mp, - &cache->objs[cache->size], - cache->len - cache->size); - cache->len = cache->size; - } + /* Increment stat. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); + goto done; } -normal: m = rte_pktmbuf_prefree_seg(txep[0].mbuf); if (likely(m != NULL)) { free[0] = m; @@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq) rte_lcore_id()); void **cache_objs; - if (!cache || cache->len == 0) - goto normal; - - cache_objs = &cache->objs[cache->len]; - - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); + if (!cache || unlikely(n + cache->len > cache->size)) { + rte_mempool_generic_put(mp, (void *)txep, n, cache); goto done; } - /* The cache follows the following algorithm - * 1. Add the objects to the cache - * 2. Anything greater than the cache min value (if it crosses the - * cache flush threshold) is flushed to the ring. - */ + cache_objs = &cache->objs[cache->len]; + /* Add elements back into the cache */ uint32_t copied = 0; /* n is multiple of 32 */ @@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq) } cache->len += n; - if (cache->len >= cache->flushthresh) { - rte_mempool_ops_enqueue_bulk(mp, - &cache->objs[cache->size], - cache->len - cache->size); - cache->len = cache->size; - } + /* Increment stat. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); + goto done; } -normal: m = rte_pktmbuf_prefree_seg(txep[0].mbuf); if (likely(m)) { free[0] = m; diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c index 74bfcab509..3a936826c8 100644 --- a/drivers/mempool/dpaa/dpaa_mempool.c +++ b/drivers/mempool/dpaa/dpaa_mempool.c @@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp) struct bman_pool_params params = { .flags = BMAN_POOL_FLAG_DYNAMIC_BPID }; - unsigned int lcore_id; - struct rte_mempool_cache *cache; MEMPOOL_INIT_FUNC_TRACE(); @@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp) rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid], sizeof(struct dpaa_bp_info)); mp->pool_data = (void *)bp_info; - /* Update per core mempool cache threshold to optimal value which is - * number of buffers that can be released to HW buffer pool in - * a single API call. - */ - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { - cache = &mp->local_cache[lcore_id]; - DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d", - lcore_id, cache->flushthresh, - (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL)); - if (cache->flushthresh) - cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL; - } DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid); return 0; @@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool, DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d", count, bp_info->bpid); - if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) { + if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) { DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers", count); return -1; diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c index 42e17d984c..a44f3cf616 100644 --- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c +++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c @@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp) struct dpaa2_bp_info *bp_info; struct dpbp_attr dpbp_attr; uint32_t bpid; - unsigned int lcore_id; - struct rte_mempool_cache *cache; int ret; avail_dpbp = dpaa2_alloc_dpbp_dev(); @@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp) DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid); h_bp_list = bp_list; - /* Update per core mempool cache threshold to optimal value which is - * number of buffers that can be released to HW buffer pool in - * a single API call. - */ - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { - cache = &mp->local_cache[lcore_id]; - DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d", - lcore_id, cache->flushthresh, - (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL)); - if (cache->flushthresh) - cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL; - } return 0; err3: diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c index 0238b03f8a..712ab1726f 100644 --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c @@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq) struct rte_mempool_cache *cache = rte_mempool_default_cache(mp, rte_lcore_id()); - if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) { + if (!cache || unlikely(n + cache->len > cache->size)) { rte_mempool_generic_put(mp, (void *)txep, n, cache); goto done; } cache_objs = &cache->objs[cache->len]; - /* The cache follows the following algorithm - * 1. Add the objects to the cache - * 2. Anything greater than the cache min value (if it - * crosses the cache flush threshold) is flushed to the ring. - */ /* Add elements back into the cache */ uint32_t copied = 0; /* n is multiple of 32 */ @@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq) } cache->len += n; - if (cache->len >= cache->flushthresh) { - rte_mempool_ops_enqueue_bulk - (mp, &cache->objs[cache->size], - cache->len - cache->size); - cache->len = cache->size; - } + /* Increment stat. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); + goto done; } diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c index 3bb6f305df..307bb8556a 100644 --- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c +++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c @@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq) rte_lcore_id()); void **cache_objs; - if (!cache || cache->len == 0) - goto normal; - - cache_objs = &cache->objs[cache->len]; - - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); + if (!cache || unlikely(n + cache->len > cache->size)) { + rte_mempool_generic_put(mp, (void *)txep, n, cache); goto done; } - /* The cache follows the following algorithm - * 1. Add the objects to the cache - * 2. Anything greater than the cache min value (if it crosses the - * cache flush threshold) is flushed to the ring. - */ + cache_objs = &cache->objs[cache->len]; + /* Add elements back into the cache */ uint32_t copied = 0; /* n is multiple of 32 */ @@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq) } cache->len += n; - if (cache->len >= cache->flushthresh) { - rte_mempool_ops_enqueue_bulk(mp, - &cache->objs[cache->size], - cache->len - cache->size); - cache->len = cache->size; - } + /* Increment stat. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); + goto done; } -normal: m = rte_pktmbuf_prefree_seg(txep[0].mbuf); if (likely(m)) { free[0] = m; diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c index 04148e8ea2..4ea1db734e 100644 --- a/drivers/net/ice/ice_rxtx_vec_avx512.c +++ b/drivers/net/ice/ice_rxtx_vec_avx512.c @@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq) struct rte_mempool_cache *cache = rte_mempool_default_cache(mp, rte_lcore_id()); - if (!cache || cache->len == 0) - goto normal; - - cache_objs = &cache->objs[cache->len]; - - if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) { - rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n); + if (!cache || unlikely(n + cache->len > cache->size)) { + rte_mempool_generic_put(mp, (void *)txep, n, cache); goto done; } - /* The cache follows the following algorithm - * 1. Add the objects to the cache - * 2. Anything greater than the cache min value (if it - * crosses the cache flush threshold) is flushed to the ring. - */ + cache_objs = &cache->objs[cache->len]; + /* Add elements back into the cache */ uint32_t copied = 0; /* n is multiple of 32 */ @@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq) } cache->len += n; - if (cache->len >= cache->flushthresh) { - rte_mempool_ops_enqueue_bulk - (mp, &cache->objs[cache->size], - cache->len - cache->size); - cache->len = cache->size; - } + /* Increment stat. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); + goto done; } -normal: m = rte_pktmbuf_prefree_seg(txep[0].mbuf); if (likely(m)) { free[0] = m; diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h index dffef062e4..3c49b41a6d 100644 --- a/lib/mempool/mempool_trace.h +++ b/lib/mempool/mempool_trace.h @@ -112,7 +112,6 @@ RTE_TRACE_POINT( rte_trace_point_emit_i32(socket_id); rte_trace_point_emit_ptr(cache); rte_trace_point_emit_u32(cache->len); - rte_trace_point_emit_u32(cache->flushthresh); ) RTE_TRACE_POINT( diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c index d8e39e5c20..40fb13239a 100644 --- a/lib/mempool/rte_mempool.c +++ b/lib/mempool/rte_mempool.c @@ -50,11 +50,6 @@ static void mempool_event_callback_invoke(enum rte_mempool_event event, struct rte_mempool *mp); -/* Note: avoid using floating point since that compiler - * may not think that is constant. - */ -#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2) - #if defined(RTE_ARCH_X86) /* * return the greatest common divisor between a and b (fast algorithm) @@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp) static void mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size) { - /* Check that cache have enough space for flush threshold */ - RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) > + /* Check that cache have enough space for size */ + RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE > RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) / RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0])); cache->size = size; - cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size); cache->len = 0; } @@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size, /* asked cache too big */ if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE || - CALC_CACHE_FLUSHTHRESH(cache_size) > n) { + cache_size > n) { rte_errno = EINVAL; return NULL; } diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h index 7bdc92b812..de1d49bac3 100644 --- a/lib/mempool/rte_mempool.h +++ b/lib/mempool/rte_mempool.h @@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats { */ struct __rte_cache_aligned rte_mempool_cache { uint32_t size; /**< Size of the cache */ - uint32_t flushthresh; /**< Threshold before we flush excess elements */ uint32_t len; /**< Current cache count */ #ifdef RTE_LIBRTE_MEMPOOL_STATS - uint32_t unused; /* * Alternative location for the most frequently updated mempool statistics (per-lcore), * providing faster update access when using a mempool cache. @@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache { * Cache is allocated to this size to allow it to overflow in certain * cases to avoid needless emptying of cache. */ - alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2]; + alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE]; }; /** @@ -1363,7 +1361,8 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache, } /** - * @internal Put several objects back in the mempool; used internally. + * @internal Put several objects back in the mempool; used internally when + * the number of objects exceeds the remaining space in the mempool cache. * @param mp * A pointer to the mempool structure. * @param obj_table @@ -1371,58 +1370,94 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache, * @param n * The number of objects to store back in the mempool, must be strictly * positive. + * Must be more than the remaining space in the mempool cache, i.e.: + * cache->len + n > cache->size * @param cache - * A pointer to a mempool cache structure. May be NULL if not needed. + * A pointer to a mempool cache structure. Not NULL. */ -static __rte_always_inline void -rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table, - unsigned int n, struct rte_mempool_cache *cache) +static __rte_noinline void +rte_mempool_do_generic_put_many(struct rte_mempool *mp, void * const *obj_table, + unsigned int n, struct rte_mempool_cache *cache) { - void **cache_objs; + __attribute__((assume(cache != NULL))); + __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + __attribute__((assume(cache->len + n > cache->size))); - /* No cache provided */ - if (unlikely(cache == NULL)) - goto driver_enqueue; + void **cache_objs; + unsigned int len; + const uint32_t cache_size = cache->size; - /* increment stat now, adding in mempool always success */ + /* Increment stat now, adding in mempool always succeeds. */ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); - /* The request itself is too big for the cache */ - if (unlikely(n > cache->flushthresh)) - goto driver_enqueue_stats_incremented; - - /* - * The cache follows the following algorithm: - * 1. If the objects cannot be added to the cache without crossing - * the flush threshold, flush the cache to the backend. - * 2. Add the objects to the cache. - */ - - if (cache->len + n <= cache->flushthresh) { - cache_objs = &cache->objs[cache->len]; - cache->len += n; - } else { - cache_objs = &cache->objs[0]; - rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len); - cache->len = n; + /* Fill the cache with the first objects. */ + cache_objs = &cache->objs[cache->len]; + len = (cache_size - cache->len); + rte_memcpy(cache_objs, obj_table, sizeof(void *) * len); + obj_table += len; + n -= len; + + /* Flush the entire cache to the backend. */ + cache_objs = &cache->objs[0]; + rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size); + + if (unlikely(n > cache_size)) { + /* Push following objects, in entire cache sizes, directly to the backend. */ + len = n - n % cache_size; + rte_mempool_ops_enqueue_bulk(mp, obj_table, len); + obj_table += len; + n -= len; } - /* Add the objects to the cache. */ + /* Add the remaining objects to the cache. */ + cache->len = n; rte_memcpy(cache_objs, obj_table, sizeof(void *) * n); +} - return; - -driver_enqueue: - - /* increment stat now, adding in mempool always success */ - RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1); - RTE_MEMPOOL_STAT_ADD(mp, put_objs, n); - -driver_enqueue_stats_incremented: +/** + * @internal Put several objects back in the mempool; used internally. + * @param mp + * A pointer to the mempool structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to store back in the mempool, must be strictly + * positive. + * @param cache + * A pointer to a mempool cache structure. May be NULL if not needed. + */ +static __rte_always_inline void +rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table, + unsigned int n, struct rte_mempool_cache *cache) +{ + if (likely(cache != NULL)) { + __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + + /* Enough remaining space in the cache? */ + if (likely(cache->len + n <= cache->size)) { + void **cache_objs; + + /* Increment stat now, adding in mempool always succeeds. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); + + /* Add the objects to the cache. */ + cache_objs = &cache->objs[cache->len]; + cache->len += n; + rte_memcpy(cache_objs, obj_table, sizeof(void *) * n); + } else + rte_mempool_do_generic_put_many(mp, obj_table, n, cache); + } else { + /* Increment stat now, adding in mempool always succeeds. */ + RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1); + RTE_MEMPOOL_STAT_ADD(mp, put_objs, n); - /* push objects to the backend */ - rte_mempool_ops_enqueue_bulk(mp, obj_table, n); + /* push objects to the backend */ + rte_mempool_ops_enqueue_bulk(mp, obj_table, n); + } } @@ -1490,135 +1525,193 @@ rte_mempool_put(struct rte_mempool *mp, void *obj) } /** - * @internal Get several objects from the mempool; used internally. + * @internal Get several objects from the mempool; used internally when + * the number of objects exceeds what is available in the mempool cache. * @param mp * A pointer to the mempool structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n * The number of objects to get, must be strictly positive. + * Must be more than available in the mempool cache, i.e.: + * n > cache->len * @param cache - * A pointer to a mempool cache structure. May be NULL if not needed. + * A pointer to a mempool cache structure. Not NULL. * @return * - 0: Success. * - <0: Error; code of driver dequeue function. */ -static __rte_always_inline int -rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table, - unsigned int n, struct rte_mempool_cache *cache) +static __rte_noinline int +rte_mempool_do_generic_get_many(struct rte_mempool *mp, void **obj_table, + unsigned int n, struct rte_mempool_cache *cache) { + __attribute__((assume(cache != NULL))); + __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + __attribute__((assume(n > cache->len))); + int ret; unsigned int remaining; uint32_t index, len; void **cache_objs; + const uint32_t cache_size = cache->size; - /* No cache provided */ - if (unlikely(cache == NULL)) { - remaining = n; - goto driver_dequeue; - } - - /* The cache is a stack, so copy will be in reverse order. */ + /* Serve the first part of the request from the cache to return hot objects first. */ cache_objs = &cache->objs[cache->len]; + len = cache->len; + remaining = n - len; + for (index = 0; index < len; index++) + *obj_table++ = *--cache_objs; - if (__rte_constant(n) && n <= cache->len) { + /* At this point, the cache is empty. */ + + /* More than can be served from a full cache? */ + if (unlikely(remaining >= cache_size)) { /* - * The request size is known at build time, and - * the entire request can be satisfied from the cache, - * so let the compiler unroll the fixed length copy loop. + * Serve the following part of the request directly from the backend + * in multipla of the cache size. */ - cache->len -= n; - for (index = 0; index < n; index++) - *obj_table++ = *--cache_objs; + len = remaining - remaining % cache_size; + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len); + if (unlikely(ret < 0)) { + /* + * No further action is required to roll back the request, + * as objects in the cache are intact, and no objects have + * been dequeued from the backend. + */ - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); - return 0; - } + return ret; + } - /* - * Use the cache as much as we have to return hot objects first. - * If the request size 'n' is known at build time, the above comparison - * ensures that n > cache->len here, so omit RTE_MIN(). - */ - len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len); - cache->len -= len; - remaining = n - len; - for (index = 0; index < len; index++) - *obj_table++ = *--cache_objs; + remaining -= len; + obj_table += len; - /* - * If the request size 'n' is known at build time, the case - * where the entire request can be satisfied from the cache - * has already been handled above, so omit handling it here. - */ - if (!__rte_constant(n) && remaining == 0) { - /* The entire request is satisfied from the cache. */ + if (unlikely(remaining == 0)) { + cache->len = 0; - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); - return 0; + return 0; + } } - /* if dequeue below would overflow mem allocated for cache */ - if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE)) - goto driver_dequeue; - - /* Fill the cache from the backend; fetch size + remaining objects. */ - ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, - cache->size + remaining); + /* Fill the entire cache from the backend. */ + ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size); if (unlikely(ret < 0)) { /* - * We are buffer constrained, and not able to allocate - * cache + remaining. - * Do not fill the cache, just satisfy the remaining part of - * the request directly from the backend. + * Unable to fill the cache. + * Last resort: Try only the remaining part of the request, + * served directly from the backend. */ - goto driver_dequeue; - } + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining); + if (unlikely(ret == 0)) { + cache->len = 0; - /* Satisfy the remaining part of the request from the filled cache. */ - cache_objs = &cache->objs[cache->size + remaining]; - for (index = 0; index < remaining; index++) - *obj_table++ = *--cache_objs; + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); - cache->len = cache->size; + return 0; + } + /* Roll back. */ + if (cache->len + remaining == n) { + /* + * No further action is required to roll back the request, + * as objects in the cache are intact, and no objects have + * been dequeued from the backend. + */ + } else { + /* Update the state of the cache before putting back the objects. */ + cache->len = 0; + + len = n - remaining; + obj_table -= len; + rte_mempool_do_generic_put(mp, obj_table, len, cache); + } + + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); + + return ret; + } + + /* Increment stat now, this always succeeds. */ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); + /* Serve the remaining part of the request from the filled cache. */ + cache_objs = &cache->objs[cache_size]; + for (index = 0; index < remaining; index++) + *obj_table++ = *--cache_objs; + + cache->len = cache_size - remaining; + return 0; +} -driver_dequeue: +/** + * @internal Get several objects from the mempool; used internally. + * @param mp + * A pointer to the mempool structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to get, must be strictly positive. + * @param cache + * A pointer to a mempool cache structure. May be NULL if not needed. + * @return + * - 0: Success. + * - <0: Error; code of driver dequeue function. + */ +static __rte_always_inline int +rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table, + unsigned int n, struct rte_mempool_cache *cache) +{ + if (likely(cache != NULL)) { + __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE))); + + /* Enough objects in the cache? */ + if (n <= cache->len) { + unsigned int index; + void **cache_objs; - /* Get remaining objects directly from the backend. */ - ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining); + /* Increment stat now, this always succeeds. */ + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); + RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); - if (ret < 0) { - if (likely(cache != NULL)) { - cache->len = n - remaining; /* - * No further action is required to roll the first part - * of the request back into the cache, as objects in - * the cache are intact. + * The cache is a stack, so copy will be in reverse order. + * If the request size is known at build time, + * the compiler will unroll the fixed length copy loop. */ - } - - RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); - RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); + cache_objs = &cache->objs[cache->len]; + cache->len -= n; + for (index = 0; index < n; index++) + *obj_table++ = *--cache_objs; + + return 0; + } else + return rte_mempool_do_generic_get_many(mp, obj_table, n, cache); } else { - if (likely(cache != NULL)) { - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); - RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); + int ret; + + /* Get the objects directly from the backend. */ + ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n); + if (unlikely(ret < 0)) { + RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); + RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); } else { RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1); RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n); } - } - return ret; + return ret; + } } /**