[v4,2/2] mempool/nb_stack: add non-blocking stack mempool

Message ID 20190117153659.28477-3-gage.eads@intel.com
State New
Delegated to: Thomas Monjalon
Headers show
Series
  • Add non-blocking stack mempool handler
Related show

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Eads, Gage Jan. 17, 2019, 3:36 p.m.
This commit adds support for non-blocking (linked list based) stack mempool
handler. The stack uses a 128-bit compare-and-swap instruction, and thus is
limited to x86_64. The 128-bit CAS atomically updates the stack top pointer
and a modification counter, which protects against the ABA problem.

In mempool_perf_autotest the lock-based stack outperforms the non-blocking
handler*, however:
- For applications with preemptible pthreads, a lock-based stack's
  worst-case performance (i.e. one thread being preempted while
  holding the spinlock) is much worse than the non-blocking stack's.
- Using per-thread mempool caches will largely mitigate the performance
  difference.

*Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. The lock-based stack's
rate_persec was 1x-3.5x the non-blocking stack's.

Signed-off-by: Gage Eads <gage.eads@intel.com>
Acked-by: Andrew Rybchenko <arybchenko@solarflare.com>
---
 MAINTAINERS                                        |   4 +
 config/common_base                                 |   1 +
 doc/guides/prog_guide/env_abstraction_layer.rst    |   5 +
 drivers/mempool/Makefile                           |   3 +
 drivers/mempool/meson.build                        |   3 +-
 drivers/mempool/nb_stack/Makefile                  |  23 ++++
 drivers/mempool/nb_stack/meson.build               |   6 +
 drivers/mempool/nb_stack/nb_lifo.h                 | 147 +++++++++++++++++++++
 drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125 ++++++++++++++++++
 .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
 mk/rte.app.mk                                      |   7 +-
 11 files changed, 325 insertions(+), 3 deletions(-)
 create mode 100644 drivers/mempool/nb_stack/Makefile
 create mode 100644 drivers/mempool/nb_stack/meson.build
 create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
 create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
 create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map

Comments

Honnappa Nagarahalli Jan. 18, 2019, 5:05 a.m. | #1
Hi Gage,
     Thank you for your contribution on non-blocking data structures. I think they are important to extend DPDK into additional use cases.

I am wondering if it makes sense to decouple the NB stack data structure from mempool driver (similar to rte_ring)? I see that stack based mempool implements the stack data structure in the driver. But, NB stack might not be such a trivial data structure. It might be useful for the applications or other use cases as well.

I also suggest that we use C11 __atomic_xxx APIs for memory operations. The rte_atomic64_xxx APIs use __sync_xxx APIs which do not provide the capability to express memory orderings.

Please find few comments inline.

> 
> This commit adds support for non-blocking (linked list based) stack
> mempool handler. The stack uses a 128-bit compare-and-swap instruction,
> and thus is limited to x86_64. The 128-bit CAS atomically updates the stack
> top pointer and a modification counter, which protects against the ABA
> problem.
> 
> In mempool_perf_autotest the lock-based stack outperforms the non-
> blocking handler*, however:
> - For applications with preemptible pthreads, a lock-based stack's
>   worst-case performance (i.e. one thread being preempted while
>   holding the spinlock) is much worse than the non-blocking stack's.
> - Using per-thread mempool caches will largely mitigate the performance
>   difference.
> 
> *Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
> running on isolcpus cores with a tickless scheduler. The lock-based stack's
> rate_persec was 1x-3.5x the non-blocking stack's.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Acked-by: Andrew Rybchenko <arybchenko@solarflare.com>
> ---
>  MAINTAINERS                                        |   4 +
>  config/common_base                                 |   1 +
>  doc/guides/prog_guide/env_abstraction_layer.rst    |   5 +
>  drivers/mempool/Makefile                           |   3 +
>  drivers/mempool/meson.build                        |   3 +-
>  drivers/mempool/nb_stack/Makefile                  |  23 ++++
>  drivers/mempool/nb_stack/meson.build               |   6 +
>  drivers/mempool/nb_stack/nb_lifo.h                 | 147
> +++++++++++++++++++++
>  drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125
> ++++++++++++++++++
>  .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
>  mk/rte.app.mk                                      |   7 +-
>  11 files changed, 325 insertions(+), 3 deletions(-)  create mode 100644
> drivers/mempool/nb_stack/Makefile  create mode 100644
> drivers/mempool/nb_stack/meson.build
>  create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
>  create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
>  create mode 100644
> drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 470f36b9c..5519d3323 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -416,6 +416,10 @@ M: Artem V. Andreev <artem.andreev@oktetlabs.ru>
>  M: Andrew Rybchenko <arybchenko@solarflare.com>
>  F: drivers/mempool/bucket/
> 
> +Non-blocking stack memory pool
> +M: Gage Eads <gage.eads@intel.com>
> +F: drivers/mempool/nb_stack/
> +
> 
>  Bus Drivers
>  -----------
> diff --git a/config/common_base b/config/common_base index
> 964a6956e..8a51f36b1 100644
> --- a/config/common_base
> +++ b/config/common_base
> @@ -726,6 +726,7 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n  #
> CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
>  CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
> +CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
>  CONFIG_RTE_DRIVER_MEMPOOL_RING=y
>  CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
> 
> diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst
> b/doc/guides/prog_guide/env_abstraction_layer.rst
> index 929d76dba..9497b879c 100644
> --- a/doc/guides/prog_guide/env_abstraction_layer.rst
> +++ b/doc/guides/prog_guide/env_abstraction_layer.rst
> @@ -541,6 +541,11 @@ Known Issues
> 
>    5. It MUST not be used by multi-producer/consumer pthreads, whose
> scheduling policies are SCHED_FIFO or SCHED_RR.
> 
> +  Alternatively, x86_64 applications can use the non-blocking stack
> mempool handler. When considering this handler, note that:
> +
> +  - it is limited to the x86_64 platform, because it uses an instruction (16-
> byte compare-and-swap) that is not available on other platforms.
                                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Arm architecture supports similar instructions. I suggest to simplify this statement to indicate that 'nb_stack feature is available for x86_64 platforms currently'

> +  - it has worse average-case performance than the non-preemptive
> rte_ring, but software caching (e.g. the mempool cache) can mitigate this by
> reducing the number of handler operations.
> +
>  + rte_timer
> 
>    Running  ``rte_timer_manage()`` on a non-EAL pthread is not allowed.
> However, resetting/stopping the timer from a non-EAL pthread is allowed.
> diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile index
> 28c2e8360..895cf8a34 100644
> --- a/drivers/mempool/Makefile
> +++ b/drivers/mempool/Makefile
> @@ -10,6 +10,9 @@ endif
>  ifeq ($(CONFIG_RTE_EAL_VFIO)$(CONFIG_RTE_LIBRTE_FSLMC_BUS),yy)
>  DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2  endif
> +ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> +DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack endif
>  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
>  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
>  DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx diff --git
> a/drivers/mempool/meson.build b/drivers/mempool/meson.build index
> 4527d9806..220cfaf63 100644
> --- a/drivers/mempool/meson.build
> +++ b/drivers/mempool/meson.build
> @@ -1,7 +1,8 @@
>  # SPDX-License-Identifier: BSD-3-Clause  # Copyright(c) 2017 Intel
> Corporation
> 
> -drivers = ['bucket', 'dpaa', 'dpaa2', 'octeontx', 'ring', 'stack']
> +drivers = ['bucket', 'dpaa', 'dpaa2', 'nb_stack', 'octeontx', 'ring',
> +'stack']
> +
>  std_deps = ['mempool']
>  config_flag_fmt = 'RTE_LIBRTE_@0@_MEMPOOL'
>  driver_name_fmt = 'rte_mempool_@0@'
> diff --git a/drivers/mempool/nb_stack/Makefile
> b/drivers/mempool/nb_stack/Makefile
> new file mode 100644
> index 000000000..318b18283
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/Makefile
> @@ -0,0 +1,23 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> +Corporation
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +#
> +# library name
> +#
> +LIB = librte_mempool_nb_stack.a
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS)
> +
> +# Headers
> +LDLIBS += -lrte_eal -lrte_mempool
> +
> +EXPORT_MAP := rte_mempool_nb_stack_version.map
> +
> +LIBABIVER := 1
> +
> +SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) +=
> rte_mempool_nb_stack.c
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/drivers/mempool/nb_stack/meson.build
> b/drivers/mempool/nb_stack/meson.build
> new file mode 100644
> index 000000000..7dec72242
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/meson.build
> @@ -0,0 +1,6 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> +Corporation
> +
> +build = dpdk_conf.has('RTE_ARCH_X86_64')
> +
> +sources = files('rte_mempool_nb_stack.c')
> diff --git a/drivers/mempool/nb_stack/nb_lifo.h
> b/drivers/mempool/nb_stack/nb_lifo.h
> new file mode 100644
> index 000000000..ad4a3401f
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/nb_lifo.h
> @@ -0,0 +1,147 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#ifndef _NB_LIFO_H_
> +#define _NB_LIFO_H_
> +
> +struct nb_lifo_elem {
> +	void *data;
> +	struct nb_lifo_elem *next;
> +};
> +
> +struct nb_lifo_head {
> +	struct nb_lifo_elem *top; /**< Stack top */
> +	uint64_t cnt; /**< Modification counter */ };
Minor comment, mentioning ABA problem in the comments for 'cnt' will be helpful.

