[v3,9/9] mempool/bucket: handle non-EAL lcores
Checks
Commit Message
Convert to new lcore API to support non-EAL lcores.
Signed-off-by: David Marchand <david.marchand@redhat.com>
---
drivers/mempool/bucket/rte_mempool_bucket.c | 131 ++++++++++++--------
1 file changed, 82 insertions(+), 49 deletions(-)
Comments
On 6/22/20 4:25 PM, David Marchand wrote:
> Convert to new lcore API to support non-EAL lcores.
>
> Signed-off-by: David Marchand <david.marchand@redhat.com>
> ---
> drivers/mempool/bucket/rte_mempool_bucket.c | 131 ++++++++++++--------
> 1 file changed, 82 insertions(+), 49 deletions(-)
>
> diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
> index 5ce1ef16fb..0b4f42d330 100644
> --- a/drivers/mempool/bucket/rte_mempool_bucket.c
> +++ b/drivers/mempool/bucket/rte_mempool_bucket.c
> @@ -55,6 +55,7 @@ struct bucket_data {
> struct rte_ring *shared_orphan_ring;
> struct rte_mempool *pool;
> unsigned int bucket_mem_size;
> + void *lcore_callback_handle;
> };
>
> static struct bucket_stack *
> @@ -345,6 +346,22 @@ bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
> return 0;
> }
>
> +struct bucket_per_lcore_ctx {
The structure is not used in per-lcore init and uninit
functions. So, it is better to add _count to make it
count specified. I.e. bucket_count_per_lcore_ctx.
> + const struct bucket_data *bd;
> + unsigned int count;
> +};
> +
> +static int
> +count_per_lcore(unsigned int lcore_id, void *arg)
> +{
> + struct bucket_per_lcore_ctx *ctx = arg;
> +
> + ctx->count += ctx->bd->obj_per_bucket *
> + ctx->bd->buckets[lcore_id]->top;
> + ctx->count += rte_ring_count(ctx->bd->adoption_buffer_rings[lcore_id]);
> + return 0;
> +}
> +
> static void
> count_underfilled_buckets(struct rte_mempool *mp,
> void *opaque,
> @@ -373,23 +390,66 @@ count_underfilled_buckets(struct rte_mempool *mp,
> static unsigned int
> bucket_get_count(const struct rte_mempool *mp)
> {
> - const struct bucket_data *bd = mp->pool_data;
> - unsigned int count =
> - bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
> - rte_ring_count(bd->shared_orphan_ring);
> - unsigned int i;
> + struct bucket_per_lcore_ctx ctx;
Just a nit, but I think that ctx is too generic.
(some time ago bucket_data bd was ctx in fact :) )
May be bplc? Up to you.
>
> - for (i = 0; i < RTE_MAX_LCORE; i++) {
> - if (!rte_lcore_is_enabled(i))
> - continue;
> - count += bd->obj_per_bucket * bd->buckets[i]->top +
> - rte_ring_count(bd->adoption_buffer_rings[i]);
> - }
> + ctx.bd = mp->pool_data;
> + ctx.count = ctx.bd->obj_per_bucket *
> + rte_ring_count(ctx.bd->shared_bucket_ring);
> + ctx.count += rte_ring_count(ctx.bd->shared_orphan_ring);
>
> + rte_lcore_iterate(count_per_lcore, &ctx);
> rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
> - count_underfilled_buckets, &count);
> + count_underfilled_buckets, &ctx.count);
> +
> + return ctx.count;
> +}
> +
> +static int
> +bucket_init_per_lcore(unsigned int lcore_id, void *arg)
It should be no bucket_ prefix here, or it should be bucket_
prefix above in count_per_lcore.
> +{
> + char rg_name[RTE_RING_NAMESIZE];
> + struct bucket_data *bd = arg;
> + struct rte_mempool *mp;
> + int rg_flags;
> + int rc;
> +
> + mp = bd->pool;
> + bd->buckets[lcore_id] = bucket_stack_create(mp,
> + mp->size / bd->obj_per_bucket);
> + if (bd->buckets[lcore_id] == NULL)
> + goto error;
> +
> + rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
> + mp->name, lcore_id);
> + if (rc < 0 || rc >= (int)sizeof(rg_name))
> + goto error;
> +
> + rg_flags = RING_F_SC_DEQ;
> + if (mp->flags & MEMPOOL_F_SP_PUT)
> + rg_flags |= RING_F_SP_ENQ;
> + if (mp->flags & MEMPOOL_F_SC_GET)
> + rg_flags |= RING_F_SC_DEQ;
There is not point to have two above lines here, since
RING_F_SC_DEQ is always set.
> + bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
> + rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
> + if (bd->adoption_buffer_rings[lcore_id] == NULL)
> + goto error;
>
> - return count;
> + return 0;
> +error:
> + rte_free(bd->buckets[lcore_id]);
> + bd->buckets[lcore_id] = NULL;
> + return -1;
Why does the API collapse all negative errnos into -1?
(I don't think it is critical, just want to know why).
> +}
> +
> +static void
> +bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
Same note about bucket_ prefix.
On Tue, Jun 23, 2020 at 7:28 PM Andrew Rybchenko
<arybchenko@solarflare.com> wrote:
>
> On 6/22/20 4:25 PM, David Marchand wrote:
> > Convert to new lcore API to support non-EAL lcores.
> >
> > Signed-off-by: David Marchand <david.marchand@redhat.com>
> > ---
> > drivers/mempool/bucket/rte_mempool_bucket.c | 131 ++++++++++++--------
> > 1 file changed, 82 insertions(+), 49 deletions(-)
> >
> > diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
> > index 5ce1ef16fb..0b4f42d330 100644
> > --- a/drivers/mempool/bucket/rte_mempool_bucket.c
> > +++ b/drivers/mempool/bucket/rte_mempool_bucket.c
> > @@ -55,6 +55,7 @@ struct bucket_data {
> > struct rte_ring *shared_orphan_ring;
> > struct rte_mempool *pool;
> > unsigned int bucket_mem_size;
> > + void *lcore_callback_handle;
> > };
> >
> > static struct bucket_stack *
> > @@ -345,6 +346,22 @@ bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
> > return 0;
> > }
> >
> > +struct bucket_per_lcore_ctx {
>
> The structure is not used in per-lcore init and uninit
> functions. So, it is better to add _count to make it
> count specified. I.e. bucket_count_per_lcore_ctx.
Yes, and this aligns with its only user being renamed from
count_per_lcore to bucket_count_per_lcore.
>
> > + const struct bucket_data *bd;
> > + unsigned int count;
> > +};
> > +
> > +static int
> > +count_per_lcore(unsigned int lcore_id, void *arg)
> > +{
> > + struct bucket_per_lcore_ctx *ctx = arg;
> > +
> > + ctx->count += ctx->bd->obj_per_bucket *
> > + ctx->bd->buckets[lcore_id]->top;
> > + ctx->count += rte_ring_count(ctx->bd->adoption_buffer_rings[lcore_id]);
> > + return 0;
> > +}
> > +
> > static void
> > count_underfilled_buckets(struct rte_mempool *mp,
> > void *opaque,
> > @@ -373,23 +390,66 @@ count_underfilled_buckets(struct rte_mempool *mp,
> > static unsigned int
> > bucket_get_count(const struct rte_mempool *mp)
> > {
> > - const struct bucket_data *bd = mp->pool_data;
> > - unsigned int count =
> > - bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
> > - rte_ring_count(bd->shared_orphan_ring);
> > - unsigned int i;
> > + struct bucket_per_lcore_ctx ctx;
>
> Just a nit, but I think that ctx is too generic.
> (some time ago bucket_data bd was ctx in fact :) )
> May be bplc? Up to you.
Ack.
>
> >
> > - for (i = 0; i < RTE_MAX_LCORE; i++) {
> > - if (!rte_lcore_is_enabled(i))
> > - continue;
> > - count += bd->obj_per_bucket * bd->buckets[i]->top +
> > - rte_ring_count(bd->adoption_buffer_rings[i]);
> > - }
> > + ctx.bd = mp->pool_data;
> > + ctx.count = ctx.bd->obj_per_bucket *
> > + rte_ring_count(ctx.bd->shared_bucket_ring);
> > + ctx.count += rte_ring_count(ctx.bd->shared_orphan_ring);
> >
> > + rte_lcore_iterate(count_per_lcore, &ctx);
> > rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
> > - count_underfilled_buckets, &count);
> > + count_underfilled_buckets, &ctx.count);
> > +
> > + return ctx.count;
> > +}
> > +
> > +static int
> > +bucket_init_per_lcore(unsigned int lcore_id, void *arg)
>
> It should be no bucket_ prefix here, or it should be bucket_
> prefix above in count_per_lcore.
As mentioned before, ack.
>
> > +{
> > + char rg_name[RTE_RING_NAMESIZE];
> > + struct bucket_data *bd = arg;
> > + struct rte_mempool *mp;
> > + int rg_flags;
> > + int rc;
> > +
> > + mp = bd->pool;
> > + bd->buckets[lcore_id] = bucket_stack_create(mp,
> > + mp->size / bd->obj_per_bucket);
> > + if (bd->buckets[lcore_id] == NULL)
> > + goto error;
> > +
> > + rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
> > + mp->name, lcore_id);
> > + if (rc < 0 || rc >= (int)sizeof(rg_name))
> > + goto error;
> > +
> > + rg_flags = RING_F_SC_DEQ;
> > + if (mp->flags & MEMPOOL_F_SP_PUT)
> > + rg_flags |= RING_F_SP_ENQ;
> > + if (mp->flags & MEMPOOL_F_SC_GET)
> > + rg_flags |= RING_F_SC_DEQ;
>
> There is not point to have two above lines here, since
> RING_F_SC_DEQ is always set.
Ah yes, I did not realise when moving the code.
>
> > + bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
> > + rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
> > + if (bd->adoption_buffer_rings[lcore_id] == NULL)
> > + goto error;
> >
> > - return count;
> > + return 0;
> > +error:
> > + rte_free(bd->buckets[lcore_id]);
> > + bd->buckets[lcore_id] = NULL;
> > + return -1;
>
> Why does the API collapse all negative errnos into -1?
> (I don't think it is critical, just want to know why).
I collapsed everything as a single error as we have a partial idea of
what went wrong when calling this callback with all lcores at
registration.
We could get a specific error reported by this callback, but then we
would not know on which lcore (programmatically).
And in the end, all errors will summarize as a lack of resources, I do
not expect a need for input validation.
Maybe you have other use cases in mind?
Thanks for the review.
On 6/26/20 5:13 PM, David Marchand wrote:
> On Tue, Jun 23, 2020 at 7:28 PM Andrew Rybchenko
> <arybchenko@solarflare.com> wrote:
>>
>> On 6/22/20 4:25 PM, David Marchand wrote:
>>> Convert to new lcore API to support non-EAL lcores.
>>>
>>> Signed-off-by: David Marchand <david.marchand@redhat.com>
>>> ---
[snip]
>>> + bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
>>> + rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
>>> + if (bd->adoption_buffer_rings[lcore_id] == NULL)
>>> + goto error;
>>>
>>> - return count;
>>> + return 0;
>>> +error:
>>> + rte_free(bd->buckets[lcore_id]);
>>> + bd->buckets[lcore_id] = NULL;
>>> + return -1;
>>
>> Why does the API collapse all negative errnos into -1?
>> (I don't think it is critical, just want to know why).
>
> I collapsed everything as a single error as we have a partial idea of
> what went wrong when calling this callback with all lcores at
> registration.
> We could get a specific error reported by this callback, but then we
> would not know on which lcore (programmatically).
> And in the end, all errors will summarize as a lack of resources, I do
> not expect a need for input validation.
Makes sense. Thanks for explanations.
> Maybe you have other use cases in mind?
No.
@@ -55,6 +55,7 @@ struct bucket_data {
struct rte_ring *shared_orphan_ring;
struct rte_mempool *pool;
unsigned int bucket_mem_size;
+ void *lcore_callback_handle;
};
static struct bucket_stack *
@@ -345,6 +346,22 @@ bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
return 0;
}
+struct bucket_per_lcore_ctx {
+ const struct bucket_data *bd;
+ unsigned int count;
+};
+
+static int
+count_per_lcore(unsigned int lcore_id, void *arg)
+{
+ struct bucket_per_lcore_ctx *ctx = arg;
+
+ ctx->count += ctx->bd->obj_per_bucket *
+ ctx->bd->buckets[lcore_id]->top;
+ ctx->count += rte_ring_count(ctx->bd->adoption_buffer_rings[lcore_id]);
+ return 0;
+}
+
static void
count_underfilled_buckets(struct rte_mempool *mp,
void *opaque,
@@ -373,23 +390,66 @@ count_underfilled_buckets(struct rte_mempool *mp,
static unsigned int
bucket_get_count(const struct rte_mempool *mp)
{
- const struct bucket_data *bd = mp->pool_data;
- unsigned int count =
- bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
- rte_ring_count(bd->shared_orphan_ring);
- unsigned int i;
+ struct bucket_per_lcore_ctx ctx;
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- if (!rte_lcore_is_enabled(i))
- continue;
- count += bd->obj_per_bucket * bd->buckets[i]->top +
- rte_ring_count(bd->adoption_buffer_rings[i]);
- }
+ ctx.bd = mp->pool_data;
+ ctx.count = ctx.bd->obj_per_bucket *
+ rte_ring_count(ctx.bd->shared_bucket_ring);
+ ctx.count += rte_ring_count(ctx.bd->shared_orphan_ring);
+ rte_lcore_iterate(count_per_lcore, &ctx);
rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
- count_underfilled_buckets, &count);
+ count_underfilled_buckets, &ctx.count);
+
+ return ctx.count;
+}
+
+static int
+bucket_init_per_lcore(unsigned int lcore_id, void *arg)
+{
+ char rg_name[RTE_RING_NAMESIZE];
+ struct bucket_data *bd = arg;
+ struct rte_mempool *mp;
+ int rg_flags;
+ int rc;
+
+ mp = bd->pool;
+ bd->buckets[lcore_id] = bucket_stack_create(mp,
+ mp->size / bd->obj_per_bucket);
+ if (bd->buckets[lcore_id] == NULL)
+ goto error;
+
+ rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
+ mp->name, lcore_id);
+ if (rc < 0 || rc >= (int)sizeof(rg_name))
+ goto error;
+
+ rg_flags = RING_F_SC_DEQ;
+ if (mp->flags & MEMPOOL_F_SP_PUT)
+ rg_flags |= RING_F_SP_ENQ;
+ if (mp->flags & MEMPOOL_F_SC_GET)
+ rg_flags |= RING_F_SC_DEQ;
+ bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
+ rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
+ if (bd->adoption_buffer_rings[lcore_id] == NULL)
+ goto error;
- return count;
+ return 0;
+error:
+ rte_free(bd->buckets[lcore_id]);
+ bd->buckets[lcore_id] = NULL;
+ return -1;
+}
+
+static void
+bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
+{
+ struct bucket_data *bd = arg;
+
+ rte_ring_free(bd->adoption_buffer_rings[lcore_id]);
+ bd->adoption_buffer_rings[lcore_id] = NULL;
+ rte_free(bd->buckets[lcore_id]);
+ bd->buckets[lcore_id] = NULL;
}
static int
@@ -399,7 +459,6 @@ bucket_alloc(struct rte_mempool *mp)
int rc = 0;
char rg_name[RTE_RING_NAMESIZE];
struct bucket_data *bd;
- unsigned int i;
unsigned int bucket_header_size;
size_t pg_sz;
@@ -429,36 +488,17 @@ bucket_alloc(struct rte_mempool *mp)
/* eventually this should be a tunable parameter */
bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
+ bd->lcore_callback_handle = rte_lcore_callback_register("bucket",
+ bucket_init_per_lcore, bucket_uninit_per_lcore, bd);
+ if (bd->lcore_callback_handle == NULL) {
+ rc = -ENOMEM;
+ goto no_mem_for_stacks;
+ }
+
if (mp->flags & MEMPOOL_F_SP_PUT)
rg_flags |= RING_F_SP_ENQ;
if (mp->flags & MEMPOOL_F_SC_GET)
rg_flags |= RING_F_SC_DEQ;
-
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- if (!rte_lcore_is_enabled(i))
- continue;
- bd->buckets[i] =
- bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
- if (bd->buckets[i] == NULL) {
- rc = -ENOMEM;
- goto no_mem_for_stacks;
- }
- rc = snprintf(rg_name, sizeof(rg_name),
- RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
- if (rc < 0 || rc >= (int)sizeof(rg_name)) {
- rc = -ENAMETOOLONG;
- goto no_mem_for_stacks;
- }
- bd->adoption_buffer_rings[i] =
- rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
- mp->socket_id,
- rg_flags | RING_F_SC_DEQ);
- if (bd->adoption_buffer_rings[i] == NULL) {
- rc = -rte_errno;
- goto no_mem_for_stacks;
- }
- }
-
rc = snprintf(rg_name, sizeof(rg_name),
RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
if (rc < 0 || rc >= (int)sizeof(rg_name)) {
@@ -498,11 +538,8 @@ bucket_alloc(struct rte_mempool *mp)
rte_ring_free(bd->shared_orphan_ring);
cannot_create_shared_orphan_ring:
invalid_shared_orphan_ring:
+ rte_lcore_callback_unregister(bd->lcore_callback_handle);
no_mem_for_stacks:
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- rte_free(bd->buckets[i]);
- rte_ring_free(bd->adoption_buffer_rings[i]);
- }
rte_free(bd);
no_mem_for_data:
rte_errno = -rc;
@@ -512,16 +549,12 @@ bucket_alloc(struct rte_mempool *mp)
static void
bucket_free(struct rte_mempool *mp)
{
- unsigned int i;
struct bucket_data *bd = mp->pool_data;
if (bd == NULL)
return;
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- rte_free(bd->buckets[i]);
- rte_ring_free(bd->adoption_buffer_rings[i]);
- }
+ rte_lcore_callback_unregister(bd->lcore_callback_handle);
rte_ring_free(bd->shared_orphan_ring);
rte_ring_free(bd->shared_bucket_ring);