From patchwork Sat Nov 4 17:29:40 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: =?utf-8?q?Morten_Br=C3=B8rup?= X-Patchwork-Id: 133872 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 3345143287; Sat, 4 Nov 2023 18:29:44 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id B494F4029B; Sat, 4 Nov 2023 18:29:43 +0100 (CET) Received: from dkmailrelay1.smartsharesystems.com (smartserver.smartsharesystems.com [77.243.40.215]) by mails.dpdk.org (Postfix) with ESMTP id D84C640282 for ; Sat, 4 Nov 2023 18:29:42 +0100 (CET) Received: from smartserver.smartsharesystems.com (smartserver.smartsharesys.local [192.168.4.10]) by dkmailrelay1.smartsharesystems.com (Postfix) with ESMTP id AB539206C3 for ; Sat, 4 Nov 2023 18:29:42 +0100 (CET) Content-class: urn:content-classes:message MIME-Version: 1.0 Subject: [RFC] mempool: CPU cache aligning mempool driver accesses X-MimeOLE: Produced By Microsoft Exchange V6.5 Date: Sat, 4 Nov 2023 18:29:40 +0100 Message-ID: <98CBD80474FA8B44BF855DF32C47DC35E9EFD4@smartserver.smartshare.dk> X-MS-Has-Attach: X-MS-TNEF-Correlator: Thread-Topic: [RFC] mempool: CPU cache aligning mempool driver accesses Thread-Index: AdoPRH2dqaLCk8BNQ/aikLEUxLBH7w== From: =?iso-8859-1?q?Morten_Br=F8rup?= To: X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org I tried a little experiment, which gave a 25 % improvement in mempool perf tests for long bursts (n_get_bulk=32 n_put_bulk=32 n_keep=512 constant_n=0) on a Xeon E5-2620 v4 based system. This is the concept: If all accesses to the mempool driver goes through the mempool cache, we can ensure that these bulk load/stores are always CPU cache aligned, by using cache->size when loading/storing to the mempool driver. Furthermore, it is rumored that most applications use the default mempool cache size, so if the driver tests for that specific value, it can use rte_memcpy(src,dst,N) with N known at build time, allowing optimal performance for copying the array of objects. Unfortunately, I need to change the flush threshold from 1.5 to 2 to be able to always use cache->size when loading/storing to the mempool driver. What do you think? PS: If we can't get rid of the mempool cache size threshold factor, we really need to expose it through public APIs. A job for another day. Signed-off-by: Morten Brørup diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c index 7a7a9bf6db..b21033209b 100644 --- a/lib/mempool/rte_mempool.c +++ b/lib/mempool/rte_mempool.c @@ -48,7 +48,7 @@ static void mempool_event_callback_invoke(enum rte_mempool_event event, struct rte_mempool *mp); -#define CACHE_FLUSHTHRESH_MULTIPLIER 1.5 +#define CACHE_FLUSHTHRESH_MULTIPLIER 2 #define CALC_CACHE_FLUSHTHRESH(c) \ ((typeof(c))((c) * CACHE_FLUSHTHRESH_MULTIPLIER)) diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h index df87cd231e..76efeff59e 100644 --- a/lib/mempool/rte_mempool.h +++ b/lib/mempool/rte_mempool.h @@ -1014,7 +1014,7 @@ typedef void (rte_mempool_ctor_t)(struct rte_mempool *, void *); * If cache_size is non-zero, the rte_mempool library will try to * limit the accesses to the common lockless pool, by maintaining a * per-lcore object cache. This argument must be lower or equal to - * RTE_MEMPOOL_CACHE_MAX_SIZE and n / 1.5. It is advised to choose + * RTE_MEMPOOL_CACHE_MAX_SIZE and n / 2. It is advised to choose * cache_size to have "n modulo cache_size == 0": if this is * not the case, some elements will always stay in the pool and will * never be used. The access to the per-lcore table is of course @@ -1373,24 +1373,24 @@ rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table, RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n); /* The request itself is too big for the cache */ - if (unlikely(n > cache->flushthresh)) + if (unlikely(n > cache->size)) goto driver_enqueue_stats_incremented; /* * The cache follows the following algorithm: * 1. If the objects cannot be added to the cache without crossing - * the flush threshold, flush the cache to the backend. + * the flush threshold, flush a fixed amount of the cache to the backend. * 2. Add the objects to the cache. */ if (cache->len + n <= cache->flushthresh) { cache_objs = &cache->objs[cache->len]; - cache->len += n; } else { - cache_objs = &cache->objs[0]; - rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len); - cache->len = n; + cache->len -= cache->size; + cache_objs = &cache->objs[cache->len]; + rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->size); } + cache->len += n; /* Add the objects to the cache. */ rte_memcpy(cache_objs, obj_table, sizeof(void *) * n); @@ -1547,13 +1547,13 @@ rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table, return 0; } - /* if dequeue below would overflow mem allocated for cache */ - if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE)) + /* More remaining than the cache size */ + if (unlikely(remaining > cache->size)) goto driver_dequeue; - /* Fill the cache from the backend; fetch size + remaining objects. */ + /* Fill the cache from the backend; fetch size objects. */ ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, - cache->size + remaining); + cache->size); if (unlikely(ret < 0)) { /* * We are buffer constrained, and not able to allocate @@ -1565,11 +1565,11 @@ rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table, } /* Satisfy the remaining part of the request from the filled cache. */ - cache_objs = &cache->objs[cache->size + remaining]; + cache_objs = &cache->objs[cache->size]; for (index = 0; index < remaining; index++) *obj_table++ = *--cache_objs; - cache->len = cache->size; + cache->len = cache->size - remaining; RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index 4b80f58980..2b10b76fc1 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -10,6 +10,9 @@ #ifndef _RTE_RING_ELEM_PVT_H_ #define _RTE_RING_ELEM_PVT_H_ +#include +#include + #if defined(RTE_TOOLCHAIN_GCC) && (GCC_VERSION >= 120000) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wstringop-overflow" @@ -24,6 +27,12 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t *ring = (uint32_t *)&r[1]; const uint32_t *obj = (const uint32_t *)obj_table; if (likely(idx + n <= size)) { +#ifdef RTE_ARCH_32 + if (n == RTE_MEMPOOL_CACHE_MAX_SIZE) { + rte_memcpy(&ring[idx], obj_table, RTE_MEMPOOL_CACHE_MAX_SIZE * sizeof(uint32_t)); + return; + } +#endif for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { ring[idx] = obj[i]; ring[idx + 1] = obj[i + 1]; @@ -69,6 +78,12 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, uint64_t *ring = (uint64_t *)&r[1]; const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; if (likely(idx + n <= size)) { +#ifdef RTE_ARCH_64 + if (n == RTE_MEMPOOL_CACHE_MAX_SIZE) { + rte_memcpy(&ring[idx], obj_table, RTE_MEMPOOL_CACHE_MAX_SIZE * sizeof(uint64_t)); + return; + } +#endif for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { ring[idx] = obj[i]; ring[idx + 1] = obj[i + 1]; @@ -158,6 +173,12 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t *ring = (uint32_t *)&r[1]; uint32_t *obj = (uint32_t *)obj_table; if (likely(idx + n <= size)) { +#ifdef RTE_ARCH_32 + if (n == RTE_MEMPOOL_CACHE_MAX_SIZE) { + rte_memcpy(obj_table, &ring[idx], RTE_MEMPOOL_CACHE_MAX_SIZE * sizeof(uint32_t)); + return; + } +#endif for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { obj[i] = ring[idx]; obj[i + 1] = ring[idx + 1]; @@ -203,6 +224,12 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t cons_head, uint64_t *ring = (uint64_t *)&r[1]; unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; if (likely(idx + n <= size)) { +#ifdef RTE_ARCH_64 + if (n == RTE_MEMPOOL_CACHE_MAX_SIZE) { + rte_memcpy(obj_table, &ring[idx], RTE_MEMPOOL_CACHE_MAX_SIZE * sizeof(uint64_t)); + return; + } +#endif for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { obj[i] = ring[idx]; obj[i + 1] = ring[idx + 1];