> +
> +struct nb_lifo {
> +	volatile struct nb_lifo_head head __rte_aligned(16);
> +	rte_atomic64_t len;
> +} __rte_cache_aligned;
> +
> +static __rte_always_inline void
> +nb_lifo_init(struct nb_lifo *lifo)
> +{
> +	memset(lifo, 0, sizeof(*lifo));
> +	rte_atomic64_set(&lifo->len, 0);
> +}
> +
> +static __rte_always_inline unsigned int nb_lifo_len(struct nb_lifo
> +*lifo) {
> +	/* nb_lifo_push() and nb_lifo_pop() do not update the list's
> contents
> +	 * and lifo->len atomically, which can cause the list to appear
> shorter
> +	 * than it actually is if this function is called while other threads
> +	 * are modifying the list.
> +	 *
> +	 * However, given the inherently approximate nature of the
> get_count
> +	 * callback -- even if the list and its size were updated atomically,
> +	 * the size could change between when get_count executes and
> when the
> +	 * value is returned to the caller -- this is acceptable.
> +	 *
> +	 * The lifo->len updates are placed such that the list may appear to
> +	 * have fewer elements than it does, but will never appear to have
> more
> +	 * elements. If the mempool is near-empty to the point that this is a
> +	 * concern, the user should consider increasing the mempool size.
> +	 */
> +	return (unsigned int)rte_atomic64_read(&lifo->len);
> +}
> +
> +static __rte_always_inline void
> +nb_lifo_push(struct nb_lifo *lifo,
> +	     struct nb_lifo_elem *first,
> +	     struct nb_lifo_elem *last,
> +	     unsigned int num)
> +{
> +	while (1) {
> +		struct nb_lifo_head old_head, new_head;
> +
> +		old_head = lifo->head;
> +
> +		/* Swing the top pointer to the first element in the list and
> +		 * make the last element point to the old top.
> +		 */
> +		new_head.top = first;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		last->next = old_head.top;
> +
> +		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> +					 (uint64_t *)&old_head,
> +					 (uint64_t *)&new_head))
> +			break;
> +	}
Minor comment, this can be a do-while loop (for ex: similar to the one in __rte_ring_move_prod_head)

> +
> +	rte_atomic64_add(&lifo->len, num);
> +}
> +
> +static __rte_always_inline void
> +nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem) {
> +	nb_lifo_push(lifo, elem, elem, 1);
> +}
> +
> +static __rte_always_inline struct nb_lifo_elem * nb_lifo_pop(struct
> +nb_lifo *lifo,
> +	    unsigned int num,
> +	    void **obj_table,
> +	    struct nb_lifo_elem **last)
> +{
> +	struct nb_lifo_head old_head;
> +
> +	/* Reserve num elements, if available */
> +	while (1) {
> +		uint64_t len = rte_atomic64_read(&lifo->len);
> +
> +		/* Does the list contain enough elements? */
> +		if (len < num)
> +			return NULL;
> +
> +		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> +					len, len - num))
> +			break;
> +	}
> +
> +	/* Pop num elements */
> +	while (1) {
> +		struct nb_lifo_head new_head;
> +		struct nb_lifo_elem *tmp;
> +		unsigned int i;
> +
> +		old_head = lifo->head;
> +
> +		tmp = old_head.top;
> +
> +		/* Traverse the list to find the new head. A next pointer will
> +		 * either point to another element or NULL; if a thread
> +		 * encounters a pointer that has already been popped, the
> CAS
> +		 * will fail.
> +		 */
> +		for (i = 0; i < num && tmp != NULL; i++) {
> +			if (obj_table)
This 'if' check can be outside the for loop. May be use RTE_ASSERT in the beginning of the function?

> +				obj_table[i] = tmp->data;
> +			if (last)
> +				*last = tmp;
> +			tmp = tmp->next;
> +		}
> +
> +		/* If NULL was encountered, the list was modified while
> +		 * traversing it. Retry.
> +		 */
> +		if (i != num)
> +			continue;
> +
> +		new_head.top = tmp;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> +					 (uint64_t *)&old_head,
> +					 (uint64_t *)&new_head))
> +			break;
> +	}
> +
> +	return old_head.top;
> +}
> +
> +#endif /* _NB_LIFO_H_ */
> diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> new file mode 100644
> index 000000000..1818a2cfa
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> @@ -0,0 +1,125 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#include <stdio.h>
> +#include <rte_mempool.h>
> +#include <rte_malloc.h>
> +
> +#include "nb_lifo.h"
> +
> +struct rte_mempool_nb_stack {
> +	uint64_t size;
> +	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
> +	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements
> */
> +};
> +
> +static int
> +nb_stack_alloc(struct rte_mempool *mp)
> +{
> +	struct rte_mempool_nb_stack *s;
> +	struct nb_lifo_elem *elems;
> +	unsigned int n = mp->size;
> +	unsigned int size, i;
> +
> +	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
IMO, the allocation of the stack elements can be moved under nb_lifo_init API, it would make the nb stack code modular.

> +
> +	/* Allocate our local memory structure */
> +	s = rte_zmalloc_socket("mempool-nb_stack",
> +			       size,
> +			       RTE_CACHE_LINE_SIZE,
> +			       mp->socket_id);
> +	if (s == NULL) {
> +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
> +		return -ENOMEM;
> +	}
> +
> +	s->size = n;
> +
> +	nb_lifo_init(&s->used_lifo);
> +	nb_lifo_init(&s->free_lifo);
> +
> +	elems = (struct nb_lifo_elem *)&s[1];
> +	for (i = 0; i < n; i++)
> +		nb_lifo_push_single(&s->free_lifo, &elems[i]);
This also can be added to nb_lifo_init API.

> +
> +	mp->pool_data = s;
> +
> +	return 0;
> +}
> +
> +static int
> +nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table,
> +		 unsigned int n)
> +{
> +	struct rte_mempool_nb_stack *s = mp->pool_data;
> +	struct nb_lifo_elem *first, *last, *tmp;
> +	unsigned int i;
> +
> +	if (unlikely(n == 0))
> +		return 0;
> +
> +	/* Pop n free elements */
> +	first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL);
> +	if (unlikely(first == NULL))
> +		return -ENOBUFS;
> +
> +	/* Prepare the list elements */
> +	tmp = first;
> +	for (i = 0; i < n; i++) {
> +		tmp->data = obj_table[i];
> +		last = tmp;
> +		tmp = tmp->next;
> +	}
> +
> +	/* Enqueue them to the used list */
> +	nb_lifo_push(&s->used_lifo, first, last, n);
> +
> +	return 0;
> +}
> +
> +static int
> +nb_stack_dequeue(struct rte_mempool *mp, void **obj_table,
> +		 unsigned int n)
> +{
> +	struct rte_mempool_nb_stack *s = mp->pool_data;
> +	struct nb_lifo_elem *first, *last;
> +
> +	if (unlikely(n == 0))
> +		return 0;
> +
> +	/* Pop n used elements */
> +	first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last);
> +	if (unlikely(first == NULL))
> +		return -ENOENT;
> +
> +	/* Enqueue the list elements to the free list */
> +	nb_lifo_push(&s->free_lifo, first, last, n);
> +
> +	return 0;
> +}
> +
> +static unsigned
> +nb_stack_get_count(const struct rte_mempool *mp) {
> +	struct rte_mempool_nb_stack *s = mp->pool_data;
> +
> +	return nb_lifo_len(&s->used_lifo);
> +}
> +
> +static void
> +nb_stack_free(struct rte_mempool *mp)
> +{
> +	rte_free(mp->pool_data);
> +}
> +
> +static struct rte_mempool_ops ops_nb_stack = {
> +	.name = "nb_stack",
> +	.alloc = nb_stack_alloc,
> +	.free = nb_stack_free,
> +	.enqueue = nb_stack_enqueue,
> +	.dequeue = nb_stack_dequeue,
> +	.get_count = nb_stack_get_count
> +};
> +
> +MEMPOOL_REGISTER_OPS(ops_nb_stack);
> diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> new file mode 100644
> index 000000000..fc8c95e91
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> @@ -0,0 +1,4 @@
> +DPDK_19.05 {
> +
> +	local: *;
> +};
> diff --git a/mk/rte.app.mk b/mk/rte.app.mk index 02e8b6f05..d4b4aaaf6
> 100644
> --- a/mk/rte.app.mk
> +++ b/mk/rte.app.mk
> @@ -131,8 +131,11 @@ endif
>  ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
>  # plugins (link only if static libraries)
> 
> -_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -
> lrte_mempool_bucket
> -_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -
> lrte_mempool_stack
> +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET)   += -
> lrte_mempool_bucket
> +ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += -
> lrte_mempool_nb_stack
> +endif
> +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)    += -
> lrte_mempool_stack
>  ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
>  _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL)   += -lrte_mempool_dpaa
>  endif
> --
> 2.13.6
Eads, Gage Jan. 18, 2019, 8:09 p.m. | #2
> -----Original Message-----
> From: Honnappa Nagarahalli [mailto:Honnappa.Nagarahalli@arm.com]
> Sent: Thursday, January 17, 2019 11:05 PM
> To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> Cc: olivier.matz@6wind.com; arybchenko@solarflare.com; Richardson, Bruce
> <bruce.richardson@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>; Gavin Hu (Arm Technology China)
> <Gavin.Hu@arm.com>; nd <nd@arm.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v4 2/2] mempool/nb_stack: add non-blocking
> stack mempool
> 
> Hi Gage,
>      Thank you for your contribution on non-blocking data structures. I think they
> are important to extend DPDK into additional use cases.
> 

