[RFC,v16] mempool: fix mempool cache size

Message ID 20241002143550.2582893-1-mb@smartsharesystems.com (mailing list archive)
State Superseded
Delegated to: Thomas Monjalon
Headers
Series [RFC,v16] mempool: fix mempool cache size |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/loongarch-compilation success Compilation OK
ci/loongarch-unit-testing success Unit Testing PASS
ci/Intel-compilation success Compilation OK
ci/intel-Testing success Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/intel-Functional success Functional PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/iol-marvell-Functional success Functional Testing PASS
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-sample-apps-testing success Testing PASS
ci/iol-unit-amd64-testing success Testing PASS
ci/iol-compile-amd64-testing success Testing PASS
ci/iol-unit-arm64-testing success Testing PASS
ci/iol-compile-arm64-testing success Testing PASS

Commit Message

Morten Brørup Oct. 2, 2024, 2:35 p.m. UTC
This patch refactors the mempool cache to fix two bugs:
1. When a mempool is created with a per-lcore cache size of N objects, the
per-lcore caches were actually created with a size of 1.5 * N objects.
2. The mempool cache field names did not reflect their purpose;
the "flushthresh" field held the size, and the "size" field held the
number of objects remaining in the cache when returning from a get
operation refilling it from the backend.

Especially the first item could be fatal:
When more objects than a mempool's configured cache size is held in the
mempool's caches associated with other lcores, a rightsized mempool may
unexpectedly run out of objects, causing the application to fail.

Furthermore, this patch introduces some optimizations.
(Work in progress. Details to follow later. Submitting to get CI
performance data.)

Various drivers accessing the mempool directly have been updated
accordingly.
These drivers did not update mempool statistics when accessing the mempool
directly, so that is fixed too.

Note: Performance not yet benchmarked.

Signed-off-by: Morten Brørup <mb@smartsharesystems.com>
---
v16:
* Fix bug in rte_mempool_do_generic_put() regarding criteria for flush.
v15:
* Changed back cache bypass limit from n >= RTE_MEMPOOL_CACHE_MAX_SIZE to
  n > RTE_MEMPOOL_CACHE_MAX_SIZE.
* Removed cache size limit from serving via cache.
v14:
* Change rte_mempool_do_generic_put() back from add-then-flush to
  flush-then-add.
  Keep the target cache fill level of ca. 1/2 size of the cache.
v13:
* Target a cache fill level of ca. 1/2 size of the cache when flushing and
  refilling; based on an assumption of equal probability of get and put,
  instead of assuming a higher probability of put being followed by
  another put, and get being followed by another get.
* Reduce the amount of changes to the drivers.
v12:
* Do not init mempool caches with size zero; they don't exist.
  Bug introduced in v10.
v11:
* Removed rte_mempool_do_generic_get_split().
v10:
* Initialize mempool caches, regardless of size zero.
  This to fix compiler warning about out of bounds access.
v9:
* Removed factor 1.5 from description of cache_size parameter to
  rte_mempool_create().
* Refactored rte_mempool_do_generic_put() to eliminate some gotos.
  No functional change.
* Removed check for n >= RTE_MEMPOOL_CACHE_MAX_SIZE in
  rte_mempool_do_generic_get(); it caused the function to fail when the
  request could not be served from the backend alone, but it could be
  served from the cache and the backend.
* Refactored rte_mempool_do_generic_get_split() to make it shorter.
* When getting objects directly from the backend, use burst size aligned
  with either CPU cache line size or mempool cache size.
v8:
* Rewrote rte_mempool_do_generic_put() to get rid of transaction
  splitting. Use a method similar to the existing put method with fill
  followed by flush if overfilled.
  This also made rte_mempool_do_generic_put_split() obsolete.
* When flushing the cache as much as we can, use burst size aligned with
  either CPU cache line size or mempool cache size.
v7:
* Increased max mempool cache size from 512 to 1024 objects.
  Mainly for CI performance test purposes.
  Originally, the max mempool cache size was 768 objects, and used a fixed
  size array of 1024 objects in the mempool cache structure.
