[RFC,v2] mempool: add API to return pointer to free space on per-core cache
Checks
Commit Message
Expose the pointer to free space in per core cache in PMD, so that the
objects can be directly copied to cache without any temporary storage
Signed-off-by: Kamalakshitha Aligeri <kamalakshitha.aligeri@arm.com>
---
v2: Integration of API in vector PMD
v1: API to return pointer to free space on per-core cache and
integration of API in scalar PMD
app/test/test_mempool.c | 140 ++++++++++++++++++++++++
drivers/net/i40e/i40e_rxtx_vec_avx512.c | 46 +++-----
drivers/net/i40e/i40e_rxtx_vec_common.h | 22 +++-
lib/mempool/rte_mempool.h | 46 ++++++++
4 files changed, 219 insertions(+), 35 deletions(-)
Comments
> From: Kamalakshitha Aligeri [mailto:kamalakshitha.aligeri@arm.com]
> Sent: Wednesday, 16 November 2022 18.25
>
> Expose the pointer to free space in per core cache in PMD, so that the
> objects can be directly copied to cache without any temporary storage
>
> Signed-off-by: Kamalakshitha Aligeri <kamalakshitha.aligeri@arm.com>
> ---
Please build your patch in continuation of my patch [1], and use rte_mempool_cache_zc_put_bulk() instead of rte_mempool_get_cache().
[1]: https://inbox.dpdk.org/dev/20221116180419.98937-1-mb@smartsharesystems.com/
Some initial comments follow inline below.
> v2: Integration of API in vector PMD
> v1: API to return pointer to free space on per-core cache and
> integration of API in scalar PMD
>
> app/test/test_mempool.c | 140 ++++++++++++++++++++++++
> drivers/net/i40e/i40e_rxtx_vec_avx512.c | 46 +++-----
> drivers/net/i40e/i40e_rxtx_vec_common.h | 22 +++-
> lib/mempool/rte_mempool.h | 46 ++++++++
> 4 files changed, 219 insertions(+), 35 deletions(-)
>
> diff --git a/app/test/test_mempool.c b/app/test/test_mempool.c
> index 8e493eda47..a0160336dd 100644
> --- a/app/test/test_mempool.c
> +++ b/app/test/test_mempool.c
> @@ -187,6 +187,142 @@ test_mempool_basic(struct rte_mempool *mp, int
> use_external_cache)
> return ret;
> }
>
> +/* basic tests (done on one core) */
> +static int
> +test_mempool_get_cache(struct rte_mempool *mp, int use_external_cache)
> +{
> + uint32_t *objnum;
> + void **objtable;
> + void *obj, *obj2;
> + char *obj_data;
> + int ret = 0;
> + unsigned int i, j;
> + int offset;
> + struct rte_mempool_cache *cache;
> + void **cache_objs;
> +
> + if (use_external_cache) {
> + /* Create a user-owned mempool cache. */
> + cache =
> rte_mempool_cache_create(RTE_MEMPOOL_CACHE_MAX_SIZE,
> + SOCKET_ID_ANY);
> + if (cache == NULL)
> + RET_ERR();
> + } else {
> + /* May be NULL if cache is disabled. */
> + cache = rte_mempool_default_cache(mp, rte_lcore_id());
> + }
> +
> + /* dump the mempool status */
> + rte_mempool_dump(stdout, mp);
> +
> + printf("get an object\n");
> + if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
> + GOTO_ERR(ret, out);
> + rte_mempool_dump(stdout, mp);
> +
> + /* tests that improve coverage */
> + printf("get object count\n");
> + /* We have to count the extra caches, one in this case. */
> + offset = use_external_cache ? 1 * cache->len : 0;
> + if (rte_mempool_avail_count(mp) + offset != MEMPOOL_SIZE - 1)
> + GOTO_ERR(ret, out);
> +
> + printf("get private data\n");
> + if (rte_mempool_get_priv(mp) != (char *)mp +
> + RTE_MEMPOOL_HEADER_SIZE(mp, mp->cache_size))
> + GOTO_ERR(ret, out);
> +
> +#ifndef RTE_EXEC_ENV_FREEBSD /* rte_mem_virt2iova() not supported on
> bsd */
> + printf("get physical address of an object\n");
> + if (rte_mempool_virt2iova(obj) != rte_mem_virt2iova(obj))
> + GOTO_ERR(ret, out);
> +#endif
> +
> +
> + printf("put the object back\n");
> + cache_objs = rte_mempool_get_cache(mp, 1);
Use rte_mempool_cache_zc_put_bulk() instead.
> + if (cache_objs != NULL)
> + rte_memcpy(cache_objs, &obj, sizeof(void *));
> + else
> + rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
rte_mempool_ops_enqueue_bulk() is an mempool internal function, and it lacks proper instrumentation. Use this instead:
rte_mempool_generic_put(mp, &obj, 1, NULL);
> +
> + rte_mempool_dump(stdout, mp);
> +
> + printf("get 2 objects\n");
> + if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
> + GOTO_ERR(ret, out);
> + if (rte_mempool_generic_get(mp, &obj2, 1, cache) < 0) {
> + rte_mempool_generic_put(mp, &obj, 1, cache);
> + GOTO_ERR(ret, out);
> + }
> + rte_mempool_dump(stdout, mp);
> +
> + printf("put the objects back\n");
> + cache_objs = rte_mempool_get_cache(mp, 1);
Use rte_mempool_cache_zc_put_bulk() instead.
> + if (cache_objs != NULL)
> + rte_memcpy(mp, &obj, sizeof(void *));
> + else
> + rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
Use rte_mempool_generic_put() instead.
> +
> + cache_objs = rte_mempool_get_cache(mp, 1);
Use rte_mempool_cache_zc_put_bulk() instead.
> + if (cache_objs != NULL)
> + rte_memcpy(mp, &obj2, sizeof(void *));
> + else
> + rte_mempool_ops_enqueue_bulk(mp, &obj2, 1);
Use rte_mempool_generic_put() instead.
> + rte_mempool_dump(stdout, mp);
> +
> + /*
> + * get many objects: we cannot get them all because the cache
> + * on other cores may not be empty.
> + */
> + objtable = malloc(MEMPOOL_SIZE * sizeof(void *));
> + if (objtable == NULL)
> + GOTO_ERR(ret, out);
> +
> + for (i = 0; i < MEMPOOL_SIZE; i++) {
> + if (rte_mempool_generic_get(mp, &objtable[i], 1, cache) <
> 0)
> + break;
> + }
> +
> + /*
> + * for each object, check that its content was not modified,
> + * and put objects back in pool
> + */
> + cache_objs = rte_mempool_get_cache(mp, MEMPOOL_SIZE);
Use rte_mempool_cache_zc_put_bulk() instead.
Also, this will always fail (return NULL) if MEMPOOL_SIZE is larger than the cache size.
> + if (cache_objs != NULL) {
> + while (i--) {
> + obj = objtable[i];
> + obj_data = obj;
> + objnum = obj;
> + if (*objnum > MEMPOOL_SIZE) {
> + printf("bad object number(%d)\n", *objnum);
> + ret = -1;
> + break;
> + }
> + for (j = sizeof(*objnum); j < mp->elt_size; j++) {
> + if (obj_data[j] != 0)
> + ret = -1;
> + }
> +
> + rte_memcpy(&cache_objs[i], &objtable[i], sizeof(void
> *));
> + }
> + } else {
> + rte_mempool_ops_enqueue_bulk(mp, objtable, MEMPOOL_SIZE);
Use rte_mempool_generic_put() instead.
> + }
> +
> + free(objtable);
> + if (ret == -1)
> + printf("objects were modified!\n");
> +
> +out:
> + if (use_external_cache) {
> + rte_mempool_cache_flush(cache, mp);
> + rte_mempool_cache_free(cache);
> + }
> +
> + return ret;
> +}
> +
> static int test_mempool_creation_with_exceeded_cache_size(void)
> {
> struct rte_mempool *mp_cov;
> @@ -986,6 +1122,10 @@ test_mempool(void)
> if (test_mempool_basic(mp_cache, 0) < 0)
> GOTO_ERR(ret, err);
>
> + /* basic tests with get cache */
> + if (test_mempool_get_cache(mp_cache, 0) < 0)
> + GOTO_ERR(ret, err);
> +
> /* basic tests with user-owned cache */
> if (test_mempool_basic(mp_nocache, 1) < 0)
> GOTO_ERR(ret, err);
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> index 60c97d5331..bfdb4f21f9 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> @@ -902,14 +902,7 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue
> *txq)
>
> if (txq->offloads & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE && (n & 31)
> == 0) {
> struct rte_mempool *mp = txep[0].mbuf->pool;
> - void **cache_objs;
> - struct rte_mempool_cache *cache =
> rte_mempool_default_cache(mp,
> - rte_lcore_id());
> -
> - if (!cache || cache->len == 0)
> - goto normal;
> -
> - cache_objs = &cache->objs[cache->len];
> + void **cache_objs = rte_mempool_get_cache(mp, n);
Use rte_mempool_cache_zc_put_bulk() instead.
Remove these:
> if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
The (n > RTE_MEMPOOL_CACHE_MAX_SIZE) comparison is obsolete, because rte_mempool_cache_zc_put_bulk() will return NULL if there is no cache.
> @@ -922,29 +915,22 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue
> *txq)
> * crosses the cache flush threshold) is flushed to the
> ring.
> */
> /* Add elements back into the cache */
> - uint32_t copied = 0;
> - /* n is multiple of 32 */
> - while (copied < n) {
> - const __m512i a = _mm512_load_si512(&txep[copied]);
> - const __m512i b = _mm512_load_si512(&txep[copied +
> 8]);
> - const __m512i c = _mm512_load_si512(&txep[copied +
> 16]);
> - const __m512i d = _mm512_load_si512(&txep[copied +
> 24]);
> -
> - _mm512_storeu_si512(&cache_objs[copied], a);
> - _mm512_storeu_si512(&cache_objs[copied + 8], b);
> - _mm512_storeu_si512(&cache_objs[copied + 16], c);
> - _mm512_storeu_si512(&cache_objs[copied + 24], d);
> - copied += 32;
> - }
> - cache->len += n;
> -
> - if (cache->len >= cache->flushthresh) {
> - rte_mempool_ops_enqueue_bulk
> - (mp, &cache->objs[cache->size],
> - cache->len - cache->size);
> - cache->len = cache->size;
> + if (cache_objs != NULL) {
> + uint32_t copied = 0;
> + /* n is multiple of 32 */
> + while (copied < n) {
> + const __m512i a =
> _mm512_load_si512(&txep[copied]);
> + const __m512i b =
> _mm512_load_si512(&txep[copied + 8]);
> + const __m512i c =
> _mm512_load_si512(&txep[copied + 16]);
> + const __m512i d =
> _mm512_load_si512(&txep[copied + 24]);
> +
> + _mm512_storeu_si512(&cache_objs[copied], a);
> + _mm512_storeu_si512(&cache_objs[copied + 8],
> b);
> + _mm512_storeu_si512(&cache_objs[copied + 16],
> c);
> + _mm512_storeu_si512(&cache_objs[copied + 24],
> d);
> + copied += 32;
And add this here instead of the (n > RTE_MEMPOOL_CACHE_MAX_SIZE) comparison:
+} else
+ rte_mempool_generic_put();
> + }
> }
> - goto done;
> }
>
> normal:
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_common.h
> b/drivers/net/i40e/i40e_rxtx_vec_common.h
> index fe1a6ec75e..4389ab9094 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_common.h
> +++ b/drivers/net/i40e/i40e_rxtx_vec_common.h
> @@ -99,14 +99,26 @@ i40e_tx_free_bufs(struct i40e_tx_queue *txq)
> * tx_next_dd - (tx_rs_thresh-1)
> */
> txep = &txq->sw_ring[txq->tx_next_dd - (n - 1)];
> + struct rte_mempool *mp = txep[0].mbuf->pool;
>
> if (txq->offloads & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) {
> - for (i = 0; i < n; i++) {
> - free[i] = txep[i].mbuf;
> - /* no need to reset txep[i].mbuf in vector path */
> + void **cache_objs;
> + cache_objs = rte_mempool_get_cache(mp, n);
Use rte_mempool_cache_zc_put_bulk() instead.
> +
> + if (cache_objs != NULL) {
> + for (i = 0; i < n; i++) {
> + /* no need to reset txep[i].mbuf in vector path
> */
> + rte_memcpy(&cache_objs[i], &txep->mbuf,
> sizeof(struct rte_mbuf));
> + txep++;
> + }
> + goto done;
> + } else {
> + for (i = 0; i < n; i++) {
> + free[i] = txep->mbuf;
> + txep++;
> + }
> + rte_mempool_ops_enqueue_bulk(mp, (void **)free, n);
Use rte_mempool_generic_put() instead.
> }
> - rte_mempool_put_bulk(free[0]->pool, (void **)free, n);
> - goto done;
> }
>
> m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> index 1f5707f46a..480b1eb585 100644
> --- a/lib/mempool/rte_mempool.h
> +++ b/lib/mempool/rte_mempool.h
> @@ -1360,6 +1360,52 @@ rte_mempool_do_generic_put(struct rte_mempool
> *mp, void * const *obj_table,
> rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
> }
>
> +/**
> + * @internal Put several objects back in the mempool; used internally.
> + * @param mp
> + * A pointer to the mempool structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to store back in the mempool, must be
> strictly
> + * positive.
> + * @param cache
> + * A pointer to a mempool cache structure. May be NULL if not
> needed.
> + */
> +static __rte_always_inline void**
> +rte_mempool_get_cache(struct rte_mempool *mp, unsigned int n)
The zero-copy functions must be public, not internal. Internal functions are only intended to be used inside the library, so the PMDs should not call mempool internal functions.
As mentioned above: Please build in continuation of my patch, instead of providing your own similar function.
> +{
> + void **cache_objs;
> +
> + struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
> rte_lcore_id());
> +
> + /* increment stat now, adding in mempool always success */
> + RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> + RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
> +
> + /* No cache provided or the request itself is too big for the
> cache */
> + if (unlikely(cache == NULL || n > cache->flushthresh))
> + return NULL;
> +
> + /*
> + * The cache follows the following algorithm:
> + * 1. If the objects cannot be added to the cache without
> crossing
> + * the flush threshold, flush the cache to the backend.
> + * 2. Add the objects to the cache.
> + */
> +
> + if (cache->len + n <= cache->flushthresh) {
> + cache_objs = &cache->objs[cache->len];
> + cache->len += n;
> + } else {
> + cache_objs = &cache->objs[0];
> + rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
> + cache->len = n;
> + }
> +
> + return cache_objs;
> +
> +}
>
> /**
> * Put several objects back in the mempool.
> --
> 2.25.1
>
@@ -187,6 +187,142 @@ test_mempool_basic(struct rte_mempool *mp, int use_external_cache)
return ret;
}
+/* basic tests (done on one core) */
+static int
+test_mempool_get_cache(struct rte_mempool *mp, int use_external_cache)
+{
+ uint32_t *objnum;
+ void **objtable;
+ void *obj, *obj2;
+ char *obj_data;
+ int ret = 0;
+ unsigned int i, j;
+ int offset;
+ struct rte_mempool_cache *cache;
+ void **cache_objs;
+
+ if (use_external_cache) {
+ /* Create a user-owned mempool cache. */
+ cache = rte_mempool_cache_create(RTE_MEMPOOL_CACHE_MAX_SIZE,
+ SOCKET_ID_ANY);
+ if (cache == NULL)
+ RET_ERR();
+ } else {
+ /* May be NULL if cache is disabled. */
+ cache = rte_mempool_default_cache(mp, rte_lcore_id());
+ }
+
+ /* dump the mempool status */
+ rte_mempool_dump(stdout, mp);
+
+ printf("get an object\n");
+ if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
+ GOTO_ERR(ret, out);
+ rte_mempool_dump(stdout, mp);
+
+ /* tests that improve coverage */
+ printf("get object count\n");
+ /* We have to count the extra caches, one in this case. */
+ offset = use_external_cache ? 1 * cache->len : 0;
+ if (rte_mempool_avail_count(mp) + offset != MEMPOOL_SIZE - 1)
+ GOTO_ERR(ret, out);
+
+ printf("get private data\n");
+ if (rte_mempool_get_priv(mp) != (char *)mp +
+ RTE_MEMPOOL_HEADER_SIZE(mp, mp->cache_size))
+ GOTO_ERR(ret, out);
+
+#ifndef RTE_EXEC_ENV_FREEBSD /* rte_mem_virt2iova() not supported on bsd */
+ printf("get physical address of an object\n");
+ if (rte_mempool_virt2iova(obj) != rte_mem_virt2iova(obj))
+ GOTO_ERR(ret, out);
+#endif
+
+
+ printf("put the object back\n");
+ cache_objs = rte_mempool_get_cache(mp, 1);
+ if (cache_objs != NULL)
+ rte_memcpy(cache_objs, &obj, sizeof(void *));
+ else
+ rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
+
+ rte_mempool_dump(stdout, mp);
+
+ printf("get 2 objects\n");
+ if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
+ GOTO_ERR(ret, out);
+ if (rte_mempool_generic_get(mp, &obj2, 1, cache) < 0) {
+ rte_mempool_generic_put(mp, &obj, 1, cache);
+ GOTO_ERR(ret, out);
+ }
+ rte_mempool_dump(stdout, mp);
+
+ printf("put the objects back\n");
+ cache_objs = rte_mempool_get_cache(mp, 1);
+ if (cache_objs != NULL)
+ rte_memcpy(mp, &obj, sizeof(void *));
+ else
+ rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
+
+ cache_objs = rte_mempool_get_cache(mp, 1);
+ if (cache_objs != NULL)
+ rte_memcpy(mp, &obj2, sizeof(void *));
+ else
+ rte_mempool_ops_enqueue_bulk(mp, &obj2, 1);
+ rte_mempool_dump(stdout, mp);
+
+ /*
+ * get many objects: we cannot get them all because the cache
+ * on other cores may not be empty.
+ */
+ objtable = malloc(MEMPOOL_SIZE * sizeof(void *));
+ if (objtable == NULL)
+ GOTO_ERR(ret, out);
+
+ for (i = 0; i < MEMPOOL_SIZE; i++) {
+ if (rte_mempool_generic_get(mp, &objtable[i], 1, cache) < 0)
+ break;
+ }
+
+ /*
+ * for each object, check that its content was not modified,
+ * and put objects back in pool
+ */
+ cache_objs = rte_mempool_get_cache(mp, MEMPOOL_SIZE);
+ if (cache_objs != NULL) {
+ while (i--) {
+ obj = objtable[i];
+ obj_data = obj;
+ objnum = obj;
+ if (*objnum > MEMPOOL_SIZE) {
+ printf("bad object number(%d)\n", *objnum);
+ ret = -1;
+ break;
+ }
+ for (j = sizeof(*objnum); j < mp->elt_size; j++) {
+ if (obj_data[j] != 0)
+ ret = -1;
+ }
+
+ rte_memcpy(&cache_objs[i], &objtable[i], sizeof(void *));
+ }
+ } else {
+ rte_mempool_ops_enqueue_bulk(mp, objtable, MEMPOOL_SIZE);
+ }
+
+ free(objtable);
+ if (ret == -1)
+ printf("objects were modified!\n");
+
+out:
+ if (use_external_cache) {
+ rte_mempool_cache_flush(cache, mp);
+ rte_mempool_cache_free(cache);
+ }
+
+ return ret;
+}
+
static int test_mempool_creation_with_exceeded_cache_size(void)
{
struct rte_mempool *mp_cov;
@@ -986,6 +1122,10 @@ test_mempool(void)
if (test_mempool_basic(mp_cache, 0) < 0)
GOTO_ERR(ret, err);
+ /* basic tests with get cache */
+ if (test_mempool_get_cache(mp_cache, 0) < 0)
+ GOTO_ERR(ret, err);
+
/* basic tests with user-owned cache */
if (test_mempool_basic(mp_nocache, 1) < 0)
GOTO_ERR(ret, err);
@@ -902,14 +902,7 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
if (txq->offloads & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE && (n & 31) == 0) {
struct rte_mempool *mp = txep[0].mbuf->pool;
- void **cache_objs;
- struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
- rte_lcore_id());
-
- if (!cache || cache->len == 0)
- goto normal;
-
- cache_objs = &cache->objs[cache->len];
+ void **cache_objs = rte_mempool_get_cache(mp, n);
if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
@@ -922,29 +915,22 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
* crosses the cache flush threshold) is flushed to the ring.
*/
/* Add elements back into the cache */
- uint32_t copied = 0;
- /* n is multiple of 32 */
- while (copied < n) {
- const __m512i a = _mm512_load_si512(&txep[copied]);
- const __m512i b = _mm512_load_si512(&txep[copied + 8]);
- const __m512i c = _mm512_load_si512(&txep[copied + 16]);
- const __m512i d = _mm512_load_si512(&txep[copied + 24]);
-
- _mm512_storeu_si512(&cache_objs[copied], a);
- _mm512_storeu_si512(&cache_objs[copied + 8], b);
- _mm512_storeu_si512(&cache_objs[copied + 16], c);
- _mm512_storeu_si512(&cache_objs[copied + 24], d);
- copied += 32;
- }
- cache->len += n;
-
- if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk
- (mp, &cache->objs[cache->size],
- cache->len - cache->size);
- cache->len = cache->size;
+ if (cache_objs != NULL) {
+ uint32_t copied = 0;
+ /* n is multiple of 32 */
+ while (copied < n) {
+ const __m512i a = _mm512_load_si512(&txep[copied]);
+ const __m512i b = _mm512_load_si512(&txep[copied + 8]);
+ const __m512i c = _mm512_load_si512(&txep[copied + 16]);
+ const __m512i d = _mm512_load_si512(&txep[copied + 24]);
+
+ _mm512_storeu_si512(&cache_objs[copied], a);
+ _mm512_storeu_si512(&cache_objs[copied + 8], b);
+ _mm512_storeu_si512(&cache_objs[copied + 16], c);
+ _mm512_storeu_si512(&cache_objs[copied + 24], d);
+ copied += 32;
+ }
}
- goto done;
}
normal:
@@ -99,14 +99,26 @@ i40e_tx_free_bufs(struct i40e_tx_queue *txq)
* tx_next_dd - (tx_rs_thresh-1)
*/
txep = &txq->sw_ring[txq->tx_next_dd - (n - 1)];
+ struct rte_mempool *mp = txep[0].mbuf->pool;
if (txq->offloads & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) {
- for (i = 0; i < n; i++) {
- free[i] = txep[i].mbuf;
- /* no need to reset txep[i].mbuf in vector path */
+ void **cache_objs;
+ cache_objs = rte_mempool_get_cache(mp, n);
+
+ if (cache_objs != NULL) {
+ for (i = 0; i < n; i++) {
+ /* no need to reset txep[i].mbuf in vector path */
+ rte_memcpy(&cache_objs[i], &txep->mbuf, sizeof(struct rte_mbuf));
+ txep++;
+ }
+ goto done;
+ } else {
+ for (i = 0; i < n; i++) {
+ free[i] = txep->mbuf;
+ txep++;
+ }
+ rte_mempool_ops_enqueue_bulk(mp, (void **)free, n);
}
- rte_mempool_put_bulk(free[0]->pool, (void **)free, n);
- goto done;
}
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
@@ -1360,6 +1360,52 @@ rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
}
+/**
+ * @internal Put several objects back in the mempool; used internally.
+ * @param mp
+ * A pointer to the mempool structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to store back in the mempool, must be strictly
+ * positive.
+ * @param cache
+ * A pointer to a mempool cache structure. May be NULL if not needed.
+ */
+static __rte_always_inline void**
+rte_mempool_get_cache(struct rte_mempool *mp, unsigned int n)
+{
+ void **cache_objs;
+
+ struct rte_mempool_cache *cache = rte_mempool_default_cache(mp, rte_lcore_id());
+
+ /* increment stat now, adding in mempool always success */
+ RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
+ RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
+
+ /* No cache provided or the request itself is too big for the cache */
+ if (unlikely(cache == NULL || n > cache->flushthresh))
+ return NULL;
+
+ /*
+ * The cache follows the following algorithm:
+ * 1. If the objects cannot be added to the cache without crossing
+ * the flush threshold, flush the cache to the backend.
+ * 2. Add the objects to the cache.
+ */
+
+ if (cache->len + n <= cache->flushthresh) {
+ cache_objs = &cache->objs[cache->len];
+ cache->len += n;
+ } else {
+ cache_objs = &cache->objs[0];
+ rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
+ cache->len = n;
+ }
+
+ return cache_objs;
+
+}
/**
* Put several objects back in the mempool.