Glad to hear it. Be sure to check out my non-blocking ring patchset as well, if you haven't already: http://mails.dpdk.org/archives/dev/2019-January/123774.html

> I am wondering if it makes sense to decouple the NB stack data structure from
> mempool driver (similar to rte_ring)? I see that stack based mempool
> implements the stack data structure in the driver. But, NB stack might not be
> such a trivial data structure. It might be useful for the applications or other use
> cases as well.
> 

I agree -- and you're not the first to suggest this :).

I'm going to defer that work to a later patchset; creating a new lib/ directory requires tech board approval (IIRC), which would unnecessarily slow down this mempool handler from getting merged.

> I also suggest that we use C11 __atomic_xxx APIs for memory operations. The
> rte_atomic64_xxx APIs use __sync_xxx APIs which do not provide the capability
> to express memory orderings.
> 

Ok, I will add those (dependent on RTE_USE_C11_MEM_MODEL).

> Please find few comments inline.
> 
> >
> > This commit adds support for non-blocking (linked list based) stack
> > mempool handler. The stack uses a 128-bit compare-and-swap
> > instruction, and thus is limited to x86_64. The 128-bit CAS atomically
> > updates the stack top pointer and a modification counter, which
> > protects against the ABA problem.
> >
> > In mempool_perf_autotest the lock-based stack outperforms the non-
> > blocking handler*, however:
> > - For applications with preemptible pthreads, a lock-based stack's
> >   worst-case performance (i.e. one thread being preempted while
> >   holding the spinlock) is much worse than the non-blocking stack's.
> > - Using per-thread mempool caches will largely mitigate the performance
> >   difference.
> >
> > *Test setup: x86_64 build with default config, dual-socket Xeon
> > E5-2699 v4, running on isolcpus cores with a tickless scheduler. The
> > lock-based stack's rate_persec was 1x-3.5x the non-blocking stack's.
> >
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> > Acked-by: Andrew Rybchenko <arybchenko@solarflare.com>
> > ---
> >  MAINTAINERS                                        |   4 +
> >  config/common_base                                 |   1 +
> >  doc/guides/prog_guide/env_abstraction_layer.rst    |   5 +
> >  drivers/mempool/Makefile                           |   3 +
> >  drivers/mempool/meson.build                        |   3 +-
> >  drivers/mempool/nb_stack/Makefile                  |  23 ++++
> >  drivers/mempool/nb_stack/meson.build               |   6 +
> >  drivers/mempool/nb_stack/nb_lifo.h                 | 147
> > +++++++++++++++++++++
> >  drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125
> > ++++++++++++++++++
> >  .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
> >  mk/rte.app.mk                                      |   7 +-
> >  11 files changed, 325 insertions(+), 3 deletions(-)  create mode
> > 100644 drivers/mempool/nb_stack/Makefile  create mode 100644
> > drivers/mempool/nb_stack/meson.build
> >  create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
> >  create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> >  create mode 100644
> > drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> >
> > diff --git a/MAINTAINERS b/MAINTAINERS index 470f36b9c..5519d3323
> > 100644
> > --- a/MAINTAINERS
> > +++ b/MAINTAINERS
> > @@ -416,6 +416,10 @@ M: Artem V. Andreev <artem.andreev@oktetlabs.ru>
> >  M: Andrew Rybchenko <arybchenko@solarflare.com>
> >  F: drivers/mempool/bucket/
> >
> > +Non-blocking stack memory pool
> > +M: Gage Eads <gage.eads@intel.com>
> > +F: drivers/mempool/nb_stack/
> > +
> >
> >  Bus Drivers
> >  -----------
> > diff --git a/config/common_base b/config/common_base index
> > 964a6956e..8a51f36b1 100644
> > --- a/config/common_base
> > +++ b/config/common_base
> > @@ -726,6 +726,7 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n  #
> > CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
> >  CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
> > +CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
> >  CONFIG_RTE_DRIVER_MEMPOOL_RING=y
> >  CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
> >
> > diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst
> > b/doc/guides/prog_guide/env_abstraction_layer.rst
> > index 929d76dba..9497b879c 100644
> > --- a/doc/guides/prog_guide/env_abstraction_layer.rst
> > +++ b/doc/guides/prog_guide/env_abstraction_layer.rst
> > @@ -541,6 +541,11 @@ Known Issues
> >
> >    5. It MUST not be used by multi-producer/consumer pthreads, whose
> > scheduling policies are SCHED_FIFO or SCHED_RR.
> >
> > +  Alternatively, x86_64 applications can use the non-blocking stack
> > mempool handler. When considering this handler, note that:
> > +
> > +  - it is limited to the x86_64 platform, because it uses an
> > + instruction (16-
> > byte compare-and-swap) that is not available on other platforms.
>                                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Arm
> architecture supports similar instructions. I suggest to simplify this statement to
> indicate that 'nb_stack feature is available for x86_64 platforms currently'
> 

Will do.

