From patchwork Thu Sep 30 17:27:35 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dharmik Thakkar X-Patchwork-Id: 100168 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 6B534A0C43; Thu, 30 Sep 2021 19:27:52 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id E39DD410E5; Thu, 30 Sep 2021 19:27:51 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id A22244067E for ; Thu, 30 Sep 2021 19:27:50 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 07E0F106F; Thu, 30 Sep 2021 10:27:50 -0700 (PDT) Received: from 2p2660v4-1.austin.arm.com (2p2660v4-1.austin.arm.com [10.118.13.211]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id F31CF3F70D; Thu, 30 Sep 2021 10:27:49 -0700 (PDT) From: Dharmik Thakkar To: Olivier Matz , Andrew Rybchenko Cc: dev@dpdk.org, nd@arm.com, honnappa.nagarahalli@arm.com, ruifeng.wang@arm.com, Dharmik Thakkar Date: Thu, 30 Sep 2021 12:27:35 -0500 Message-Id: <20210930172735.2675627-1-dharmik.thakkar@arm.com> X-Mailer: git-send-email 2.25.1 MIME-Version: 1.0 Subject: [dpdk-dev] [RFC] mempool: implement index-based per core cache X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Current mempool per core cache implementation is based on pointer For most architectures, each pointer consumes 64b Replace it with index-based implementation, where in each buffer is addressed by (pool address + index) It will reduce memory requirements L3Fwd performance testing reveals minor improvements in the cache performance and no change in throughput Micro-benchmarking the patch using mempool_perf_test shows significant improvement with majority of the test cases Future plan involves replacing global pool's pointer-based implementation with index-based implementation Signed-off-by: Dharmik Thakkar --- drivers/mempool/ring/rte_mempool_ring.c | 2 +- lib/mempool/rte_mempool.c | 8 +++ lib/mempool/rte_mempool.h | 74 ++++++++++++++++++++++--- 3 files changed, 74 insertions(+), 10 deletions(-) diff --git a/drivers/mempool/ring/rte_mempool_ring.c b/drivers/mempool/ring/rte_mempool_ring.c index b1f09ff28f4d..e55913e47f21 100644 --- a/drivers/mempool/ring/rte_mempool_ring.c +++ b/drivers/mempool/ring/rte_mempool_ring.c @@ -101,7 +101,7 @@ ring_alloc(struct rte_mempool *mp, uint32_t rg_flags) return -rte_errno; mp->pool_data = r; - + mp->local_cache_base_addr = &r[1]; return 0; } diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c index 59a588425bd6..424bdb19c323 100644 --- a/lib/mempool/rte_mempool.c +++ b/lib/mempool/rte_mempool.c @@ -480,6 +480,7 @@ rte_mempool_populate_default(struct rte_mempool *mp) int ret; bool need_iova_contig_obj; size_t max_alloc_size = SIZE_MAX; + unsigned lcore_id; ret = mempool_ops_alloc_once(mp); if (ret != 0) @@ -600,6 +601,13 @@ rte_mempool_populate_default(struct rte_mempool *mp) } } + /* Init all default caches. */ + if (mp->cache_size != 0) { + for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) + mp->local_cache[lcore_id].local_cache_base_value = + *(void **)mp->local_cache_base_addr; + } + rte_mempool_trace_populate_default(mp); return mp->size; diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h index 4235d6f0bf2b..545405c0d3ce 100644 --- a/lib/mempool/rte_mempool.h +++ b/lib/mempool/rte_mempool.h @@ -51,6 +51,8 @@ #include #include +#include + #include "rte_mempool_trace_fp.h" #ifdef __cplusplus @@ -91,11 +93,12 @@ struct rte_mempool_cache { uint32_t size; /**< Size of the cache */ uint32_t flushthresh; /**< Threshold before we flush excess elements */ uint32_t len; /**< Current cache count */ + void *local_cache_base_value; /**< Base value to calculate indices */ /* * Cache is allocated to this size to allow it to overflow in certain * cases to avoid needless emptying of cache. */ - void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */ + uint32_t objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */ } __rte_cache_aligned; /** @@ -172,7 +175,6 @@ struct rte_mempool_objtlr { * A list of memory where objects are stored */ STAILQ_HEAD(rte_mempool_memhdr_list, rte_mempool_memhdr); - /** * Callback used to free a memory chunk */ @@ -244,6 +246,7 @@ struct rte_mempool { int32_t ops_index; struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */ + void *local_cache_base_addr; /**< Reference to the base value */ uint32_t populated_size; /**< Number of populated objects. */ struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */ @@ -1269,7 +1272,15 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache, if (cache == NULL || cache->len == 0) return; rte_mempool_trace_cache_flush(cache, mp); - rte_mempool_ops_enqueue_bulk(mp, cache->objs, cache->len); + + unsigned int i; + unsigned int cache_len = cache->len; + void *obj_table[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; + void *base_value = cache->local_cache_base_value; + uint32_t *cache_objs = cache->objs; + for (i = 0; i < cache_len; i++) + obj_table[i] = (void *) RTE_PTR_ADD(base_value, cache_objs[i]); + rte_mempool_ops_enqueue_bulk(mp, obj_table, cache->len); cache->len = 0; } @@ -1289,7 +1300,9 @@ static __rte_always_inline void __mempool_generic_put(struct rte_mempool *mp, void * const *obj_table, unsigned int n, struct rte_mempool_cache *cache) { - void **cache_objs; + uint32_t *cache_objs; + void *base_value; + uint32_t i; /* increment stat now, adding in mempool always success */ __MEMPOOL_STAT_ADD(mp, put_bulk, 1); @@ -1301,6 +1314,12 @@ __mempool_generic_put(struct rte_mempool *mp, void * const *obj_table, cache_objs = &cache->objs[cache->len]; + base_value = cache->local_cache_base_value; + + uint64x2_t v_obj_table; + uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value); + uint32x2_t v_cache_objs; + /* * The cache follows the following algorithm * 1. Add the objects to the cache @@ -1309,12 +1328,26 @@ __mempool_generic_put(struct rte_mempool *mp, void * const *obj_table, */ /* Add elements back into the cache */ - rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n); + +#if defined __ARM_NEON + for (i = 0; i < (n & ~0x1); i+=2) { + v_obj_table = vld1q_u64((const uint64_t *)&obj_table[i]); + v_cache_objs = vqmovn_u64(vsubq_u64(v_obj_table, v_base_value)); + vst1_u32(cache_objs + i, v_cache_objs); + } + if (n & 0x1) { + cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i], base_value); + } +#else + for (i = 0; i < n; i++) { + cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i], base_value); + } +#endif cache->len += n; if (cache->len >= cache->flushthresh) { - rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size], + rte_mempool_ops_enqueue_bulk(mp, obj_table + cache->len - cache->size, cache->len - cache->size); cache->len = cache->size; } @@ -1415,23 +1448,26 @@ __mempool_generic_get(struct rte_mempool *mp, void **obj_table, unsigned int n, struct rte_mempool_cache *cache) { int ret; + uint32_t i; uint32_t index, len; - void **cache_objs; + uint32_t *cache_objs; /* No cache provided or cannot be satisfied from cache */ if (unlikely(cache == NULL || n >= cache->size)) goto ring_dequeue; + void *base_value = cache->local_cache_base_value; cache_objs = cache->objs; /* Can this be satisfied from the cache? */ if (cache->len < n) { /* No. Backfill the cache first, and then fill from it */ uint32_t req = n + (cache->size - cache->len); + void *temp_objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */ /* How many do we require i.e. number to fill the cache + the request */ ret = rte_mempool_ops_dequeue_bulk(mp, - &cache->objs[cache->len], req); + temp_objs, req); if (unlikely(ret < 0)) { /* * In the off chance that we are buffer constrained, @@ -1442,12 +1478,32 @@ __mempool_generic_get(struct rte_mempool *mp, void **obj_table, goto ring_dequeue; } + len = cache->len; + for (i = 0; i < req; ++i, ++len) { + cache_objs[len] = (uint32_t) RTE_PTR_DIFF(temp_objs[i], base_value); + } + cache->len += req; } + uint64x2_t v_obj_table; + uint64x2_t v_cache_objs; + uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value); + /* Now fill in the response ... */ +#if defined __ARM_NEON + for (index = 0, len = cache->len - 1; index < (n & ~0x1); index+=2, + len-=2, obj_table+=2) { + v_cache_objs = vmovl_u32(vld1_u32(cache_objs + len - 1)); + v_obj_table = vaddq_u64(v_cache_objs, v_base_value); + vst1q_u64((uint64_t *)obj_table, v_obj_table); + } + if (n & 0x1) + *obj_table = (void *) RTE_PTR_ADD(base_value, cache_objs[len]); +#else for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++) - *obj_table = cache_objs[len]; + *obj_table = (void *) RTE_PTR_ADD(base_value, cache_objs[len]); +#endif cache->len -= n;