v6:
* Fix v5 incomplete implementation of passing large requests directly to
  the backend.
* Use memcpy instead of rte_memcpy where compiler complains about it.
* Added const to some function parameters.
v5:
* Moved helper functions back into the header file, for improved
  performance.
* Pass large requests directly to the backend. This also simplifies the
  code.
v4:
* Updated subject to reflect that misleading names are considered bugs.
* Rewrote patch description to provide more details about the bugs fixed.
  (Mattias Rönnblom)
* Moved helper functions, not to be inlined, to mempool C file.
  (Mattias Rönnblom)
* Pass requests for n >= RTE_MEMPOOL_CACHE_MAX_SIZE objects known at build
  time directly to backend driver, to avoid calling the helper functions.
  This also fixes the compiler warnings about out of bounds array access.
v3:
* Removed __attribute__(assume).
v2:
* Removed mempool perf test; not part of patch set.
---
 drivers/common/idpf/idpf_common_rxtx_avx512.c |  46 ++---
 drivers/mempool/dpaa/dpaa_mempool.c           |  14 --
 drivers/mempool/dpaa2/dpaa2_hw_mempool.c      |  14 --
 drivers/net/i40e/i40e_rxtx_vec_avx512.c       |  21 +-
 drivers/net/iavf/iavf_rxtx_vec_avx512.c       |  23 +--
 drivers/net/ice/ice_rxtx_vec_avx512.c         |  23 +--
 lib/mempool/mempool_trace.h                   |   1 -
 lib/mempool/rte_mempool.c                     |  17 +-
 lib/mempool/rte_mempool.h                     | 191 +++++++++---------
 9 files changed, 156 insertions(+), 194 deletions(-)
  

Patch

diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c
index 3b5e124ec8..add1cc86c2 100644
--- a/drivers/common/idpf/idpf_common_rxtx_avx512.c
+++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c
@@ -1034,12 +1034,18 @@  idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
 			goto done;
 		}
 
-		/* The cache follows the following algorithm
-		 *   1. Add the objects to the cache
-		 *   2. Anything greater than the cache min value (if it crosses the
-		 *   cache flush threshold) is flushed to the ring.
-		 */
+		/* Insufficient free space in the cache? */
+		if (unlikely(n + cache->len > cache->size)) {
+			rte_mempool_generic_put(mp, (void *)txep, n, cache);
+			goto done;
+		}
+
+		/* Increment stats now, adding in mempool always succeeds. */
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
 		/* Add elements back into the cache */
+		cache->len += n;
 		uint32_t copied = 0;
 		/* n is multiple of 32 */
 		while (copied < n) {
@@ -1054,14 +1060,7 @@  idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
 			_mm512_storeu_si512(&cache_objs[copied + 24], d);
 			copied += 32;
 		}
-		cache->len += n;
 
-		if (cache->len >= cache->flushthresh) {
-			rte_mempool_ops_enqueue_bulk(mp,
-						     &cache->objs[cache->size],
-						     cache->len - cache->size);
-			cache->len = cache->size;
-		}
 		goto done;
 	}
 
@@ -1345,12 +1344,18 @@  idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
 			goto done;
 		}
 
-		/* The cache follows the following algorithm
-		 *   1. Add the objects to the cache
-		 *   2. Anything greater than the cache min value (if it crosses the
-		 *   cache flush threshold) is flushed to the ring.
-		 */
+		/* Insufficient free space in the cache? */
+		if (unlikely(n + cache->len > cache->size)) {
+			rte_mempool_generic_put(mp, (void *)txep, n, cache);
+			goto done;
+		}
+
+		/* Increment stats now, adding in mempool always succeeds. */
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
 		/* Add elements back into the cache */
+		cache->len += n;
 		uint32_t copied = 0;
 		/* n is multiple of 32 */
 		while (copied < n) {
@@ -1365,14 +1370,7 @@  idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
 			_mm512_storeu_si512(&cache_objs[copied + 24], d);
 			copied += 32;
 		}
-		cache->len += n;
 