> > +  - it has worse average-case performance than the non-preemptive
> > rte_ring, but software caching (e.g. the mempool cache) can mitigate
> > this by reducing the number of handler operations.
> > +
> >  + rte_timer
> >
> >    Running  ``rte_timer_manage()`` on a non-EAL pthread is not allowed.
> > However, resetting/stopping the timer from a non-EAL pthread is allowed.
> > diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile index
> > 28c2e8360..895cf8a34 100644
> > --- a/drivers/mempool/Makefile
> > +++ b/drivers/mempool/Makefile
> > @@ -10,6 +10,9 @@ endif
> >  ifeq ($(CONFIG_RTE_EAL_VFIO)$(CONFIG_RTE_LIBRTE_FSLMC_BUS),yy)
> >  DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2  endif
> > +ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> > +DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack endif
> >  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
> >  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
> >  DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx diff --git
> > a/drivers/mempool/meson.build b/drivers/mempool/meson.build index
> > 4527d9806..220cfaf63 100644
> > --- a/drivers/mempool/meson.build
> > +++ b/drivers/mempool/meson.build
> > @@ -1,7 +1,8 @@
> >  # SPDX-License-Identifier: BSD-3-Clause  # Copyright(c) 2017 Intel
> > Corporation
> >
> > -drivers = ['bucket', 'dpaa', 'dpaa2', 'octeontx', 'ring', 'stack']
> > +drivers = ['bucket', 'dpaa', 'dpaa2', 'nb_stack', 'octeontx', 'ring',
> > +'stack']
> > +
> >  std_deps = ['mempool']
> >  config_flag_fmt = 'RTE_LIBRTE_@0@_MEMPOOL'
> >  driver_name_fmt = 'rte_mempool_@0@'
> > diff --git a/drivers/mempool/nb_stack/Makefile
> > b/drivers/mempool/nb_stack/Makefile
> > new file mode 100644
> > index 000000000..318b18283
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/Makefile
> > @@ -0,0 +1,23 @@
> > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> > +Corporation
> > +
> > +include $(RTE_SDK)/mk/rte.vars.mk
> > +
> > +#
> > +# library name
> > +#
> > +LIB = librte_mempool_nb_stack.a
> > +
> > +CFLAGS += -O3
> > +CFLAGS += $(WERROR_FLAGS)
> > +
> > +# Headers
> > +LDLIBS += -lrte_eal -lrte_mempool
> > +
> > +EXPORT_MAP := rte_mempool_nb_stack_version.map
> > +
> > +LIBABIVER := 1
> > +
> > +SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) +=
> > rte_mempool_nb_stack.c
> > +
> > +include $(RTE_SDK)/mk/rte.lib.mk
> > diff --git a/drivers/mempool/nb_stack/meson.build
> > b/drivers/mempool/nb_stack/meson.build
> > new file mode 100644
> > index 000000000..7dec72242
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/meson.build
> > @@ -0,0 +1,6 @@
> > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> > +Corporation
> > +
> > +build = dpdk_conf.has('RTE_ARCH_X86_64')
> > +
> > +sources = files('rte_mempool_nb_stack.c')
> > diff --git a/drivers/mempool/nb_stack/nb_lifo.h
> > b/drivers/mempool/nb_stack/nb_lifo.h
> > new file mode 100644
> > index 000000000..ad4a3401f
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/nb_lifo.h
> > @@ -0,0 +1,147 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2019 Intel Corporation  */
> > +
> > +#ifndef _NB_LIFO_H_
> > +#define _NB_LIFO_H_
> > +
> > +struct nb_lifo_elem {
> > +	void *data;
> > +	struct nb_lifo_elem *next;
> > +};
> > +
> > +struct nb_lifo_head {
> > +	struct nb_lifo_elem *top; /**< Stack top */
> > +	uint64_t cnt; /**< Modification counter */ };
> Minor comment, mentioning ABA problem in the comments for 'cnt' will be
> helpful.
> 

Sure.

> > +
> > +struct nb_lifo {
> > +	volatile struct nb_lifo_head head __rte_aligned(16);
> > +	rte_atomic64_t len;
> > +} __rte_cache_aligned;
> > +
> > +static __rte_always_inline void
> > +nb_lifo_init(struct nb_lifo *lifo)
> > +{
> > +	memset(lifo, 0, sizeof(*lifo));
> > +	rte_atomic64_set(&lifo->len, 0);
> > +}
> > +
> > +static __rte_always_inline unsigned int nb_lifo_len(struct nb_lifo
> > +*lifo) {
> > +	/* nb_lifo_push() and nb_lifo_pop() do not update the list's
> > contents
> > +	 * and lifo->len atomically, which can cause the list to appear
> > shorter
> > +	 * than it actually is if this function is called while other threads
> > +	 * are modifying the list.
> > +	 *
> > +	 * However, given the inherently approximate nature of the
> > get_count
> > +	 * callback -- even if the list and its size were updated atomically,
> > +	 * the size could change between when get_count executes and
> > when the
> > +	 * value is returned to the caller -- this is acceptable.
> > +	 *
> > +	 * The lifo->len updates are placed such that the list may appear to
> > +	 * have fewer elements than it does, but will never appear to have
> > more
> > +	 * elements. If the mempool is near-empty to the point that this is a
> > +	 * concern, the user should consider increasing the mempool size.
> > +	 */
> > +	return (unsigned int)rte_atomic64_read(&lifo->len);
> > +}
> > +
> > +static __rte_always_inline void
> > +nb_lifo_push(struct nb_lifo *lifo,
> > +	     struct nb_lifo_elem *first,
> > +	     struct nb_lifo_elem *last,
> > +	     unsigned int num)
> > +{
> > +	while (1) {
> > +		struct nb_lifo_head old_head, new_head;
> > +
> > +		old_head = lifo->head;
> > +
> > +		/* Swing the top pointer to the first element in the list and
> > +		 * make the last element point to the old top.
> > +		 */
> > +		new_head.top = first;
> > +		new_head.cnt = old_head.cnt + 1;
> > +
> > +		last->next = old_head.top;
> > +
> > +		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> > +					 (uint64_t *)&old_head,
> > +					 (uint64_t *)&new_head))
> > +			break;
> > +	}
> Minor comment, this can be a do-while loop (for ex: similar to the one in
> __rte_ring_move_prod_head)
> 

Sure.

