[RFC,3/6] ring/soring: introduce Staged Ordered Ring
Checks
Commit Message
From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
with multiple processing 'stages'.
It is based on conventional DPDK rte_ring, re-uses many of its concepts,
and even substantial part of its code.
It can be viewed as an 'extension' of rte_ring functionality.
In particular, main SORING properties:
- circular ring buffer with fixed size objects
- producer, consumer plus multiple processing stages in the middle.
- allows to split objects processing into multiple stages.
- objects remain in the same ring while moving from one stage to the other,
initial order is preserved, no extra copying needed.
- preserves the ingress order of objects within the queue across multiple
stages, i.e.:
at the same stage multiple threads can process objects from the ring in
any order, but for the next stage objects will always appear in the
original order.
- each stage (and producer/consumer) can be served by single and/or
multiple threads.
- number of stages, size and number of objects in the ring are
configurable at ring initialization time.
Data-path API provides four main operations:
- enqueue/dequeue works in the same manner as for conventional rte_ring,
all rte_ring synchronization types are supported.
- acquire/release - for each stage there is an acquire (start) and
release (finish) operation.
after some objects are 'acquired' - given thread can safely assume that
it has exclusive possession of these objects till 'release' for them is
invoked.
Note that right now user has to release exactly the same number of
objects that was acquired before.
After 'release', objects can be 'acquired' by next stage and/or dequeued
by the consumer (in case of last stage).
Expected use-case: applications that uses pipeline model
(probably with multiple stages) for packet processing, when preserving
incoming packet order is important. I.E.: IPsec processing, etc.
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
lib/ring/meson.build | 4 +-
lib/ring/rte_soring.c | 144 ++++++++++++++
lib/ring/rte_soring.h | 270 ++++++++++++++++++++++++++
lib/ring/soring.c | 431 ++++++++++++++++++++++++++++++++++++++++++
lib/ring/soring.h | 124 ++++++++++++
lib/ring/version.map | 13 ++
6 files changed, 984 insertions(+), 2 deletions(-)
create mode 100644 lib/ring/rte_soring.c
create mode 100644 lib/ring/rte_soring.h
create mode 100644 lib/ring/soring.c
create mode 100644 lib/ring/soring.h
Comments
> From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
>
> Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
> with multiple processing 'stages'.
> It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> and even substantial part of its code.
> It can be viewed as an 'extension' of rte_ring functionality.
> In particular, main SORING properties:
> - circular ring buffer with fixed size objects
> - producer, consumer plus multiple processing stages in the middle.
> - allows to split objects processing into multiple stages.
> - objects remain in the same ring while moving from one stage to the other,
> initial order is preserved, no extra copying needed.
> - preserves the ingress order of objects within the queue across multiple
> stages, i.e.:
> at the same stage multiple threads can process objects from the ring in
> any order, but for the next stage objects will always appear in the
> original order.
> - each stage (and producer/consumer) can be served by single and/or
> multiple threads.
> - number of stages, size and number of objects in the ring are
> configurable at ring initialization time.
>
> Data-path API provides four main operations:
> - enqueue/dequeue works in the same manner as for conventional rte_ring,
> all rte_ring synchronization types are supported.
> - acquire/release - for each stage there is an acquire (start) and
> release (finish) operation.
> after some objects are 'acquired' - given thread can safely assume that
> it has exclusive possession of these objects till 'release' for them is
> invoked.
> Note that right now user has to release exactly the same number of
> objects that was acquired before.
> After 'release', objects can be 'acquired' by next stage and/or dequeued
> by the consumer (in case of last stage).
>
> Expected use-case: applications that uses pipeline model
> (probably with multiple stages) for packet processing, when preserving
> incoming packet order is important. I.E.: IPsec processing, etc.
>
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> ---
The existing RING library is for a ring of objects.
It is very confusing that the new SORING library is for a ring of object pairs (obj, objst).
The new SORING library should be for a ring of objects, like the existing RING library. Please get rid of all the objst stuff.
This might also improve performance when not using the optional secondary object.
With that in place, you can extend the SORING library with additional APIs for object pairs.
I suggest calling the secondary object "metadata" instead of "status" or "state" or "ret-value".
I agree that data passed as {obj[num], meta[num]} is more efficient than {obj, meta}[num] in some use cases, which is why your API uses two vector pointers instead of one.
Furthermore, you should consider semi-zero-copy APIs for the "acquire"/"release" functions:
The "acquire" function can use a concept similar to rte_pktmbuf_read(), where a vector is provided for copying (if the ring wraps), and the return value either points directly to the objects in the ring (zero-copy), or to the vector where the objects were copied to.
And the "release" function does not need to copy the object vector back if the "acquire" function returned a zero-copy pointer.
> > From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> >
> > Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
> > with multiple processing 'stages'.
> > It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> > and even substantial part of its code.
> > It can be viewed as an 'extension' of rte_ring functionality.
> > In particular, main SORING properties:
> > - circular ring buffer with fixed size objects
> > - producer, consumer plus multiple processing stages in the middle.
> > - allows to split objects processing into multiple stages.
> > - objects remain in the same ring while moving from one stage to the other,
> > initial order is preserved, no extra copying needed.
> > - preserves the ingress order of objects within the queue across multiple
> > stages, i.e.:
> > at the same stage multiple threads can process objects from the ring in
> > any order, but for the next stage objects will always appear in the
> > original order.
> > - each stage (and producer/consumer) can be served by single and/or
> > multiple threads.
> > - number of stages, size and number of objects in the ring are
> > configurable at ring initialization time.
> >
> > Data-path API provides four main operations:
> > - enqueue/dequeue works in the same manner as for conventional rte_ring,
> > all rte_ring synchronization types are supported.
> > - acquire/release - for each stage there is an acquire (start) and
> > release (finish) operation.
> > after some objects are 'acquired' - given thread can safely assume that
> > it has exclusive possession of these objects till 'release' for them is
> > invoked.
> > Note that right now user has to release exactly the same number of
> > objects that was acquired before.
> > After 'release', objects can be 'acquired' by next stage and/or dequeued
> > by the consumer (in case of last stage).
> >
> > Expected use-case: applications that uses pipeline model
> > (probably with multiple stages) for packet processing, when preserving
> > incoming packet order is important. I.E.: IPsec processing, etc.
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> > ---
>
> The existing RING library is for a ring of objects.
>
> It is very confusing that the new SORING library is for a ring of object pairs (obj, objst).
>
> The new SORING library should be for a ring of objects, like the existing RING library. Please get rid of all the objst stuff.
>
> This might also improve performance when not using the optional secondary object.
>
>
> With that in place, you can extend the SORING library with additional APIs for object pairs.
>
> I suggest calling the secondary object "metadata" instead of "status" or "state" or "ret-value".
> I agree that data passed as {obj[num], meta[num]} is more efficient than {obj, meta}[num] in some use cases, which is why your API
> uses two vector pointers instead of one.
I suppose what you suggest is to have 2 set of functions: one that takes both objs[] and meta[] and second that takes just objs[]?
If so, yes I can do that - in fact I was thinking about same thing.
BTW, right now meta[] is an optional one anyway.
Also will probably get rid of explicit 'behavior' and will have '_burst_' and '_bulk_' versions instead,
same as rte_ring.
>
> Furthermore, you should consider semi-zero-copy APIs for the "acquire"/"release" functions:
>
> The "acquire" function can use a concept similar to rte_pktmbuf_read(), where a vector is provided for copying (if the ring wraps), and
> the return value either points directly to the objects in the ring (zero-copy), or to the vector where the objects were copied to.
You mean to introduce analog of rte_ring '_zc_' functions?
Yes, I considered that, but decided to leave it for the future.
First, because we do need a generic and simple function with copying things anyway.
Second I am not so convinced that this _zc_ will give much performance gain,
while it definitely makes API not that straightforward.
> And the "release" function does not need to copy the object vector back if the "acquire" function returned a zero-copy pointer.
For "release" you don't need to *always* copy objs[] and meta[].
It is optional and is left for the user to decide based on the use-case.
If he doesn't need to update objs[] or meta[] he can just pass a NULL ptr here.
> From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
>
> > > From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> > >
> > > Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered'
> queues
> > > with multiple processing 'stages'.
> > > It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> > > and even substantial part of its code.
> > > It can be viewed as an 'extension' of rte_ring functionality.
> > > In particular, main SORING properties:
> > > - circular ring buffer with fixed size objects
> > > - producer, consumer plus multiple processing stages in the middle.
> > > - allows to split objects processing into multiple stages.
> > > - objects remain in the same ring while moving from one stage to the
> other,
> > > initial order is preserved, no extra copying needed.
> > > - preserves the ingress order of objects within the queue across multiple
> > > stages, i.e.:
> > > at the same stage multiple threads can process objects from the ring in
> > > any order, but for the next stage objects will always appear in the
> > > original order.
> > > - each stage (and producer/consumer) can be served by single and/or
> > > multiple threads.
> > > - number of stages, size and number of objects in the ring are
> > > configurable at ring initialization time.
> > >
> > > Data-path API provides four main operations:
> > > - enqueue/dequeue works in the same manner as for conventional rte_ring,
> > > all rte_ring synchronization types are supported.
> > > - acquire/release - for each stage there is an acquire (start) and
> > > release (finish) operation.
> > > after some objects are 'acquired' - given thread can safely assume that
> > > it has exclusive possession of these objects till 'release' for them is
> > > invoked.
> > > Note that right now user has to release exactly the same number of
> > > objects that was acquired before.
> > > After 'release', objects can be 'acquired' by next stage and/or dequeued
> > > by the consumer (in case of last stage).
> > >
> > > Expected use-case: applications that uses pipeline model
> > > (probably with multiple stages) for packet processing, when preserving
> > > incoming packet order is important. I.E.: IPsec processing, etc.
> > >
> > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> > > ---
> >
> > The existing RING library is for a ring of objects.
> >
> > It is very confusing that the new SORING library is for a ring of object
> pairs (obj, objst).
> >
> > The new SORING library should be for a ring of objects, like the existing
> RING library. Please get rid of all the objst stuff.
> >
> > This might also improve performance when not using the optional secondary
> object.
> >
> >
> > With that in place, you can extend the SORING library with additional APIs
> for object pairs.
> >
> > I suggest calling the secondary object "metadata" instead of "status" or
> "state" or "ret-value".
> > I agree that data passed as {obj[num], meta[num]} is more efficient than
> {obj, meta}[num] in some use cases, which is why your API
> > uses two vector pointers instead of one.
>
> I suppose what you suggest is to have 2 set of functions: one that takes both
> objs[] and meta[] and second that takes just objs[]?
> If so, yes I can do that - in fact I was thinking about same thing.
Yes, please.
Mainly for readability/familiarity; it makes the API much more similar to the Ring API.
> BTW, right now meta[] is an optional one anyway.
I noticed that meta[] is optional, but it is confusing that the APIs are so much different than the Ring APIs.
With two sets of functions, the basic set will resemble the Ring APIs much more.
> Also will probably get rid of explicit 'behavior' and will have '_burst_' and
> '_bulk_' versions instead,
> same as rte_ring.
+1
>
> >
> > Furthermore, you should consider semi-zero-copy APIs for the
> "acquire"/"release" functions:
> >
> > The "acquire" function can use a concept similar to rte_pktmbuf_read(),
> where a vector is provided for copying (if the ring wraps), and
> > the return value either points directly to the objects in the ring (zero-
> copy), or to the vector where the objects were copied to.
>
> You mean to introduce analog of rte_ring '_zc_' functions?
> Yes, I considered that, but decided to leave it for the future.
Somewhat similar, but I think the (semi-)zero-copy "acquire"/"release" APIs will be simpler than the rte_ring's _zc_ functions because we know that no other thread can dequeue the objects out of the ring before the processing stage has released them, i.e. no additional locking is required.
Anyway, leave it for the future.
I don't think it will require changes to the underlying implementation, so we don't need to consider it in advance.
> First, because we do need a generic and simple function with copying things
> anyway.
> Second I am not so convinced that this _zc_ will give much performance gain,
> while it definitely makes API not that straightforward.
>
> > And the "release" function does not need to copy the object vector back if
> the "acquire" function returned a zero-copy pointer.
>
> For "release" you don't need to *always* copy objs[] and meta[].
> It is optional and is left for the user to decide based on the use-case.
> If he doesn't need to update objs[] or meta[] he can just pass a NULL ptr
> here.
Yes, I noticed.
I'm mentioning that zero-copy adds a "release" case where copying back the modified object vector is not required; when the processing stage has modified the object vector (e.g. replaced some objects) directly in the ring, and not in a copy returned by "acquire".
>
>
On 2024-08-15 10:53, Konstantin Ananyev wrote:
> From: Konstantin Ananyev <konstantin.ananyev@huawei.com>
>
> Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
> with multiple processing 'stages'.
> It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> and even substantial part of its code.
> It can be viewed as an 'extension' of rte_ring functionality.
> In particular, main SORING properties:
> - circular ring buffer with fixed size objects
> - producer, consumer plus multiple processing stages in the middle.
> - allows to split objects processing into multiple stages.
> - objects remain in the same ring while moving from one stage to the other,
> initial order is preserved, no extra copying needed.
> - preserves the ingress order of objects within the queue across multiple
> stages, i.e.:
> at the same stage multiple threads can process objects from the ring in
> any order, but for the next stage objects will always appear in the
> original order.
> - each stage (and producer/consumer) can be served by single and/or
> multiple threads.
> - number of stages, size and number of objects in the ring are
> configurable at ring initialization time.
>
> Data-path API provides four main operations:
> - enqueue/dequeue works in the same manner as for conventional rte_ring,
> all rte_ring synchronization types are supported.
> - acquire/release - for each stage there is an acquire (start) and
> release (finish) operation.
> after some objects are 'acquired' - given thread can safely assume that
> it has exclusive possession of these objects till 'release' for them is
> invoked.
> Note that right now user has to release exactly the same number of
> objects that was acquired before.
> After 'release', objects can be 'acquired' by next stage and/or dequeued
> by the consumer (in case of last stage).
>
> Expected use-case: applications that uses pipeline model
> (probably with multiple stages) for packet processing, when preserving
> incoming packet order is important. I.E.: IPsec processing, etc.
>
How does SORING related to Eventdev? Would it be feasible to reshape
this into a SW event device?
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> ---
> lib/ring/meson.build | 4 +-
> lib/ring/rte_soring.c | 144 ++++++++++++++
> lib/ring/rte_soring.h | 270 ++++++++++++++++++++++++++
> lib/ring/soring.c | 431 ++++++++++++++++++++++++++++++++++++++++++
> lib/ring/soring.h | 124 ++++++++++++
> lib/ring/version.map | 13 ++
> 6 files changed, 984 insertions(+), 2 deletions(-)
> create mode 100644 lib/ring/rte_soring.c
> create mode 100644 lib/ring/rte_soring.h
> create mode 100644 lib/ring/soring.c
> create mode 100644 lib/ring/soring.h
>
> diff --git a/lib/ring/meson.build b/lib/ring/meson.build
> index 7fca958ed7..21f2c12989 100644
> --- a/lib/ring/meson.build
> +++ b/lib/ring/meson.build
> @@ -1,8 +1,8 @@
> # SPDX-License-Identifier: BSD-3-Clause
> # Copyright(c) 2017 Intel Corporation
>
> -sources = files('rte_ring.c')
> -headers = files('rte_ring.h')
> +sources = files('rte_ring.c', 'rte_soring.c', 'soring.c')
> +headers = files('rte_ring.h', 'rte_soring.h')
> # most sub-headers are not for direct inclusion
> indirect_headers += files (
> 'rte_ring_core.h',
> diff --git a/lib/ring/rte_soring.c b/lib/ring/rte_soring.c
> new file mode 100644
> index 0000000000..17b1b73a42
> --- /dev/null
> +++ b/lib/ring/rte_soring.c
> @@ -0,0 +1,144 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#include "soring.h"
> +#include <rte_string_fns.h>
> +
> +RTE_LOG_REGISTER_DEFAULT(soring_logtype, INFO);
> +#define RTE_LOGTYPE_SORING soring_logtype
> +#define SORING_LOG(level, ...) \
> + RTE_LOG_LINE(level, SORING, "" __VA_ARGS__)
> +
> +static uint32_t
> +soring_calc_elem_num(uint32_t count)
> +{
> + return rte_align32pow2(count + 1);
> +}
> +
> +static int
> +soring_check_param(uint32_t esize, uint32_t stsize, uint32_t count,
> + uint32_t stages)
> +{
> + if (stages == 0) {
> + SORING_LOG(ERR, "invalid number of stages: %u", stages);
> + return -EINVAL;
> + }
> +
> + /* Check if element size is a multiple of 4B */
> + if (esize == 0 || esize % 4 != 0) {
> + SORING_LOG(ERR, "invalid element size: %u", esize);
> + return -EINVAL;
> + }
> +
> + /* Check if ret-code size is a multiple of 4B */
> + if (stsize % 4 != 0) {
> + SORING_LOG(ERR, "invalid retcode size: %u", stsize);
> + return -EINVAL;
> + }
> +
> + /* count must be a power of 2 */
> + if (rte_is_power_of_2(count) == 0 ||
> + (count > RTE_SORING_ELEM_MAX + 1)) {
> + SORING_LOG(ERR, "invalid number of elements: %u", count);
> + return -EINVAL;
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * Calculate size offsets for SORING internal data layout.
> + */
> +static size_t
> +soring_get_szofs(uint32_t esize, uint32_t stsize, uint32_t count,
> + uint32_t stages, size_t *elst_ofs, size_t *state_ofs,
> + size_t *stage_ofs)
> +{
> + size_t sz;
> + const struct rte_soring * const r = NULL;
> +
> + sz = sizeof(r[0]) + (size_t)count * esize;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + if (elst_ofs != NULL)
> + *elst_ofs = sz;
> +
> + sz = sz + (size_t)count * stsize;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + if (state_ofs != NULL)
> + *state_ofs = sz;
> +
> + sz += sizeof(r->state[0]) * count;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + if (stage_ofs != NULL)
> + *stage_ofs = sz;
> +
> + sz += sizeof(r->stage[0]) * stages;
> + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +
> + return sz;
> +}
> +
> +
> +ssize_t
> +rte_soring_get_memsize(const struct rte_soring_param *prm)
> +{
> + int32_t rc;
> + uint32_t count;
> +
> + count = soring_calc_elem_num(prm->elems);
> + rc = soring_check_param(prm->esize, prm->stsize, count, prm->stages);
> + if (rc != 0)
> + return rc;
> +
> + return soring_get_szofs(prm->esize, prm->stsize, count, prm->stages,
> + NULL, NULL, NULL);
> +}
> +
> +int
> +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm)
> +{
> + int32_t rc;
> + uint32_t n;
> + size_t elst_ofs, stage_ofs, state_ofs;
> +
> + if (r == NULL || prm == NULL)
> + return -EINVAL;
> +
> + n = soring_calc_elem_num(prm->elems);
> + rc = soring_check_param(prm->esize, prm->stsize, n, prm->stages);
> + if (rc != 0)
> + return rc;
> +
> + soring_get_szofs(prm->esize, prm->stsize, n, prm->stages, &elst_ofs,
> + &state_ofs, &stage_ofs);
> +
> + memset(r, 0, sizeof(*r));
> + rc = strlcpy(r->name, prm->name, sizeof(r->name));
> + if (rc < 0 || rc >= (int)sizeof(r->name))
> + return -ENAMETOOLONG;
> +
> + r->size = n;
> + r->mask = r->size - 1;
> + r->capacity = prm->elems;
> + r->esize = prm->esize;
> + r->stsize = prm->stsize;
> +
> + r->prod.ht.sync_type = prm->prod_synt;
> + r->cons.ht.sync_type = prm->cons_synt;
> +
> + r->state = (union soring_state *)((uintptr_t)r + state_ofs);
> + memset(r->state, 0, sizeof(r->state[0]) * r->size);
> +
> + r->stage = (struct soring_stage *)((uintptr_t)r + stage_ofs);
> + r->nb_stage = prm->stages;
> + memset(r->stage, 0, r->nb_stage * sizeof(r->stage[0]));
> +
> + if (r->stsize != 0)
> + r->elemst = (void *)((uintptr_t)r + elst_ofs);
> +
> + return 0;
> +}
> diff --git a/lib/ring/rte_soring.h b/lib/ring/rte_soring.h
> new file mode 100644
> index 0000000000..fb0e75b39a
> --- /dev/null
> +++ b/lib/ring/rte_soring.h
> @@ -0,0 +1,270 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#ifndef _RTE_SORING_H_
> +#define _RTE_SORING_H_
> +
> +/**
> + * @file
> + * This file contains definition of RTE soring (Staged Ordered Ring) public API.
> + * Brief description:
> + * enqueue/dequeue works the same as for conventional rte_ring:
> + * any rte_ring sync types can be used, etc.
> + * Plus there could be multiple 'stages'.
> + * For each stage there is an acquire (start) and release (finish) operation.
> + * after some elems are 'acquired' - user can safely assume that he has
> + * exclusive possession of these elems till 'release' for them is done.
> + * Note that right now user has to release exactly the same number of elems
> + * he acquired before.
> + * After 'release', elems can be 'acquired' by next stage and/or dequeued
> + * (in case of last stage).
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring.h>
> +
> +/* upper 2 bits are used for status */
> +#define RTE_SORING_ST_BIT 30
> +
> +/* max possible number of elements in the soring */
> +#define RTE_SORING_ELEM_MAX (RTE_BIT32(RTE_SORING_ST_BIT) - 1)
> +
> +struct rte_soring_param {
> + /** expected name of the ring */
> + const char *name;
> + /** number of elemnts in the ring */
> + uint32_t elems;
> + /** size of elements in the ring, must be a multiple of 4 */
> + uint32_t esize;
> + /**
> + * size of retcode for each elem, must be a multiple of 4.
> + * This parameter defines a size of supplementary and optional
> + * array of ret-codes associated with each object in the soring.
> + * While element size is configurable (see @esize parameter above),
> + * so user can specify it big enough to hold both object and its
> + * ret-code together, for performance reasons it might be plausible
> + * to access them as separate arrays.
> + * Common usage scenario when such separation helps:
> + * enqueue() - writes to objects array
> + * acquire() - reads from objects array
> + * release() - writes to ret-code array
> + * dequeue() - reads both objects and ret-code array
> + */
> + uint32_t stsize;
> + /** number of stages in the ring */
> + uint32_t stages;
> + /** sync type for producer */
> + enum rte_ring_sync_type prod_synt;
> + /** sync type for consumer */
> + enum rte_ring_sync_type cons_synt;
> +};
> +
> +struct rte_soring;
> +
> +/**
> + * Calculate the memory size needed for a soring
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the expected parameters for it. This value is the sum of the size of
> + * the internal metadata and the size of the memory needed by the
> + * actual ring elements and theri rec-codes. The value is aligned to a cache
> + * line size.
> + *
> + * @param prm
> + * Pointer to the structure that contains soring creation paramers.
> + * @return
> + * - The memory size needed for the soring on success.
> + * - -EINVAL if provided paramer values are invalid.
> + */
> +__rte_experimental
> +ssize_t
> +rte_soring_get_memsize(const struct rte_soring_param *prm);
> +
> +/**
> + * Initialize a soring structure.
> + *
> + * Initialize a soring structure in memory pointed by "r".
> + * The size of the memory area must be large enough to store the soring
> + * internal structures plus the objects and ret-code tables.
> + * It is strongly advised to use rte_ring_get_memsize() to get the
> + * appropriate size.
> + *
> + * @param r
> + * Pointer to the soring structure.
> + * @param prm
> + * Pointer to the structure that contains soring creation paramers.
> + * @return
> + * - 0 on success, or a negative error code.
> + */
> +__rte_experimental
> +int
> +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm);
> +
> +/**
> + * Return the total number of filled entries in a ring.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @return
> + * The number of entries in the ring.
> + */
> +__rte_experimental
> +unsigned int
> +rte_soring_count(const struct rte_soring *r);
> +
> +/**
> + * Enqueue several objects on the ring.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param obj_table
> + * A pointer to an array of objects to enqueue.
> + * Size of objects to enqueue must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to an array of status values for each object to enqueue.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then @objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + * Behavior type, one of:
> + * - RTE_RING_QUEUE_FIXED: enqueue either exactly @n objects or none.
> + * - RTE_RING_QUEUE_VARIABLE: enqueue up to @n objects.
> + * @param free_space
> + * if non-NULL, returns the amount of space in the ring after the
> + * enqueue operation has finished.
> + * @return
> + * - Actual number of objects enqueued.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
> + const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
> + uint32_t *free_space);
> +
> +/**
> + * Dequeue several objects from the ring.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param obj_table
> + * A pointer to an array of objects to dequeue.
> + * Size of objects to enqueue must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to array of status values for each object to dequeue.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then @objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param num
> + * The number of objects to dequeue from the ring into the obj_table.
> + * @param behavior
> + * Behavior type, one of:
> + * - RTE_RING_QUEUE_FIXED: dequeue either exactly @n objects or none.
> + * - RTE_RING_QUEUE_VARIABLE: dequeue up to @n objects.
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries after the
> + * dequeue has finished.
> + * @return
> + * - Actual number of objects dequeued.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
> + uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *available);
> +
> +/**
> + * Acquire several objects from the ring for given stage.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param objs
> + * A pointer to an array of objects to acquire.
> + * Size of objects must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to an array of status values for each for each acquired object.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then @objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param stage
> + * Stage to acquire objects for.
> + * @param num
> + * The number of objects to acquire.
> + * @param behavior
> + * Behavior type, one of:
> + * - RTE_RING_QUEUE_FIXED: acquire either exactly @n objects or none.
> + * - RTE_RING_QUEUE_VARIABLE: acquire up to @n objects.
> + * @param ftoken
> + * Pointer to the opaque 'token' value used by release() op.
> + * User has to store this value somewhere, and later provide to the
> + * release().
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries for given stage
> + * after the acquire has finished.
> + * @return
> + * - Actual number of objects acquired.
> + */
> +__rte_experimental
> +uint32_t
> +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
> + uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *ftoken, uint32_t *available);
> +
> +/**
> + * Release several objects for given stage back to the ring.
> + * Note that it means these objects become avaialble for next stage or
> + * dequeue.
> + *
> + * @param r
> + * A pointer to the soring structure.
> + * @param objs
> + * A pointer to an array of objects to relase.
> + * Note that unless user needs to overwrite ring objects this parameter
> + * can be NULL.
> + * Size of objects must be the same value as @esize parameter
> + * used while creating the ring. Otherwise the results are undefined.
> + * @param objst
> + * A pointer to an array of status values for each object to release.
> + * Note that if user not using object status values, then this parameter
> + * can be NULL.
> + * Size of elements in this array must be the same value as @stsize parameter
> + * used while creating the ring. If user created the soring with
> + * @stsize value equals zero, then objst parameter should be NULL.
> + * Otherwise the results are undefined.
> + * @param stage
> + * Current stage.
> + * @param n
> + * The number of objects to release.
> + * Has to be the same value as returned by acquire() op.
> + * @param ftoken
> + * Opaque 'token' value obtained from acquire() op.
> + * @return
> + * - None.
> + */
> +__rte_experimental
> +void
> +rte_soring_release(struct rte_soring *r, const void *objs,
> + const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_SORING_H_ */
> diff --git a/lib/ring/soring.c b/lib/ring/soring.c
> new file mode 100644
> index 0000000000..929bde9697
> --- /dev/null
> +++ b/lib/ring/soring.c
> @@ -0,0 +1,431 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +/**
> + * @file
> + * This file contains implementation of SORING 'datapath' functions.
> + * Brief description:
> + * ==================
> + * enqueue/dequeue works the same as for conventional rte_ring:
> + * any rte_ring sync types can be used, etc.
> + * Plus there could be multiple 'stages'.
> + * For each stage there is an acquire (start) and release (finish) operation.
> + * After some elems are 'acquired' - user can safely assume that he has
> + * exclusive possession of these elems till 'release' for them is done.
> + * Note that right now user has to release exactly the same number of elems
> + * he acquired before.
> + * After 'release', elems can be 'acquired' by next stage and/or dequeued
> + * (in case of last stage).
> + * Internal structure:
> + * ===================
> + * In addition to 'normal' ring of elems, it also has a ring of states of the
> + * same size. Each state[] corresponds to exactly one elem[].
> + * state[] will be used by acquire/release/dequeue functions to store internal
> + * information and should not be accessed by the user directly.
> + * How it works:
> + * =============
> + * 'acquire()' just moves stage's head (same as rte_ring move_head does),
> + * plus it saves in state[stage.cur_head] information about how many elems
> + * were acquired, current head position and special flag value to indicate
> + * that elems are acquired (RTE_SORING_ST_START).
> + * Note that 'acquire()' returns to the user a special 'ftoken' that user has
> + * to provide for 'release()' (in fact it is just a position for current head).
> + * 'release()' extracts old head value from provided ftoken and checks that
> + * corresponding 'state[]' contains expected values(mostly for sanity
> + * purposes).
> + * * Then it marks this state[] with 'RTE_SORING_ST_FINISH' flag to indicate
> + * that given subset of objects was released.
> + * After that, it checks does old head value equals to current tail value?
> + * If yes, then it performs 'finalize()' operation, otherwise 'release()'
> + * just returns (without spinning on stage tail value).
> + * As updated state[] is shared by all threads, some other thread can do
> + * 'finalize()' for given stage.
> + * That allows 'release()' to avoid excessive waits on the tail value.
> + * Main purpose of 'finalize()' operation is to walk through 'state[]'
> + * from current stage tail up to its head, check state[] and move stage tail
> + * through elements that already are in RTE_SORING_ST_FINISH state.
> + * Along with that, corresponding state[] values are reset to zero.
> + * Note that 'finalize()' for given stage can be done from multiple places:
> + * 'release()' for that stage or from 'acquire()' for next stage
> + * even from consumer's 'dequeue()' - in case given stage is the last one.
> + * So 'finalize()' has to be MT-safe and inside it we have to
> + * guarantee that only one thread will update state[] and stage's tail values.
> + */
> +
> +#include "soring.h"
> +
> +/*
> + * Inline functions (fastpath) start here.
> + */
> +static __rte_always_inline uint32_t
> +__rte_soring_stage_finalize(struct soring_stage_headtail *sht,
> + union soring_state *rstate, uint32_t rmask, uint32_t maxn)
> +{
> + int32_t rc;
> + uint32_t head, i, idx, k, n, tail;
> + union soring_stage_tail nt, ot;
> + union soring_state st;
> +
> + /* try to grab exclusive right to update tail value */
> + ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
> + rte_memory_order_acquire);
> +
> + /* other thread already finalizing it for us */
> + if (ot.sync != 0)
> + return 0;
> +
> + nt.pos = ot.pos;
> + nt.sync = 1;
> + rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
> + &ot.raw, nt.raw, rte_memory_order_acquire,
> + rte_memory_order_relaxed);
> +
> + /* other thread won the race */
> + if (rc == 0)
> + return 0;
> +
> + /*
> + * start with current tail and walk through states that are
> + * already finished.
> + */
> +
> + head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
> + n = RTE_MIN(head - ot.pos, maxn);
> + for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
> +
> + idx = tail & rmask;
> + st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
> + rte_memory_order_relaxed);
> + if ((st.stnum & RTE_SORING_ST_MASK) != RTE_SORING_ST_FINISH ||
> + st.ftoken != tail)
> + break;
> +
> + k = st.stnum & ~RTE_SORING_ST_MASK;
> + rte_atomic_store_explicit(&rstate[idx].raw, 0,
> + rte_memory_order_relaxed);
> + }
> +
> +
> + /* release exclusive right to update along with new tail value */
> + ot.pos = tail;
> + rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
> + rte_memory_order_release);
> +
> + return i;
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
> + enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
> + uint32_t *head, uint32_t *next, uint32_t *free)
> +{
> + uint32_t n;
> +
> + switch (st) {
> + case RTE_RING_SYNC_ST:
> + case RTE_RING_SYNC_MT:
> + n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
> + r->capacity, st, num, behavior, head, next, free);
> + break;
> + case RTE_RING_SYNC_MT_RTS:
> + n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
> + r->capacity, num, behavior, head, free);
> + *next = *head + n;
> + break;
> + case RTE_RING_SYNC_MT_HTS:
> + n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
> + r->capacity, num, behavior, head, free);
> + *next = *head + n;
> + break;
> + default:
> + /* unsupported mode, shouldn't be here */
> + RTE_ASSERT(0);
> + *free = 0;
> + n = 0;
> + }
> +
> + return n;
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
> + enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
> + uint32_t *head, uint32_t *next, uint32_t *avail)
> +{
> + uint32_t n;
> +
> + switch (st) {
> + case RTE_RING_SYNC_ST:
> + case RTE_RING_SYNC_MT:
> + n = __rte_ring_headtail_move_head(&r->cons.ht,
> + &r->stage[stage].ht, 0, st, num, behavior,
> + head, next, avail);
> + break;
> + case RTE_RING_SYNC_MT_RTS:
> + n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
> + 0, num, behavior, head, avail);
> + *next = *head + n;
> + break;
> + case RTE_RING_SYNC_MT_HTS:
> + n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
> + 0, num, behavior, head, avail);
> + *next = *head + n;
> + break;
> + default:
> + /* unsupported mode, shouldn't be here */
> + RTE_ASSERT(0);
> + *avail = 0;
> + n = 0;
> + }
> +
> + return n;
> +}
> +
> +static __rte_always_inline void
> +__rte_soring_update_tail(struct __rte_ring_headtail *rht,
> + enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
> +{
> + uint32_t n;
> +
> + switch (st) {
> + case RTE_RING_SYNC_ST:
> + case RTE_RING_SYNC_MT:
> + __rte_ring_update_tail(&rht->ht, head, next, st, enq);
> + break;
> + case RTE_RING_SYNC_MT_RTS:
> + __rte_ring_rts_update_tail(&rht->rts);
> + break;
> + case RTE_RING_SYNC_MT_HTS:
> + n = next - head;
> + __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
> + break;
> + default:
> + /* unsupported mode, shouldn't be here */
> + RTE_ASSERT(0);
> + }
> +}
> +
> +static __rte_always_inline uint32_t
> +__rte_soring_stage_move_head(struct soring_stage_headtail *d,
> + const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
> + enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
> +{
> + uint32_t n, tail;
> +
> + *old_head = rte_atomic_load_explicit(&d->head,
> + rte_memory_order_acquire);
> +
> + do {
> + n = num;
> + tail = rte_atomic_load_explicit(&s->tail,
> + rte_memory_order_relaxed);
> + *avail = capacity + tail - *old_head;
> + if (n > *avail)
> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
> + if (n == 0)
> + return 0;
> + *new_head = *old_head + n;
> + } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
> + old_head, *new_head, rte_memory_order_acquire,
> + rte_memory_order_acquire) == 0);
> +
> + return n;
> +}
> +
> +/*
> + * Public functions (data-path) start here.
> + */
> +
> +unsigned int
> +rte_soring_count(const struct rte_soring *r)
> +{
> + uint32_t prod_tail = r->prod.ht.tail;
> + uint32_t cons_tail = r->cons.ht.tail;
> + uint32_t count = (prod_tail - cons_tail) & r->mask;
> + return (count > r->capacity) ? r->capacity : count;
> +}
> +
> +uint32_t
> +rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
> + const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
> + uint32_t *free_space)
> +{
> + enum rte_ring_sync_type st;
> + uint32_t nb_free, prod_head, prod_next;
> +
> + RTE_ASSERT(r != NULL && r->nb_stage > 0);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + st = r->prod.ht.sync_type;
> +
> + n = __rte_soring_move_prod_head(r, n, behavior, st,
> + &prod_head, &prod_next, &nb_free);
> + if (n != 0) {
> + __rte_ring_do_enqueue_elems(&r[1], obj_table, r->size,
> + prod_head & r->mask, r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_enqueue_elems(r->elemst, objst, r->size,
> + prod_head & r->mask, r->stsize, n);
> + __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
> + }
> +
> + if (free_space != NULL)
> + *free_space = nb_free - n;
> + return n;
> +}
> +
> +uint32_t
> +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
> + uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *available)
> +{
> + enum rte_ring_sync_type st;
> + uint32_t entries, cons_head, cons_next, n, ns, reqn;
> +
> + RTE_ASSERT(r != NULL && r->nb_stage > 0);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + ns = r->nb_stage - 1;
> + st = r->cons.ht.sync_type;
> +
> + /* try to grab exactly @num elems first */
> + n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
> + &cons_head, &cons_next, &entries);
> + if (n == 0) {
> + /* try to finalize some elems from previous stage */
> + n = __rte_soring_stage_finalize(&r->stage[ns].sht,
> + r->state, r->mask, 2 * num);
> + entries += n;
> +
> + /* repeat attempt to grab elems */
> + reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
> + if (entries >= reqn)
> + n = __rte_soring_move_cons_head(r, ns, num, behavior,
> + st, &cons_head, &cons_next, &entries);
> + else
> + n = 0;
> + }
> +
> + /* we have some elems to consume */
> + if (n != 0) {
> + __rte_ring_do_dequeue_elems(obj_table, &r[1], r->size,
> + cons_head & r->mask, r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_dequeue_elems(objst, r->elemst, r->size,
> + cons_head & r->mask, r->stsize, n);
> + __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
> + }
> +
> + if (available != NULL)
> + *available = entries - n;
> + return n;
> +}
> +
> +uint32_t
> +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
> + uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
> + uint32_t *ftoken, uint32_t *available)
> +{
> + uint32_t avail, head, idx, n, next, reqn;
> + union soring_state st;
> + struct soring_stage *pstg;
> + struct soring_stage_headtail *cons;
> +
> + RTE_ASSERT(r != NULL && stage < r->nb_stage);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + cons = &r->stage[stage].sht;
> +
> + if (stage == 0)
> + n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
> + behavior, &head, &next, &avail);
> + else {
> + pstg = r->stage + stage - 1;
> +
> + /* try to grab exactly @num elems */
> + n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
> + RTE_RING_QUEUE_FIXED, &head, &next, &avail);
> + if (n == 0) {
> + /* try to finalize some elems from previous stage */
> + n = __rte_soring_stage_finalize(&pstg->sht, r->state,
> + r->mask, 2 * num);
> + avail += n;
> +
> + /* repeat attempt to grab elems */
> + reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
> + if (avail >= reqn)
> + n = __rte_soring_stage_move_head(cons,
> + &pstg->ht, 0, num, behavior, &head,
> + &next, &avail);
> + else
> + n = 0;
> + }
> + }
> +
> + if (n != 0) {
> +
> + idx = head & r->mask;
> +
> + /* copy elems that are ready for given stage */
> + __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
> + r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_dequeue_elems(objst, r->elemst,
> + r->size, idx, r->stsize, n);
> +
> + /* update status ring */
> + st.ftoken = head;
> + st.stnum = (RTE_SORING_ST_START | n);
> +
> + rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
> + rte_memory_order_relaxed);
> + *ftoken = head;
> + }
> +
> + if (available != NULL)
> + *available = avail - n;
> + return n;
> +}
> +
> +void
> +rte_soring_release(struct rte_soring *r, const void *objs,
> + const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken)
> +{
> + uint32_t idx, tail;
> + struct soring_stage *stg;
> + union soring_state st;
> +
> + RTE_ASSERT(r != NULL && stage < r->nb_stage);
> + RTE_ASSERT(objst == NULL || r->elemst != NULL);
> +
> + stg = r->stage + stage;
> + tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
> + rte_memory_order_relaxed);
> +
> + idx = ftoken & r->mask;
> + st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
> + rte_memory_order_acquire);
> +
> + /* check state ring contents */
> + RTE_VERIFY(st.stnum == (RTE_SORING_ST_START | n) &&
> + st.ftoken == ftoken);
> +
> + /* update contents of the ring, if necessary */
> + if (objs != NULL)
> + __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
> + r->esize, n);
> + if (objst != NULL)
> + __rte_ring_do_enqueue_elems(r->elemst, objst, r->size, idx,
> + r->stsize, n);
> +
> + /* set state to FINISH, make sure it is not reordered */
> + st.stnum = RTE_SORING_ST_FINISH | n;
> + rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
> + rte_memory_order_release);
> +
> + if (tail == ftoken)
> + __rte_soring_stage_finalize(&stg->sht, r->state, r->mask,
> + r->capacity);
> +}
> diff --git a/lib/ring/soring.h b/lib/ring/soring.h
> new file mode 100644
> index 0000000000..3a3f6efa76
> --- /dev/null
> +++ b/lib/ring/soring.h
> @@ -0,0 +1,124 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Huawei Technologies Co., Ltd
> + */
> +
> +#ifndef _SORING_H_
> +#define _SORING_H_
> +
> +/**
> + * @file
> + * This file contains internal strctures of RTE soring: Staged Ordered Ring.
> + * Sort of extension of conventional RTE ring.
> + * Internal structure:
> + * In addition to 'normal' ring of elems, it also has a ring of states of the
> + * same size. Each state[] corresponds to exactly one elem[].
> + * state[] will be used by acquire/release/dequeue functions to store internal
> + * information and should not be accessed by the user directly.
> + * For actual implementation details, please refer to soring.c
> + */
> +
> +#include <rte_soring.h>
> +
> +union soring_state {
> + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
> + struct {
> + RTE_ATOMIC(uint32_t) ftoken;
> + RTE_ATOMIC(uint32_t) stnum;
> + };
> +};
> +
> +/* upper 2 bits are used for status */
> +#define RTE_SORING_ST_START RTE_SHIFT_VAL32(1, RTE_SORING_ST_BIT)
> +#define RTE_SORING_ST_FINISH RTE_SHIFT_VAL32(2, RTE_SORING_ST_BIT)
> +
> +#define RTE_SORING_ST_MASK \
> + RTE_GENMASK32(sizeof(uint32_t) * CHAR_BIT - 1, RTE_SORING_ST_BIT)
> +
> +/**
> + * SORING re-uses rte_ring internal structures and implementation
> + * for enqueue/dequeue operations.
> + */
> +struct __rte_ring_headtail {
> +
> + union __rte_cache_aligned {
> + struct rte_ring_headtail ht;
> + struct rte_ring_hts_headtail hts;
> + struct rte_ring_rts_headtail rts;
> + };
> +
> + RTE_CACHE_GUARD;
> +};
> +
> +union soring_stage_tail {
> + /** raw 8B value to read/write *sync* and *pos* as one atomic op */
> + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
> + struct {
> + RTE_ATOMIC(uint32_t) sync;
> + RTE_ATOMIC(uint32_t) pos;
> + };
> +};
> +
> +struct soring_stage_headtail {
> + volatile union soring_stage_tail tail;
> + enum rte_ring_sync_type unused; /**< unused */
> + volatile RTE_ATOMIC(uint32_t) head;
> +};
> +
> +/**
> + * SORING stage head_tail structure is 'compatible' with rte_ring ones.
> + * rte_ring internal functions for moving producer/consumer head
> + * can work transparently with stage head_tail and visa-versa
> + * (no modifications required).
> + */
> +struct soring_stage {
> +
> + union __rte_cache_aligned {
> + struct rte_ring_headtail ht;
> + struct soring_stage_headtail sht;
> + };
> +
> + RTE_CACHE_GUARD;
> +};
> +
> +/**
> + * RTE soring internal structure.
> + * As with rte_ring actual elements array supposed to be located direclty
> + * after the rte_soring structure.
> + */
> +struct __rte_cache_aligned rte_soring {
> + uint32_t size; /**< Size of ring. */
> + uint32_t mask; /**< Mask (size-1) of ring. */
> + uint32_t capacity; /**< Usable size of ring */
> + uint32_t esize;
> + /**< size of elements in the ring, must be a multiple of 4*/
> + uint32_t stsize;
> + /**< size of status value for each elem, must be a multiple of 4 */
> +
> + /** Ring stages */
> + struct soring_stage *stage;
> + uint32_t nb_stage;
> +
> + /** Ring of states (one per element) */
> + union soring_state *state;
> +
> + /** Pointer to the buffer where status values for each elements
> + * are stored. This is supplementary and optional information that
> + * user can attach to each element of the ring.
> + * While it is possible to incorporate this information inside
> + * user-defined element, in many cases it is plausible to maintain it
> + * as a separate array (mainly for performance reasons).
> + */
> + void *elemst;
> +
> + RTE_CACHE_GUARD;
> +
> + /** Ring producer status. */
> + struct __rte_ring_headtail prod;
> +
> + /** Ring consumer status. */
> + struct __rte_ring_headtail cons;
> +
> + char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */
> +};
> +
> +#endif /* _SORING_H_ */
> diff --git a/lib/ring/version.map b/lib/ring/version.map
> index 8da094a69a..a1f95a500f 100644
> --- a/lib/ring/version.map
> +++ b/lib/ring/version.map
> @@ -14,3 +14,16 @@ DPDK_25 {
>
> local: *;
> };
> +
> +EXPERIMENTAL {
> + global:
> +
> + # added in 24.11
> + rte_soring_acquire;
> + rte_soring_count;
> + rte_soring_dequeue;
> + rte_soring_enqueue;
> + rte_soring_get_memsize;
> + rte_soring_init;
> + rte_soring_release;
> +};
> > Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues
> > with multiple processing 'stages'.
> > It is based on conventional DPDK rte_ring, re-uses many of its concepts,
> > and even substantial part of its code.
> > It can be viewed as an 'extension' of rte_ring functionality.
> > In particular, main SORING properties:
> > - circular ring buffer with fixed size objects
> > - producer, consumer plus multiple processing stages in the middle.
> > - allows to split objects processing into multiple stages.
> > - objects remain in the same ring while moving from one stage to the other,
> > initial order is preserved, no extra copying needed.
> > - preserves the ingress order of objects within the queue across multiple
> > stages, i.e.:
> > at the same stage multiple threads can process objects from the ring in
> > any order, but for the next stage objects will always appear in the
> > original order.
> > - each stage (and producer/consumer) can be served by single and/or
> > multiple threads.
> > - number of stages, size and number of objects in the ring are
> > configurable at ring initialization time.
> >
> > Data-path API provides four main operations:
> > - enqueue/dequeue works in the same manner as for conventional rte_ring,
> > all rte_ring synchronization types are supported.
> > - acquire/release - for each stage there is an acquire (start) and
> > release (finish) operation.
> > after some objects are 'acquired' - given thread can safely assume that
> > it has exclusive possession of these objects till 'release' for them is
> > invoked.
> > Note that right now user has to release exactly the same number of
> > objects that was acquired before.
> > After 'release', objects can be 'acquired' by next stage and/or dequeued
> > by the consumer (in case of last stage).
> >
> > Expected use-case: applications that uses pipeline model
> > (probably with multiple stages) for packet processing, when preserving
> > incoming packet order is important. I.E.: IPsec processing, etc.
> >
>
> How does SORING related to Eventdev?
So far there is no direct relation.
Though yes, DPDK eventdev framework also provides ‘ordered’ queue ability (along with other modes).
Again, as I mentioned in the cover-letter rte_soring uses similar concept as OPDL eventdev implementation.
One of the main aims with rte_soring was to introduce sort of extension to rte_ring,
while keeping its API and implementation as lightweight and generic as possible.
So it could be consumed by various apps that do use pipeline model,
but for whatever reason do not use (/plan to use) eventdev framework.
> Would it be feasible to reshape this into a SW event device?
I guess an opposite approach might work better - i.e. make some SW-based eventdev
Implementation to use rte_soring internally.
Though I didn't try to do that so far.
@@ -1,8 +1,8 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2017 Intel Corporation
-sources = files('rte_ring.c')
-headers = files('rte_ring.h')
+sources = files('rte_ring.c', 'rte_soring.c', 'soring.c')
+headers = files('rte_ring.h', 'rte_soring.h')
# most sub-headers are not for direct inclusion
indirect_headers += files (
'rte_ring_core.h',
new file mode 100644
@@ -0,0 +1,144 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include "soring.h"
+#include <rte_string_fns.h>
+
+RTE_LOG_REGISTER_DEFAULT(soring_logtype, INFO);
+#define RTE_LOGTYPE_SORING soring_logtype
+#define SORING_LOG(level, ...) \
+ RTE_LOG_LINE(level, SORING, "" __VA_ARGS__)
+
+static uint32_t
+soring_calc_elem_num(uint32_t count)
+{
+ return rte_align32pow2(count + 1);
+}
+
+static int
+soring_check_param(uint32_t esize, uint32_t stsize, uint32_t count,
+ uint32_t stages)
+{
+ if (stages == 0) {
+ SORING_LOG(ERR, "invalid number of stages: %u", stages);
+ return -EINVAL;
+ }
+
+ /* Check if element size is a multiple of 4B */
+ if (esize == 0 || esize % 4 != 0) {
+ SORING_LOG(ERR, "invalid element size: %u", esize);
+ return -EINVAL;
+ }
+
+ /* Check if ret-code size is a multiple of 4B */
+ if (stsize % 4 != 0) {
+ SORING_LOG(ERR, "invalid retcode size: %u", stsize);
+ return -EINVAL;
+ }
+
+ /* count must be a power of 2 */
+ if (rte_is_power_of_2(count) == 0 ||
+ (count > RTE_SORING_ELEM_MAX + 1)) {
+ SORING_LOG(ERR, "invalid number of elements: %u", count);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+/*
+ * Calculate size offsets for SORING internal data layout.
+ */
+static size_t
+soring_get_szofs(uint32_t esize, uint32_t stsize, uint32_t count,
+ uint32_t stages, size_t *elst_ofs, size_t *state_ofs,
+ size_t *stage_ofs)
+{
+ size_t sz;
+ const struct rte_soring * const r = NULL;
+
+ sz = sizeof(r[0]) + (size_t)count * esize;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ if (elst_ofs != NULL)
+ *elst_ofs = sz;
+
+ sz = sz + (size_t)count * stsize;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ if (state_ofs != NULL)
+ *state_ofs = sz;
+
+ sz += sizeof(r->state[0]) * count;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ if (stage_ofs != NULL)
+ *stage_ofs = sz;
+
+ sz += sizeof(r->stage[0]) * stages;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ return sz;
+}
+
+
+ssize_t
+rte_soring_get_memsize(const struct rte_soring_param *prm)
+{
+ int32_t rc;
+ uint32_t count;
+
+ count = soring_calc_elem_num(prm->elems);
+ rc = soring_check_param(prm->esize, prm->stsize, count, prm->stages);
+ if (rc != 0)
+ return rc;
+
+ return soring_get_szofs(prm->esize, prm->stsize, count, prm->stages,
+ NULL, NULL, NULL);
+}
+
+int
+rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm)
+{
+ int32_t rc;
+ uint32_t n;
+ size_t elst_ofs, stage_ofs, state_ofs;
+
+ if (r == NULL || prm == NULL)
+ return -EINVAL;
+
+ n = soring_calc_elem_num(prm->elems);
+ rc = soring_check_param(prm->esize, prm->stsize, n, prm->stages);
+ if (rc != 0)
+ return rc;
+
+ soring_get_szofs(prm->esize, prm->stsize, n, prm->stages, &elst_ofs,
+ &state_ofs, &stage_ofs);
+
+ memset(r, 0, sizeof(*r));
+ rc = strlcpy(r->name, prm->name, sizeof(r->name));
+ if (rc < 0 || rc >= (int)sizeof(r->name))
+ return -ENAMETOOLONG;
+
+ r->size = n;
+ r->mask = r->size - 1;
+ r->capacity = prm->elems;
+ r->esize = prm->esize;
+ r->stsize = prm->stsize;
+
+ r->prod.ht.sync_type = prm->prod_synt;
+ r->cons.ht.sync_type = prm->cons_synt;
+
+ r->state = (union soring_state *)((uintptr_t)r + state_ofs);
+ memset(r->state, 0, sizeof(r->state[0]) * r->size);
+
+ r->stage = (struct soring_stage *)((uintptr_t)r + stage_ofs);
+ r->nb_stage = prm->stages;
+ memset(r->stage, 0, r->nb_stage * sizeof(r->stage[0]));
+
+ if (r->stsize != 0)
+ r->elemst = (void *)((uintptr_t)r + elst_ofs);
+
+ return 0;
+}
new file mode 100644
@@ -0,0 +1,270 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#ifndef _RTE_SORING_H_
+#define _RTE_SORING_H_
+
+/**
+ * @file
+ * This file contains definition of RTE soring (Staged Ordered Ring) public API.
+ * Brief description:
+ * enqueue/dequeue works the same as for conventional rte_ring:
+ * any rte_ring sync types can be used, etc.
+ * Plus there could be multiple 'stages'.
+ * For each stage there is an acquire (start) and release (finish) operation.
+ * after some elems are 'acquired' - user can safely assume that he has
+ * exclusive possession of these elems till 'release' for them is done.
+ * Note that right now user has to release exactly the same number of elems
+ * he acquired before.
+ * After 'release', elems can be 'acquired' by next stage and/or dequeued
+ * (in case of last stage).
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_ring.h>
+
+/* upper 2 bits are used for status */
+#define RTE_SORING_ST_BIT 30
+
+/* max possible number of elements in the soring */
+#define RTE_SORING_ELEM_MAX (RTE_BIT32(RTE_SORING_ST_BIT) - 1)
+
+struct rte_soring_param {
+ /** expected name of the ring */
+ const char *name;
+ /** number of elemnts in the ring */
+ uint32_t elems;
+ /** size of elements in the ring, must be a multiple of 4 */
+ uint32_t esize;
+ /**
+ * size of retcode for each elem, must be a multiple of 4.
+ * This parameter defines a size of supplementary and optional
+ * array of ret-codes associated with each object in the soring.
+ * While element size is configurable (see @esize parameter above),
+ * so user can specify it big enough to hold both object and its
+ * ret-code together, for performance reasons it might be plausible
+ * to access them as separate arrays.
+ * Common usage scenario when such separation helps:
+ * enqueue() - writes to objects array
+ * acquire() - reads from objects array
+ * release() - writes to ret-code array
+ * dequeue() - reads both objects and ret-code array
+ */
+ uint32_t stsize;
+ /** number of stages in the ring */
+ uint32_t stages;
+ /** sync type for producer */
+ enum rte_ring_sync_type prod_synt;
+ /** sync type for consumer */
+ enum rte_ring_sync_type cons_synt;
+};
+
+struct rte_soring;
+
+/**
+ * Calculate the memory size needed for a soring
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the expected parameters for it. This value is the sum of the size of
+ * the internal metadata and the size of the memory needed by the
+ * actual ring elements and theri rec-codes. The value is aligned to a cache
+ * line size.
+ *
+ * @param prm
+ * Pointer to the structure that contains soring creation paramers.
+ * @return
+ * - The memory size needed for the soring on success.
+ * - -EINVAL if provided paramer values are invalid.
+ */
+__rte_experimental
+ssize_t
+rte_soring_get_memsize(const struct rte_soring_param *prm);
+
+/**
+ * Initialize a soring structure.
+ *
+ * Initialize a soring structure in memory pointed by "r".
+ * The size of the memory area must be large enough to store the soring
+ * internal structures plus the objects and ret-code tables.
+ * It is strongly advised to use rte_ring_get_memsize() to get the
+ * appropriate size.
+ *
+ * @param r
+ * Pointer to the soring structure.
+ * @param prm
+ * Pointer to the structure that contains soring creation paramers.
+ * @return
+ * - 0 on success, or a negative error code.
+ */
+__rte_experimental
+int
+rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm);
+
+/**
+ * Return the total number of filled entries in a ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @return
+ * The number of entries in the ring.
+ */
+__rte_experimental
+unsigned int
+rte_soring_count(const struct rte_soring *r);
+
+/**
+ * Enqueue several objects on the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param obj_table
+ * A pointer to an array of objects to enqueue.
+ * Size of objects to enqueue must be the same value as @esize parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param objst
+ * A pointer to an array of status values for each object to enqueue.
+ * Note that if user not using object status values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @stsize parameter
+ * used while creating the ring. If user created the soring with
+ * @stsize value equals zero, then @objst parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * Behavior type, one of:
+ * - RTE_RING_QUEUE_FIXED: enqueue either exactly @n objects or none.
+ * - RTE_RING_QUEUE_VARIABLE: enqueue up to @n objects.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - Actual number of objects enqueued.
+ */
+__rte_experimental
+uint32_t
+rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
+ const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
+ uint32_t *free_space);
+
+/**
+ * Dequeue several objects from the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param obj_table
+ * A pointer to an array of objects to dequeue.
+ * Size of objects to enqueue must be the same value as @esize parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param objst
+ * A pointer to array of status values for each object to dequeue.
+ * Note that if user not using object status values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @stsize parameter
+ * used while creating the ring. If user created the soring with
+ * @stsize value equals zero, then @objst parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param num
+ * The number of objects to dequeue from the ring into the obj_table.
+ * @param behavior
+ * Behavior type, one of:
+ * - RTE_RING_QUEUE_FIXED: dequeue either exactly @n objects or none.
+ * - RTE_RING_QUEUE_VARIABLE: dequeue up to @n objects.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Actual number of objects dequeued.
+ */
+__rte_experimental
+uint32_t
+rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
+ uint32_t num, enum rte_ring_queue_behavior behavior,
+ uint32_t *available);
+
+/**
+ * Acquire several objects from the ring for given stage.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to acquire.
+ * Size of objects must be the same value as @esize parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param objst
+ * A pointer to an array of status values for each for each acquired object.
+ * Note that if user not using object status values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @stsize parameter
+ * used while creating the ring. If user created the soring with
+ * @stsize value equals zero, then @objst parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param stage
+ * Stage to acquire objects for.
+ * @param num
+ * The number of objects to acquire.
+ * @param behavior
+ * Behavior type, one of:
+ * - RTE_RING_QUEUE_FIXED: acquire either exactly @n objects or none.
+ * - RTE_RING_QUEUE_VARIABLE: acquire up to @n objects.
+ * @param ftoken
+ * Pointer to the opaque 'token' value used by release() op.
+ * User has to store this value somewhere, and later provide to the
+ * release().
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries for given stage
+ * after the acquire has finished.
+ * @return
+ * - Actual number of objects acquired.
+ */
+__rte_experimental
+uint32_t
+rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
+ uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
+ uint32_t *ftoken, uint32_t *available);
+
+/**
+ * Release several objects for given stage back to the ring.
+ * Note that it means these objects become avaialble for next stage or
+ * dequeue.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to relase.
+ * Note that unless user needs to overwrite ring objects this parameter
+ * can be NULL.
+ * Size of objects must be the same value as @esize parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param objst
+ * A pointer to an array of status values for each object to release.
+ * Note that if user not using object status values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @stsize parameter
+ * used while creating the ring. If user created the soring with
+ * @stsize value equals zero, then objst parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param stage
+ * Current stage.
+ * @param n
+ * The number of objects to release.
+ * Has to be the same value as returned by acquire() op.
+ * @param ftoken
+ * Opaque 'token' value obtained from acquire() op.
+ * @return
+ * - None.
+ */
+__rte_experimental
+void
+rte_soring_release(struct rte_soring *r, const void *objs,
+ const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_SORING_H_ */
new file mode 100644
@@ -0,0 +1,431 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+/**
+ * @file
+ * This file contains implementation of SORING 'datapath' functions.
+ * Brief description:
+ * ==================
+ * enqueue/dequeue works the same as for conventional rte_ring:
+ * any rte_ring sync types can be used, etc.
+ * Plus there could be multiple 'stages'.
+ * For each stage there is an acquire (start) and release (finish) operation.
+ * After some elems are 'acquired' - user can safely assume that he has
+ * exclusive possession of these elems till 'release' for them is done.
+ * Note that right now user has to release exactly the same number of elems
+ * he acquired before.
+ * After 'release', elems can be 'acquired' by next stage and/or dequeued
+ * (in case of last stage).
+ * Internal structure:
+ * ===================
+ * In addition to 'normal' ring of elems, it also has a ring of states of the
+ * same size. Each state[] corresponds to exactly one elem[].
+ * state[] will be used by acquire/release/dequeue functions to store internal
+ * information and should not be accessed by the user directly.
+ * How it works:
+ * =============
+ * 'acquire()' just moves stage's head (same as rte_ring move_head does),
+ * plus it saves in state[stage.cur_head] information about how many elems
+ * were acquired, current head position and special flag value to indicate
+ * that elems are acquired (RTE_SORING_ST_START).
+ * Note that 'acquire()' returns to the user a special 'ftoken' that user has
+ * to provide for 'release()' (in fact it is just a position for current head).
+ * 'release()' extracts old head value from provided ftoken and checks that
+ * corresponding 'state[]' contains expected values(mostly for sanity
+ * purposes).
+ * * Then it marks this state[] with 'RTE_SORING_ST_FINISH' flag to indicate
+ * that given subset of objects was released.
+ * After that, it checks does old head value equals to current tail value?
+ * If yes, then it performs 'finalize()' operation, otherwise 'release()'
+ * just returns (without spinning on stage tail value).
+ * As updated state[] is shared by all threads, some other thread can do
+ * 'finalize()' for given stage.
+ * That allows 'release()' to avoid excessive waits on the tail value.
+ * Main purpose of 'finalize()' operation is to walk through 'state[]'
+ * from current stage tail up to its head, check state[] and move stage tail
+ * through elements that already are in RTE_SORING_ST_FINISH state.
+ * Along with that, corresponding state[] values are reset to zero.
+ * Note that 'finalize()' for given stage can be done from multiple places:
+ * 'release()' for that stage or from 'acquire()' for next stage
+ * even from consumer's 'dequeue()' - in case given stage is the last one.
+ * So 'finalize()' has to be MT-safe and inside it we have to
+ * guarantee that only one thread will update state[] and stage's tail values.
+ */
+
+#include "soring.h"
+
+/*
+ * Inline functions (fastpath) start here.
+ */
+static __rte_always_inline uint32_t
+__rte_soring_stage_finalize(struct soring_stage_headtail *sht,
+ union soring_state *rstate, uint32_t rmask, uint32_t maxn)
+{
+ int32_t rc;
+ uint32_t head, i, idx, k, n, tail;
+ union soring_stage_tail nt, ot;
+ union soring_state st;
+
+ /* try to grab exclusive right to update tail value */
+ ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
+ rte_memory_order_acquire);
+
+ /* other thread already finalizing it for us */
+ if (ot.sync != 0)
+ return 0;
+
+ nt.pos = ot.pos;
+ nt.sync = 1;
+ rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
+ &ot.raw, nt.raw, rte_memory_order_acquire,
+ rte_memory_order_relaxed);
+
+ /* other thread won the race */
+ if (rc == 0)
+ return 0;
+
+ /*
+ * start with current tail and walk through states that are
+ * already finished.
+ */
+
+ head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
+ n = RTE_MIN(head - ot.pos, maxn);
+ for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
+
+ idx = tail & rmask;
+ st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
+ rte_memory_order_relaxed);
+ if ((st.stnum & RTE_SORING_ST_MASK) != RTE_SORING_ST_FINISH ||
+ st.ftoken != tail)
+ break;
+
+ k = st.stnum & ~RTE_SORING_ST_MASK;
+ rte_atomic_store_explicit(&rstate[idx].raw, 0,
+ rte_memory_order_relaxed);
+ }
+
+
+ /* release exclusive right to update along with new tail value */
+ ot.pos = tail;
+ rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
+ rte_memory_order_release);
+
+ return i;
+}
+
+static __rte_always_inline uint32_t
+__rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
+ enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
+ uint32_t *head, uint32_t *next, uint32_t *free)
+{
+ uint32_t n;
+
+ switch (st) {
+ case RTE_RING_SYNC_ST:
+ case RTE_RING_SYNC_MT:
+ n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
+ r->capacity, st, num, behavior, head, next, free);
+ break;
+ case RTE_RING_SYNC_MT_RTS:
+ n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
+ r->capacity, num, behavior, head, free);
+ *next = *head + n;
+ break;
+ case RTE_RING_SYNC_MT_HTS:
+ n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
+ r->capacity, num, behavior, head, free);
+ *next = *head + n;
+ break;
+ default:
+ /* unsupported mode, shouldn't be here */
+ RTE_ASSERT(0);
+ *free = 0;
+ n = 0;
+ }
+
+ return n;
+}
+
+static __rte_always_inline uint32_t
+__rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
+ enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
+ uint32_t *head, uint32_t *next, uint32_t *avail)
+{
+ uint32_t n;
+
+ switch (st) {
+ case RTE_RING_SYNC_ST:
+ case RTE_RING_SYNC_MT:
+ n = __rte_ring_headtail_move_head(&r->cons.ht,
+ &r->stage[stage].ht, 0, st, num, behavior,
+ head, next, avail);
+ break;
+ case RTE_RING_SYNC_MT_RTS:
+ n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
+ 0, num, behavior, head, avail);
+ *next = *head + n;
+ break;
+ case RTE_RING_SYNC_MT_HTS:
+ n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
+ 0, num, behavior, head, avail);
+ *next = *head + n;
+ break;
+ default:
+ /* unsupported mode, shouldn't be here */
+ RTE_ASSERT(0);
+ *avail = 0;
+ n = 0;
+ }
+
+ return n;
+}
+
+static __rte_always_inline void
+__rte_soring_update_tail(struct __rte_ring_headtail *rht,
+ enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
+{
+ uint32_t n;
+
+ switch (st) {
+ case RTE_RING_SYNC_ST:
+ case RTE_RING_SYNC_MT:
+ __rte_ring_update_tail(&rht->ht, head, next, st, enq);
+ break;
+ case RTE_RING_SYNC_MT_RTS:
+ __rte_ring_rts_update_tail(&rht->rts);
+ break;
+ case RTE_RING_SYNC_MT_HTS:
+ n = next - head;
+ __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
+ break;
+ default:
+ /* unsupported mode, shouldn't be here */
+ RTE_ASSERT(0);
+ }
+}
+
+static __rte_always_inline uint32_t
+__rte_soring_stage_move_head(struct soring_stage_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
+{
+ uint32_t n, tail;
+
+ *old_head = rte_atomic_load_explicit(&d->head,
+ rte_memory_order_acquire);
+
+ do {
+ n = num;
+ tail = rte_atomic_load_explicit(&s->tail,
+ rte_memory_order_relaxed);
+ *avail = capacity + tail - *old_head;
+ if (n > *avail)
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
+ if (n == 0)
+ return 0;
+ *new_head = *old_head + n;
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
+ old_head, *new_head, rte_memory_order_acquire,
+ rte_memory_order_acquire) == 0);
+
+ return n;
+}
+
+/*
+ * Public functions (data-path) start here.
+ */
+
+unsigned int
+rte_soring_count(const struct rte_soring *r)
+{
+ uint32_t prod_tail = r->prod.ht.tail;
+ uint32_t cons_tail = r->cons.ht.tail;
+ uint32_t count = (prod_tail - cons_tail) & r->mask;
+ return (count > r->capacity) ? r->capacity : count;
+}
+
+uint32_t
+rte_soring_enqueue(struct rte_soring *r, const void *obj_table,
+ const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior,
+ uint32_t *free_space)
+{
+ enum rte_ring_sync_type st;
+ uint32_t nb_free, prod_head, prod_next;
+
+ RTE_ASSERT(r != NULL && r->nb_stage > 0);
+ RTE_ASSERT(objst == NULL || r->elemst != NULL);
+
+ st = r->prod.ht.sync_type;
+
+ n = __rte_soring_move_prod_head(r, n, behavior, st,
+ &prod_head, &prod_next, &nb_free);
+ if (n != 0) {
+ __rte_ring_do_enqueue_elems(&r[1], obj_table, r->size,
+ prod_head & r->mask, r->esize, n);
+ if (objst != NULL)
+ __rte_ring_do_enqueue_elems(r->elemst, objst, r->size,
+ prod_head & r->mask, r->stsize, n);
+ __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
+ }
+
+ if (free_space != NULL)
+ *free_space = nb_free - n;
+ return n;
+}
+
+uint32_t
+rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst,
+ uint32_t num, enum rte_ring_queue_behavior behavior,
+ uint32_t *available)
+{
+ enum rte_ring_sync_type st;
+ uint32_t entries, cons_head, cons_next, n, ns, reqn;
+
+ RTE_ASSERT(r != NULL && r->nb_stage > 0);
+ RTE_ASSERT(objst == NULL || r->elemst != NULL);
+
+ ns = r->nb_stage - 1;
+ st = r->cons.ht.sync_type;
+
+ /* try to grab exactly @num elems first */
+ n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
+ &cons_head, &cons_next, &entries);
+ if (n == 0) {
+ /* try to finalize some elems from previous stage */
+ n = __rte_soring_stage_finalize(&r->stage[ns].sht,
+ r->state, r->mask, 2 * num);
+ entries += n;
+
+ /* repeat attempt to grab elems */
+ reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
+ if (entries >= reqn)
+ n = __rte_soring_move_cons_head(r, ns, num, behavior,
+ st, &cons_head, &cons_next, &entries);
+ else
+ n = 0;
+ }
+
+ /* we have some elems to consume */
+ if (n != 0) {
+ __rte_ring_do_dequeue_elems(obj_table, &r[1], r->size,
+ cons_head & r->mask, r->esize, n);
+ if (objst != NULL)
+ __rte_ring_do_dequeue_elems(objst, r->elemst, r->size,
+ cons_head & r->mask, r->stsize, n);
+ __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
+ }
+
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+uint32_t
+rte_soring_acquire(struct rte_soring *r, void *objs, void *objst,
+ uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
+ uint32_t *ftoken, uint32_t *available)
+{
+ uint32_t avail, head, idx, n, next, reqn;
+ union soring_state st;
+ struct soring_stage *pstg;
+ struct soring_stage_headtail *cons;
+
+ RTE_ASSERT(r != NULL && stage < r->nb_stage);
+ RTE_ASSERT(objst == NULL || r->elemst != NULL);
+
+ cons = &r->stage[stage].sht;
+
+ if (stage == 0)
+ n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
+ behavior, &head, &next, &avail);
+ else {
+ pstg = r->stage + stage - 1;
+
+ /* try to grab exactly @num elems */
+ n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
+ RTE_RING_QUEUE_FIXED, &head, &next, &avail);
+ if (n == 0) {
+ /* try to finalize some elems from previous stage */
+ n = __rte_soring_stage_finalize(&pstg->sht, r->state,
+ r->mask, 2 * num);
+ avail += n;
+
+ /* repeat attempt to grab elems */
+ reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
+ if (avail >= reqn)
+ n = __rte_soring_stage_move_head(cons,
+ &pstg->ht, 0, num, behavior, &head,
+ &next, &avail);
+ else
+ n = 0;
+ }
+ }
+
+ if (n != 0) {
+
+ idx = head & r->mask;
+
+ /* copy elems that are ready for given stage */
+ __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
+ r->esize, n);
+ if (objst != NULL)
+ __rte_ring_do_dequeue_elems(objst, r->elemst,
+ r->size, idx, r->stsize, n);
+
+ /* update status ring */
+ st.ftoken = head;
+ st.stnum = (RTE_SORING_ST_START | n);
+
+ rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
+ rte_memory_order_relaxed);
+ *ftoken = head;
+ }
+
+ if (available != NULL)
+ *available = avail - n;
+ return n;
+}
+
+void
+rte_soring_release(struct rte_soring *r, const void *objs,
+ const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken)
+{
+ uint32_t idx, tail;
+ struct soring_stage *stg;
+ union soring_state st;
+
+ RTE_ASSERT(r != NULL && stage < r->nb_stage);
+ RTE_ASSERT(objst == NULL || r->elemst != NULL);
+
+ stg = r->stage + stage;
+ tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
+ rte_memory_order_relaxed);
+
+ idx = ftoken & r->mask;
+ st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
+ rte_memory_order_acquire);
+
+ /* check state ring contents */
+ RTE_VERIFY(st.stnum == (RTE_SORING_ST_START | n) &&
+ st.ftoken == ftoken);
+
+ /* update contents of the ring, if necessary */
+ if (objs != NULL)
+ __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
+ r->esize, n);
+ if (objst != NULL)
+ __rte_ring_do_enqueue_elems(r->elemst, objst, r->size, idx,
+ r->stsize, n);
+
+ /* set state to FINISH, make sure it is not reordered */
+ st.stnum = RTE_SORING_ST_FINISH | n;
+ rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
+ rte_memory_order_release);
+
+ if (tail == ftoken)
+ __rte_soring_stage_finalize(&stg->sht, r->state, r->mask,
+ r->capacity);
+}
new file mode 100644
@@ -0,0 +1,124 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#ifndef _SORING_H_
+#define _SORING_H_
+
+/**
+ * @file
+ * This file contains internal strctures of RTE soring: Staged Ordered Ring.
+ * Sort of extension of conventional RTE ring.
+ * Internal structure:
+ * In addition to 'normal' ring of elems, it also has a ring of states of the
+ * same size. Each state[] corresponds to exactly one elem[].
+ * state[] will be used by acquire/release/dequeue functions to store internal
+ * information and should not be accessed by the user directly.
+ * For actual implementation details, please refer to soring.c
+ */
+
+#include <rte_soring.h>
+
+union soring_state {
+ alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+ struct {
+ RTE_ATOMIC(uint32_t) ftoken;
+ RTE_ATOMIC(uint32_t) stnum;
+ };
+};
+
+/* upper 2 bits are used for status */
+#define RTE_SORING_ST_START RTE_SHIFT_VAL32(1, RTE_SORING_ST_BIT)
+#define RTE_SORING_ST_FINISH RTE_SHIFT_VAL32(2, RTE_SORING_ST_BIT)
+
+#define RTE_SORING_ST_MASK \
+ RTE_GENMASK32(sizeof(uint32_t) * CHAR_BIT - 1, RTE_SORING_ST_BIT)
+
+/**
+ * SORING re-uses rte_ring internal structures and implementation
+ * for enqueue/dequeue operations.
+ */
+struct __rte_ring_headtail {
+
+ union __rte_cache_aligned {
+ struct rte_ring_headtail ht;
+ struct rte_ring_hts_headtail hts;
+ struct rte_ring_rts_headtail rts;
+ };
+
+ RTE_CACHE_GUARD;
+};
+
+union soring_stage_tail {
+ /** raw 8B value to read/write *sync* and *pos* as one atomic op */
+ alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+ struct {
+ RTE_ATOMIC(uint32_t) sync;
+ RTE_ATOMIC(uint32_t) pos;
+ };
+};
+
+struct soring_stage_headtail {
+ volatile union soring_stage_tail tail;
+ enum rte_ring_sync_type unused; /**< unused */
+ volatile RTE_ATOMIC(uint32_t) head;
+};
+
+/**
+ * SORING stage head_tail structure is 'compatible' with rte_ring ones.
+ * rte_ring internal functions for moving producer/consumer head
+ * can work transparently with stage head_tail and visa-versa
+ * (no modifications required).
+ */
+struct soring_stage {
+
+ union __rte_cache_aligned {
+ struct rte_ring_headtail ht;
+ struct soring_stage_headtail sht;
+ };
+
+ RTE_CACHE_GUARD;
+};
+
+/**
+ * RTE soring internal structure.
+ * As with rte_ring actual elements array supposed to be located direclty
+ * after the rte_soring structure.
+ */
+struct __rte_cache_aligned rte_soring {
+ uint32_t size; /**< Size of ring. */
+ uint32_t mask; /**< Mask (size-1) of ring. */
+ uint32_t capacity; /**< Usable size of ring */
+ uint32_t esize;
+ /**< size of elements in the ring, must be a multiple of 4*/
+ uint32_t stsize;
+ /**< size of status value for each elem, must be a multiple of 4 */
+
+ /** Ring stages */
+ struct soring_stage *stage;
+ uint32_t nb_stage;
+
+ /** Ring of states (one per element) */
+ union soring_state *state;
+
+ /** Pointer to the buffer where status values for each elements
+ * are stored. This is supplementary and optional information that
+ * user can attach to each element of the ring.
+ * While it is possible to incorporate this information inside
+ * user-defined element, in many cases it is plausible to maintain it
+ * as a separate array (mainly for performance reasons).
+ */
+ void *elemst;
+
+ RTE_CACHE_GUARD;
+
+ /** Ring producer status. */
+ struct __rte_ring_headtail prod;
+
+ /** Ring consumer status. */
+ struct __rte_ring_headtail cons;
+
+ char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */
+};
+
+#endif /* _SORING_H_ */
@@ -14,3 +14,16 @@ DPDK_25 {
local: *;
};
+
+EXPERIMENTAL {
+ global:
+
+ # added in 24.11
+ rte_soring_acquire;
+ rte_soring_count;
+ rte_soring_dequeue;
+ rte_soring_enqueue;
+ rte_soring_get_memsize;
+ rte_soring_init;
+ rte_soring_release;
+};