-		if (cache->len >= cache->flushthresh) {
-			rte_mempool_ops_enqueue_bulk(mp,
-						     &cache->objs[cache->size],
-						     cache->len - cache->size);
-			cache->len = cache->size;
-		}
 		goto done;
 	}
 
diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c
index 74bfcab509..7490862809 100644
--- a/drivers/mempool/dpaa/dpaa_mempool.c
+++ b/drivers/mempool/dpaa/dpaa_mempool.c
@@ -51,8 +51,6 @@  dpaa_mbuf_create_pool(struct rte_mempool *mp)
 	struct bman_pool_params params = {
 		.flags = BMAN_POOL_FLAG_DYNAMIC_BPID
 	};
-	unsigned int lcore_id;
-	struct rte_mempool_cache *cache;
 
 	MEMPOOL_INIT_FUNC_TRACE();
 
@@ -120,18 +118,6 @@  dpaa_mbuf_create_pool(struct rte_mempool *mp)
 	rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],
 		   sizeof(struct dpaa_bp_info));
 	mp->pool_data = (void *)bp_info;
-	/* Update per core mempool cache threshold to optimal value which is
-	 * number of buffers that can be released to HW buffer pool in
-	 * a single API call.
-	 */
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
-		cache = &mp->local_cache[lcore_id];
-		DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
-			lcore_id, cache->flushthresh,
-			(uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));
-		if (cache->flushthresh)
-			cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL;
-	}
 
 	DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);
 	return 0;
diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
index 42e17d984c..a44f3cf616 100644
--- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
+++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
@@ -44,8 +44,6 @@  rte_hw_mbuf_create_pool(struct rte_mempool *mp)
 	struct dpaa2_bp_info *bp_info;
 	struct dpbp_attr dpbp_attr;
 	uint32_t bpid;
-	unsigned int lcore_id;
-	struct rte_mempool_cache *cache;
 	int ret;
 
 	avail_dpbp = dpaa2_alloc_dpbp_dev();
@@ -134,18 +132,6 @@  rte_hw_mbuf_create_pool(struct rte_mempool *mp)
 	DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid);
 
 	h_bp_list = bp_list;
-	/* Update per core mempool cache threshold to optimal value which is
-	 * number of buffers that can be released to HW buffer pool in
-	 * a single API call.
-	 */
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
-		cache = &mp->local_cache[lcore_id];
-		DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
-			lcore_id, cache->flushthresh,
-			(uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));
-		if (cache->flushthresh)
-			cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL;
-	}
 
 	return 0;
 err3:
diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
index 0238b03f8a..2599084dc8 100644
--- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
+++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
@@ -783,19 +783,21 @@  i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
 		struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
 				rte_lcore_id());
 
-		if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
+		/* No cache, too large request, or insufficient free space in the cache? */
+		if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE ||
+				unlikely(n + cache->len > cache->size)) {
 			rte_mempool_generic_put(mp, (void *)txep, n, cache);
 			goto done;
 		}
 
 		cache_objs = &cache->objs[cache->len];
 
-		/* The cache follows the following algorithm
-		 *   1. Add the objects to the cache
-		 *   2. Anything greater than the cache min value (if it
-		 *   crosses the cache flush threshold) is flushed to the ring.
-		 */
+		/* Increment stats now, adding in mempool always succeeds. */
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
 		/* Add elements back into the cache */
+		cache->len += n;
 		uint32_t copied = 0;
 		/* n is multiple of 32 */
 		while (copied < n) {
@@ -810,14 +812,7 @@  i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
 			_mm512_storeu_si512(&cache_objs[copied + 24], d);
 			copied += 32;
 		}
-		cache->len += n;
 
-		if (cache->len >= cache->flushthresh) {
-			rte_mempool_ops_enqueue_bulk
-				(mp, &cache->objs[cache->size],
-				cache->len - cache->size);
-			cache->len = cache->size;
-		}
 		goto done;
 	}
 
diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
index 3bb6f305df..a1dca398ed 100644
--- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
+++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
@@ -1883,12 +1883,18 @@  iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
 			goto done;
 		}
 