> > +
> > +	rte_atomic64_add(&lifo->len, num);
> > +}
> > +
> > +static __rte_always_inline void
> > +nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem) {
> > +	nb_lifo_push(lifo, elem, elem, 1);
> > +}
> > +
> > +static __rte_always_inline struct nb_lifo_elem * nb_lifo_pop(struct
> > +nb_lifo *lifo,
> > +	    unsigned int num,
> > +	    void **obj_table,
> > +	    struct nb_lifo_elem **last)
> > +{
> > +	struct nb_lifo_head old_head;
> > +
> > +	/* Reserve num elements, if available */
> > +	while (1) {
> > +		uint64_t len = rte_atomic64_read(&lifo->len);
> > +
> > +		/* Does the list contain enough elements? */
> > +		if (len < num)
> > +			return NULL;
> > +
> > +		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> > +					len, len - num))
> > +			break;
> > +	}
> > +
> > +	/* Pop num elements */
> > +	while (1) {
> > +		struct nb_lifo_head new_head;
> > +		struct nb_lifo_elem *tmp;
> > +		unsigned int i;
> > +
> > +		old_head = lifo->head;
> > +
> > +		tmp = old_head.top;
> > +
> > +		/* Traverse the list to find the new head. A next pointer will
> > +		 * either point to another element or NULL; if a thread
> > +		 * encounters a pointer that has already been popped, the
> > CAS
> > +		 * will fail.
> > +		 */
> > +		for (i = 0; i < num && tmp != NULL; i++) {
> > +			if (obj_table)
> This 'if' check can be outside the for loop. May be use RTE_ASSERT in the
> beginning of the function?
> 

A NULL obj_table pointer isn't an error -- nb_stack_enqueue() calls this function with NULL because it doesn't need the popped elements added to a table. When the compiler inlines this function into nb_stack_enqueue(), it can use constant propagation to optimize away the if-statement.

I don't think that's possible for the other caller, nb_stack_dequeue, though, unless we add a NULL pointer check to the beginning of that function. Then it would be guaranteed that obj_table is non-NULL, and the compiler can optimize away the if-statement. I'll add that.

> > +				obj_table[i] = tmp->data;
> > +			if (last)
> > +				*last = tmp;
> > +			tmp = tmp->next;
> > +		}
> > +
> > +		/* If NULL was encountered, the list was modified while
> > +		 * traversing it. Retry.
> > +		 */
> > +		if (i != num)
> > +			continue;
> > +
> > +		new_head.top = tmp;
> > +		new_head.cnt = old_head.cnt + 1;
> > +
> > +		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> > +					 (uint64_t *)&old_head,
> > +					 (uint64_t *)&new_head))
> > +			break;
> > +	}
> > +
> > +	return old_head.top;
> > +}
> > +
> > +#endif /* _NB_LIFO_H_ */
> > diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > new file mode 100644
> > index 000000000..1818a2cfa
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > @@ -0,0 +1,125 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2019 Intel Corporation  */
> > +
> > +#include <stdio.h>
> > +#include <rte_mempool.h>
> > +#include <rte_malloc.h>
> > +
> > +#include "nb_lifo.h"
> > +
> > +struct rte_mempool_nb_stack {
> > +	uint64_t size;
> > +	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
> > +	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements
> > */
> > +};
> > +
> > +static int
> > +nb_stack_alloc(struct rte_mempool *mp) {
> > +	struct rte_mempool_nb_stack *s;
> > +	struct nb_lifo_elem *elems;
> > +	unsigned int n = mp->size;
> > +	unsigned int size, i;
> > +
> > +	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
> IMO, the allocation of the stack elements can be moved under nb_lifo_init API,
> it would make the nb stack code modular.
> 

(see below)

> > +
> > +	/* Allocate our local memory structure */
> > +	s = rte_zmalloc_socket("mempool-nb_stack",
> > +			       size,
> > +			       RTE_CACHE_LINE_SIZE,
> > +			       mp->socket_id);
> > +	if (s == NULL) {
> > +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
> > +		return -ENOMEM;
> > +	}
> > +
> > +	s->size = n;
> > +
> > +	nb_lifo_init(&s->used_lifo);
> > +	nb_lifo_init(&s->free_lifo);
> > +
> > +	elems = (struct nb_lifo_elem *)&s[1];
> > +	for (i = 0; i < n; i++)
> > +		nb_lifo_push_single(&s->free_lifo, &elems[i]);
> This also can be added to nb_lifo_init API.
> 

Sure, good suggestions. I'll address this.

Appreciate the feedback!

Thanks,
Gage
Eads, Gage Jan. 19, 2019, midnight | #3
> -----Original Message-----
> From: Eads, Gage
> Sent: Friday, January 18, 2019 2:10 PM
> To: 'Honnappa Nagarahalli' <Honnappa.Nagarahalli@arm.com>; dev@dpdk.org
> Cc: olivier.matz@6wind.com; arybchenko@solarflare.com; Richardson, Bruce
> <bruce.richardson@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>; Gavin Hu (Arm Technology China)
> <Gavin.Hu@arm.com>; nd <nd@arm.com>; nd <nd@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v4 2/2] mempool/nb_stack: add non-blocking
> stack mempool
> 
> 
> 
> > -----Original Message-----
> > From: Honnappa Nagarahalli [mailto:Honnappa.Nagarahalli@arm.com]
> > Sent: Thursday, January 17, 2019 11:05 PM
> > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > Cc: olivier.matz@6wind.com; arybchenko@solarflare.com; Richardson,
> > Bruce <bruce.richardson@intel.com>; Ananyev, Konstantin
> > <konstantin.ananyev@intel.com>; Gavin Hu (Arm Technology China)
> > <Gavin.Hu@arm.com>; nd <nd@arm.com>; Honnappa Nagarahalli
> > <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>
> > Subject: RE: [dpdk-dev] [PATCH v4 2/2] mempool/nb_stack: add
> > non-blocking stack mempool
> >
> > Hi Gage,
> >      Thank you for your contribution on non-blocking data structures.
> > I think they are important to extend DPDK into additional use cases.
> >
> 
> Glad to hear it. Be sure to check out my non-blocking ring patchset as well, if
> you haven't already: http://mails.dpdk.org/archives/dev/2019-
> January/123774.html
> 
> > I am wondering if it makes sense to decouple the NB stack data
> > structure from mempool driver (similar to rte_ring)? I see that stack
> > based mempool implements the stack data structure in the driver. But,
> > NB stack might not be such a trivial data structure. It might be
> > useful for the applications or other use cases as well.
> >
> 
> I agree -- and you're not the first to suggest this :).
> 
> I'm going to defer that work to a later patchset; creating a new lib/ directory
> requires tech board approval (IIRC), which would unnecessarily slow down this
> mempool handler from getting merged.
> 
> > I also suggest that we use C11 __atomic_xxx APIs for memory
> > operations. The rte_atomic64_xxx APIs use __sync_xxx APIs which do not
> > provide the capability to express memory orderings.
> >
> 
> Ok, I will add those (dependent on RTE_USE_C11_MEM_MODEL).
> 
> > Please find few comments inline.
> >
> > >
> > > This commit adds support for non-blocking (linked list based) stack
> > > mempool handler. The stack uses a 128-bit compare-and-swap
> > > instruction, and thus is limited to x86_64. The 128-bit CAS
> > > atomically updates the stack top pointer and a modification counter,
> > > which protects against the ABA problem.
> > >
> > > In mempool_perf_autotest the lock-based stack outperforms the non-
> > > blocking handler*, however:
> > > - For applications with preemptible pthreads, a lock-based stack's
> > >   worst-case performance (i.e. one thread being preempted while
> > >   holding the spinlock) is much worse than the non-blocking stack's.
> > > - Using per-thread mempool caches will largely mitigate the performance
> > >   difference.
> > >
> > > *Test setup: x86_64 build with default config, dual-socket Xeon
> > > E5-2699 v4, running on isolcpus cores with a tickless scheduler. The
> > > lock-based stack's rate_persec was 1x-3.5x the non-blocking stack's.
> > >
> > > Signed-off-by: Gage Eads <gage.eads@intel.com>
> > > Acked-by: Andrew Rybchenko <arybchenko@solarflare.com>
> > > ---
> > >  MAINTAINERS                                        |   4 +
> > >  config/common_base                                 |   1 +
> > >  doc/guides/prog_guide/env_abstraction_layer.rst    |   5 +
> > >  drivers/mempool/Makefile                           |   3 +
> > >  drivers/mempool/meson.build                        |   3 +-
> > >  drivers/mempool/nb_stack/Makefile                  |  23 ++++
> > >  drivers/mempool/nb_stack/meson.build               |   6 +
> > >  drivers/mempool/nb_stack/nb_lifo.h                 | 147
> > > +++++++++++++++++++++
> > >  drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125
> > > ++++++++++++++++++
> > >  .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
> > >  mk/rte.app.mk                                      |   7 +-
> > >  11 files changed, 325 insertions(+), 3 deletions(-)  create mode
> > > 100644 drivers/mempool/nb_stack/Makefile  create mode 100644
> > > drivers/mempool/nb_stack/meson.build
> > >  create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
> > >  create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > >  create mode 100644
> > > drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> > >
> > > diff --git a/MAINTAINERS b/MAINTAINERS index 470f36b9c..5519d3323
> > > 100644
> > > --- a/MAINTAINERS
> > > +++ b/MAINTAINERS
> > > @@ -416,6 +416,10 @@ M: Artem V. Andreev
> > > <artem.andreev@oktetlabs.ru>
> > >  M: Andrew Rybchenko <arybchenko@solarflare.com>
> > >  F: drivers/mempool/bucket/
> > >
> > > +Non-blocking stack memory pool
> > > +M: Gage Eads <gage.eads@intel.com>
> > > +F: drivers/mempool/nb_stack/
> > > +
> > >
> > >  Bus Drivers
> > >  -----------
> > > diff --git a/config/common_base b/config/common_base index
> > > 964a6956e..8a51f36b1 100644
> > > --- a/config/common_base
> > > +++ b/config/common_base
> > > @@ -726,6 +726,7 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n  #
> > > CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
> > >  CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
> > > +CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
> > >  CONFIG_RTE_DRIVER_MEMPOOL_RING=y
> > >  CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
> > >
> > > diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst
> > > b/doc/guides/prog_guide/env_abstraction_layer.rst
> > > index 929d76dba..9497b879c 100644
> > > --- a/doc/guides/prog_guide/env_abstraction_layer.rst
> > > +++ b/doc/guides/prog_guide/env_abstraction_layer.rst
> > > @@ -541,6 +541,11 @@ Known Issues
> > >
> > >    5. It MUST not be used by multi-producer/consumer pthreads, whose
> > > scheduling policies are SCHED_FIFO or SCHED_RR.
> > >
> > > +  Alternatively, x86_64 applications can use the non-blocking stack
> > > mempool handler. When considering this handler, note that:
> > > +
> > > +  - it is limited to the x86_64 platform, because it uses an
> > > + instruction (16-
> > > byte compare-and-swap) that is not available on other platforms.
> >
> > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Arm architecture supports similar
> > instructions. I suggest to simplify this statement to indicate that 'nb_stack
> feature is available for x86_64 platforms currently'
> >
> 
> Will do.
> 
> > > +  - it has worse average-case performance than the non-preemptive
> > > rte_ring, but software caching (e.g. the mempool cache) can mitigate
> > > this by reducing the number of handler operations.
> > > +
> > >  + rte_timer
> > >
> > >    Running  ``rte_timer_manage()`` on a non-EAL pthread is not allowed.
> > > However, resetting/stopping the timer from a non-EAL pthread is allowed.
> > > diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
> > > index
> > > 28c2e8360..895cf8a34 100644
> > > --- a/drivers/mempool/Makefile
> > > +++ b/drivers/mempool/Makefile
> > > @@ -10,6 +10,9 @@ endif
> > >  ifeq ($(CONFIG_RTE_EAL_VFIO)$(CONFIG_RTE_LIBRTE_FSLMC_BUS),yy)
> > >  DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2  endif
> > > +ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> > > +DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack endif
> > >  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
> > >  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
> > >  DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx diff --git
> > > a/drivers/mempool/meson.build b/drivers/mempool/meson.build index
> > > 4527d9806..220cfaf63 100644
> > > --- a/drivers/mempool/meson.build
> > > +++ b/drivers/mempool/meson.build
> > > @@ -1,7 +1,8 @@
> > >  # SPDX-License-Identifier: BSD-3-Clause  # Copyright(c) 2017 Intel
> > > Corporation
> > >
> > > -drivers = ['bucket', 'dpaa', 'dpaa2', 'octeontx', 'ring', 'stack']
> > > +drivers = ['bucket', 'dpaa', 'dpaa2', 'nb_stack', 'octeontx',
> > > +'ring', 'stack']
> > > +
> > >  std_deps = ['mempool']
> > >  config_flag_fmt = 'RTE_LIBRTE_@0@_MEMPOOL'
> > >  driver_name_fmt = 'rte_mempool_@0@'
> > > diff --git a/drivers/mempool/nb_stack/Makefile
> > > b/drivers/mempool/nb_stack/Makefile
> > > new file mode 100644
> > > index 000000000..318b18283
> > > --- /dev/null
> > > +++ b/drivers/mempool/nb_stack/Makefile
> > > @@ -0,0 +1,23 @@
> > > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> > > +Corporation
> > > +
> > > +include $(RTE_SDK)/mk/rte.vars.mk
> > > +
> > > +#
> > > +# library name
> > > +#
> > > +LIB = librte_mempool_nb_stack.a
> > > +
> > > +CFLAGS += -O3
> > > +CFLAGS += $(WERROR_FLAGS)
> > > +
> > > +# Headers
> > > +LDLIBS += -lrte_eal -lrte_mempool
> > > +
> > > +EXPORT_MAP := rte_mempool_nb_stack_version.map
> > > +
> > > +LIBABIVER := 1
> > > +
> > > +SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) +=
> > > rte_mempool_nb_stack.c
> > > +
> > > +include $(RTE_SDK)/mk/rte.lib.mk
> > > diff --git a/drivers/mempool/nb_stack/meson.build
> > > b/drivers/mempool/nb_stack/meson.build
> > > new file mode 100644
> > > index 000000000..7dec72242
> > > --- /dev/null
> > > +++ b/drivers/mempool/nb_stack/meson.build
> > > @@ -0,0 +1,6 @@
> > > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> > > +Corporation
> > > +
> > > +build = dpdk_conf.has('RTE_ARCH_X86_64')
> > > +
> > > +sources = files('rte_mempool_nb_stack.c')
> > > diff --git a/drivers/mempool/nb_stack/nb_lifo.h
> > > b/drivers/mempool/nb_stack/nb_lifo.h
> > > new file mode 100644
> > > index 000000000..ad4a3401f
> > > --- /dev/null
> > > +++ b/drivers/mempool/nb_stack/nb_lifo.h
> > > @@ -0,0 +1,147 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2019 Intel Corporation  */
> > > +
> > > +#ifndef _NB_LIFO_H_
> > > +#define _NB_LIFO_H_
> > > +
> > > +struct nb_lifo_elem {
> > > +	void *data;
> > > +	struct nb_lifo_elem *next;
> > > +};
> > > +
> > > +struct nb_lifo_head {
> > > +	struct nb_lifo_elem *top; /**< Stack top */
> > > +	uint64_t cnt; /**< Modification counter */ };
> > Minor comment, mentioning ABA problem in the comments for 'cnt' will
> > be helpful.
> >
> 
> Sure.
> 
> > > +
> > > +struct nb_lifo {
> > > +	volatile struct nb_lifo_head head __rte_aligned(16);
> > > +	rte_atomic64_t len;
> > > +} __rte_cache_aligned;
> > > +
> > > +static __rte_always_inline void
> > > +nb_lifo_init(struct nb_lifo *lifo)
> > > +{
> > > +	memset(lifo, 0, sizeof(*lifo));
> > > +	rte_atomic64_set(&lifo->len, 0);
> > > +}
> > > +
> > > +static __rte_always_inline unsigned int nb_lifo_len(struct nb_lifo
> > > +*lifo) {
> > > +	/* nb_lifo_push() and nb_lifo_pop() do not update the list's
> > > contents
> > > +	 * and lifo->len atomically, which can cause the list to appear
> > > shorter
> > > +	 * than it actually is if this function is called while other threads
> > > +	 * are modifying the list.
> > > +	 *
> > > +	 * However, given the inherently approximate nature of the
> > > get_count
> > > +	 * callback -- even if the list and its size were updated atomically,
> > > +	 * the size could change between when get_count executes and
> > > when the
> > > +	 * value is returned to the caller -- this is acceptable.
> > > +	 *
> > > +	 * The lifo->len updates are placed such that the list may appear to
> > > +	 * have fewer elements than it does, but will never appear to have
> > > more
> > > +	 * elements. If the mempool is near-empty to the point that this is a
> > > +	 * concern, the user should consider increasing the mempool size.
> > > +	 */
> > > +	return (unsigned int)rte_atomic64_read(&lifo->len);
> > > +}
> > > +
> > > +static __rte_always_inline void
> > > +nb_lifo_push(struct nb_lifo *lifo,
> > > +	     struct nb_lifo_elem *first,
> > > +	     struct nb_lifo_elem *last,
> > > +	     unsigned int num)
> > > +{
> > > +	while (1) {
> > > +		struct nb_lifo_head old_head, new_head;
> > > +
> > > +		old_head = lifo->head;
> > > +
> > > +		/* Swing the top pointer to the first element in the list and
> > > +		 * make the last element point to the old top.
> > > +		 */
> > > +		new_head.top = first;
> > > +		new_head.cnt = old_head.cnt + 1;
> > > +
> > > +		last->next = old_head.top;
> > > +
> > > +		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> > > +					 (uint64_t *)&old_head,
> > > +					 (uint64_t *)&new_head))
> > > +			break;
> > > +	}
> > Minor comment, this can be a do-while loop (for ex: similar to the one
> > in
> > __rte_ring_move_prod_head)
> >
> 
> Sure.
> 
> > > +
> > > +	rte_atomic64_add(&lifo->len, num); }
> > > +
> > > +static __rte_always_inline void
> > > +nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem) {
> > > +	nb_lifo_push(lifo, elem, elem, 1); }
> > > +
> > > +static __rte_always_inline struct nb_lifo_elem * nb_lifo_pop(struct
> > > +nb_lifo *lifo,
> > > +	    unsigned int num,
> > > +	    void **obj_table,
> > > +	    struct nb_lifo_elem **last)
> > > +{
> > > +	struct nb_lifo_head old_head;
> > > +
> > > +	/* Reserve num elements, if available */
> > > +	while (1) {
> > > +		uint64_t len = rte_atomic64_read(&lifo->len);
> > > +
> > > +		/* Does the list contain enough elements? */
> > > +		if (len < num)
> > > +			return NULL;
> > > +
> > > +		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> > > +					len, len - num))
> > > +			break;
> > > +	}
> > > +
> > > +	/* Pop num elements */
> > > +	while (1) {
> > > +		struct nb_lifo_head new_head;
> > > +		struct nb_lifo_elem *tmp;
> > > +		unsigned int i;
> > > +
> > > +		old_head = lifo->head;
> > > +
> > > +		tmp = old_head.top;
> > > +
> > > +		/* Traverse the list to find the new head. A next pointer will
> > > +		 * either point to another element or NULL; if a thread
> > > +		 * encounters a pointer that has already been popped, the
> > > CAS
> > > +		 * will fail.
> > > +		 */
> > > +		for (i = 0; i < num && tmp != NULL; i++) {
> > > +			if (obj_table)
> > This 'if' check can be outside the for loop. May be use RTE_ASSERT in
> > the beginning of the function?
> >
> 
> A NULL obj_table pointer isn't an error -- nb_stack_enqueue() calls this function
> with NULL because it doesn't need the popped elements added to a table. When
> the compiler inlines this function into nb_stack_enqueue(), it can use constant
> propagation to optimize away the if-statement.
> 
> I don't think that's possible for the other caller, nb_stack_dequeue, though,
> unless we add a NULL pointer check to the beginning of that function. Then it
> would be guaranteed that obj_table is non-NULL, and the compiler can optimize
> away the if-statement. I'll add that.
> 
> > > +				obj_table[i] = tmp->data;
> > > +			if (last)
> > > +				*last = tmp;
> > > +			tmp = tmp->next;
> > > +		}
> > > +
> > > +		/* If NULL was encountered, the list was modified while
> > > +		 * traversing it. Retry.
> > > +		 */
> > > +		if (i != num)
> > > +			continue;
> > > +
> > > +		new_head.top = tmp;
> > > +		new_head.cnt = old_head.cnt + 1;
> > > +
> > > +		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> > > +					 (uint64_t *)&old_head,
> > > +					 (uint64_t *)&new_head))
> > > +			break;
> > > +	}
> > > +
> > > +	return old_head.top;
> > > +}
> > > +
> > > +#endif /* _NB_LIFO_H_ */
> > > diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > > b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > > new file mode 100644
> > > index 000000000..1818a2cfa
> > > --- /dev/null
> > > +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > > @@ -0,0 +1,125 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2019 Intel Corporation  */
> > > +
> > > +#include <stdio.h>
> > > +#include <rte_mempool.h>
> > > +#include <rte_malloc.h>
> > > +
> > > +#include "nb_lifo.h"
> > > +
> > > +struct rte_mempool_nb_stack {
> > > +	uint64_t size;
> > > +	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
> > > +	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO
> > > +elements
> > > */
> > > +};
> > > +
> > > +static int
> > > +nb_stack_alloc(struct rte_mempool *mp) {
> > > +	struct rte_mempool_nb_stack *s;
> > > +	struct nb_lifo_elem *elems;
> > > +	unsigned int n = mp->size;
> > > +	unsigned int size, i;
> > > +
> > > +	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
> > IMO, the allocation of the stack elements can be moved under
> > nb_lifo_init API, it would make the nb stack code modular.
> >
> 
> (see below)
> 
> > > +
> > > +	/* Allocate our local memory structure */
> > > +	s = rte_zmalloc_socket("mempool-nb_stack",
> > > +			       size,
> > > +			       RTE_CACHE_LINE_SIZE,
> > > +			       mp->socket_id);
> > > +	if (s == NULL) {
> > > +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
> > > +		return -ENOMEM;
> > > +	}
> > > +
> > > +	s->size = n;
> > > +
> > > +	nb_lifo_init(&s->used_lifo);
> > > +	nb_lifo_init(&s->free_lifo);
> > > +
> > > +	elems = (struct nb_lifo_elem *)&s[1];
> > > +	for (i = 0; i < n; i++)
> > > +		nb_lifo_push_single(&s->free_lifo, &elems[i]);
> > This also can be added to nb_lifo_init API.
> >
> 
> Sure, good suggestions. I'll address this.
> 

On second thought, moving this push code into nb_lifo_init() doesn't work since we do it for one LIFO and not the other.

I'll think on it, but I'm not going to spend too many cycles -- modularizing nb_lifo can be deferred to the patchset that moves it to a separate library.
Thomas Monjalon Jan. 19, 2019, 12:15 a.m. | #4
19/01/2019 01:00, Eads, Gage:
> > > I am wondering if it makes sense to decouple the NB stack data
> > > structure from mempool driver (similar to rte_ring)? I see that stack
> > > based mempool implements the stack data structure in the driver. But,
> > > NB stack might not be such a trivial data structure. It might be
> > > useful for the applications or other use cases as well.
> > >
> > 
> > I agree -- and you're not the first to suggest this :).
> > 
> > I'm going to defer that work to a later patchset; creating a new lib/ directory
> > requires tech board approval (IIRC), which would unnecessarily slow down this
> > mempool handler from getting merged.

You have time. This patch cannot go in 19.02.
You just need to be ready for 19.05-rc1 (more than 2 months).

[...]
> modularizing nb_lifo can be deferred to the patchset that moves it to a separate library.

Please introduce it in the right place from the beginning.
Eads, Gage Jan. 22, 2019, 6:24 p.m. | #5
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas@monjalon.net]
> Sent: Friday, January 18, 2019 6:15 PM
> To: Eads, Gage <gage.eads@intel.com>
> Cc: dev@dpdk.org; Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>;
> olivier.matz@6wind.com; arybchenko@solarflare.com; Richardson, Bruce
> <bruce.richardson@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>; Gavin Hu (Arm Technology China)
> <Gavin.Hu@arm.com>; nd <nd@arm.com>
> Subject: Re: [dpdk-dev] [PATCH v4 2/2] mempool/nb_stack: add non-blocking
> stack mempool
> 
> 19/01/2019 01:00, Eads, Gage:
> > > > I am wondering if it makes sense to decouple the NB stack data
> > > > structure from mempool driver (similar to rte_ring)? I see that
> > > > stack based mempool implements the stack data structure in the
> > > > driver. But, NB stack might not be such a trivial data structure.
> > > > It might be useful for the applications or other use cases as well.
> > > >
> > >
> > > I agree -- and you're not the first to suggest this :).
> > >
> > > I'm going to defer that work to a later patchset; creating a new
> > > lib/ directory requires tech board approval (IIRC), which would
> > > unnecessarily slow down this mempool handler from getting merged.
> 
> You have time. This patch cannot go in 19.02.
> You just need to be ready for 19.05-rc1 (more than 2 months).
> 

