[v1,1/2] deque: add multi-thread unsafe double ended queue

Message ID 20240401013729.1466298-2-aditya.ambadipudi@arm.com (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series deque: add multithread unsafe deque library |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Aditya Ambadipudi April 1, 2024, 1:37 a.m. UTC
  From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Add a multi-thread unsafe double ended queue data structure. This
library provides a simple and efficient alternative to multi-thread
safe ring when multi-thread safety is not required.

Signed-off-by: Aditya Ambadipudi <aditya.ambadipudi@arm.com>
Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
---
 .mailmap                   |   1 +
 lib/deque/meson.build      |  11 +
 lib/deque/rte_deque.c      | 194 +++++++++++++
 lib/deque/rte_deque.h      | 533 ++++++++++++++++++++++++++++++++++++
 lib/deque/rte_deque_core.h |  82 ++++++
 lib/deque/rte_deque_pvt.h  | 538 +++++++++++++++++++++++++++++++++++++
 lib/deque/rte_deque_zc.h   | 430 +++++++++++++++++++++++++++++
 lib/deque/version.map      |  14 +
 lib/meson.build            |   1 +
 9 files changed, 1804 insertions(+)
 create mode 100644 lib/deque/meson.build
 create mode 100644 lib/deque/rte_deque.c
 create mode 100644 lib/deque/rte_deque.h
 create mode 100644 lib/deque/rte_deque_core.h
 create mode 100644 lib/deque/rte_deque_pvt.h
 create mode 100644 lib/deque/rte_deque_zc.h
 create mode 100644 lib/deque/version.map
  

Comments

Morten Brørup April 6, 2024, 9:35 a.m. UTC | #1
> From: Aditya Ambadipudi [mailto:aditya.ambadipudi@arm.com]
> Sent: Monday, 1 April 2024 03.37
> 
> From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> 
> Add a multi-thread unsafe double ended queue data structure. This
> library provides a simple and efficient alternative to multi-thread
> safe ring when multi-thread safety is not required.
> 
> Signed-off-by: Aditya Ambadipudi <aditya.ambadipudi@arm.com>
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> ---

This is a good contribution, thank you.

Two high-level comments:

1.
Please use head/tail explicitly in function names, not implicitly.
E.g. rename rte_deque_enqueue_bulk_elem() to rte_deque_enqueue_at_head_bulk_elem(), like rte_deque_enqueue_at_tail_bulk_elem().
Also consider removing "at" from the function names, e.g. rte_deque_(head|tail)_enqueue_bulk_elem() instead of rte_deque_enqueue_at_(head|tail)_bulk_elem().

2.
In the future, someone might come up with a lock-free implementation, like for the stack.
Please ensure that the API and documentation is prepared for that, so we don't have to use a different namespace for such a lock-free implementation.
I haven't reviewed the patch thoroughly; if it is already be prepared for this, you can ignore this comment.
  

Patch

diff --git a/.mailmap b/.mailmap
index 3843868716..8e705ab6ab 100644
--- a/.mailmap
+++ b/.mailmap
@@ -17,6 +17,7 @@  Adam Bynes <adambynes@outlook.com>
 Adam Dybkowski <adamx.dybkowski@intel.com>
 Adam Ludkiewicz <adam.ludkiewicz@intel.com>
 Adham Masarwah <adham@nvidia.com> <adham@mellanox.com>
+Aditya Ambadipudi <aditya.ambadipudi@arm.com>
 Adrian Moreno <amorenoz@redhat.com>
 Adrian Podlawski <adrian.podlawski@intel.com>
 Adrien Mazarguil <adrien.mazarguil@6wind.com>
diff --git a/lib/deque/meson.build b/lib/deque/meson.build
new file mode 100644
index 0000000000..1ff45fc39f
--- /dev/null
+++ b/lib/deque/meson.build
@@ -0,0 +1,11 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2024 Arm Limited
+
+sources = files('rte_deque.c')
+headers = files('rte_deque.h')
+# most sub-headers are not for direct inclusion
+indirect_headers += files (
+        'rte_deque_core.h',
+        'rte_deque_pvt.h',
+        'rte_deque_zc.h'
+)
diff --git a/lib/deque/rte_deque.c b/lib/deque/rte_deque.c
new file mode 100644
index 0000000000..3b08b91a98
--- /dev/null
+++ b/lib/deque/rte_deque.c
@@ -0,0 +1,194 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#include <stdio.h>
+#include <stdalign.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_eal_memconfig.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+
+#include "rte_deque.h"
+
+/* mask of all valid flag values to deque_create() */
+#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
+ssize_t
+rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
+{
+	ssize_t sz;
+
+	/* Check if element size is a multiple of 4B */
+	if (esize % 4 != 0) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): element size is not a multiple of 4\n",
+			__func__);
+
+		return -EINVAL;
+	}
+
+	/* count must be a power of 2 */
+	if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Requested number of elements is invalid,"
+			"must be power of 2, and not exceed %u\n",
+			__func__, RTE_DEQUE_SZ_MASK);
+
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_deque) + (ssize_t)count * esize;
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+void
+rte_deque_reset(struct rte_deque *d)
+{
+	d->head = 0;
+	d->tail = 0;
+}
+
+int
+rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+	unsigned int flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_deque) &
+			  RTE_CACHE_LINE_MASK) != 0);
+
+	/* future proof flags, only allow supported values */
+	if (flags & ~__RTE_DEQUE_F_MASK) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Unsupported flags requested %#x\n",
+			__func__, flags);
+		return -EINVAL;
+	}
+
+	/* init the deque structure */
+	memset(d, 0, sizeof(*d));
+	ret = strlcpy(d->name, name, sizeof(d->name));
+	if (ret < 0 || ret >= (int)sizeof(d->name))
+		return -ENAMETOOLONG;
+	d->flags = flags;
+
+	if (flags & RTE_DEQUE_F_EXACT_SZ) {
+		d->size = rte_align32pow2(count + 1);
+		d->mask = d->size - 1;
+		d->capacity = count;
+	} else {
+		if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+			rte_log(RTE_LOG_ERR, rte_deque_log_type,
+				"%s(): Requested size is invalid, must be power"
+				" of 2, and not exceed the size limit %u\n",
+				__func__, RTE_DEQUE_SZ_MASK);
+			return -EINVAL;
+		}
+		d->size = count;
+		d->mask = count - 1;
+		d->capacity = d->mask;
+	}
+
+	return 0;
+}
+
+/* create the deque for a given element size */
+struct rte_deque *
+rte_deque_create(const char *name, unsigned int esize, unsigned int count,
+		int socket_id, unsigned int flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_deque *d;
+	const struct rte_memzone *mz;
+	ssize_t deque_size;
+	int mz_flags = 0;
+	const unsigned int requested_count = count;
+	int ret;
+
+	/* for an exact size deque, round up from count to a power of two */
+	if (flags & RTE_DEQUE_F_EXACT_SZ)
+		count = rte_align32pow2(count + 1);
+
+	deque_size = rte_deque_get_memsize_elem(esize, count);
+	if (deque_size < 0) {
+		rte_errno = -deque_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_DEQUE_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	/* reserve a memory zone for this deque. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this function
+	 */
+	mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
+					 mz_flags, alignof(struct rte_deque));
+	if (mz != NULL) {
+		d = mz->addr;
+		/* no need to check return value here, we already checked the
+		 * arguments above
+		 */
+		rte_deque_init(d, name, requested_count, flags);
+		d->memzone = mz;
+	} else {
+		d = NULL;
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Cannot reserve memory\n", __func__);
+	}
+	return d;
+}
+
+/* free the deque */
+void
+rte_deque_free(struct rte_deque *d)
+{
+	if (d == NULL)
+		return;
+
+	/*
+	 * Deque was not created with rte_deque_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (d->memzone == NULL) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Cannot free deque, not created "
+			"with rte_deque_create()\n", __func__);
+		return;
+	}
+
+	if (rte_memzone_free(d->memzone) != 0)
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Cannot free memory\n", __func__);
+}
+
+/* dump the status of the deque on the console */
+void
+rte_deque_dump(FILE *f, const struct rte_deque *d)
+{
+	fprintf(f, "deque <%s>@%p\n", d->name, d);
+	fprintf(f, "  flags=%x\n", d->flags);
+	fprintf(f, "  size=%"PRIu32"\n", d->size);
+	fprintf(f, "  capacity=%"PRIu32"\n", d->capacity);
+	fprintf(f, "  head=%"PRIu32"\n", d->head);
+	fprintf(f, "  tail=%"PRIu32"\n", d->tail);
+	fprintf(f, "  used=%u\n", rte_deque_count(d));
+	fprintf(f, "  avail=%u\n", rte_deque_free_count(d));
+}
+
+RTE_LOG_REGISTER_DEFAULT(rte_deque_log_type, ERR);
diff --git a/lib/deque/rte_deque.h b/lib/deque/rte_deque.h
new file mode 100644
index 0000000000..1ac74ca539
--- /dev/null
+++ b/lib/deque/rte_deque.h
@@ -0,0 +1,533 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_H_
+#define _RTE_DEQUE_H_
+
+/**
+ * @file
+ * RTE double ended queue (Deque)
+ *
+ * This fixed-size queue does not provide concurrent access by
+ * multiple threads. If required, the application should use locks
+ * to protect the deque from concurrent access.
+ *
+ * - Double ended queue
+ * - Maximum size is fixed
+ * - Store objects of any size
+ * - Single/bulk/burst dequeue at tail or head
+ * - Single/bulk/burst enqueue at head or tail
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_deque_core.h>
+#include <rte_deque_pvt.h>
+#include <rte_deque_zc.h>
+
+/**
+ * Calculate the memory size needed for a deque
+ *
+ * This function returns the number of bytes needed for a deque, given
+ * the number of objects and the object size. This value is the sum of
+ * the size of the structure rte_deque and the size of the memory needed
+ * by the objects. The value is aligned to a cache line size.
+ *
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of objects in the deque (must be a power of 2).
+ * @return
+ *   - The memory size needed for the deque on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+__rte_experimental
+ssize_t rte_deque_get_memsize_elem(unsigned int esize, unsigned int count);
+
+/**
+ * Initialize a deque structure.
+ *
+ * Initialize a deque structure in memory pointed by "d". The size of the
+ * memory area must be large enough to store the deque structure and the
+ * object table. It is advised to use rte_deque_get_memsize() to get the
+ * appropriate size.
+ *
+ * The deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param d
+ *   The pointer to the deque structure followed by the objects table.
+ * @param name
+ *   The name of the deque.
+ * @param count
+ *   The number of objects in the deque (must be a power of 2,
+ *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param flags
+ *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold
+ *     exactly the requested number of objects, and the requested size
+ *     will be rounded up to the next power of two, but the usable space
+ *     will be exactly that requested. Worst case, if a power-of-2 size is
+ *     requested, half the deque space will be wasted.
+ *     Without this flag set, the deque size requested must be a power of 2,
+ *     and the usable space will be that size - 1.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+__rte_experimental
+int rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+		unsigned int flags);
+
+/**
+ * Create a new deque named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_deque_init() to initialize an empty deque.
+ *
+ * The new deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param name
+ *   The name of the deque.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The size of the deque (must be a power of 2,
+ *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold exactly the
+ *     requested number of entries, and the requested size will be rounded up
+ *     to the next power of two, but the usable space will be exactly that
+ *     requested. Worst case, if a power-of-2 size is requested, half the
+ *     deque space will be wasted.
+ *     Without this flag set, the deque size requested must be a power of 2,
+ *     and the usable space will be that size - 1.
+ * @return
+ *   On success, the pointer to the new allocated deque. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_deque *rte_deque_create(const char *name, unsigned int esize,
+				unsigned int count, int socket_id,
+				unsigned int flags);
+
+/**
+ * De-allocate all memory used by the deque.
+ *
+ * @param d
+ *   Deque to free.
+ *   If NULL then, the function does nothing.
+ */
+__rte_experimental
+void rte_deque_free(struct rte_deque *d);
+
+/**
+ * Dump the status of the deque to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param d
+ *   A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_dump(FILE *f, const struct rte_deque *d);
+
+/**
+ * Return the number of entries in a deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The number of entries in the deque.
+ */
+static inline unsigned int
+rte_deque_count(const struct rte_deque *d)
+{
+	return (d->head - d->tail) & d->mask;
+}
+
+/**
+ * Return the number of free entries in a deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The number of free entries in the deque.
+ */
+static inline unsigned int
+rte_deque_free_count(const struct rte_deque *d)
+{
+	return d->capacity - rte_deque_count(d);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_bulk_elem(struct rte_deque *d,
+			const void *obj_table,
+			unsigned int esize,
+			unsigned int n,
+			unsigned int *free_space)
+{
+	*free_space = rte_deque_free_count(d);
+	if (unlikely(n > *free_space))
+		return 0;
+	*free_space -= n;
+	return __rte_deque_enqueue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_burst_elem(struct rte_deque *d, const void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *free_space)
+{
+	unsigned int avail_space = rte_deque_free_count(d);
+	unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+	*free_space = avail_space - n;
+	return __rte_deque_enqueue_at_head(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_at_tail_bulk_elem(struct rte_deque *d,
+				 const void *obj_table, unsigned int esize,
+				 unsigned int n, unsigned int *free_space)
+{
+	*free_space = rte_deque_free_count(d);
+	if (unlikely(n > *free_space))
+		return 0;
+	*free_space -= n;
+	return __rte_deque_enqueue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_at_tail_burst_elem(struct rte_deque *d,
+				const void *obj_table, unsigned int esize,
+				unsigned int n, unsigned int *free_space)
+{
+	unsigned int avail_space = rte_deque_free_count(d);
+	unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+	*free_space = avail_space - to_be_enqueued;
+	return __rte_deque_enqueue_at_tail(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	*available = rte_deque_count(d);
+	if (unlikely(n > *available))
+		return 0;
+	*available -= n;
+	return __rte_deque_dequeue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	unsigned int count = rte_deque_count(d);
+	unsigned int to_be_dequeued = (n <= count ? n : count);
+	*available = count - to_be_dequeued;
+	return __rte_deque_dequeue_at_tail(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_at_head_bulk_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	*available = rte_deque_count(d);
+	if (unlikely(n > *available))
+		return 0;
+	*available -= n;
+	return __rte_deque_dequeue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_at_head_burst_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	unsigned int count = rte_deque_count(d);
+	unsigned int to_be_dequeued = (n <= count ? n : count);
+	*available = count - to_be_dequeued;
+	return __rte_deque_dequeue_at_head(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Flush a deque.
+ *
+ * This function flush all the objects in a deque
+ *
+ * @warning
+ * Make sure the deque is not in use while calling this function.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_reset(struct rte_deque *d);
+
+/**
+ * Test if a deque is full.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   - 1: The deque is full.
+ *   - 0: The deque is not full.
+ */
+static inline int
+rte_deque_full(const struct rte_deque *d)
+{
+	return rte_deque_free_count(d) == 0;
+}
+
+/**
+ * Test if a deque is empty.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   - 1: The deque is empty.
+ *   - 0: The deque is not empty.
+ */
+static inline int
+rte_deque_empty(const struct rte_deque *d)
+{
+	return d->tail == d->head;
+}
+
+/**
+ * Return the size of the deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The size of the data store used by the deque.
+ *   NOTE: this is not the same as the usable space in the deque. To query that
+ *   use ``rte_deque_get_capacity()``.
+ */
+static inline unsigned int
+rte_deque_get_size(const struct rte_deque *d)
+{
+	return d->size;
+}
+
+/**
+ * Return the number of objects which can be stored in the deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The usable size of the deque.
+ */
+static inline unsigned int
+rte_deque_get_capacity(const struct rte_deque *d)
+{
+	return d->capacity;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_H_ */
diff --git a/lib/deque/rte_deque_core.h b/lib/deque/rte_deque_core.h
new file mode 100644
index 0000000000..ff82b80d38
--- /dev/null
+++ b/lib/deque/rte_deque_core.h
@@ -0,0 +1,82 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_CORE_H_
+#define _RTE_DEQUE_CORE_H_
+
+/**
+ * @file
+ * This file contains definition of RTE deque structure, init flags and
+ * some related macros. This file should not be included directly,
+ * include rte_deque.h instead.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include <rte_debug.h>
+
+extern int rte_deque_log_type;
+
+#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
+/** The maximum length of a deque name. */
+#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
+
+/**
+ * Double ended queue (deque) structure.
+ *
+ * The producer and the consumer have a head and a tail index. These indices
+ * are not between 0 and size(deque)-1. These indices are between 0 and
+ * 2^32 -1. Their value is masked while accessing the objects in deque.
+ * These indices are unsigned 32bits. Hence the result of the subtraction is
+ * always a modulo of 2^32 and it is between 0 and capacity.
+ */
+struct rte_deque {
+	alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];
+	/**< Name of the deque */
+	int flags;
+	/**< Flags supplied at creation. */
+	const struct rte_memzone *memzone;
+	/**< Memzone, if any, containing the rte_deque */
+
+	alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */
+
+	uint32_t size;           /**< Size of deque. */
+	uint32_t mask;           /**< Mask (size-1) of deque. */
+	uint32_t capacity;       /**< Usable size of deque */
+	/** Ring head and tail pointers. */
+	volatile uint32_t head;
+	volatile uint32_t tail;
+};
+
+/**
+ * Deque is to hold exactly requested number of entries.
+ * Without this flag set, the deque size requested must be a power of 2, and the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * deque space will be wasted.
+ */
+#define RTE_DEQUE_F_EXACT_SZ 0x0004
+#define RTE_DEQUE_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_CORE_H_ */
diff --git a/lib/deque/rte_deque_pvt.h b/lib/deque/rte_deque_pvt.h
new file mode 100644
index 0000000000..931bbd4d19
--- /dev/null
+++ b/lib/deque/rte_deque_pvt.h
@@ -0,0 +1,538 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_PVT_H_
+#define _RTE_DEQUE_PVT_H_
+
+#define __RTE_DEQUE_COUNT(d) ((d->head - d->tail) & d->mask)
+#define __RTE_DEQUE_FREE_SPACE(d) (d->capacity - __RTE_DEQUE_COUNT(d))
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_32(struct rte_deque *d,
+				const unsigned int size,
+				uint32_t idx,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t *deque = (uint32_t *)&d[1];
+	const uint32_t *obj = (const uint32_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			deque[idx] = obj[i];
+			deque[idx + 1] = obj[i + 1];
+			deque[idx + 2] = obj[i + 2];
+			deque[idx + 3] = obj[i + 3];
+			deque[idx + 4] = obj[i + 4];
+			deque[idx + 5] = obj[i + 5];
+			deque[idx + 6] = obj[i + 6];
+			deque[idx + 7] = obj[i + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 6:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 5:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 4:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 3:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			deque[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			deque[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_64(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->head & d->mask);
+	uint64_t *deque = (uint64_t *)&d[1];
+	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			deque[idx] = obj[i];
+			deque[idx + 1] = obj[i + 1];
+			deque[idx + 2] = obj[i + 2];
+			deque[idx + 3] = obj[i + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			deque[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			deque[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->head & d->mask);
+	rte_int128_t *deque = (rte_int128_t *)&d[1];
+	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 32);
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_head(struct rte_deque *d,
+			const void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_enqueue_elems_head_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_enqueue_elems_head_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->head & d->mask;
+		nd_idx = idx * scale;
+		nd_size = d->size * scale;
+		__rte_deque_enqueue_elems_head_32(d, nd_size, nd_idx,
+						obj_table, nd_num);
+	}
+	d->head = (d->head + n) & d->mask;
+	return n;
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_32(struct rte_deque *d,
+				const unsigned int mask,
+				uint32_t idx,
+				const void *obj_table,
+				unsigned int n,
+				const unsigned int scale,
+				const unsigned int elem_size)
+{
+	unsigned int i;
+	uint32_t *deque = (uint32_t *)&d[1];
+	const uint32_t *obj = (const uint32_t *)obj_table;
+
+	if (likely(idx >= n)) {
+		for (i = 0; i < n; idx -= scale, i += scale)
+			memcpy(&deque[idx], &obj[i], elem_size);
+	} else {
+		for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+			memcpy(&deque[idx], &obj[i], elem_size);
+
+		/* Start at the ending */
+		idx = mask;
+		for (; i < n; idx -= scale, i += scale)
+			memcpy(&deque[idx], &obj[i], elem_size);
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_64(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->tail & d->mask);
+	uint64_t *deque = (uint64_t *)&d[1];
+	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+			deque[idx] = obj[i];
+			deque[idx - 1] = obj[i + 1];
+			deque[idx - 2] = obj[i + 2];
+			deque[idx - 3] = obj[i + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			deque[idx--] = obj[i++]; /* fallthrough */
+		case 2:
+			deque[idx--] = obj[i++]; /* fallthrough */
+		case 1:
+			deque[idx--] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			deque[idx] = obj[i];
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			deque[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_128(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->tail & d->mask);
+	rte_int128_t *deque = (rte_int128_t *)&d[1];
+	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+			deque[idx] = obj[i];
+			deque[idx - 1] = obj[i + 1];
+		}
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_tail(struct rte_deque *d,
+			const void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* The tail point must point at an empty cell when enqueuing */
+	d->tail--;
+
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_enqueue_elems_tail_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_enqueue_elems_tail_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->tail & d->mask;
+		nd_idx = idx * scale;
+		nd_mask = d->mask * scale;
+		__rte_deque_enqueue_elems_tail_32(d, nd_mask, nd_idx, obj_table,
+						nd_num, scale, esize);
+	}
+
+	/* The +1 is because the tail needs to point at a
+	 * non-empty memory location after the enqueuing operation.
+	 */
+	d->tail = (d->tail - n + 1) & d->mask;
+	return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_32(struct rte_deque *d,
+			const unsigned int size,
+			uint32_t idx,
+			void *obj_table,
+			unsigned int n)
+{
+	unsigned int i;
+	const uint32_t *deque = (const uint32_t *)&d[1];
+	uint32_t *obj = (uint32_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx + 1];
+			obj[i + 2] = deque[idx + 2];
+			obj[i + 3] = deque[idx + 3];
+			obj[i + 4] = deque[idx + 4];
+			obj[i + 5] = deque[idx + 5];
+			obj[i + 6] = deque[idx + 6];
+			obj[i + 7] = deque[idx + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 6:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 5:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 4:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 3:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = deque[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = deque[idx];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_64(struct rte_deque *d, void *obj_table,
+			unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->tail & d->mask);
+	const uint64_t *deque = (const uint64_t *)&d[1];
+	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx + 1];
+			obj[i + 2] = deque[idx + 2];
+			obj[i + 3] = deque[idx + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = deque[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = deque[idx];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_128(struct rte_deque *d,
+			void *obj_table,
+			unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->tail & d->mask);
+	const rte_int128_t *deque = (const rte_int128_t *)&d[1];
+	rte_int128_t *obj = (rte_int128_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 32);
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_tail(struct rte_deque *d,
+			void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_dequeue_elems_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_dequeue_elems_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->tail & d->mask;
+		nd_idx = idx * scale;
+		nd_size = d->size * scale;
+		__rte_deque_dequeue_elems_32(d, nd_size, nd_idx,
+					obj_table, nd_num);
+	}
+	d->tail = (d->tail + n) & d->mask;
+	return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_32(struct rte_deque *d,
+				const unsigned int mask,
+				uint32_t idx,
+				void *obj_table,
+				unsigned int n,
+				const unsigned int scale,
+				const unsigned int elem_size)
+{
+	unsigned int i;
+	const uint32_t *deque = (uint32_t *)&d[1];
+	uint32_t *obj = (uint32_t *)obj_table;
+
+	if (likely(idx >= n)) {
+		for (i = 0; i < n; idx -= scale, i += scale)
+			memcpy(&obj[i], &deque[idx], elem_size);
+	} else {
+		for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+			memcpy(&obj[i], &deque[idx], elem_size);
+		/* Start at the ending */
+		idx = mask;
+		for (; i < n; idx -= scale, i += scale)
+			memcpy(&obj[i], &deque[idx], elem_size);
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_64(struct rte_deque *d,
+				void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->head & d->mask);
+	const uint64_t *deque = (uint64_t *)&d[1];
+	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx - 1];
+			obj[i + 2] = deque[idx - 2];
+			obj[i + 3] = deque[idx - 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			obj[i++] = deque[idx--];  /* fallthrough */
+		case 2:
+			obj[i++] = deque[idx--]; /* fallthrough */
+		case 1:
+			obj[i++] = deque[idx--]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			obj[i] = deque[idx];
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			obj[i] = deque[idx];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_128(struct rte_deque *d,
+				void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->head & d->mask);
+	const rte_int128_t *deque = (rte_int128_t *)&d[1];
+	rte_int128_t *obj = (rte_int128_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx - 1];
+		}
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_head(struct rte_deque *d,
+			void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* The head must point at an empty cell when dequeueing */
+	d->head--;
+
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_dequeue_elems_head_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_dequeue_elems_head_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->head & d->mask;
+		nd_idx = idx * scale;
+		nd_mask = d->mask * scale;
+		__rte_deque_dequeue_elems_head_32(d, nd_mask, nd_idx, obj_table,
+						nd_num, scale, esize);
+	}
+
+	/* The +1 is because the head needs to point at a
+	 * empty memory location after the dequeueing operation.
+	 */
+	d->head = (d->head - n + 1) & d->mask;
+	return n;
+}
+#endif /* _RTE_DEQUEU_PVT_H_ */
diff --git a/lib/deque/rte_deque_zc.h b/lib/deque/rte_deque_zc.h
new file mode 100644
index 0000000000..75a7e6fddb
--- /dev/null
+++ b/lib/deque/rte_deque_zc.h
@@ -0,0 +1,430 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+#ifndef _RTE_DEQUE_ZC_H_
+#define _RTE_DEQUE_ZC_H_
+
+/**
+ * @file
+ * This file should not be included directly, include rte_deque.h instead.
+ *
+ * Deque Zero Copy APIs
+ * These APIs make it possible to split public enqueue/dequeue API
+ * into 3 parts:
+ * - enqueue/dequeue start
+ * - copy data to/from the deque
+ * - enqueue/dequeue finish
+ * These APIs provide the ability to avoid copying of the data to temporary area.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Deque zero-copy information structure.
+ *
+ * This structure contains the pointers and length of the space
+ * reserved on the Deque storage.
+ */
+struct __rte_cache_aligned rte_deque_zc_data {
+	/* Pointer to the first space in the deque */
+	void *ptr1;
+	/* Pointer to the second space in the deque if there is wrap-around.
+	 * It contains valid value only if wrap-around happens.
+	 */
+	void *ptr2;
+	/* Number of elements in the first pointer. If this is equal to
+	 * the number of elements requested, then ptr2 is NULL.
+	 * Otherwise, subtracting n1 from number of elements requested
+	 * will give the number of elements available at ptr2.
+	 */
+	unsigned int n1;
+};
+
+static __rte_always_inline void
+__rte_deque_get_elem_addr(struct rte_deque *d, uint32_t pos,
+	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2,
+	bool low_to_high)
+{
+	uint32_t idx, scale, nr_idx;
+	uint32_t *deque = (uint32_t *)&d[1];
+
+	/* Normalize to uint32_t */
+	scale = esize / sizeof(uint32_t);
+	idx = pos & d->mask;
+	nr_idx = idx * scale;
+
+	*dst1 = deque + nr_idx;
+	*n1 = num;
+
+	if (low_to_high) {
+		if (idx + num > d->size) {
+			*n1 = d->size - idx;
+			*dst2 = deque;
+		} else
+			*dst2 = NULL;
+	} else {
+		if ((int32_t)(idx - num) < 0) {
+			*n1 = idx + 1;
+			*dst2 = (void *)&deque[(-1 & d->mask) * scale];
+		} else
+			*dst2 = NULL;
+	}
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	if (unlikely(*free_space < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->head, esize, n, &zcd->ptr1,
+							&zcd->n1, &zcd->ptr2, true);
+
+	*free_space -= n;
+	return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+	d->head = (d->head + n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	n = n > *free_space ? *free_space : n;
+	return rte_deque_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_zc_bulk_elem_tail_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	if (unlikely(*free_space < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->tail - 1, esize, n, &zcd->ptr1,
+							  &zcd->n1, &zcd->ptr2, false);
+
+	*free_space -= n;
+	return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_enqueue_zc_elem_tail_finish(struct rte_deque *d, unsigned int n)
+{
+	d->tail = (d->tail - n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.@param r
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_enqueue_zc_burst_elem_tail_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	n = n > *free_space ? *free_space : n;
+	return rte_deque_enqueue_zc_bulk_elem_tail_start(d, esize, n, zcd, free_space);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	if (unlikely(*available < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->tail, esize, n, &zcd->ptr1,
+							&zcd->n1, &zcd->ptr2, true);
+
+	*available -= n;
+	return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+	d->tail = (d->tail + n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	n = n > *available ? *available : n;
+	return rte_deque_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_zc_bulk_elem_head_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	if (unlikely(*available < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->head - 1, esize, n, &zcd->ptr1,
+							&zcd->n1, &zcd->ptr2, false);
+
+	*available -= n;
+	return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_dequeue_zc_elem_head_finish(struct rte_deque *d, unsigned int n)
+{
+	d->head = (d->head - n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_dequeue_zc_burst_elem_head_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	n = n > *available ? *available : n;
+	return rte_deque_dequeue_zc_bulk_elem_head_start(d, esize, n, zcd, available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_ZC_H_ */
diff --git a/lib/deque/version.map b/lib/deque/version.map
new file mode 100644
index 0000000000..103fd3b512
--- /dev/null
+++ b/lib/deque/version.map
@@ -0,0 +1,14 @@ 
+EXPERIMENTAL {
+	global:
+
+	# added in 24.07
+	rte_deque_log_type;
+	rte_deque_create;
+	rte_deque_dump;
+	rte_deque_free;
+	rte_deque_get_memsize_elem;
+	rte_deque_init;
+	rte_deque_reset;
+
+	local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 179a272932..8c8c1e98e2 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -14,6 +14,7 @@  libraries = [
         'argparse',
         'telemetry', # basic info querying
         'eal', # everything depends on eal
+        'deque',
         'ring',
         'rcu', # rcu depends on ring
         'mempool',