-		/* The cache follows the following algorithm
-		 *   1. Add the objects to the cache
-		 *   2. Anything greater than the cache min value (if it crosses the
-		 *   cache flush threshold) is flushed to the ring.
-		 */
+		/* Insufficient free space in the cache? */
+		if (unlikely(n + cache->len > cache->size)) {
+			rte_mempool_generic_put(mp, (void *)txep, n, cache);
+			goto done;
+		}
+
+		/* Increment stats now, adding in mempool always succeeds. */
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
 		/* Add elements back into the cache */
+		cache->len += n;
 		uint32_t copied = 0;
 		/* n is multiple of 32 */
 		while (copied < n) {
@@ -1903,14 +1909,7 @@  iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
 			_mm512_storeu_si512(&cache_objs[copied + 24], d);
 			copied += 32;
 		}
-		cache->len += n;
 
-		if (cache->len >= cache->flushthresh) {
-			rte_mempool_ops_enqueue_bulk(mp,
-						     &cache->objs[cache->size],
-						     cache->len - cache->size);
-			cache->len = cache->size;
-		}
 		goto done;
 	}
 
diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c
index 04148e8ea2..3aea6ff6ec 100644
--- a/drivers/net/ice/ice_rxtx_vec_avx512.c
+++ b/drivers/net/ice/ice_rxtx_vec_avx512.c
@@ -898,12 +898,18 @@  ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
 			goto done;
 		}
 
-		/* The cache follows the following algorithm
-		 *   1. Add the objects to the cache
-		 *   2. Anything greater than the cache min value (if it
-		 *   crosses the cache flush threshold) is flushed to the ring.
-		 */
+		/* Insufficient free space in the cache? */
+		if (unlikely(n + cache->len > cache->size)) {
+			rte_mempool_generic_put(mp, (void *)txep, n, cache);
+			goto done;
+		}
+
+		/* Increment stats now, adding in mempool always succeeds. */
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+		RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
 		/* Add elements back into the cache */
+		cache->len += n;
 		uint32_t copied = 0;
 		/* n is multiple of 32 */
 		while (copied < n) {
@@ -918,14 +924,7 @@  ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
 			_mm512_storeu_si512(&cache_objs[copied + 24], d);
 			copied += 32;
 		}
-		cache->len += n;
 
-		if (cache->len >= cache->flushthresh) {
-			rte_mempool_ops_enqueue_bulk
-				(mp, &cache->objs[cache->size],
-				 cache->len - cache->size);
-			cache->len = cache->size;
-		}
 		goto done;
 	}
 
diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h
index dffef062e4..3c49b41a6d 100644
--- a/lib/mempool/mempool_trace.h
+++ b/lib/mempool/mempool_trace.h
@@ -112,7 +112,6 @@  RTE_TRACE_POINT(
 	rte_trace_point_emit_i32(socket_id);
 	rte_trace_point_emit_ptr(cache);
 	rte_trace_point_emit_u32(cache->len);
-	rte_trace_point_emit_u32(cache->flushthresh);
 )
 
 RTE_TRACE_POINT(
diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
index d8e39e5c20..11dae53b02 100644
--- a/lib/mempool/rte_mempool.c
+++ b/lib/mempool/rte_mempool.c
@@ -50,11 +50,6 @@  static void
 mempool_event_callback_invoke(enum rte_mempool_event event,
 			      struct rte_mempool *mp);
 
-/* Note: avoid using floating point since that compiler
- * may not think that is constant.
- */
-#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)
-
 #if defined(RTE_ARCH_X86)
 /*
  * return the greatest common divisor between a and b (fast algorithm)
@@ -746,13 +741,12 @@  rte_mempool_free(struct rte_mempool *mp)
 static void
 mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
 {
-	/* Check that cache have enough space for flush threshold */
-	RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) >
+	/* Check that cache have enough space for size */
+	RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE >
 			 RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /
 			 RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0]));
 
 	cache->size = size;
-	cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
 	cache->len = 0;
 }
 