Understood, I was concerned the process could take longer. The code itself isn't too complicated, but I'm not sure how much time it'll take to reach consensus on the interface. On the other hand, it may go quickly if we model it after the ring API -- my current thinking -- which folks seem to generally like.

Anyway, I'll rework this into a standalone stack library.

Thanks,
Gage

> [...]
> > modularizing nb_lifo can be deferred to the patchset that moves it to a
> separate library.
> 
> Please introduce it in the right place from the beginning.
>

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 470f36b9c..5519d3323 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -416,6 +416,10 @@  M: Artem V. Andreev <artem.andreev@oktetlabs.ru>
 M: Andrew Rybchenko <arybchenko@solarflare.com>
 F: drivers/mempool/bucket/
 
+Non-blocking stack memory pool
+M: Gage Eads <gage.eads@intel.com>
+F: drivers/mempool/nb_stack/
+
 
 Bus Drivers
 -----------
diff --git a/config/common_base b/config/common_base
index 964a6956e..8a51f36b1 100644
--- a/config/common_base
+++ b/config/common_base
@@ -726,6 +726,7 @@  CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n
 #
 CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
 CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
+CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
 CONFIG_RTE_DRIVER_MEMPOOL_RING=y
 CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
 
diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst b/doc/guides/prog_guide/env_abstraction_layer.rst
index 929d76dba..9497b879c 100644
--- a/doc/guides/prog_guide/env_abstraction_layer.rst
+++ b/doc/guides/prog_guide/env_abstraction_layer.rst
@@ -541,6 +541,11 @@  Known Issues
 
   5. It MUST not be used by multi-producer/consumer pthreads, whose scheduling policies are SCHED_FIFO or SCHED_RR.
 
