From patchwork Mon Aug 21 06:04:20 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Honnappa Nagarahalli X-Patchwork-Id: 130550 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id AB951430BC; Mon, 21 Aug 2023 08:04:29 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 7FEFF40EE3; Mon, 21 Aug 2023 08:04:29 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id 4549940A7D for ; Mon, 21 Aug 2023 08:04:27 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 70FE01FB; Sun, 20 Aug 2023 23:05:07 -0700 (PDT) Received: from ampere-altra-2-2.usa.Arm.com (unknown [10.118.91.160]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id AD90D3F740; Sun, 20 Aug 2023 23:04:26 -0700 (PDT) From: Honnappa Nagarahalli To: jackmin@nvidia.com, konstantin.v.ananyev@yandex.ru Cc: dev@dpdk.org, ruifeng.wang@arm.com, aditya.ambadipudi@arm.com, wathsala.vithanage@arm.com, nd@arm.com, Honnappa Nagarahalli Subject: [RFC] lib/st_ring: add single thread ring Date: Mon, 21 Aug 2023 06:04:20 +0000 Message-Id: <20230821060420.3509667-1-honnappa.nagarahalli@arm.com> X-Mailer: git-send-email 2.25.1 MIME-Version: 1.0 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Add a single thread safe and multi-thread unsafe ring data structure. This library provides an simple and efficient alternative to multi-thread safe ring when multi-thread safety is not required. Signed-off-by: Honnappa Nagarahalli --- v1: 1) The code is very prelimnary and is not even compiled 2) This is intended to show the APIs and some thoughts on implementation 3) More APIs and the rest of the implementation will come in subsequent versions lib/st_ring/rte_st_ring.h | 567 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 567 insertions(+) create mode 100644 lib/st_ring/rte_st_ring.h diff --git a/lib/st_ring/rte_st_ring.h b/lib/st_ring/rte_st_ring.h new file mode 100644 index 0000000000..8cb8832591 --- /dev/null +++ b/lib/st_ring/rte_st_ring.h @@ -0,0 +1,567 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2023 Arm Limited + */ + +#ifndef _RTE_ST_RING_H_ +#define _RTE_ST_RING_H_ + +/** + * @file + * RTE Signle Thread Ring (ST Ring) + * + * The ST Ring is a fixed-size queue intended to be accessed + * by one thread at a time. It does not provide concurrent access to + * multiple threads. If there are multiple threads accessing the ST ring, + * then the threads have to use locks to protect the ring from + * getting corrupted. + * + * - FIFO (First In First Out) + * - Maximum size is fixed; the pointers are stored in a table. + * - Consumer and producer part of same thread. + * - Multi-thread producers and consumers need locking. + * - Single/Bulk/burst dequeue at Tail or Head + * - Single/Bulk/burst enqueue at Head or Tail + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +/** + * Calculate the memory size needed for a ST ring + * + * This function returns the number of bytes needed for a ST ring, given + * the number of elements in it. This value is the sum of the size of + * the structure rte_st_ring and the size of the memory needed by the + * elements. The value is aligned to a cache line size. + * + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ST ring on success. + * - -EINVAL if count is not a power of 2. + */ +ssize_t rte_st_ring_get_memsize(unsigned int count); + +/** + * Initialize a ST ring structure. + * + * Initialize a ST ring structure in memory pointed by "r". The size of the + * memory area must be large enough to store the ring structure and the + * object table. It is advised to use rte_st_ring_get_memsize() to get the + * appropriate size. + * + * The ST ring size is set to *count*, which must be a power of two. + * The real usable ring size is *count-1* instead of *count* to + * differentiate a full ring from an empty ring. + * + * The ring is not added in RTE_TAILQ_ST_RING global list. Indeed, the + * memory given by the caller may not be shareable among dpdk + * processes. + * + * @param r + * The pointer to the ring structure followed by the elements table. + * @param name + * The name of the ring. + * @param count + * The number of elements in the ring (must be a power of 2, + * unless RTE_ST_RING_F_EXACT_SZ is set in flags). + * @param flags + * An OR of the following: + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold + * exactly the requested number of entries, and the requested size + * will be rounded up to the next power of two, but the usable space + * will be exactly that requested. Worst case, if a power-of-2 size is + * requested, half the ring space will be wasted. + * Without this flag set, the ring size requested must be a power of 2, + * and the usable space will be that size - 1. + * @return + * 0 on success, or a negative value on error. + */ +int rte_st_ring_init(struct rte_st_ring *r, const char *name, + unsigned int count, unsigned int flags); + +/** + * Create a new ST ring named *name* in memory. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_st_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of two. + * The real usable ring size is *count-1* instead of *count* to + * differentiate a full ring from an empty ring. + * + * The ring is added in RTE_TAILQ_ST_RING list. + * + * @param name + * The name of the ring. + * @param count + * The size of the ring (must be a power of 2, + * unless RTE_ST_RING_F_EXACT_SZ is set in flags). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold exactly the + * requested number of entries, and the requested size will be rounded up + * to the next power of two, but the usable space will be exactly that + * requested. Worst case, if a power-of-2 size is requested, half the + * ring space will be wasted. + * Without this flag set, the ring size requested must be a power of 2, + * and the usable space will be that size - 1. + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +struct rte_st_ring *rte_st_ring_create(const char *name, unsigned int count, + int socket_id, unsigned int flags); + +/** + * De-allocate all memory used by the ring. + * + * @param r + * Ring to free. + * If NULL then, the function does nothing. + */ +void rte_st_ring_free(struct rte_st_ring *r); + +/** + * Dump the status of the ring to a file. + * + * @param f + * A pointer to a file for output + * @param r + * A pointer to the ring structure. + */ +void rte_st_ring_dump(FILE *f, const struct rte_st_ring *r); + +/** + * Enqueue fixed number of objects on a ST ring. + * + * This function copies the objects at the head of the ring and + * moves the head index. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_st_ring_enqueue_bulk(struct rte_st_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_st_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *), + n, free_space); +} + +/** + * Enqueue upto a maximum number of objects on a ST ring. + * + * This function copies the objects at the head of the ring and + * moves the head index. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned int +rte_st_ring_enqueue_burst(struct rte_st_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_st_ring_enqueue_burst_elem(r, obj_table, sizeof(void *), + n, free_space); +} + +/** + * Enqueue one object on a ST ring. + * + * This function copies one object at the head of the ring and + * moves the head index. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_st_ring_enqueue(struct rte_st_ring *r, void *obj) +{ + return rte_st_ring_enqueue_elem(r, &obj, sizeof(void *)); +} + +/** + * Enqueue fixed number of objects on a ST ring at the tail. + * + * This function copies the objects at the tail of the ring and + * moves the tail index (backwards). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_st_ring_enqueue_at_tail_bulk(struct rte_st_ring *r, + void * const *obj_table, unsigned int n, + unsigned int *free_space) +{ + return rte_st_ring_enqueue_at_tail_bulk_elem(r, obj_table, + sizeof(void *), n, free_space); +} + +/** + * Enqueue upto a maximum number of objects on a ST ring at the tail. + * + * This function copies the objects at the tail of the ring and + * moves the tail index (backwards). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned int +rte_st_ring_enqueue_at_tail_burst(struct rte_st_ring *r, + void * const *obj_table, unsigned int n, + unsigned int *free_space) +{ + return rte_st_ring_enqueue_at_tail_burst_elem(r, obj_table, + sizeof(void *), n, free_space); +} + +/** + * Enqueue one object on a ST ring at tail. + * + * This function copies one object at the tail of the ring and + * moves the tail index (backwards). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_st_ring_enqueue_at_tail(struct rte_st_ring *r, void *obj) +{ + return rte_st_ring_enqueue_at_tail_elem(r, &obj, sizeof(void *)); +} + +/** + * Dequeue a fixed number of objects from a ST ring. + * + * This function copies the objects from the tail of the ring and + * moves the tail index. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_st_ring_dequeue_bulk(struct rte_st_ring *r, void **obj_table, unsigned int n, + unsigned int *available) +{ + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), + n, available); +} + +/** + * Dequeue upto a maximum number of objects from a ST ring. + * + * This function copies the objects from the tail of the ring and + * moves the tail index. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +static __rte_always_inline unsigned int +rte_st_ring_dequeue_burst(struct rte_st_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), + n, available); +} + +/** + * Dequeue one object from a ST ring. + * + * This function copies one object from the tail of the ring and + * moves the tail index. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_st_ring_dequeue(struct rte_st_ring *r, void **obj_p) +{ + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); +} + +/** + * Dequeue a fixed number of objects from a ST ring from the head. + * + * This function copies the objects from the head of the ring and + * moves the head index (backwards). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_st_ring_dequeue_at_head_bulk(struct rte_st_ring *r, void **obj_table, unsigned int n, + unsigned int *available) +{ + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), + n, available); +} + +/** + * Dequeue upto a maximum number of objects from a ST ring from the head. + * + * This function copies the objects from the head of the ring and + * moves the head index (backwards). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +static __rte_always_inline unsigned int +rte_st_ring_dequeue_at_head_burst(struct rte_st_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), + n, available); +} + +/** + * Dequeue one object from a ST ring from the head. + * + * This function copies the objects from the head of the ring and + * moves the head index (backwards). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_st_ring_at_head_dequeue(struct rte_st_ring *r, void **obj_p) +{ + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); +} + +/** + * Flush a ST ring. + * + * This function flush all the elements in a ST ring + * + * @warning + * Make sure the ring is not in use while calling this function. + * + * @param r + * A pointer to the ring structure. + */ +void +rte_st_ring_reset(struct rte_st_ring *r); + +/** + * Return the number of entries in a ST ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of entries in the ring. + */ +static inline unsigned int +rte_st_ring_count(const struct rte_st_ring *r) +{ + uint32_t count = (r->head - r->tail) & r->mask; + return count; +} + +/** + * Return the number of free entries in a ST ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of free entries in the ring. + */ +static inline unsigned int +rte_st_ring_free_count(const struct rte_st_ring *r) +{ + return r->capacity - rte_st_ring_count(r); +} + +/** + * Test if a ST ring is full. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is full. + * - 0: The ring is not full. + */ +static inline int +rte_st_ring_full(const struct rte_st_ring *r) +{ + return rte_st_ring_free_count(r) == 0; +} + +/** + * Test if a ST ring is empty. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is empty. + * - 0: The ring is not empty. + */ +static inline int +rte_st_ring_empty(const struct rte_st_ring *r) +{ + return r->tail == r->head; +} + +/** + * Return the size of the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The size of the data store used by the ring. + * NOTE: this is not the same as the usable space in the ring. To query that + * use ``rte_st_ring_get_capacity()``. + */ +static inline unsigned int +rte_st_ring_get_size(const struct rte_st_ring *r) +{ + return r->size; +} + +/** + * Return the number of elements which can be stored in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The usable size of the ring. + */ +static inline unsigned int +rte_st_ring_get_capacity(const struct rte_st_ring *r) +{ + return r->capacity; +} + +/** + * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output + */ +void rte_st_ring_list_dump(FILE *f); + +/** + * Search a ST ring from its name + * + * @param name + * The name of the ring. + * @return + * The pointer to the ring matching the name, or NULL if not found, + * with rte_errno set appropriately. Possible rte_errno values include: + * - ENOENT - required entry not available to return. + */ +struct rte_st_ring *rte_st_ring_lookup(const char *name); + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_ST_RING_H_ */