@@ -836,7 +830,7 @@  rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
 
 	/* asked cache too big */
 	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
-	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
+	    cache_size > n) {
 		rte_errno = EINVAL;
 		return NULL;
 	}
@@ -1046,8 +1040,9 @@  rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
 
 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
 		cache_count = mp->local_cache[lcore_id].len;
-		fprintf(f, "    cache_count[%u]=%"PRIu32"\n",
-			lcore_id, cache_count);
+		if (cache_count > 0)
+			fprintf(f, "    cache_count[%u]=%"PRIu32"\n",
+				lcore_id, cache_count);
 		count += cache_count;
 	}
 	fprintf(f, "    total_cache_count=%u\n", count);
diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
index 7bdc92b812..1b0a6dcd87 100644
--- a/lib/mempool/rte_mempool.h
+++ b/lib/mempool/rte_mempool.h
@@ -89,10 +89,8 @@  struct __rte_cache_aligned rte_mempool_debug_stats {
  */
 struct __rte_cache_aligned rte_mempool_cache {
 	uint32_t size;	      /**< Size of the cache */
-	uint32_t flushthresh; /**< Threshold before we flush excess elements */
 	uint32_t len;	      /**< Current cache count */
 #ifdef RTE_LIBRTE_MEMPOOL_STATS
-	uint32_t unused;
 	/*
 	 * Alternative location for the most frequently updated mempool statistics (per-lcore),
 	 * providing faster update access when using a mempool cache.
@@ -1030,7 +1028,7 @@  typedef void (rte_mempool_ctor_t)(struct rte_mempool *, void *);
  *   If cache_size is non-zero, the rte_mempool library will try to
  *   limit the accesses to the common lockless pool, by maintaining a
  *   per-lcore object cache. This argument must be lower or equal to
- *   RTE_MEMPOOL_CACHE_MAX_SIZE and n / 1.5. It is advised to choose
+ *   RTE_MEMPOOL_CACHE_MAX_SIZE and n. It is advised to choose
  *   cache_size to have "n modulo cache_size == 0": if this is
  *   not the case, some elements will always stay in the pool and will
  *   never be used. The access to the per-lcore table is of course
@@ -1376,38 +1374,56 @@  rte_mempool_cache_flush(struct rte_mempool_cache *cache,
  */
 static __rte_always_inline void
 rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
-			   unsigned int n, struct rte_mempool_cache *cache)
+			   unsigned int n, struct rte_mempool_cache * const cache)
 {
 	void **cache_objs;
+	uint32_t len;
+
+	/* No cache provided? */
+	if (unlikely(cache == NULL)) {
+		/* Increment stats now, adding in mempool always succeeds. */
+		RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
+		RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
 
-	/* No cache provided */
-	if (unlikely(cache == NULL))
 		goto driver_enqueue;
+	}
 
-	/* increment stat now, adding in mempool always success */
+	/* Increment stats now, adding in mempool always succeeds. */
 	RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
 	RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
 
-	/* The request itself is too big for the cache */
-	if (unlikely(n > cache->flushthresh))
-		goto driver_enqueue_stats_incremented;
-
-	/*
-	 * The cache follows the following algorithm:
-	 *   1. If the objects cannot be added to the cache without crossing
-	 *      the flush threshold, flush the cache to the backend.
-	 *   2. Add the objects to the cache.
-	 */
+	/* The request itself is too big for cache storage? */
+	if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE))
+		goto driver_enqueue;
 