+  Alternatively, x86_64 applications can use the non-blocking stack mempool handler. When considering this handler, note that:
+
+  - it is limited to the x86_64 platform, because it uses an instruction (16-byte compare-and-swap) that is not available on other platforms.
+  - it has worse average-case performance than the non-preemptive rte_ring, but software caching (e.g. the mempool cache) can mitigate this by reducing the number of handler operations.
+
 + rte_timer
 
   Running  ``rte_timer_manage()`` on a non-EAL pthread is not allowed. However, resetting/stopping the timer from a non-EAL pthread is allowed.
diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
index 28c2e8360..895cf8a34 100644
--- a/drivers/mempool/Makefile
+++ b/drivers/mempool/Makefile
@@ -10,6 +10,9 @@  endif
 ifeq ($(CONFIG_RTE_EAL_VFIO)$(CONFIG_RTE_LIBRTE_FSLMC_BUS),yy)
 DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2
 endif
+ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack
+endif
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
 DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx
diff --git a/drivers/mempool/meson.build b/drivers/mempool/meson.build
index 4527d9806..220cfaf63 100644
--- a/drivers/mempool/meson.build
+++ b/drivers/mempool/meson.build
@@ -1,7 +1,8 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2017 Intel Corporation
 
-drivers = ['bucket', 'dpaa', 'dpaa2', 'octeontx', 'ring', 'stack']
+drivers = ['bucket', 'dpaa', 'dpaa2', 'nb_stack', 'octeontx', 'ring', 'stack']
+
 std_deps = ['mempool']
 config_flag_fmt = 'RTE_LIBRTE_@0@_MEMPOOL'
 driver_name_fmt = 'rte_mempool_@0@'
