@@ -1,8 +1,8 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2017 Intel Corporation
-sources = files('rte_ring.c')
-headers = files('rte_ring.h')
+sources = files('rte_ring.c', 'rte_soring.c', 'soring.c')
+headers = files('rte_ring.h', 'rte_soring.h')
# most sub-headers are not for direct inclusion
indirect_headers += files (
'rte_ring_core.h',
new file mode 100644
@@ -0,0 +1,182 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#include <inttypes.h>
+
+#include "soring.h"
+#include <rte_string_fns.h>
+
+RTE_LOG_REGISTER_DEFAULT(soring_logtype, INFO);
+#define RTE_LOGTYPE_SORING soring_logtype
+#define SORING_LOG(level, ...) \
+ RTE_LOG_LINE(level, SORING, "" __VA_ARGS__)
+
+static uint32_t
+soring_calc_elem_num(uint32_t count)
+{
+ return rte_align32pow2(count + 1);
+}
+
+static int
+soring_check_param(uint32_t esize, uint32_t stsize, uint32_t count,
+ uint32_t stages)
+{
+ if (stages == 0) {
+ SORING_LOG(ERR, "invalid number of stages: %u", stages);
+ return -EINVAL;
+ }
+
+ /* Check if element size is a multiple of 4B */
+ if (esize == 0 || esize % 4 != 0) {
+ SORING_LOG(ERR, "invalid element size: %u", esize);
+ return -EINVAL;
+ }
+
+ /* Check if ret-code size is a multiple of 4B */
+ if (stsize % 4 != 0) {
+ SORING_LOG(ERR, "invalid retcode size: %u", stsize);
+ return -EINVAL;
+ }
+
+ /* count must be a power of 2 */
+ if (rte_is_power_of_2(count) == 0 ||
+ (count > RTE_SORING_ELEM_MAX + 1)) {
+ SORING_LOG(ERR, "invalid number of elements: %u", count);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+/*
+ * Calculate size offsets for SORING internal data layout.
+ */
+static size_t
+soring_get_szofs(uint32_t esize, uint32_t stsize, uint32_t count,
+ uint32_t stages, size_t *elst_ofs, size_t *state_ofs,
+ size_t *stage_ofs)
+{
+ size_t sz;
+ const struct rte_soring * const r = NULL;
+
+ sz = sizeof(r[0]) + (size_t)count * esize;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ if (elst_ofs != NULL)
+ *elst_ofs = sz;
+
+ sz = sz + (size_t)count * stsize;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ if (state_ofs != NULL)
+ *state_ofs = sz;
+
+ sz += sizeof(r->state[0]) * count;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ if (stage_ofs != NULL)
+ *stage_ofs = sz;
+
+ sz += sizeof(r->stage[0]) * stages;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ return sz;
+}
+
+static void
+soring_dump_stage_headtail(FILE *f, const char *prefix,
+ struct soring_stage *st)
+{
+ fprintf(f, "%stail.pos=%"PRIu32"\n", prefix, st->sht.tail.pos);
+ fprintf(f, "%stail.sync=%"PRIu32"\n", prefix, st->sht.tail.sync);
+ fprintf(f, "%shead=%"PRIu32"\n", prefix, st->sht.head);
+}
+
+void
+rte_soring_dump(FILE *f, const struct rte_soring *r)
+{
+ uint32_t i;
+ char buf[32];
+
+ if (f == NULL || r == NULL)
+ return;
+
+ fprintf(f, "soring <%s>@%p\n", r->name, r);
+ fprintf(f, " size=%"PRIu32"\n", r->size);
+ fprintf(f, " capacity=%"PRIu32"\n", r->capacity);
+ fprintf(f, " esize=%"PRIu32"\n", r->esize);
+ fprintf(f, " msize=%"PRIu32"\n", r->msize);
+ fprintf(f, " used=%u\n", rte_soring_count(r));
+ fprintf(f, " avail=%u\n", rte_soring_free_count(r));
+
+ rte_ring_headtail_dump(f, " cons.", &(r->cons.ht));
+ rte_ring_headtail_dump(f, " prod.", &(r->prod.ht));
+
+ fprintf(f, " nb_stage=%"PRIu32"\n", r->nb_stage);
+ for (i = 0; i < r->nb_stage; i++) {
+ snprintf(buf, sizeof(buf), " stage[%u].", i);
+ soring_dump_stage_headtail(f, buf, r->stage + i);
+ }
+}
+
+ssize_t
+rte_soring_get_memsize(const struct rte_soring_param *prm)
+{
+ int32_t rc;
+ uint32_t count;
+
+ count = soring_calc_elem_num(prm->elems);
+ rc = soring_check_param(prm->elem_size, prm->meta_size, count,
+ prm->stages);
+ if (rc != 0)
+ return rc;
+
+ return soring_get_szofs(prm->elem_size, prm->meta_size, count,
+ prm->stages, NULL, NULL, NULL);
+}
+
+int
+rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm)
+{
+ int32_t rc;
+ uint32_t n;
+ size_t meta_ofs, stage_ofs, state_ofs;
+
+ if (r == NULL || prm == NULL)
+ return -EINVAL;
+
+ n = soring_calc_elem_num(prm->elems);
+ rc = soring_check_param(prm->elem_size, prm->meta_size, n, prm->stages);
+ if (rc != 0)
+ return rc;
+
+ soring_get_szofs(prm->elem_size, prm->meta_size, n, prm->stages,
+ &meta_ofs, &state_ofs, &stage_ofs);
+
+ memset(r, 0, sizeof(*r));
+ rc = strlcpy(r->name, prm->name, sizeof(r->name));
+ if (rc < 0 || rc >= (int)sizeof(r->name))
+ return -ENAMETOOLONG;
+
+ r->size = n;
+ r->mask = r->size - 1;
+ r->capacity = prm->elems;
+ r->esize = prm->elem_size;
+ r->msize = prm->meta_size;
+
+ r->prod.ht.sync_type = prm->prod_synt;
+ r->cons.ht.sync_type = prm->cons_synt;
+
+ r->state = (union soring_state *)((uintptr_t)r + state_ofs);
+ memset(r->state, 0, sizeof(r->state[0]) * r->size);
+
+ r->stage = (struct soring_stage *)((uintptr_t)r + stage_ofs);
+ r->nb_stage = prm->stages;
+ memset(r->stage, 0, r->nb_stage * sizeof(r->stage[0]));
+
+ if (r->msize != 0)
+ r->meta = (void *)((uintptr_t)r + meta_ofs);
+
+ return 0;
+}
new file mode 100644
@@ -0,0 +1,547 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#ifndef _RTE_SORING_H_
+#define _RTE_SORING_H_
+
+/**
+ * @file
+ * This file contains definition of RTE soring (Staged Ordered Ring) public API.
+ * Brief description:
+ * enqueue/dequeue works the same as for conventional rte_ring:
+ * any rte_ring sync types can be used, etc.
+ * Plus there could be multiple 'stages'.
+ * For each stage there is an acquire (start) and release (finish) operation.
+ * after some elems are 'acquired' - user can safely assume that he has
+ * exclusive possession of these elems till 'release' for them is done.
+ * Note that right now user has to release exactly the same number of elems
+ * he acquired before.
+ * After 'release', elems can be 'acquired' by next stage and/or dequeued
+ * (in case of last stage).
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_ring.h>
+
+/* upper 2 bits are used for status */
+#define RTE_SORING_ST_BIT 30
+
+/* max possible number of elements in the soring */
+#define RTE_SORING_ELEM_MAX (RTE_BIT32(RTE_SORING_ST_BIT) - 1)
+
+struct rte_soring_param {
+ /** expected name of the ring */
+ const char *name;
+ /** number of elemnts in the ring */
+ uint32_t elems;
+ /** size of elements in the ring, must be a multiple of 4 */
+ uint32_t elem_size;
+ /**
+ * size of metadata for each elem, must be a multiple of 4.
+ * This parameter defines a size of supplementary and optional
+ * array of metadata associated with each object in the soring.
+ * While element size is configurable (see @elem_size parameter above),
+ * so user can specify it big enough to hold both object and its
+ * metadata together, for performance reasons it might be plausible
+ * to access them as separate arrays.
+ * Common usage scenario when such separation helps:
+ * enqueue() - writes to objects array
+ * acquire() - reads from objects array
+ * release() - writes to metadata array (as an example: return code)
+ * dequeue() - reads both objects and metadata array
+ */
+ uint32_t meta_size;
+ /** number of stages in the ring */
+ uint32_t stages;
+ /** sync type for producer */
+ enum rte_ring_sync_type prod_synt;
+ /** sync type for consumer */
+ enum rte_ring_sync_type cons_synt;
+};
+
+struct rte_soring;
+
+/**
+ * Calculate the memory size needed for a soring
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the expected parameters for it. This value is the sum of the size of
+ * the internal metadata and the size of the memory needed by the
+ * actual ring elements and theri rec-codes. The value is aligned to a cache
+ * line size.
+ *
+ * @param prm
+ * Pointer to the structure that contains soring creation paramers.
+ * @return
+ * - The memory size needed for the soring on success.
+ * - -EINVAL if provided paramer values are invalid.
+ */
+__rte_experimental
+ssize_t
+rte_soring_get_memsize(const struct rte_soring_param *prm);
+
+/**
+ * Initialize a soring structure.
+ *
+ * Initialize a soring structure in memory pointed by "r".
+ * The size of the memory area must be large enough to store the soring
+ * internal structures plus the objects and ret-code tables.
+ * It is strongly advised to use rte_ring_get_memsize() to get the
+ * appropriate size.
+ *
+ * @param r
+ * Pointer to the soring structure.
+ * @param prm
+ * Pointer to the structure that contains soring creation paramers.
+ * @return
+ * - 0 on success, or a negative error code.
+ */
+__rte_experimental
+int
+rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm);
+
+/**
+ * Return the total number of filled entries in a ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @return
+ * The number of entries in the ring.
+ */
+__rte_experimental
+unsigned int
+rte_soring_count(const struct rte_soring *r);
+
+/**
+ * Return the total number of unfilled entries in a ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @return
+ * The number of free entries in the ring.
+ */
+__rte_experimental
+unsigned int
+rte_soring_free_count(const struct rte_soring *r);
+
+/**
+ * Dump the status of the soring
+ *
+ * @param f
+ * A pointer to a file for output
+ * @param r
+ * Pointer to the soring structure.
+ */
+__rte_experimental
+void
+rte_soring_dump(FILE *f, const struct rte_soring *r);
+
+/**
+ * Enqueue several objects on the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to enqueue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the @objs.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - Actual number of objects enqueued, either 0 or @n.
+ */
+__rte_experimental
+uint32_t
+rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs,
+ uint32_t n, uint32_t *free_space);
+
+/**
+ * Enqueue several objects plus metadata on the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to enqueue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to an array of metadata values for each object to enqueue.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then @meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the @objs.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - Actual number of objects enqueued, either 0 or @n.
+ */
+__rte_experimental
+uint32_t
+rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t n, uint32_t *free_space);
+
+/**
+ * Enqueue several objects on the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to enqueue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the @objs.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - Actual number of objects enqueued.
+ */
+__rte_experimental
+uint32_t
+rte_soring_enqueue_burst(struct rte_soring *r, const void *objs,
+ uint32_t n, uint32_t *free_space);
+
+/**
+ * Enqueue several objects plus metadata on the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to enqueue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to an array of metadata values for each object to enqueue.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then @meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param n
+ * The number of objects to add in the ring from the @objs.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - Actual number of objects enqueued.
+ */
+__rte_experimental
+uint32_t
+rte_soring_enqueux_burst(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t n, uint32_t *free_space);
+
+/**
+ * Dequeue several objects from the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to dequeue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param num
+ * The number of objects to dequeue from the ring into the objs.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Actual number of objects dequeued, either 0 or @num.
+ */
+__rte_experimental
+uint32_t
+rte_soring_dequeue_bulk(struct rte_soring *r, void *objs,
+ uint32_t num, uint32_t *available);
+
+/**
+ * Dequeue several objects plus metadata from the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to dequeue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to array of metadata values for each object to dequeue.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then @meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param num
+ * The number of objects to dequeue from the ring into the objs.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Actual number of objects dequeued, either 0 or @num.
+ */
+__rte_experimental
+uint32_t
+rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta,
+ uint32_t num, uint32_t *available);
+
+/**
+ * Dequeue several objects from the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to dequeue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param num
+ * The number of objects to dequeue from the ring into the objs.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Actual number of objects dequeued.
+ */
+__rte_experimental
+uint32_t
+rte_soring_dequeue_burst(struct rte_soring *r, void *objs,
+ uint32_t num, uint32_t *available);
+
+/**
+ * Dequeue several objects plus metadata from the ring.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to dequeue.
+ * Size of objects to enqueue must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to array of metadata values for each object to dequeue.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then @meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param num
+ * The number of objects to dequeue from the ring into the objs.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Actual number of objects dequeued.
+ */
+__rte_experimental
+uint32_t
+rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta,
+ uint32_t num, uint32_t *available);
+
+/**
+ * Acquire several objects from the ring for given stage.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to acquire.
+ * Size of objects must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param stage
+ * Stage to acquire objects for.
+ * @param num
+ * The number of objects to acquire.
+ * @param ftoken
+ * Pointer to the opaque 'token' value used by release() op.
+ * User has to store this value somewhere, and later provide to the
+ * release().
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries for given stage
+ * after the acquire has finished.
+ * @return
+ * - Actual number of objects acquired, either 0 or @num.
+ */
+__rte_experimental
+uint32_t
+rte_soring_acquire_bulk(struct rte_soring *r, void *objs,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available);
+
+/**
+ * Acquire several objects plus metadata from the ring for given stage.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to acquire.
+ * Size of objects must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to an array of metadata values for each for each acquired object.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then @meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param stage
+ * Stage to acquire objects for.
+ * @param num
+ * The number of objects to acquire.
+ * @param ftoken
+ * Pointer to the opaque 'token' value used by release() op.
+ * User has to store this value somewhere, and later provide to the
+ * release().
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries for given stage
+ * after the acquire has finished.
+ * @return
+ * - Actual number of objects acquired, either 0 or @num.
+ */
+__rte_experimental
+uint32_t
+rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available);
+
+/**
+ * Acquire several objects from the ring for given stage.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to acquire.
+ * Size of objects must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param stage
+ * Stage to acquire objects for.
+ * @param num
+ * The number of objects to acquire.
+ * @param ftoken
+ * Pointer to the opaque 'token' value used by release() op.
+ * User has to store this value somewhere, and later provide to the
+ * release().
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries for given stage
+ * after the acquire has finished.
+ * @return
+ * - Actual number of objects acquired.
+ */
+__rte_experimental
+uint32_t
+rte_soring_acquire_burst(struct rte_soring *r, void *objs,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available);
+
+/**
+ * Acquire several objects plus metadata from the ring for given stage.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to acquire.
+ * Size of objects must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to an array of metadata values for each for each acquired object.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then @meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param stage
+ * Stage to acquire objects for.
+ * @param num
+ * The number of objects to acquire.
+ * @param ftoken
+ * Pointer to the opaque 'token' value used by release() op.
+ * User has to store this value somewhere, and later provide to the
+ * release().
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries for given stage
+ * after the acquire has finished.
+ * @return
+ * - Actual number of objects acquired.
+ */
+__rte_experimental
+uint32_t
+rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available);
+
+/**
+ * Release several objects for given stage back to the ring.
+ * Note that it means these objects become avaialble for next stage or
+ * dequeue.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to relase.
+ * Note that unless user needs to overwrite ring objects this parameter
+ * can be NULL.
+ * Size of objects must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param stage
+ * Current stage.
+ * @param n
+ * The number of objects to release.
+ * Has to be the same value as returned by acquire() op.
+ * @param ftoken
+ * Opaque 'token' value obtained from acquire() op.
+ * @return
+ * - None.
+ */
+__rte_experimental
+void
+rte_soring_release(struct rte_soring *r, const void *objs,
+ uint32_t stage, uint32_t n, uint32_t ftoken);
+
+/**
+ * Release several objects plus metadata for given stage back to the ring.
+ * Note that it means these objects become avaialble for next stage or
+ * dequeue.
+ *
+ * @param r
+ * A pointer to the soring structure.
+ * @param objs
+ * A pointer to an array of objects to relase.
+ * Note that unless user needs to overwrite ring objects this parameter
+ * can be NULL.
+ * Size of objects must be the same value as @elem_size parameter
+ * used while creating the ring. Otherwise the results are undefined.
+ * @param meta
+ * A pointer to an array of metadata values for each object to release.
+ * Note that if user not using object metadata values, then this parameter
+ * can be NULL.
+ * Size of elements in this array must be the same value as @meta_size
+ * parameter used while creating the ring. If user created the soring with
+ * @meta_size value equals zero, then meta parameter should be NULL.
+ * Otherwise the results are undefined.
+ * @param stage
+ * Current stage.
+ * @param n
+ * The number of objects to release.
+ * Has to be the same value as returned by acquire() op.
+ * @param ftoken
+ * Opaque 'token' value obtained from acquire() op.
+ * @return
+ * - None.
+ */
+__rte_experimental
+void
+rte_soring_releasx(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_SORING_H_ */
new file mode 100644
@@ -0,0 +1,548 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+/**
+ * @file
+ * This file contains implementation of SORING 'datapath' functions.
+ * Brief description:
+ * ==================
+ * enqueue/dequeue works the same as for conventional rte_ring:
+ * any rte_ring sync types can be used, etc.
+ * Plus there could be multiple 'stages'.
+ * For each stage there is an acquire (start) and release (finish) operation.
+ * After some elems are 'acquired' - user can safely assume that he has
+ * exclusive possession of these elems till 'release' for them is done.
+ * Note that right now user has to release exactly the same number of elems
+ * he acquired before.
+ * After 'release', elems can be 'acquired' by next stage and/or dequeued
+ * (in case of last stage).
+ * Internal structure:
+ * ===================
+ * In addition to 'normal' ring of elems, it also has a ring of states of the
+ * same size. Each state[] corresponds to exactly one elem[].
+ * state[] will be used by acquire/release/dequeue functions to store internal
+ * information and should not be accessed by the user directly.
+ * How it works:
+ * =============
+ * 'acquire()' just moves stage's head (same as rte_ring move_head does),
+ * plus it saves in state[stage.cur_head] information about how many elems
+ * were acquired, current head position and special flag value to indicate
+ * that elems are acquired (RTE_SORING_ST_START).
+ * Note that 'acquire()' returns to the user a special 'ftoken' that user has
+ * to provide for 'release()' (in fact it is just a position for current head).
+ * 'release()' extracts old head value from provided ftoken and checks that
+ * corresponding 'state[]' contains expected values(mostly for sanity
+ * purposes).
+ * * Then it marks this state[] with 'RTE_SORING_ST_FINISH' flag to indicate
+ * that given subset of objects was released.
+ * After that, it checks does old head value equals to current tail value?
+ * If yes, then it performs 'finalize()' operation, otherwise 'release()'
+ * just returns (without spinning on stage tail value).
+ * As updated state[] is shared by all threads, some other thread can do
+ * 'finalize()' for given stage.
+ * That allows 'release()' to avoid excessive waits on the tail value.
+ * Main purpose of 'finalize()' operation is to walk through 'state[]'
+ * from current stage tail up to its head, check state[] and move stage tail
+ * through elements that already are in RTE_SORING_ST_FINISH state.
+ * Along with that, corresponding state[] values are reset to zero.
+ * Note that 'finalize()' for given stage can be done from multiple places:
+ * 'release()' for that stage or from 'acquire()' for next stage
+ * even from consumer's 'dequeue()' - in case given stage is the last one.
+ * So 'finalize()' has to be MT-safe and inside it we have to
+ * guarantee that only one thread will update state[] and stage's tail values.
+ */
+
+#include "soring.h"
+
+/*
+ * Inline functions (fastpath) start here.
+ */
+static __rte_always_inline uint32_t
+__rte_soring_stage_finalize(struct soring_stage_headtail *sht,
+ union soring_state *rstate, uint32_t rmask, uint32_t maxn)
+{
+ int32_t rc;
+ uint32_t head, i, idx, k, n, tail;
+ union soring_stage_tail nt, ot;
+ union soring_state st;
+
+ /* try to grab exclusive right to update tail value */
+ ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
+ rte_memory_order_acquire);
+
+ /* other thread already finalizing it for us */
+ if (ot.sync != 0)
+ return 0;
+
+ nt.pos = ot.pos;
+ nt.sync = 1;
+ rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
+ &ot.raw, nt.raw, rte_memory_order_acquire,
+ rte_memory_order_relaxed);
+
+ /* other thread won the race */
+ if (rc == 0)
+ return 0;
+
+ /*
+ * start with current tail and walk through states that are
+ * already finished.
+ */
+
+ head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
+ n = RTE_MIN(head - ot.pos, maxn);
+ for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
+
+ idx = tail & rmask;
+ st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
+ rte_memory_order_relaxed);
+ if ((st.stnum & RTE_SORING_ST_MASK) != RTE_SORING_ST_FINISH ||
+ st.ftoken != tail)
+ break;
+
+ k = st.stnum & ~RTE_SORING_ST_MASK;
+ rte_atomic_store_explicit(&rstate[idx].raw, 0,
+ rte_memory_order_relaxed);
+ }
+
+
+ /* release exclusive right to update along with new tail value */
+ ot.pos = tail;
+ rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
+ rte_memory_order_release);
+
+ return i;
+}
+
+static __rte_always_inline uint32_t
+__rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
+ enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
+ uint32_t *head, uint32_t *next, uint32_t *free)
+{
+ uint32_t n;
+
+ switch (st) {
+ case RTE_RING_SYNC_ST:
+ case RTE_RING_SYNC_MT:
+ n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
+ r->capacity, st, num, behavior, head, next, free);
+ break;
+ case RTE_RING_SYNC_MT_RTS:
+ n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
+ r->capacity, num, behavior, head, free);
+ *next = *head + n;
+ break;
+ case RTE_RING_SYNC_MT_HTS:
+ n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
+ r->capacity, num, behavior, head, free);
+ *next = *head + n;
+ break;
+ default:
+ /* unsupported mode, shouldn't be here */
+ RTE_ASSERT(0);
+ *free = 0;
+ n = 0;
+ }
+
+ return n;
+}
+
+static __rte_always_inline uint32_t
+__rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
+ enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
+ uint32_t *head, uint32_t *next, uint32_t *avail)
+{
+ uint32_t n;
+
+ switch (st) {
+ case RTE_RING_SYNC_ST:
+ case RTE_RING_SYNC_MT:
+ n = __rte_ring_headtail_move_head(&r->cons.ht,
+ &r->stage[stage].ht, 0, st, num, behavior,
+ head, next, avail);
+ break;
+ case RTE_RING_SYNC_MT_RTS:
+ n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
+ 0, num, behavior, head, avail);
+ *next = *head + n;
+ break;
+ case RTE_RING_SYNC_MT_HTS:
+ n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
+ 0, num, behavior, head, avail);
+ *next = *head + n;
+ break;
+ default:
+ /* unsupported mode, shouldn't be here */
+ RTE_ASSERT(0);
+ *avail = 0;
+ n = 0;
+ }
+
+ return n;
+}
+
+static __rte_always_inline void
+__rte_soring_update_tail(struct __rte_ring_headtail *rht,
+ enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
+{
+ uint32_t n;
+
+ switch (st) {
+ case RTE_RING_SYNC_ST:
+ case RTE_RING_SYNC_MT:
+ __rte_ring_update_tail(&rht->ht, head, next, st, enq);
+ break;
+ case RTE_RING_SYNC_MT_RTS:
+ __rte_ring_rts_update_tail(&rht->rts);
+ break;
+ case RTE_RING_SYNC_MT_HTS:
+ n = next - head;
+ __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
+ break;
+ default:
+ /* unsupported mode, shouldn't be here */
+ RTE_ASSERT(0);
+ }
+}
+
+static __rte_always_inline uint32_t
+__rte_soring_stage_move_head(struct soring_stage_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
+{
+ uint32_t n, tail;
+
+ *old_head = rte_atomic_load_explicit(&d->head,
+ rte_memory_order_acquire);
+
+ do {
+ n = num;
+ tail = rte_atomic_load_explicit(&s->tail,
+ rte_memory_order_relaxed);
+ *avail = capacity + tail - *old_head;
+ if (n > *avail)
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
+ if (n == 0)
+ return 0;
+ *new_head = *old_head + n;
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
+ old_head, *new_head, rte_memory_order_acquire,
+ rte_memory_order_acquire) == 0);
+
+ return n;
+}
+
+static __rte_always_inline uint32_t
+soring_enqueue(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t n, enum rte_ring_queue_behavior behavior,
+ uint32_t *free_space)
+{
+ enum rte_ring_sync_type st;
+ uint32_t nb_free, prod_head, prod_next;
+
+ RTE_ASSERT(r != NULL && r->nb_stage > 0);
+ RTE_ASSERT(meta == NULL || r->meta != NULL);
+
+ st = r->prod.ht.sync_type;
+
+ n = __rte_soring_move_prod_head(r, n, behavior, st,
+ &prod_head, &prod_next, &nb_free);
+ if (n != 0) {
+ __rte_ring_do_enqueue_elems(&r[1], objs, r->size,
+ prod_head & r->mask, r->esize, n);
+ if (meta != NULL)
+ __rte_ring_do_enqueue_elems(r->meta, meta, r->size,
+ prod_head & r->mask, r->msize, n);
+ __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
+ }
+
+ if (free_space != NULL)
+ *free_space = nb_free - n;
+ return n;
+}
+
+static __rte_always_inline uint32_t
+soring_dequeue(struct rte_soring *r, void *objs, void *meta,
+ uint32_t num, enum rte_ring_queue_behavior behavior,
+ uint32_t *available)
+{
+ enum rte_ring_sync_type st;
+ uint32_t entries, cons_head, cons_next, n, ns, reqn;
+
+ RTE_ASSERT(r != NULL && r->nb_stage > 0);
+ RTE_ASSERT(meta == NULL || r->meta != NULL);
+
+ ns = r->nb_stage - 1;
+ st = r->cons.ht.sync_type;
+
+ /* try to grab exactly @num elems first */
+ n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
+ &cons_head, &cons_next, &entries);
+ if (n == 0) {
+ /* try to finalize some elems from previous stage */
+ n = __rte_soring_stage_finalize(&r->stage[ns].sht,
+ r->state, r->mask, 2 * num);
+ entries += n;
+
+ /* repeat attempt to grab elems */
+ reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
+ if (entries >= reqn)
+ n = __rte_soring_move_cons_head(r, ns, num, behavior,
+ st, &cons_head, &cons_next, &entries);
+ else
+ n = 0;
+ }
+
+ /* we have some elems to consume */
+ if (n != 0) {
+ __rte_ring_do_dequeue_elems(objs, &r[1], r->size,
+ cons_head & r->mask, r->esize, n);
+ if (meta != NULL)
+ __rte_ring_do_dequeue_elems(meta, r->meta, r->size,
+ cons_head & r->mask, r->msize, n);
+ __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
+ }
+
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+static __rte_always_inline uint32_t
+soring_acquire(struct rte_soring *r, void *objs, void *meta,
+ uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
+ uint32_t *ftoken, uint32_t *available)
+{
+ uint32_t avail, head, idx, n, next, reqn;
+ union soring_state st;
+ struct soring_stage *pstg;
+ struct soring_stage_headtail *cons;
+
+ RTE_ASSERT(r != NULL && stage < r->nb_stage);
+ RTE_ASSERT(meta == NULL || r->meta != NULL);
+
+ cons = &r->stage[stage].sht;
+
+ if (stage == 0)
+ n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
+ behavior, &head, &next, &avail);
+ else {
+ pstg = r->stage + stage - 1;
+
+ /* try to grab exactly @num elems */
+ n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
+ RTE_RING_QUEUE_FIXED, &head, &next, &avail);
+ if (n == 0) {
+ /* try to finalize some elems from previous stage */
+ n = __rte_soring_stage_finalize(&pstg->sht, r->state,
+ r->mask, 2 * num);
+ avail += n;
+
+ /* repeat attempt to grab elems */
+ reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
+ if (avail >= reqn)
+ n = __rte_soring_stage_move_head(cons,
+ &pstg->ht, 0, num, behavior, &head,
+ &next, &avail);
+ else
+ n = 0;
+ }
+ }
+
+ if (n != 0) {
+
+ idx = head & r->mask;
+
+ /* copy elems that are ready for given stage */
+ __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
+ r->esize, n);
+ if (meta != NULL)
+ __rte_ring_do_dequeue_elems(meta, r->meta,
+ r->size, idx, r->msize, n);
+
+ /* update status ring */
+ st.ftoken = head;
+ st.stnum = (RTE_SORING_ST_START | n);
+
+ rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
+ rte_memory_order_relaxed);
+ *ftoken = head;
+ }
+
+ if (available != NULL)
+ *available = avail - n;
+ return n;
+}
+
+static __rte_always_inline void
+soring_release(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
+{
+ uint32_t idx, tail;
+ struct soring_stage *stg;
+ union soring_state st;
+
+ RTE_ASSERT(r != NULL && stage < r->nb_stage);
+ RTE_ASSERT(meta == NULL || r->meta != NULL);
+
+ stg = r->stage + stage;
+ tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
+ rte_memory_order_relaxed);
+
+ idx = ftoken & r->mask;
+ st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
+ rte_memory_order_acquire);
+
+ /* check state ring contents */
+ RTE_VERIFY(st.stnum == (RTE_SORING_ST_START | n) &&
+ st.ftoken == ftoken);
+
+ /* update contents of the ring, if necessary */
+ if (objs != NULL)
+ __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
+ r->esize, n);
+ if (meta != NULL)
+ __rte_ring_do_enqueue_elems(r->meta, meta, r->size, idx,
+ r->msize, n);
+
+ /* set state to FINISH, make sure it is not reordered */
+ st.stnum = RTE_SORING_ST_FINISH | n;
+ rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
+ rte_memory_order_release);
+
+ if (tail == ftoken)
+ __rte_soring_stage_finalize(&stg->sht, r->state, r->mask,
+ r->capacity);
+}
+
+/*
+ * Public functions (data-path) start here.
+ */
+
+void
+rte_soring_release(struct rte_soring *r, const void *objs,
+ uint32_t stage, uint32_t n, uint32_t ftoken)
+{
+ return soring_release(r, objs, NULL, stage, n, ftoken);
+}
+
+
+void
+rte_soring_releasx(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
+{
+ return soring_release(r, objs, meta, stage, n, ftoken);
+}
+
+uint32_t
+rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs, uint32_t n,
+ uint32_t *free_space)
+{
+ return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_FIXED,
+ free_space);
+}
+
+uint32_t
+rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t n, uint32_t *free_space)
+{
+ return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_FIXED,
+ free_space);
+}
+
+uint32_t
+rte_soring_enqueue_burst(struct rte_soring *r, const void *objs, uint32_t n,
+ uint32_t *free_space)
+{
+ return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_VARIABLE,
+ free_space);
+}
+
+uint32_t
+rte_soring_enqueux_burst(struct rte_soring *r, const void *objs,
+ const void *meta, uint32_t n, uint32_t *free_space)
+{
+ return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_VARIABLE,
+ free_space);
+}
+
+uint32_t
+rte_soring_dequeue_bulk(struct rte_soring *r, void *objs, uint32_t num,
+ uint32_t *available)
+{
+ return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_FIXED,
+ available);
+}
+
+uint32_t
+rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta,
+ uint32_t num, uint32_t *available)
+{
+ return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_FIXED,
+ available);
+}
+
+uint32_t
+rte_soring_dequeue_burst(struct rte_soring *r, void *objs, uint32_t num,
+ uint32_t *available)
+{
+ return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_VARIABLE,
+ available);
+}
+
+uint32_t
+rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta,
+ uint32_t num, uint32_t *available)
+{
+ return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_VARIABLE,
+ available);
+}
+
+uint32_t
+rte_soring_acquire_bulk(struct rte_soring *r, void *objs,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
+{
+ return soring_acquire(r, objs, NULL, stage, num,
+ RTE_RING_QUEUE_FIXED, ftoken, available);
+}
+
+uint32_t
+rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
+{
+ return soring_acquire(r, objs, meta, stage, num,
+ RTE_RING_QUEUE_FIXED, ftoken, available);
+}
+
+uint32_t
+rte_soring_acquire_burst(struct rte_soring *r, void *objs,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
+{
+ return soring_acquire(r, objs, NULL, stage, num,
+ RTE_RING_QUEUE_VARIABLE, ftoken, available);
+}
+
+uint32_t
+rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta,
+ uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
+{
+ return soring_acquire(r, objs, meta, stage, num,
+ RTE_RING_QUEUE_VARIABLE, ftoken, available);
+}
+
+unsigned int
+rte_soring_count(const struct rte_soring *r)
+{
+ uint32_t prod_tail = r->prod.ht.tail;
+ uint32_t cons_tail = r->cons.ht.tail;
+ uint32_t count = (prod_tail - cons_tail) & r->mask;
+ return (count > r->capacity) ? r->capacity : count;
+}
+
+unsigned int
+rte_soring_free_count(const struct rte_soring *r)
+{
+ return r->capacity - rte_soring_count(r);
+}
new file mode 100644
@@ -0,0 +1,124 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Huawei Technologies Co., Ltd
+ */
+
+#ifndef _SORING_H_
+#define _SORING_H_
+
+/**
+ * @file
+ * This file contains internal strctures of RTE soring: Staged Ordered Ring.
+ * Sort of extension of conventional RTE ring.
+ * Internal structure:
+ * In addition to 'normal' ring of elems, it also has a ring of states of the
+ * same size. Each state[] corresponds to exactly one elem[].
+ * state[] will be used by acquire/release/dequeue functions to store internal
+ * information and should not be accessed by the user directly.
+ * For actual implementation details, please refer to soring.c
+ */
+
+#include <rte_soring.h>
+
+union soring_state {
+ alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+ struct {
+ RTE_ATOMIC(uint32_t) ftoken;
+ RTE_ATOMIC(uint32_t) stnum;
+ };
+};
+
+/* upper 2 bits are used for status */
+#define RTE_SORING_ST_START RTE_SHIFT_VAL32(1, RTE_SORING_ST_BIT)
+#define RTE_SORING_ST_FINISH RTE_SHIFT_VAL32(2, RTE_SORING_ST_BIT)
+
+#define RTE_SORING_ST_MASK \
+ RTE_GENMASK32(sizeof(uint32_t) * CHAR_BIT - 1, RTE_SORING_ST_BIT)
+
+/**
+ * SORING re-uses rte_ring internal structures and implementation
+ * for enqueue/dequeue operations.
+ */
+struct __rte_ring_headtail {
+
+ union __rte_cache_aligned {
+ struct rte_ring_headtail ht;
+ struct rte_ring_hts_headtail hts;
+ struct rte_ring_rts_headtail rts;
+ };
+
+ RTE_CACHE_GUARD;
+};
+
+union soring_stage_tail {
+ /** raw 8B value to read/write *sync* and *pos* as one atomic op */
+ alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+ struct {
+ RTE_ATOMIC(uint32_t) sync;
+ RTE_ATOMIC(uint32_t) pos;
+ };
+};
+
+struct soring_stage_headtail {
+ volatile union soring_stage_tail tail;
+ enum rte_ring_sync_type unused; /**< unused */
+ volatile RTE_ATOMIC(uint32_t) head;
+};
+
+/**
+ * SORING stage head_tail structure is 'compatible' with rte_ring ones.
+ * rte_ring internal functions for moving producer/consumer head
+ * can work transparently with stage head_tail and visa-versa
+ * (no modifications required).
+ */
+struct soring_stage {
+
+ union __rte_cache_aligned {
+ struct rte_ring_headtail ht;
+ struct soring_stage_headtail sht;
+ };
+
+ RTE_CACHE_GUARD;
+};
+
+/**
+ * RTE soring internal structure.
+ * As with rte_ring actual elements array supposed to be located direclty
+ * after the rte_soring structure.
+ */
+struct __rte_cache_aligned rte_soring {
+ uint32_t size; /**< Size of ring. */
+ uint32_t mask; /**< Mask (size-1) of ring. */
+ uint32_t capacity; /**< Usable size of ring */
+ uint32_t esize;
+ /**< size of elements in the ring, must be a multiple of 4*/
+ uint32_t msize;
+ /**< size of metadata value for each elem, must be a multiple of 4 */
+
+ /** Ring stages */
+ struct soring_stage *stage;
+ uint32_t nb_stage;
+
+ /** Ring of states (one per element) */
+ union soring_state *state;
+
+ /** Pointer to the buffer where metadata values for each elements
+ * are stored. This is supplementary and optional information that
+ * user can attach to each element of the ring.
+ * While it is possible to incorporate this information inside
+ * user-defined element, in many cases it is plausible to maintain it
+ * as a separate array (mainly for performance reasons).
+ */
+ void *meta;
+
+ RTE_CACHE_GUARD;
+
+ /** Ring producer status. */
+ struct __rte_ring_headtail prod;
+
+ /** Ring consumer status. */
+ struct __rte_ring_headtail cons;
+
+ char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */
+};
+
+#endif /* _SORING_H_ */
@@ -20,4 +20,23 @@ EXPERIMENTAL {
# added in 24.11
rte_ring_headtail_dump;
+ rte_soring_acquire_bulk;
+ rte_soring_acquire_burst;
+ rte_soring_acquirx_bulk;
+ rte_soring_acquirx_burst;
+ rte_soring_count;
+ rte_soring_dequeue_bulk;
+ rte_soring_dequeue_burst;
+ rte_soring_dequeux_bulk;
+ rte_soring_dequeux_burst;
+ rte_soring_enqueue_bulk;
+ rte_soring_enqueue_burst;
+ rte_soring_enqueux_bulk;
+ rte_soring_enqueux_burst;
+ rte_soring_dump;
+ rte_soring_free_count;
+ rte_soring_get_memsize;
+ rte_soring_init;
+ rte_soring_release;
+ rte_soring_releasx;
};