-	if (cache->len + n <= cache->flushthresh) {
+	/* Enough free space in the cache? */
+	if (likely(cache->len + n <= cache->size)) {
 		cache_objs = &cache->objs[cache->len];
 		cache->len += n;
-	} else {
-		cache_objs = &cache->objs[0];
-		rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
-		cache->len = n;
+		goto cache_enqueue;
 	}
 
+	/* The request is too big for the cache flush algorithm? */
+	if (unlikely(n > cache->size / 2))
+		goto driver_enqueue;
+
+	/*
+	 * Flush a (CPU cache line size aligned, if mempool cache size allows)
+	 * bulk of objects to the backend, so ca. 1/2 cache size will remain
+	 * after adding the objects to the cache.
+	 */
+	if (likely(cache->size >= 2 * RTE_CACHE_LINE_SIZE / sizeof(void *)))
+		len = RTE_ALIGN_FLOOR(cache->len + n - cache->size / 2,
+				RTE_CACHE_LINE_SIZE / sizeof(void *));
+	else
+		len = cache->len + n - cache->size / 2;
+	cache->len -= len;
+	cache_objs = &cache->objs[cache->len];
+	cache->len += n;
+	rte_mempool_ops_enqueue_bulk(mp, cache_objs, len);
+
+cache_enqueue:
+
 	/* Add the objects to the cache. */
 	rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
 
@@ -1415,13 +1431,7 @@  rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
 
 driver_enqueue:
 
-	/* increment stat now, adding in mempool always success */
-	RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
-	RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
-
-driver_enqueue_stats_incremented:
-
-	/* push objects to the backend */
+	/* Push the objects to the backend. */
 	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
 }
 
@@ -1440,7 +1450,7 @@  rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
  */
 static __rte_always_inline void
 rte_mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
-			unsigned int n, struct rte_mempool_cache *cache)
+			unsigned int n, struct rte_mempool_cache * const cache)
 {
 	rte_mempool_trace_generic_put(mp, obj_table, n, cache);
 	RTE_MEMPOOL_CHECK_COOKIES(mp, obj_table, n, 0);
@@ -1465,8 +1475,7 @@  static __rte_always_inline void
 rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 		     unsigned int n)
 {
-	struct rte_mempool_cache *cache;
-	cache = rte_mempool_default_cache(mp, rte_lcore_id());
+	struct rte_mempool_cache * const cache = rte_mempool_default_cache(mp, rte_lcore_id());
 	rte_mempool_trace_put_bulk(mp, obj_table, n, cache);
 	rte_mempool_generic_put(mp, obj_table, n, cache);
 }
@@ -1505,31 +1514,50 @@  rte_mempool_put(struct rte_mempool *mp, void *obj)
  */
 static __rte_always_inline int
 rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
-			   unsigned int n, struct rte_mempool_cache *cache)
+			   unsigned int n, struct rte_mempool_cache * const cache)
 {
 	int ret;
 	unsigned int remaining;
 	uint32_t index, len;
 	void **cache_objs;
 
-	/* No cache provided */
+	/* No cache provided? */
 	if (unlikely(cache == NULL)) {
 		remaining = n;
 		goto driver_dequeue;
 	}
 
+	/* The request itself is too big for cache storage? */
+	if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) {
+		remaining = n;
+		goto driver_dequeue;
+	}
+
 	/* The cache is a stack, so copy will be in reverse order. */
-	cache_objs = &cache->objs[cache->len];
+	len = cache->len;
+
+	/* The entire request can be served from the cache? */
+	if (n <= len) {
+		if (__rte_constant(n)) {
+			/*
+			 * The request size 'n' is known at build time,
+			 * so let the compiler unroll the fixed length copy loop.
+			 */
+			cache_objs = &cache->objs[len];
+			cache->len = len - n;
+			for (index = 0; index < n; index++)
+				*obj_table++ = *--cache_objs;
+		} else {
+			remaining = n;
 
-	if (__rte_constant(n) && n <= cache->len) {
-		/*
-		 * The request size is known at build time, and
-		 * the entire request can be satisfied from the cache,
-		 * so let the compiler unroll the fixed length copy loop.
-		 */
-		cache->len -= n;
-		for (index = 0; index < n; index++)
-			*obj_table++ = *--cache_objs;
+cache_dequeue:
+
+			/* Serve the remaining part of the request from the cache. */
+			cache_objs = &cache->objs[len];
+			cache->len = len - remaining;
+			for (index = 0; index < remaining; index++)
+				*obj_table++ = *--cache_objs;
+		}
 
 		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
 		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
@@ -1537,59 +1565,36 @@  rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
 		return 0;
 	}
 
-	/*
-	 * Use the cache as much as we have to return hot objects first.
-	 * If the request size 'n' is known at build time, the above comparison
-	 * ensures that n > cache->len here, so omit RTE_MIN().
-	 */
-	len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);
-	cache->len -= len;
+	/* Serve the first part of the request from the cache to return hot objects first. */
+	cache_objs = &cache->objs[len];
 	remaining = n - len;
 	for (index = 0; index < len; index++)
 		*obj_table++ = *--cache_objs;
 