diff --git a/drivers/mempool/nb_stack/Makefile b/drivers/mempool/nb_stack/Makefile
new file mode 100644
index 000000000..318b18283
--- /dev/null
+++ b/drivers/mempool/nb_stack/Makefile
@@ -0,0 +1,23 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_mempool_nb_stack.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+# Headers
+LDLIBS += -lrte_eal -lrte_mempool
+
+EXPORT_MAP := rte_mempool_nb_stack_version.map
+
+LIBABIVER := 1
+
+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += rte_mempool_nb_stack.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/mempool/nb_stack/meson.build b/drivers/mempool/nb_stack/meson.build
new file mode 100644
index 000000000..7dec72242
--- /dev/null
+++ b/drivers/mempool/nb_stack/meson.build
@@ -0,0 +1,6 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+build = dpdk_conf.has('RTE_ARCH_X86_64')
+
+sources = files('rte_mempool_nb_stack.c')
diff --git a/drivers/mempool/nb_stack/nb_lifo.h b/drivers/mempool/nb_stack/nb_lifo.h
new file mode 100644
index 000000000..ad4a3401f
--- /dev/null
+++ b/drivers/mempool/nb_stack/nb_lifo.h
@@ -0,0 +1,147 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _NB_LIFO_H_
+#define _NB_LIFO_H_
+
+struct nb_lifo_elem {
+	void *data;
+	struct nb_lifo_elem *next;
+};
+
+struct nb_lifo_head {
+	struct nb_lifo_elem *top; /**< Stack top */
+	uint64_t cnt; /**< Modification counter */
+};
+
+struct nb_lifo {
+	volatile struct nb_lifo_head head __rte_aligned(16);
+	rte_atomic64_t len;
+} __rte_cache_aligned;
+
+static __rte_always_inline void
+nb_lifo_init(struct nb_lifo *lifo)
+{
+	memset(lifo, 0, sizeof(*lifo));
+	rte_atomic64_set(&lifo->len, 0);
+}
+
+static __rte_always_inline unsigned int
+nb_lifo_len(struct nb_lifo *lifo)
+{
+	/* nb_lifo_push() and nb_lifo_pop() do not update the list's contents
+	 * and lifo->len atomically, which can cause the list to appear shorter
+	 * than it actually is if this function is called while other threads
+	 * are modifying the list.
+	 *
+	 * However, given the inherently approximate nature of the get_count
+	 * callback -- even if the list and its size were updated atomically,
+	 * the size could change between when get_count executes and when the
+	 * value is returned to the caller -- this is acceptable.
+	 *
+	 * The lifo->len updates are placed such that the list may appear to
+	 * have fewer elements than it does, but will never appear to have more
+	 * elements. If the mempool is near-empty to the point that this is a
+	 * concern, the user should consider increasing the mempool size.
+	 */
+	return (unsigned int)rte_atomic64_read(&lifo->len);
+}
+
+static __rte_always_inline void
+nb_lifo_push(struct nb_lifo *lifo,
+	     struct nb_lifo_elem *first,
+	     struct nb_lifo_elem *last,
+	     unsigned int num)
+{
+	while (1) {
+		struct nb_lifo_head old_head, new_head;
+
+		old_head = lifo->head;
+
+		/* Swing the top pointer to the first element in the list and
+		 * make the last element point to the old top.
+		 */
+		new_head.top = first;
+		new_head.cnt = old_head.cnt + 1;
+
+		last->next = old_head.top;
+
+		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
+					 (uint64_t *)&old_head,
+					 (uint64_t *)&new_head))
+			break;
+	}
+
+	rte_atomic64_add(&lifo->len, num);
+}
+
+static __rte_always_inline void
+nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem)
+{
+	nb_lifo_push(lifo, elem, elem, 1);
+}
+
+static __rte_always_inline struct nb_lifo_elem *
+nb_lifo_pop(struct nb_lifo *lifo,
+	    unsigned int num,
+	    void **obj_table,
+	    struct nb_lifo_elem **last)
+{
+	struct nb_lifo_head old_head;
+
+	/* Reserve num elements, if available */
+	while (1) {
+		uint64_t len = rte_atomic64_read(&lifo->len);
+
+		/* Does the list contain enough elements? */
+		if (len < num)
+			return NULL;
+
+		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
+					len, len - num))
+			break;
+	}
+
+	/* Pop num elements */
+	while (1) {
+		struct nb_lifo_head new_head;
+		struct nb_lifo_elem *tmp;
+		unsigned int i;
+
+		old_head = lifo->head;
+
+		tmp = old_head.top;
+
+		/* Traverse the list to find the new head. A next pointer will
+		 * either point to another element or NULL; if a thread
+		 * encounters a pointer that has already been popped, the CAS
+		 * will fail.
+		 */
+		for (i = 0; i < num && tmp != NULL; i++) {
+			if (obj_table)
+				obj_table[i] = tmp->data;
+			if (last)
+				*last = tmp;
+			tmp = tmp->next;
+		}
+
+		/* If NULL was encountered, the list was modified while
+		 * traversing it. Retry.
+		 */
+		if (i != num)
+			continue;
+
+		new_head.top = tmp;
+		new_head.cnt = old_head.cnt + 1;
+
+		if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
+					 (uint64_t *)&old_head,
+					 (uint64_t *)&new_head))
+			break;
+	}
+
+	return old_head.top;
+}
+
+#endif /* _NB_LIFO_H_ */
diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
new file mode 100644
index 000000000..1818a2cfa
--- /dev/null
+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
@@ -0,0 +1,125 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#include <stdio.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+
+#include "nb_lifo.h"
+
+struct rte_mempool_nb_stack {
+	uint64_t size;
+	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
+	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements */
+};
+
+static int
+nb_stack_alloc(struct rte_mempool *mp)
+{
+	struct rte_mempool_nb_stack *s;
+	struct nb_lifo_elem *elems;
+	unsigned int n = mp->size;
+	unsigned int size, i;
+
+	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
+
+	/* Allocate our local memory structure */
+	s = rte_zmalloc_socket("mempool-nb_stack",
+			       size,
+			       RTE_CACHE_LINE_SIZE,
+			       mp->socket_id);
+	if (s == NULL) {
+		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
+		return -ENOMEM;
+	}
+
+	s->size = n;
+
+	nb_lifo_init(&s->used_lifo);
+	nb_lifo_init(&s->free_lifo);
+
+	elems = (struct nb_lifo_elem *)&s[1];
+	for (i = 0; i < n; i++)
+		nb_lifo_push_single(&s->free_lifo, &elems[i]);
+
+	mp->pool_data = s;
+
+	return 0;
+}
+
+static int
+nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table,
+		 unsigned int n)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+	struct nb_lifo_elem *first, *last, *tmp;
+	unsigned int i;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n free elements */
+	first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL);
+	if (unlikely(first == NULL))
+		return -ENOBUFS;
+
+	/* Prepare the list elements */
+	tmp = first;
+	for (i = 0; i < n; i++) {
+		tmp->data = obj_table[i];
+		last = tmp;
+		tmp = tmp->next;
+	}
+
+	/* Enqueue them to the used list */
+	nb_lifo_push(&s->used_lifo, first, last, n);
+
+	return 0;
+}
+
+static int
+nb_stack_dequeue(struct rte_mempool *mp, void **obj_table,
+		 unsigned int n)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+	struct nb_lifo_elem *first, *last;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n used elements */
+	first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last);
+	if (unlikely(first == NULL))
+		return -ENOENT;
+
+	/* Enqueue the list elements to the free list */
+	nb_lifo_push(&s->free_lifo, first, last, n);
+
+	return 0;
+}
+
+static unsigned
+nb_stack_get_count(const struct rte_mempool *mp)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+
+	return nb_lifo_len(&s->used_lifo);
+}
+
+static void
+nb_stack_free(struct rte_mempool *mp)
+{
+	rte_free(mp->pool_data);
+}
+
+static struct rte_mempool_ops ops_nb_stack = {
+	.name = "nb_stack",
+	.alloc = nb_stack_alloc,
+	.free = nb_stack_free,
+	.enqueue = nb_stack_enqueue,
+	.dequeue = nb_stack_dequeue,
+	.get_count = nb_stack_get_count
+};
+
+MEMPOOL_REGISTER_OPS(ops_nb_stack);
diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
new file mode 100644
index 000000000..fc8c95e91
--- /dev/null
+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
@@ -0,0 +1,4 @@ 
+DPDK_19.05 {
+
+	local: *;
+};
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 02e8b6f05..d4b4aaaf6 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -131,8 +131,11 @@  endif
 ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
 # plugins (link only if static libraries)
 
-_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
-_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -lrte_mempool_stack
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET)   += -lrte_mempool_bucket
+ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += -lrte_mempool_nb_stack
+endif
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)    += -lrte_mempool_stack
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL)   += -lrte_mempool_dpaa
 endif