@@ -54,22 +54,25 @@
*
* - Bulk size (*n_get_bulk*, *n_put_bulk*)
*
- * - Bulk get from 1 to 32
- * - Bulk put from 1 to 32
- * - Bulk get and put from 1 to 32, compile time constant
+ * - Bulk get from 1 to 256, and RTE_MEMPOOL_CACHE_MAX_SIZE
+ * - Bulk put from 1 to 256, and RTE_MEMPOOL_CACHE_MAX_SIZE
+ * - Bulk get and put from 1 to 256, and RTE_MEMPOOL_CACHE_MAX_SIZE, compile time constant
*
* - Number of kept objects (*n_keep*)
*
* - 32
* - 128
* - 512
+ * - 2048
+ * - 8192
+ * - 32768
*/
-#define N 65536
-#define TIME_S 5
+#define TIME_S 1
#define MEMPOOL_ELT_SIZE 2048
-#define MAX_KEEP 512
-#define MEMPOOL_SIZE ((rte_lcore_count()*(MAX_KEEP+RTE_MEMPOOL_CACHE_MAX_SIZE))-1)
+#define MAX_KEEP 32768
+#define N (128 * MAX_KEEP)
+#define MEMPOOL_SIZE ((rte_lcore_count()*(MAX_KEEP+RTE_MEMPOOL_CACHE_MAX_SIZE*2))-1)
/* Number of pointers fitting into one cache line. */
#define CACHE_LINE_BURST (RTE_CACHE_LINE_SIZE / sizeof(uintptr_t))
@@ -100,9 +103,11 @@ static unsigned n_keep;
/* true if we want to test with constant n_get_bulk and n_put_bulk */
static int use_constant_values;
-/* number of enqueues / dequeues */
+/* number of enqueues / dequeues, and time used */
struct __rte_cache_aligned mempool_test_stats {
uint64_t enq_count;
+ uint64_t duration_cycles;
+ RTE_CACHE_GUARD;
};
static struct mempool_test_stats stats[RTE_MAX_LCORE];
@@ -185,6 +190,7 @@ per_lcore_mempool_test(void *arg)
GOTO_ERR(ret, out);
stats[lcore_id].enq_count = 0;
+ stats[lcore_id].duration_cycles = 0;
/* wait synchro for workers */
if (lcore_id != rte_get_main_lcore())
@@ -205,6 +211,15 @@ per_lcore_mempool_test(void *arg)
CACHE_LINE_BURST, CACHE_LINE_BURST);
else if (n_get_bulk == 32)
ret = test_loop(mp, cache, n_keep, 32, 32);
+ else if (n_get_bulk == 64)
+ ret = test_loop(mp, cache, n_keep, 64, 64);
+ else if (n_get_bulk == 128)
+ ret = test_loop(mp, cache, n_keep, 128, 128);
+ else if (n_get_bulk == 256)
+ ret = test_loop(mp, cache, n_keep, 256, 256);
+ else if (n_get_bulk == RTE_MEMPOOL_CACHE_MAX_SIZE)
+ ret = test_loop(mp, cache, n_keep,
+ RTE_MEMPOOL_CACHE_MAX_SIZE, RTE_MEMPOOL_CACHE_MAX_SIZE);
else
ret = -1;
@@ -216,6 +231,8 @@ per_lcore_mempool_test(void *arg)
stats[lcore_id].enq_count += N;
}
+ stats[lcore_id].duration_cycles = time_diff;
+
out:
if (use_external_cache) {
rte_mempool_cache_flush(cache, mp);
@@ -233,6 +250,7 @@ launch_cores(struct rte_mempool *mp, unsigned int cores)
uint64_t rate;
int ret;
unsigned cores_save = cores;
+ double hz = rte_get_timer_hz();
rte_atomic_store_explicit(&synchro, 0, rte_memory_order_relaxed);
@@ -279,7 +297,9 @@ launch_cores(struct rte_mempool *mp, unsigned int cores)
rate = 0;
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
- rate += (stats[lcore_id].enq_count / TIME_S);
+ if (stats[lcore_id].duration_cycles != 0)
+ rate += (double)stats[lcore_id].enq_count * hz /
+ (double)stats[lcore_id].duration_cycles;
printf("rate_persec=%" PRIu64 "\n", rate);
@@ -288,11 +308,13 @@ launch_cores(struct rte_mempool *mp, unsigned int cores)
/* for a given number of core, launch all test cases */
static int
-do_one_mempool_test(struct rte_mempool *mp, unsigned int cores)
+do_one_mempool_test(struct rte_mempool *mp, unsigned int cores, int external_cache)
{
- unsigned int bulk_tab_get[] = { 1, 4, CACHE_LINE_BURST, 32, 0 };
- unsigned int bulk_tab_put[] = { 1, 4, CACHE_LINE_BURST, 32, 0 };
- unsigned int keep_tab[] = { 32, 128, 512, 0 };
+ unsigned int bulk_tab_get[] = { 1, 4, CACHE_LINE_BURST, 32, 64, 128, 256,
+ RTE_MEMPOOL_CACHE_MAX_SIZE, 0 };
+ unsigned int bulk_tab_put[] = { 1, 4, CACHE_LINE_BURST, 32, 64, 128, 256,
+ RTE_MEMPOOL_CACHE_MAX_SIZE, 0 };
+ unsigned int keep_tab[] = { 32, 128, 512, 2048, 8192, 32768, 0 };
unsigned *get_bulk_ptr;
unsigned *put_bulk_ptr;
unsigned *keep_ptr;
@@ -302,6 +324,10 @@ do_one_mempool_test(struct rte_mempool *mp, unsigned int cores)
for (put_bulk_ptr = bulk_tab_put; *put_bulk_ptr; put_bulk_ptr++) {
for (keep_ptr = keep_tab; *keep_ptr; keep_ptr++) {
+ if (*keep_ptr < *get_bulk_ptr || *keep_ptr < *put_bulk_ptr)
+ continue;
+
+ use_external_cache = external_cache;
use_constant_values = 0;
n_get_bulk = *get_bulk_ptr;
n_put_bulk = *put_bulk_ptr;
@@ -324,7 +350,7 @@ do_one_mempool_test(struct rte_mempool *mp, unsigned int cores)
}
static int
-test_mempool_perf(void)
+do_all_mempool_perf_tests(unsigned int cores)
{
struct rte_mempool *mp_cache = NULL;
struct rte_mempool *mp_nocache = NULL;
@@ -338,8 +364,10 @@ test_mempool_perf(void)
NULL, NULL,
my_obj_init, NULL,
SOCKET_ID_ANY, 0);
- if (mp_nocache == NULL)
+ if (mp_nocache == NULL) {
+ printf("cannot allocate mempool (without cache)\n");
goto err;
+ }
/* create a mempool (with cache) */
mp_cache = rte_mempool_create("perf_test_cache", MEMPOOL_SIZE,
@@ -348,8 +376,10 @@ test_mempool_perf(void)
NULL, NULL,
my_obj_init, NULL,
SOCKET_ID_ANY, 0);
- if (mp_cache == NULL)
+ if (mp_cache == NULL) {
+ printf("cannot allocate mempool (with cache)\n");
goto err;
+ }
default_pool_ops = rte_mbuf_best_mempool_ops();
/* Create a mempool based on Default handler */
@@ -377,65 +407,83 @@ test_mempool_perf(void)
rte_mempool_obj_iter(default_pool, my_obj_init, NULL);
- /* performance test with 1, 2 and max cores */
printf("start performance test (without cache)\n");
-
- if (do_one_mempool_test(mp_nocache, 1) < 0)
- goto err;
-
- if (do_one_mempool_test(mp_nocache, 2) < 0)
+ if (do_one_mempool_test(mp_nocache, cores, 0) < 0)
goto err;
- if (do_one_mempool_test(mp_nocache, rte_lcore_count()) < 0)
- goto err;
-
- /* performance test with 1, 2 and max cores */
printf("start performance test for %s (without cache)\n",
default_pool_ops);
-
- if (do_one_mempool_test(default_pool, 1) < 0)
+ if (do_one_mempool_test(default_pool, cores, 0) < 0)
goto err;
- if (do_one_mempool_test(default_pool, 2) < 0)
+ printf("start performance test (with cache)\n");
+ if (do_one_mempool_test(mp_cache, cores, 0) < 0)
goto err;
- if (do_one_mempool_test(default_pool, rte_lcore_count()) < 0)
+ printf("start performance test (with user-owned cache)\n");
+ if (do_one_mempool_test(mp_nocache, cores, 1) < 0)
goto err;
- /* performance test with 1, 2 and max cores */
- printf("start performance test (with cache)\n");
+ rte_mempool_list_dump(stdout);
- if (do_one_mempool_test(mp_cache, 1) < 0)
- goto err;
+ ret = 0;
- if (do_one_mempool_test(mp_cache, 2) < 0)
- goto err;
+err:
+ rte_mempool_free(mp_cache);
+ rte_mempool_free(mp_nocache);
+ rte_mempool_free(default_pool);
+ return ret;
+}
- if (do_one_mempool_test(mp_cache, rte_lcore_count()) < 0)
- goto err;
+static int
+test_mempool_perf_1core(void)
+{
+ return do_all_mempool_perf_tests(1);
+}
- /* performance test with 1, 2 and max cores */
- printf("start performance test (with user-owned cache)\n");
- use_external_cache = 1;
+static int
+test_mempool_perf_2cores(void)
+{
+ if (rte_lcore_count() < 2) {
+ printf("not enough lcores\n");
+ return -1;
+ }
+ return do_all_mempool_perf_tests(2);
+}
- if (do_one_mempool_test(mp_nocache, 1) < 0)
- goto err;
+static int
+test_mempool_perf_allcores(void)
+{
+ return do_all_mempool_perf_tests(rte_lcore_count());
+}
+
+static int
+test_mempool_perf(void)
+{
+ int ret = -1;
- if (do_one_mempool_test(mp_nocache, 2) < 0)
+ /* performance test with 1, 2 and max cores */
+ if (do_all_mempool_perf_tests(1) < 0)
goto err;
+ if (rte_lcore_count() == 1)
+ goto done;
- if (do_one_mempool_test(mp_nocache, rte_lcore_count()) < 0)
+ if (do_all_mempool_perf_tests(2) < 0)
goto err;
+ if (rte_lcore_count() == 2)
+ goto done;
- rte_mempool_list_dump(stdout);
+ if (do_all_mempool_perf_tests(rte_lcore_count()) < 0)
+ goto err;
+done:
ret = 0;
err:
- rte_mempool_free(mp_cache);
- rte_mempool_free(mp_nocache);
- rte_mempool_free(default_pool);
return ret;
}
REGISTER_PERF_TEST(mempool_perf_autotest, test_mempool_perf);
+REGISTER_PERF_TEST(mempool_perf_autotest_1core, test_mempool_perf_1core);
+REGISTER_PERF_TEST(mempool_perf_autotest_2cores, test_mempool_perf_2cores);
+REGISTER_PERF_TEST(mempool_perf_autotest_allcores, test_mempool_perf_allcores);
@@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
rte_lcore_id());
void **cache_objs;
- if (cache == NULL || cache->len == 0)
- goto normal;
-
- cache_objs = &cache->objs[cache->len];
-
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
+ if (!cache || unlikely(n + cache->len > cache->size)) {
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);
goto done;
}
- /* The cache follows the following algorithm
- * 1. Add the objects to the cache
- * 2. Anything greater than the cache min value (if it crosses the
- * cache flush threshold) is flushed to the ring.
- */
+ cache_objs = &cache->objs[cache->len];
+
/* Add elements back into the cache */
uint32_t copied = 0;
/* n is multiple of 32 */
@@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)
}
cache->len += n;
- if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk(mp,
- &cache->objs[cache->size],
- cache->len - cache->size);
- cache->len = cache->size;
- }
+ /* Increment stat. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
goto done;
}
-normal:
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
if (likely(m != NULL)) {
free[0] = m;
@@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
rte_lcore_id());
void **cache_objs;
- if (!cache || cache->len == 0)
- goto normal;
-
- cache_objs = &cache->objs[cache->len];
-
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
+ if (!cache || unlikely(n + cache->len > cache->size)) {
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);
goto done;
}
- /* The cache follows the following algorithm
- * 1. Add the objects to the cache
- * 2. Anything greater than the cache min value (if it crosses the
- * cache flush threshold) is flushed to the ring.
- */
+ cache_objs = &cache->objs[cache->len];
+
/* Add elements back into the cache */
uint32_t copied = 0;
/* n is multiple of 32 */
@@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)
}
cache->len += n;
- if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk(mp,
- &cache->objs[cache->size],
- cache->len - cache->size);
- cache->len = cache->size;
- }
+ /* Increment stat. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
goto done;
}
-normal:
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
if (likely(m)) {
free[0] = m;
@@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
struct bman_pool_params params = {
.flags = BMAN_POOL_FLAG_DYNAMIC_BPID
};
- unsigned int lcore_id;
- struct rte_mempool_cache *cache;
MEMPOOL_INIT_FUNC_TRACE();
@@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],
sizeof(struct dpaa_bp_info));
mp->pool_data = (void *)bp_info;
- /* Update per core mempool cache threshold to optimal value which is
- * number of buffers that can be released to HW buffer pool in
- * a single API call.
- */
- for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
- cache = &mp->local_cache[lcore_id];
- DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
- lcore_id, cache->flushthresh,
- (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));
- if (cache->flushthresh)
- cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL;
- }
DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);
return 0;
@@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,
DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",
count, bp_info->bpid);
- if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {
+ if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {
DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers",
count);
return -1;
@@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
struct dpaa2_bp_info *bp_info;
struct dpbp_attr dpbp_attr;
uint32_t bpid;
- unsigned int lcore_id;
- struct rte_mempool_cache *cache;
int ret;
avail_dpbp = dpaa2_alloc_dpbp_dev();
@@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid);
h_bp_list = bp_list;
- /* Update per core mempool cache threshold to optimal value which is
- * number of buffers that can be released to HW buffer pool in
- * a single API call.
- */
- for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
- cache = &mp->local_cache[lcore_id];
- DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
- lcore_id, cache->flushthresh,
- (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));
- if (cache->flushthresh)
- cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL;
- }
return 0;
err3:
@@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
rte_lcore_id());
- if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
+ if (!cache || unlikely(n + cache->len > cache->size)) {
rte_mempool_generic_put(mp, (void *)txep, n, cache);
goto done;
}
cache_objs = &cache->objs[cache->len];
- /* The cache follows the following algorithm
- * 1. Add the objects to the cache
- * 2. Anything greater than the cache min value (if it
- * crosses the cache flush threshold) is flushed to the ring.
- */
/* Add elements back into the cache */
uint32_t copied = 0;
/* n is multiple of 32 */
@@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
}
cache->len += n;
- if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk
- (mp, &cache->objs[cache->size],
- cache->len - cache->size);
- cache->len = cache->size;
- }
+ /* Increment stat. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
goto done;
}
@@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
rte_lcore_id());
void **cache_objs;
- if (!cache || cache->len == 0)
- goto normal;
-
- cache_objs = &cache->objs[cache->len];
-
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
+ if (!cache || unlikely(n + cache->len > cache->size)) {
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);
goto done;
}
- /* The cache follows the following algorithm
- * 1. Add the objects to the cache
- * 2. Anything greater than the cache min value (if it crosses the
- * cache flush threshold) is flushed to the ring.
- */
+ cache_objs = &cache->objs[cache->len];
+
/* Add elements back into the cache */
uint32_t copied = 0;
/* n is multiple of 32 */
@@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
}
cache->len += n;
- if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk(mp,
- &cache->objs[cache->size],
- cache->len - cache->size);
- cache->len = cache->size;
- }
+ /* Increment stat. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
goto done;
}
-normal:
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
if (likely(m)) {
free[0] = m;
@@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
rte_lcore_id());
- if (!cache || cache->len == 0)
- goto normal;
-
- cache_objs = &cache->objs[cache->len];
-
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
+ if (!cache || unlikely(n + cache->len > cache->size)) {
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);
goto done;
}
- /* The cache follows the following algorithm
- * 1. Add the objects to the cache
- * 2. Anything greater than the cache min value (if it
- * crosses the cache flush threshold) is flushed to the ring.
- */
+ cache_objs = &cache->objs[cache->len];
+
/* Add elements back into the cache */
uint32_t copied = 0;
/* n is multiple of 32 */
@@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
}
cache->len += n;
- if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk
- (mp, &cache->objs[cache->size],
- cache->len - cache->size);
- cache->len = cache->size;
- }
+ /* Increment stat. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
goto done;
}
-normal:
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
if (likely(m)) {
free[0] = m;
@@ -112,7 +112,6 @@ RTE_TRACE_POINT(
rte_trace_point_emit_i32(socket_id);
rte_trace_point_emit_ptr(cache);
rte_trace_point_emit_u32(cache->len);
- rte_trace_point_emit_u32(cache->flushthresh);
)
RTE_TRACE_POINT(
@@ -50,11 +50,6 @@ static void
mempool_event_callback_invoke(enum rte_mempool_event event,
struct rte_mempool *mp);
-/* Note: avoid using floating point since that compiler
- * may not think that is constant.
- */
-#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)
-
#if defined(RTE_ARCH_X86)
/*
* return the greatest common divisor between a and b (fast algorithm)
@@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)
static void
mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
{
- /* Check that cache have enough space for flush threshold */
- RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) >
+ /* Check that cache have enough space for size */
+ RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE >
RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /
RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0]));
cache->size = size;
- cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
cache->len = 0;
}
@@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
/* asked cache too big */
if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
- CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
+ cache_size > n) {
rte_errno = EINVAL;
return NULL;
}
@@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {
*/
struct __rte_cache_aligned rte_mempool_cache {
uint32_t size; /**< Size of the cache */
- uint32_t flushthresh; /**< Threshold before we flush excess elements */
uint32_t len; /**< Current cache count */
#ifdef RTE_LIBRTE_MEMPOOL_STATS
- uint32_t unused;
/*
* Alternative location for the most frequently updated mempool statistics (per-lcore),
* providing faster update access when using a mempool cache.
@@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {
* Cache is allocated to this size to allow it to overflow in certain
* cases to avoid needless emptying of cache.
*/
- alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2];
+ alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];
};
/**
@@ -1363,7 +1361,8 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
}
/**
- * @internal Put several objects back in the mempool; used internally.
+ * @internal Put several objects back in the mempool; used internally when
+ * the number of objects exceeds the remaining space in the mempool cache.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
@@ -1371,58 +1370,94 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
* @param n
* The number of objects to store back in the mempool, must be strictly
* positive.
+ * Must be more than the remaining space in the mempool cache, i.e.:
+ * cache->len + n > cache->size
* @param cache
- * A pointer to a mempool cache structure. May be NULL if not needed.
+ * A pointer to a mempool cache structure. Not NULL.
*/
-static __rte_always_inline void
-rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
- unsigned int n, struct rte_mempool_cache *cache)
+static __rte_noinline void
+rte_mempool_do_generic_put_many(struct rte_mempool *mp, void * const *obj_table,
+ unsigned int n, struct rte_mempool_cache *cache)
{
- void **cache_objs;
+ __attribute__((assume(cache != NULL)));
+ __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+ __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+ __attribute__((assume(cache->len + n > cache->size)));
- /* No cache provided */
- if (unlikely(cache == NULL))
- goto driver_enqueue;
+ void **cache_objs;
+ unsigned int len;
+ const uint32_t cache_size = cache->size;
- /* increment stat now, adding in mempool always success */
+ /* Increment stat now, adding in mempool always succeeds. */
RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
- /* The request itself is too big for the cache */
- if (unlikely(n > cache->flushthresh))
- goto driver_enqueue_stats_incremented;
-
- /*
- * The cache follows the following algorithm:
- * 1. If the objects cannot be added to the cache without crossing
- * the flush threshold, flush the cache to the backend.
- * 2. Add the objects to the cache.
- */
-
- if (cache->len + n <= cache->flushthresh) {
- cache_objs = &cache->objs[cache->len];
- cache->len += n;
- } else {
- cache_objs = &cache->objs[0];
- rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
- cache->len = n;
+ /* Fill the cache with the first objects. */
+ cache_objs = &cache->objs[cache->len];
+ len = (cache_size - cache->len);
+ rte_memcpy(cache_objs, obj_table, sizeof(void *) * len);
+ obj_table += len;
+ n -= len;
+
+ /* Flush the entire cache to the backend. */
+ cache_objs = &cache->objs[0];
+ rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);
+
+ if (unlikely(n > cache_size)) {
+ /* Push following objects, in entire cache sizes, directly to the backend. */
+ len = n - n % cache_size;
+ rte_mempool_ops_enqueue_bulk(mp, obj_table, len);
+ obj_table += len;
+ n -= len;
}
- /* Add the objects to the cache. */
+ /* Add the remaining objects to the cache. */
+ cache->len = n;
rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
+}
- return;
-
-driver_enqueue:
-
- /* increment stat now, adding in mempool always success */
- RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
- RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
-
-driver_enqueue_stats_incremented:
+/**
+ * @internal Put several objects back in the mempool; used internally.
+ * @param mp
+ * A pointer to the mempool structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to store back in the mempool, must be strictly
+ * positive.
+ * @param cache
+ * A pointer to a mempool cache structure. May be NULL if not needed.
+ */
+static __rte_always_inline void
+rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
+ unsigned int n, struct rte_mempool_cache *cache)
+{
+ if (likely(cache != NULL)) {
+ __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+ __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+
+ /* Enough remaining space in the cache? */
+ if (likely(cache->len + n <= cache->size)) {
+ void **cache_objs;
+
+ /* Increment stat now, adding in mempool always succeeds. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
+ /* Add the objects to the cache. */
+ cache_objs = &cache->objs[cache->len];
+ cache->len += n;
+ rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
+ } else
+ rte_mempool_do_generic_put_many(mp, obj_table, n, cache);
+ } else {
+ /* Increment stat now, adding in mempool always succeeds. */
+ RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
+ RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
- /* push objects to the backend */
- rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
+ /* push objects to the backend */
+ rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
+ }
}
@@ -1490,135 +1525,193 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
}
/**
- * @internal Get several objects from the mempool; used internally.
+ * @internal Get several objects from the mempool; used internally when
+ * the number of objects exceeds what is available in the mempool cache.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to get, must be strictly positive.
+ * Must be more than available in the mempool cache, i.e.:
+ * n > cache->len
* @param cache
- * A pointer to a mempool cache structure. May be NULL if not needed.
+ * A pointer to a mempool cache structure. Not NULL.
* @return
* - 0: Success.
* - <0: Error; code of driver dequeue function.
*/
-static __rte_always_inline int
-rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
- unsigned int n, struct rte_mempool_cache *cache)
+static __rte_noinline int
+rte_mempool_do_generic_get_many(struct rte_mempool *mp, void **obj_table,
+ unsigned int n, struct rte_mempool_cache *cache)
{
+ __attribute__((assume(cache != NULL)));
+ __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+ __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+ __attribute__((assume(n > cache->len)));
+
int ret;
unsigned int remaining;
uint32_t index, len;
void **cache_objs;
+ const uint32_t cache_size = cache->size;
- /* No cache provided */
- if (unlikely(cache == NULL)) {
- remaining = n;
- goto driver_dequeue;
- }
-
- /* The cache is a stack, so copy will be in reverse order. */
+ /* Serve the first part of the request from the cache to return hot objects first. */
cache_objs = &cache->objs[cache->len];
+ len = cache->len;
+ remaining = n - len;
+ for (index = 0; index < len; index++)
+ *obj_table++ = *--cache_objs;
- if (__rte_constant(n) && n <= cache->len) {
+ /* At this point, the cache is empty. */
+
+ /* More than can be served from a full cache? */
+ if (unlikely(remaining >= cache_size)) {
/*
- * The request size is known at build time, and
- * the entire request can be satisfied from the cache,
- * so let the compiler unroll the fixed length copy loop.
+ * Serve the following part of the request directly from the backend
+ * in multipla of the cache size.
*/
- cache->len -= n;
- for (index = 0; index < n; index++)
- *obj_table++ = *--cache_objs;
+ len = remaining - remaining % cache_size;
+ ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);
+ if (unlikely(ret < 0)) {
+ /*
+ * No further action is required to roll back the request,
+ * as objects in the cache are intact, and no objects have
+ * been dequeued from the backend.
+ */
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
- return 0;
- }
+ return ret;
+ }
- /*
- * Use the cache as much as we have to return hot objects first.
- * If the request size 'n' is known at build time, the above comparison
- * ensures that n > cache->len here, so omit RTE_MIN().
- */
- len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);
- cache->len -= len;
- remaining = n - len;
- for (index = 0; index < len; index++)
- *obj_table++ = *--cache_objs;
+ remaining -= len;
+ obj_table += len;
- /*
- * If the request size 'n' is known at build time, the case
- * where the entire request can be satisfied from the cache
- * has already been handled above, so omit handling it here.
- */
- if (!__rte_constant(n) && remaining == 0) {
- /* The entire request is satisfied from the cache. */
+ if (unlikely(remaining == 0)) {
+ cache->len = 0;
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
- return 0;
+ return 0;
+ }
}
- /* if dequeue below would overflow mem allocated for cache */
- if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))
- goto driver_dequeue;
-
- /* Fill the cache from the backend; fetch size + remaining objects. */
- ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,
- cache->size + remaining);
+ /* Fill the entire cache from the backend. */
+ ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);
if (unlikely(ret < 0)) {
/*
- * We are buffer constrained, and not able to allocate
- * cache + remaining.
- * Do not fill the cache, just satisfy the remaining part of
- * the request directly from the backend.
+ * Unable to fill the cache.
+ * Last resort: Try only the remaining part of the request,
+ * served directly from the backend.
*/
- goto driver_dequeue;
- }
+ ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
+ if (unlikely(ret == 0)) {
+ cache->len = 0;
- /* Satisfy the remaining part of the request from the filled cache. */
- cache_objs = &cache->objs[cache->size + remaining];
- for (index = 0; index < remaining; index++)
- *obj_table++ = *--cache_objs;
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
- cache->len = cache->size;
+ return 0;
+ }
+ /* Roll back. */
+ if (cache->len + remaining == n) {
+ /*
+ * No further action is required to roll back the request,
+ * as objects in the cache are intact, and no objects have
+ * been dequeued from the backend.
+ */
+ } else {
+ /* Update the state of the cache before putting back the objects. */
+ cache->len = 0;
+
+ len = n - remaining;
+ obj_table -= len;
+ rte_mempool_do_generic_put(mp, obj_table, len, cache);
+ }
+
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
+
+ return ret;
+ }
+
+ /* Increment stat now, this always succeeds. */
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
+ /* Serve the remaining part of the request from the filled cache. */
+ cache_objs = &cache->objs[cache_size];
+ for (index = 0; index < remaining; index++)
+ *obj_table++ = *--cache_objs;
+
+ cache->len = cache_size - remaining;
+
return 0;
+}
-driver_dequeue:
+/**
+ * @internal Get several objects from the mempool; used internally.
+ * @param mp
+ * A pointer to the mempool structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to get, must be strictly positive.
+ * @param cache
+ * A pointer to a mempool cache structure. May be NULL if not needed.
+ * @return
+ * - 0: Success.
+ * - <0: Error; code of driver dequeue function.
+ */
+static __rte_always_inline int
+rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
+ unsigned int n, struct rte_mempool_cache *cache)
+{
+ if (likely(cache != NULL)) {
+ __attribute__((assume(cache->size <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+ __attribute__((assume(cache->len <= RTE_MEMPOOL_CACHE_MAX_SIZE)));
+
+ /* Enough objects in the cache? */
+ if (n <= cache->len) {
+ unsigned int index;
+ void **cache_objs;
- /* Get remaining objects directly from the backend. */
- ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
+ /* Increment stat now, this always succeeds. */
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
- if (ret < 0) {
- if (likely(cache != NULL)) {
- cache->len = n - remaining;
/*
- * No further action is required to roll the first part
- * of the request back into the cache, as objects in
- * the cache are intact.
+ * The cache is a stack, so copy will be in reverse order.
+ * If the request size is known at build time,
+ * the compiler will unroll the fixed length copy loop.
*/
- }
-
- RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
- RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
+ cache_objs = &cache->objs[cache->len];
+ cache->len -= n;
+ for (index = 0; index < n; index++)
+ *obj_table++ = *--cache_objs;
+
+ return 0;
+ } else
+ return rte_mempool_do_generic_get_many(mp, obj_table, n, cache);
} else {
- if (likely(cache != NULL)) {
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
+ int ret;
+
+ /* Get the objects directly from the backend. */
+ ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
+ if (unlikely(ret < 0)) {
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
} else {
RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);
}
- }
- return ret;
+ return ret;
+ }
}
/**