+	/* At this point, the cache is empty. */
+
 	/*
-	 * If the request size 'n' is known at build time, the case
-	 * where the entire request can be satisfied from the cache
-	 * has already been handled above, so omit handling it here.
+	 * Fill the cache from the backend;
+	 * fetch the remaining part of the request + ca. 1/2 cache size.
+	 * Round down to a CPU cache line size aligned bulk, if mempool cache size allows.
 	 */
-	if (!__rte_constant(n) && remaining == 0) {
-		/* The entire request is satisfied from the cache. */
-
-		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
-		RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
-
-		return 0;
+	if (likely(cache->size >= 2 * RTE_CACHE_LINE_SIZE / sizeof(void *)))
+		len = RTE_ALIGN_FLOOR(remaining + cache->size / 2,
+				RTE_CACHE_LINE_SIZE / sizeof(void *));
+	else
+		len = remaining + cache->size / 2;
+	ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, len);
+	if (likely(ret == 0)) {
+		/* Serve the remaining part of the request from the cache. */
+		goto cache_dequeue;
 	}
 
-	/* if dequeue below would overflow mem allocated for cache */
-	if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))
-		goto driver_dequeue;
-
-	/* Fill the cache from the backend; fetch size + remaining objects. */
-	ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,
-			cache->size + remaining);
-	if (unlikely(ret < 0)) {
-		/*
-		 * We are buffer constrained, and not able to allocate
-		 * cache + remaining.
-		 * Do not fill the cache, just satisfy the remaining part of
-		 * the request directly from the backend.
-		 */
-		goto driver_dequeue;
-	}
-
-	/* Satisfy the remaining part of the request from the filled cache. */
-	cache_objs = &cache->objs[cache->size + remaining];
-	for (index = 0; index < remaining; index++)
-		*obj_table++ = *--cache_objs;
-
-	cache->len = cache->size;
-
-	RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
-	RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
-
-	return 0;
+	/*
+	 * We are buffer constrained, and not able to fetch
+	 * cache + remaining.
+	 * Do not fill the cache, just serve the remaining part of
+	 * the request directly from the backend.
+	 */
 
 driver_dequeue:
 
@@ -1597,7 +1602,8 @@  rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
 	ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
 
 	if (ret < 0) {
-		if (likely(cache != NULL)) {
+		/* The first part of the request was served from the cache? */
+		if (likely(cache != NULL) && remaining != n) {
 			cache->len = n - remaining;
 			/*
 			 * No further action is required to roll the first part
@@ -1643,7 +1649,7 @@  rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
  */
 static __rte_always_inline int
 rte_mempool_generic_get(struct rte_mempool *mp, void **obj_table,
-			unsigned int n, struct rte_mempool_cache *cache)
+			unsigned int n, struct rte_mempool_cache * const cache)
 {
 	int ret;
 	ret = rte_mempool_do_generic_get(mp, obj_table, n, cache);
@@ -1678,8 +1684,7 @@  rte_mempool_generic_get(struct rte_mempool *mp, void **obj_table,
 static __rte_always_inline int
 rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned int n)
 {
-	struct rte_mempool_cache *cache;
-	cache = rte_mempool_default_cache(mp, rte_lcore_id());
+	struct rte_mempool_cache * const cache = rte_mempool_default_cache(mp, rte_lcore_id());
 	rte_mempool_trace_get_bulk(mp, obj_table, n, cache);
 	return rte_mempool_generic_get(mp, obj_table, n, cache);
 }