[v4,1/3] lib/lpm: integrate RCU QSBR

Message ID 20200608051658.144417-2-ruifeng.wang@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series RCU integration with LPM library |

Checks

Context Check Description
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-nxp-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-mellanox-Performance success Performance Testing PASS
ci/iol-testing warning Testing issues
ci/checkpatch success coding style OK

Commit Message

Ruifeng Wang June 8, 2020, 5:16 a.m. UTC
  Currently, the tbl8 group is freed even though the readers might be
using the tbl8 group entries. The freed tbl8 group can be reallocated
quickly. This results in incorrect lookup results.

RCU QSBR process is integrated for safe tbl8 group reclaim.
Refer to RCU documentation to understand various aspects of
integrating RCU library into other libraries.

Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
---
 doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
 lib/librte_lpm/Makefile            |   2 +-
 lib/librte_lpm/meson.build         |   1 +
 lib/librte_lpm/rte_lpm.c           | 123 ++++++++++++++++++++++++++---
 lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
 lib/librte_lpm/rte_lpm_version.map |   6 ++
 6 files changed, 211 insertions(+), 12 deletions(-)
  

Comments

Honnappa Nagarahalli June 8, 2020, 6:46 p.m. UTC | #1
<snip>

> Subject: [PATCH v4 1/3] lib/lpm: integrate RCU QSBR
> 
> Currently, the tbl8 group is freed even though the readers might be using the
> tbl8 group entries. The freed tbl8 group can be reallocated quickly. This
> results in incorrect lookup results.
> 
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of integrating
> RCU library into other libraries.
> 
> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> ---
>  doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
>  lib/librte_lpm/Makefile            |   2 +-
>  lib/librte_lpm/meson.build         |   1 +
>  lib/librte_lpm/rte_lpm.c           | 123 ++++++++++++++++++++++++++---
>  lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
>  lib/librte_lpm/rte_lpm_version.map |   6 ++
>  6 files changed, 211 insertions(+), 12 deletions(-)
> 
> diff --git a/doc/guides/prog_guide/lpm_lib.rst
> b/doc/guides/prog_guide/lpm_lib.rst
> index 1609a57d0..7cc99044a 100644
> --- a/doc/guides/prog_guide/lpm_lib.rst
> +++ b/doc/guides/prog_guide/lpm_lib.rst
> @@ -145,6 +145,38 @@ depending on whether we need to move to the next
> table or not.
>  Prefix expansion is one of the keys of this algorithm,  since it improves the
> speed dramatically by adding redundancy.
> 
> +Deletion
> +~~~~~~~~
> +
> +When deleting a rule, a replacement rule is searched for. Replacement
> +rule is an existing rule that has the longest prefix match with the rule to be
> deleted, but has smaller depth.
> +
> +If a replacement rule is found, target tbl24 and tbl8 entries are
> +updated to have the same depth and next hop value with the replacement
> rule.
> +
> +If no replacement rule can be found, target tbl24 and tbl8 entries will be
> cleared.
> +
> +Prefix expansion is performed if the rule's depth is not exactly 24 bits or 32
> bits.
> +
> +After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry
> are freed in following cases:
> +
> +*   All tbl8s in the group are empty .
> +
> +*   All tbl8s in the group have the same values and with depth no greater
> than 24.
> +
> +Free of tbl8s have different behaviors:
> +
> +*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
> +
> +*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
> +
> +When the LPM is not using RCU, tbl8 group can be freed immediately even
> +though the readers might be using the tbl8 group entries. This might result
> in incorrect lookup results.
> +
> +RCU QSBR process is integrated for safe tbl8 group reclaimation.
> +Application has certain responsibilities while using this feature.
> +Please refer to resource reclaimation framework of :ref:`RCU library
> <RCU_Library>` for more details.
> +
>  Lookup
>  ~~~~~~
> 
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
> d682785b6..6f06c5c03 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -8,7 +8,7 @@ LIB = librte_lpm.a
> 
>  CFLAGS += -O3
>  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> 
>  EXPORT_MAP := rte_lpm_version.map
> 
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build index
> 021ac6d8d..6cfc083c5 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -7,3 +7,4 @@ headers = files('rte_lpm.h', 'rte_lpm6.h')  # without
> worrying about which architecture we actually need  headers +=
> files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')  deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
> 38ab512a4..30f541179 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>   */
> 
>  #include <string.h>
> @@ -246,12 +247,85 @@ rte_lpm_free(struct rte_lpm *lpm)
> 
>  	rte_mcfg_tailq_write_unlock();
> 
> +	if (lpm->dq)
> +		rte_rcu_qsbr_dq_delete(lpm->dq);
>  	rte_free(lpm->tbl8);
>  	rte_free(lpm->rules_tbl);
>  	rte_free(lpm);
>  	rte_free(te);
>  }
> 
> +static void
> +__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n) {
> +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	uint32_t tbl8_group_index = *(uint32_t *)data;
> +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> +
> +	RTE_SET_USED(n);
> +	/* Set tbl8 group invalid */
> +	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
> +		__ATOMIC_RELAXED);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
> +	struct rte_rcu_qsbr_dq **dq)
I prefer not to return the defer queue to the user here. I see 3 different ways how RCU can be integrated in the libraries:

1) The sync mode in which the defer queue is not created. The rte_rcu_qsbr_synchronize API is called after delete. The resource is freed after rte_rcu_qsbr_synchronize returns and the control is given back to the user.

2) The mode where the defer queue is created. There is a lot of flexibility provided now as the defer queue size, reclaim threshold and how many resources to reclaim are all configurable. IMO, this solves most of the use cases and helps the application integrate lock-less algorithms with minimal effort.

3) This is where the application has its own method of reclamation that does not fall under 1) or 2). To address this use case, I think we should make changes to the LPM library. Today, in LPM, the delete and free are combined into a single API. We can split this single API into 2 separate APIs - delete and free (similar thing was done to rte_hash library) without affecting the ABI. This should provide all the flexibility required for the application to implement any kind of reclamation algorithm it wants. Returning the defer queue to the user in the above API does not solve this use case.

> +{
> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params = {0};
> +
> +	if ((lpm == NULL) || (cfg == NULL)) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	if (lpm->v) {
> +		rte_errno = EEXIST;
> +		return 1;
> +	}
> +
> +	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
> +		/* No other things to do. */
> +	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
> +		/* Init QSBR defer queue. */
> +		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
> +				"LPM_RCU_%s", lpm->name);
> +		params.name = rcu_dq_name;
> +		params.size = cfg->dq_size;
> +		if (params.size == 0)
> +			params.size = lpm->number_tbl8s;
> +		params.trigger_reclaim_limit = cfg->reclaim_thd;
> +		if (params.trigger_reclaim_limit == 0)
> +			params.trigger_reclaim_limit =
> +					RTE_LPM_RCU_DQ_RECLAIM_THD;
> +		params.max_reclaim_size = cfg->reclaim_max;
> +		if (params.max_reclaim_size == 0)
> +			params.max_reclaim_size =
> RTE_LPM_RCU_DQ_RECLAIM_MAX;
> +		params.esize = sizeof(uint32_t);	/* tbl8 group index */
> +		params.free_fn = __lpm_rcu_qsbr_free_resource;
> +		params.p = lpm->tbl8;
> +		params.v = cfg->v;
> +		lpm->dq = rte_rcu_qsbr_dq_create(&params);
> +		if (lpm->dq == NULL) {
> +			RTE_LOG(ERR, LPM,
> +					"LPM QS defer queue creation
> failed\n");
> +			return 1;
> +		}
> +		if (dq)
> +			*dq = lpm->dq;
> +	} else {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +	lpm->rcu_mode = cfg->mode;
> +	lpm->v = cfg->v;
> +
> +	return 0;
> +}
> +
>  /*
>   * Adds a rule to the rule table.
>   *
> @@ -394,14 +468,15 @@ rule_find(struct rte_lpm *lpm, uint32_t ip_masked,
> uint8_t depth)
>   * Find, clean and allocate a tbl8.
>   */
>  static int32_t
> -tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +_tbl8_alloc(struct rte_lpm *lpm)
>  {
>  	uint32_t group_idx; /* tbl8 group index. */
>  	struct rte_lpm_tbl_entry *tbl8_entry;
> 
>  	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -		tbl8_entry = &tbl8[group_idx *
> RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +		tbl8_entry = &lpm->tbl8[group_idx *
> +
> 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>  		/* If a free tbl8 group is found clean it and set as VALID. */
>  		if (!tbl8_entry->valid_group) {
>  			struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -
> 427,14 +502,40 @@ tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t
> number_tbl8s)
>  	return -ENOSPC;
>  }
> 
> +static int32_t
> +tbl8_alloc(struct rte_lpm *lpm)
> +{
> +	int32_t group_idx; /* tbl8 group index. */
> +
> +	group_idx = _tbl8_alloc(lpm);
> +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> +		/* If there are no tbl8 groups try to reclaim one. */
> +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL)
> == 0)
> +			group_idx = _tbl8_alloc(lpm);
> +	}
> +
> +	return group_idx;
> +}
> +
>  static void
> -tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>  {
> -	/* Set tbl8 group invalid*/
>  	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> 
> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -			__ATOMIC_RELAXED);
> +	if (!lpm->v) {
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
> +		/* Wait for quiescent state change. */
> +		rte_rcu_qsbr_synchronize(lpm->v,
> RTE_QSBR_THRID_INVALID);
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
> +		/* Push into QSBR defer queue. */
> +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void
> *)&tbl8_group_start);
> +	}
>  }
> 
>  static __rte_noinline int32_t
> @@ -523,7 +624,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked, uint8_t depth,
> 
>  	if (!lpm->tbl24[tbl24_index].valid) {
>  		/* Search for a free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc(lpm);
> 
>  		/* Check tbl8 allocation was successful. */
>  		if (tbl8_group_index < 0) {
> @@ -569,7 +670,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked, uint8_t depth,
>  	} /* If valid entry but not extended calculate the index into Table8. */
>  	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>  		/* Search for free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc(lpm);
> 
>  		if (tbl8_group_index < 0) {
>  			return tbl8_group_index;
> @@ -977,7 +1078,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked,
>  		 */
>  		lpm->tbl24[tbl24_index].valid = 0;
>  		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free(lpm->tbl8, tbl8_group_start);
> +		tbl8_free(lpm, tbl8_group_start);
>  	} else if (tbl8_recycle_index > -1) {
>  		/* Update tbl24 entry. */
>  		struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -993,7
> +1094,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
>  		__atomic_store(&lpm->tbl24[tbl24_index],
> &new_tbl24_entry,
>  				__ATOMIC_RELAXED);
>  		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free(lpm->tbl8, tbl8_group_start);
> +		tbl8_free(lpm, tbl8_group_start);
>  	}
>  #undef group_idx
>  	return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
> b9d49ac87..8c054509a 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>   */
> 
>  #ifndef _RTE_LPM_H_
> @@ -20,6 +21,7 @@
>  #include <rte_memory.h>
>  #include <rte_common.h>
>  #include <rte_vect.h>
> +#include <rte_rcu_qsbr.h>
> 
>  #ifdef __cplusplus
>  extern "C" {
> @@ -62,6 +64,17 @@ extern "C" {
>  /** Bitmask used to indicate successful lookup */
>  #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
> 
> +/** @internal Default threshold to trigger RCU defer queue reclaimation. */
> +#define RTE_LPM_RCU_DQ_RECLAIM_THD	32
> +
> +/** @internal Default RCU defer queue entries to reclaim in one go. */
> +#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
> +
> +/* Create defer queue for reclaim. */
> +#define RTE_LPM_QSBR_MODE_DQ		0
> +/* Use blocking mode reclaim. No defer queue created. */
> +#define RTE_LPM_QSBR_MODE_SYNC		0x01
> +
>  #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
>  /** @internal Tbl24 entry structure. */  __extension__ @@ -130,6 +143,28
> @@ struct rte_lpm {
>  			__rte_cache_aligned; /**< LPM tbl24 table. */
>  	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>  	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +
> +	/* RCU config. */
> +	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
> +	uint32_t rcu_mode;		/* Blocking, defer queue. */
> +	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
> +};
> +
> +/** LPM RCU QSBR configuration structure. */ struct rte_lpm_rcu_config
> +{
> +	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
> +	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
> +	 * '0' for default: create defer queue for reclaim.
> +	 */
> +	uint32_t mode;
> +	/* RCU defer queue size. default: lpm->number_tbl8s. */
> +	uint32_t dq_size;
> +	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim.
> +				 * default:
> RTE_LPM_RCU_DQ_RECLAIM_TRHD.
> +				 */
> +	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
> +				 * default:
> RTE_LPM_RCU_DQ_RECLAIM_MAX.
> +				 */
>  };
> 
>  /**
> @@ -179,6 +214,30 @@ rte_lpm_find_existing(const char *name);  void
> rte_lpm_free(struct rte_lpm *lpm);
> 
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param cfg
> + *   RCU QSBR configuration
> + * @param dq
> + *   handler of created RCU QSBR defer queue
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config
> *cfg,
> +	struct rte_rcu_qsbr_dq **dq);
> +
>  /**
>   * Add a rule to the LPM table.
>   *
> diff --git a/lib/librte_lpm/rte_lpm_version.map
> b/lib/librte_lpm/rte_lpm_version.map
> index 500f58b80..bfccd7eac 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -21,3 +21,9 @@ DPDK_20.0 {
> 
>  	local: *;
>  };
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_lpm_rcu_qsbr_add;
> +};
> --
> 2.17.1
  
Vladimir Medvedkin June 18, 2020, 5:21 p.m. UTC | #2
Hi Ruifeng,

Thanks for patches, see comments below


On 08/06/2020 06:16, Ruifeng Wang wrote:
> Currently, the tbl8 group is freed even though the readers might be
> using the tbl8 group entries. The freed tbl8 group can be reallocated
> quickly. This results in incorrect lookup results.
>
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of
> integrating RCU library into other libraries.
>
> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> ---
>   doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
>   lib/librte_lpm/Makefile            |   2 +-
>   lib/librte_lpm/meson.build         |   1 +
>   lib/librte_lpm/rte_lpm.c           | 123 ++++++++++++++++++++++++++---
>   lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
>   lib/librte_lpm/rte_lpm_version.map |   6 ++
>   6 files changed, 211 insertions(+), 12 deletions(-)
>
> diff --git a/doc/guides/prog_guide/lpm_lib.rst b/doc/guides/prog_guide/lpm_lib.rst
> index 1609a57d0..7cc99044a 100644
> --- a/doc/guides/prog_guide/lpm_lib.rst
> +++ b/doc/guides/prog_guide/lpm_lib.rst
> @@ -145,6 +145,38 @@ depending on whether we need to move to the next table or not.
>   Prefix expansion is one of the keys of this algorithm,
>   since it improves the speed dramatically by adding redundancy.
>   
> +Deletion
> +~~~~~~~~
> +
> +When deleting a rule, a replacement rule is searched for. Replacement rule is an existing rule that has
> +the longest prefix match with the rule to be deleted, but has smaller depth.
> +
> +If a replacement rule is found, target tbl24 and tbl8 entries are updated to have the same depth and next hop
> +value with the replacement rule.
> +
> +If no replacement rule can be found, target tbl24 and tbl8 entries will be cleared.
> +
> +Prefix expansion is performed if the rule's depth is not exactly 24 bits or 32 bits.
> +
> +After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry are freed in following cases:
> +
> +*   All tbl8s in the group are empty .
> +
> +*   All tbl8s in the group have the same values and with depth no greater than 24.
> +
> +Free of tbl8s have different behaviors:
> +
> +*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
> +
> +*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
> +
> +When the LPM is not using RCU, tbl8 group can be freed immediately even though the readers might be using
> +the tbl8 group entries. This might result in incorrect lookup results.
> +
> +RCU QSBR process is integrated for safe tbl8 group reclaimation. Application has certain responsibilities
> +while using this feature. Please refer to resource reclaimation framework of :ref:`RCU library <RCU_Library>`
> +for more details.
> +
>   Lookup
>   ~~~~~~
>   
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> index d682785b6..6f06c5c03 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -8,7 +8,7 @@ LIB = librte_lpm.a
>   
>   CFLAGS += -O3
>   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
>   
>   EXPORT_MAP := rte_lpm_version.map
>   
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> index 021ac6d8d..6cfc083c5 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -7,3 +7,4 @@ headers = files('rte_lpm.h', 'rte_lpm6.h')
>   # without worrying about which architecture we actually need
>   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
>   deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> index 38ab512a4..30f541179 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>    */
>   
>   #include <string.h>
> @@ -246,12 +247,85 @@ rte_lpm_free(struct rte_lpm *lpm)
>   
>   	rte_mcfg_tailq_write_unlock();
>   
> +	if (lpm->dq)
> +		rte_rcu_qsbr_dq_delete(lpm->dq);
>   	rte_free(lpm->tbl8);
>   	rte_free(lpm->rules_tbl);
>   	rte_free(lpm);
>   	rte_free(te);
>   }
>   
> +static void
> +__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
> +{
> +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	uint32_t tbl8_group_index = *(uint32_t *)data;
> +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> +
> +	RTE_SET_USED(n);
> +	/* Set tbl8 group invalid */
> +	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
> +		__ATOMIC_RELAXED);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
> +	struct rte_rcu_qsbr_dq **dq)
> +{
> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params = {0};
> +
> +	if ((lpm == NULL) || (cfg == NULL)) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	if (lpm->v) {
> +		rte_errno = EEXIST;
> +		return 1;
> +	}
> +
> +	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
> +		/* No other things to do. */
> +	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
> +		/* Init QSBR defer queue. */
> +		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
> +				"LPM_RCU_%s", lpm->name);
> +		params.name = rcu_dq_name;
> +		params.size = cfg->dq_size;
> +		if (params.size == 0)
> +			params.size = lpm->number_tbl8s;
> +		params.trigger_reclaim_limit = cfg->reclaim_thd;
> +		if (params.trigger_reclaim_limit == 0)


This makes it impossible for a user to configure reclamation triggering 
with every call. Should we allow it?


> +			params.trigger_reclaim_limit =
> +					RTE_LPM_RCU_DQ_RECLAIM_THD;
> +		params.max_reclaim_size = cfg->reclaim_max;
> +		if (params.max_reclaim_size == 0)
> +			params.max_reclaim_size = RTE_LPM_RCU_DQ_RECLAIM_MAX;
> +		params.esize = sizeof(uint32_t);	/* tbl8 group index */
> +		params.free_fn = __lpm_rcu_qsbr_free_resource;
> +		params.p = lpm->tbl8;


I think it's better to pass the LPM pointer here rather than tbl8, for 
example, in case we decide to add some counters in the future


> +		params.v = cfg->v;
> +		lpm->dq = rte_rcu_qsbr_dq_create(&params);
> +		if (lpm->dq == NULL) {
> +			RTE_LOG(ERR, LPM,
> +					"LPM QS defer queue creation failed\n");
> +			return 1;
> +		}
> +		if (dq)
> +			*dq = lpm->dq;
> +	} else {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +	lpm->rcu_mode = cfg->mode;
> +	lpm->v = cfg->v;
> +
> +	return 0;
> +}
> +
>   /*
>    * Adds a rule to the rule table.
>    *
> @@ -394,14 +468,15 @@ rule_find(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth)
>    * Find, clean and allocate a tbl8.
>    */
>   static int32_t
> -tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +_tbl8_alloc(struct rte_lpm *lpm)
>   {
>   	uint32_t group_idx; /* tbl8 group index. */
>   	struct rte_lpm_tbl_entry *tbl8_entry;
>   
>   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +		tbl8_entry = &lpm->tbl8[group_idx *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>   		/* If a free tbl8 group is found clean it and set as VALID. */
>   		if (!tbl8_entry->valid_group) {
>   			struct rte_lpm_tbl_entry new_tbl8_entry = {
> @@ -427,14 +502,40 @@ tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
>   	return -ENOSPC;
>   }
>   
> +static int32_t
> +tbl8_alloc(struct rte_lpm *lpm)
> +{
> +	int32_t group_idx; /* tbl8 group index. */
> +
> +	group_idx = _tbl8_alloc(lpm);
> +	if ((group_idx < 0) && (lpm->dq != NULL)) {


I think (group_idx == -ENOSPC) will be safer


> +		/* If there are no tbl8 groups try to reclaim one. */
> +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL) == 0)
> +			group_idx = _tbl8_alloc(lpm);


I'm not really happy with this approach. _tbl8_alloc() produces linear 
scan through the memory to find a free group_idx and it is the slowest 
part of rte_lpm_add().
Here after reclamation of some group index we need to rescan a memory 
again to find it. It would be great if there will be some way to return 
reclaimed elements. Or just to dequeue elements from dq and reclaim them 
manually.


> +	}
> +
> +	return group_idx;
> +}
> +
>   static void
> -tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>   {
> -	/* Set tbl8 group invalid*/
>   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
>   
> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -			__ATOMIC_RELAXED);
> +	if (!lpm->v) {
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
> +		/* Wait for quiescent state change. */
> +		rte_rcu_qsbr_synchronize(lpm->v, RTE_QSBR_THRID_INVALID);
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
> +		/* Push into QSBR defer queue. */
> +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&tbl8_group_start);
> +	}
>   }
>   
>   static __rte_noinline int32_t
> @@ -523,7 +624,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   
>   	if (!lpm->tbl24[tbl24_index].valid) {
>   		/* Search for a free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc(lpm);
>   
>   		/* Check tbl8 allocation was successful. */
>   		if (tbl8_group_index < 0) {
> @@ -569,7 +670,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   	} /* If valid entry but not extended calculate the index into Table8. */
>   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>   		/* Search for free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc(lpm);
>   
>   		if (tbl8_group_index < 0) {
>   			return tbl8_group_index;
> @@ -977,7 +1078,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
>   		 */
>   		lpm->tbl24[tbl24_index].valid = 0;
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free(lpm->tbl8, tbl8_group_start);
> +		tbl8_free(lpm, tbl8_group_start);
>   	} else if (tbl8_recycle_index > -1) {
>   		/* Update tbl24 entry. */
>   		struct rte_lpm_tbl_entry new_tbl24_entry = {
> @@ -993,7 +1094,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
>   		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
>   				__ATOMIC_RELAXED);
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free(lpm->tbl8, tbl8_group_start);
> +		tbl8_free(lpm, tbl8_group_start);
>   	}
>   #undef group_idx
>   	return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index b9d49ac87..8c054509a 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>    */
>   
>   #ifndef _RTE_LPM_H_
> @@ -20,6 +21,7 @@
>   #include <rte_memory.h>
>   #include <rte_common.h>
>   #include <rte_vect.h>
> +#include <rte_rcu_qsbr.h>
>   
>   #ifdef __cplusplus
>   extern "C" {
> @@ -62,6 +64,17 @@ extern "C" {
>   /** Bitmask used to indicate successful lookup */
>   #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
>   
> +/** @internal Default threshold to trigger RCU defer queue reclaimation. */
> +#define RTE_LPM_RCU_DQ_RECLAIM_THD	32
> +
> +/** @internal Default RCU defer queue entries to reclaim in one go. */
> +#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
> +
> +/* Create defer queue for reclaim. */
> +#define RTE_LPM_QSBR_MODE_DQ		0
> +/* Use blocking mode reclaim. No defer queue created. */
> +#define RTE_LPM_QSBR_MODE_SYNC		0x01


using enums instead of defines?


> +
>   #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
>   /** @internal Tbl24 entry structure. */
>   __extension__
> @@ -130,6 +143,28 @@ struct rte_lpm {
>   			__rte_cache_aligned; /**< LPM tbl24 table. */
>   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +
> +	/* RCU config. */
> +	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
> +	uint32_t rcu_mode;		/* Blocking, defer queue. */
> +	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
> +};
> +
> +/** LPM RCU QSBR configuration structure. */
> +struct rte_lpm_rcu_config {
> +	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
> +	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
> +	 * '0' for default: create defer queue for reclaim.
> +	 */
> +	uint32_t mode;
> +	/* RCU defer queue size. default: lpm->number_tbl8s. */
> +	uint32_t dq_size;
> +	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim.
> +				 * default: RTE_LPM_RCU_DQ_RECLAIM_TRHD.
> +				 */
> +	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
> +				 * default: RTE_LPM_RCU_DQ_RECLAIM_MAX.
> +				 */
>   };
>   
>   /**
> @@ -179,6 +214,30 @@ rte_lpm_find_existing(const char *name);
>   void
>   rte_lpm_free(struct rte_lpm *lpm);
>   
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param cfg
> + *   RCU QSBR configuration
> + * @param dq
> + *   handler of created RCU QSBR defer queue
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
> +	struct rte_rcu_qsbr_dq **dq);
> +
>   /**
>    * Add a rule to the LPM table.
>    *
> diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
> index 500f58b80..bfccd7eac 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -21,3 +21,9 @@ DPDK_20.0 {
>   
>   	local: *;
>   };
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_lpm_rcu_qsbr_add;
> +};
  
Vladimir Medvedkin June 18, 2020, 5:36 p.m. UTC | #3
Hi Honnappa,

On 08/06/2020 19:46, Honnappa Nagarahalli wrote:
> <snip>
>
>> Subject: [PATCH v4 1/3] lib/lpm: integrate RCU QSBR
>>
>> Currently, the tbl8 group is freed even though the readers might be using the
>> tbl8 group entries. The freed tbl8 group can be reallocated quickly. This
>> results in incorrect lookup results.
>>
>> RCU QSBR process is integrated for safe tbl8 group reclaim.
>> Refer to RCU documentation to understand various aspects of integrating
>> RCU library into other libraries.
>>
>> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
>> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
>> ---
>>   doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
>>   lib/librte_lpm/Makefile            |   2 +-
>>   lib/librte_lpm/meson.build         |   1 +
>>   lib/librte_lpm/rte_lpm.c           | 123 ++++++++++++++++++++++++++---
>>   lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
>>   lib/librte_lpm/rte_lpm_version.map |   6 ++
>>   6 files changed, 211 insertions(+), 12 deletions(-)
>>
>> diff --git a/doc/guides/prog_guide/lpm_lib.rst
>> b/doc/guides/prog_guide/lpm_lib.rst
>> index 1609a57d0..7cc99044a 100644
>> --- a/doc/guides/prog_guide/lpm_lib.rst
>> +++ b/doc/guides/prog_guide/lpm_lib.rst
>> @@ -145,6 +145,38 @@ depending on whether we need to move to the next
>> table or not.
>>   Prefix expansion is one of the keys of this algorithm,  since it improves the
>> speed dramatically by adding redundancy.
>>
>> +Deletion
>> +~~~~~~~~
>> +
>> +When deleting a rule, a replacement rule is searched for. Replacement
>> +rule is an existing rule that has the longest prefix match with the rule to be
>> deleted, but has smaller depth.
>> +
>> +If a replacement rule is found, target tbl24 and tbl8 entries are
>> +updated to have the same depth and next hop value with the replacement
>> rule.
>> +
>> +If no replacement rule can be found, target tbl24 and tbl8 entries will be
>> cleared.
>> +
>> +Prefix expansion is performed if the rule's depth is not exactly 24 bits or 32
>> bits.
>> +
>> +After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry
>> are freed in following cases:
>> +
>> +*   All tbl8s in the group are empty .
>> +
>> +*   All tbl8s in the group have the same values and with depth no greater
>> than 24.
>> +
>> +Free of tbl8s have different behaviors:
>> +
>> +*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
>> +
>> +*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
>> +
>> +When the LPM is not using RCU, tbl8 group can be freed immediately even
>> +though the readers might be using the tbl8 group entries. This might result
>> in incorrect lookup results.
>> +
>> +RCU QSBR process is integrated for safe tbl8 group reclaimation.
>> +Application has certain responsibilities while using this feature.
>> +Please refer to resource reclaimation framework of :ref:`RCU library
>> <RCU_Library>` for more details.
>> +
>>   Lookup
>>   ~~~~~~
>>
>> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
>> d682785b6..6f06c5c03 100644
>> --- a/lib/librte_lpm/Makefile
>> +++ b/lib/librte_lpm/Makefile
>> @@ -8,7 +8,7 @@ LIB = librte_lpm.a
>>
>>   CFLAGS += -O3
>>   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
>> -LDLIBS += -lrte_eal -lrte_hash
>> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
>>
>>   EXPORT_MAP := rte_lpm_version.map
>>
>> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build index
>> 021ac6d8d..6cfc083c5 100644
>> --- a/lib/librte_lpm/meson.build
>> +++ b/lib/librte_lpm/meson.build
>> @@ -7,3 +7,4 @@ headers = files('rte_lpm.h', 'rte_lpm6.h')  # without
>> worrying about which architecture we actually need  headers +=
>> files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')  deps += ['hash']
>> +deps += ['rcu']
>> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
>> 38ab512a4..30f541179 100644
>> --- a/lib/librte_lpm/rte_lpm.c
>> +++ b/lib/librte_lpm/rte_lpm.c
>> @@ -1,5 +1,6 @@
>>   /* SPDX-License-Identifier: BSD-3-Clause
>>    * Copyright(c) 2010-2014 Intel Corporation
>> + * Copyright(c) 2020 Arm Limited
>>    */
>>
>>   #include <string.h>
>> @@ -246,12 +247,85 @@ rte_lpm_free(struct rte_lpm *lpm)
>>
>>   	rte_mcfg_tailq_write_unlock();
>>
>> +	if (lpm->dq)
>> +		rte_rcu_qsbr_dq_delete(lpm->dq);
>>   	rte_free(lpm->tbl8);
>>   	rte_free(lpm->rules_tbl);
>>   	rte_free(lpm);
>>   	rte_free(te);
>>   }
>>
>> +static void
>> +__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n) {
>> +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
>> +	uint32_t tbl8_group_index = *(uint32_t *)data;
>> +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
>> +
>> +	RTE_SET_USED(n);
>> +	/* Set tbl8 group invalid */
>> +	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
>> +		__ATOMIC_RELAXED);
>> +}
>> +
>> +/* Associate QSBR variable with an LPM object.
>> + */
>> +int
>> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
>> +	struct rte_rcu_qsbr_dq **dq)
> I prefer not to return the defer queue to the user here. I see 3 different ways how RCU can be integrated in the libraries:
>
> 1) The sync mode in which the defer queue is not created. The rte_rcu_qsbr_synchronize API is called after delete. The resource is freed after rte_rcu_qsbr_synchronize returns and the control is given back to the user.
>
> 2) The mode where the defer queue is created. There is a lot of flexibility provided now as the defer queue size, reclaim threshold and how many resources to reclaim are all configurable. IMO, this solves most of the use cases and helps the application integrate lock-less algorithms with minimal effort.
>
> 3) This is where the application has its own method of reclamation that does not fall under 1) or 2). To address this use case, I think we should make changes to the LPM library. Today, in LPM, the delete and free are combined into a single API. We can split this single API into 2 separate APIs - delete and free (similar thing was done to rte_hash library) without affecting the ABI. This should provide all the flexibility required for the application to implement any kind of reclamation algorithm it wants. Returning the defer queue to the user in the above API does not solve this use case.


Agree, I don't see any case when user will need the defer queue. From my 
perspective reclamation of tbl8 is totally internal and user should not 
worry about it. So, in case of LPM I don't see any real use case when we 
need to enable third way of how RCU can be integrated.
P.S. In rte_fib case we even don't have an opportunity to reclaim it not 
from the library due to rte_fib struct layout is hidden from user. 
Moreover, there may not be tbl8 at all, since it supports different 
algorithms


>> +{
>> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
>> +	struct rte_rcu_qsbr_dq_parameters params = {0};
>> +
>> +	if ((lpm == NULL) || (cfg == NULL)) {
>> +		rte_errno = EINVAL;
>> +		return 1;
>> +	}
>> +
>> +	if (lpm->v) {
>> +		rte_errno = EEXIST;
>> +		return 1;
>> +	}
>> +
>> +	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
>> +		/* No other things to do. */
>> +	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
>> +		/* Init QSBR defer queue. */
>> +		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
>> +				"LPM_RCU_%s", lpm->name);
>> +		params.name = rcu_dq_name;
>> +		params.size = cfg->dq_size;
>> +		if (params.size == 0)
>> +			params.size = lpm->number_tbl8s;
>> +		params.trigger_reclaim_limit = cfg->reclaim_thd;
>> +		if (params.trigger_reclaim_limit == 0)
>> +			params.trigger_reclaim_limit =
>> +					RTE_LPM_RCU_DQ_RECLAIM_THD;
>> +		params.max_reclaim_size = cfg->reclaim_max;
>> +		if (params.max_reclaim_size == 0)
>> +			params.max_reclaim_size =
>> RTE_LPM_RCU_DQ_RECLAIM_MAX;
>> +		params.esize = sizeof(uint32_t);	/* tbl8 group index */
>> +		params.free_fn = __lpm_rcu_qsbr_free_resource;
>> +		params.p = lpm->tbl8;
>> +		params.v = cfg->v;
>> +		lpm->dq = rte_rcu_qsbr_dq_create(&params);
>> +		if (lpm->dq == NULL) {
>> +			RTE_LOG(ERR, LPM,
>> +					"LPM QS defer queue creation
>> failed\n");
>> +			return 1;
>> +		}
>> +		if (dq)
>> +			*dq = lpm->dq;
>> +	} else {
>> +		rte_errno = EINVAL;
>> +		return 1;
>> +	}
>> +	lpm->rcu_mode = cfg->mode;
>> +	lpm->v = cfg->v;
>> +
>> +	return 0;
>> +}
>> +
>>   /*
>>    * Adds a rule to the rule table.
>>    *
>> @@ -394,14 +468,15 @@ rule_find(struct rte_lpm *lpm, uint32_t ip_masked,
>> uint8_t depth)
>>    * Find, clean and allocate a tbl8.
>>    */
>>   static int32_t
>> -tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
>> +_tbl8_alloc(struct rte_lpm *lpm)
>>   {
>>   	uint32_t group_idx; /* tbl8 group index. */
>>   	struct rte_lpm_tbl_entry *tbl8_entry;
>>
>>   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
>> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
>> -		tbl8_entry = &tbl8[group_idx *
>> RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
>> +		tbl8_entry = &lpm->tbl8[group_idx *
>> +
>> 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>>   		/* If a free tbl8 group is found clean it and set as VALID. */
>>   		if (!tbl8_entry->valid_group) {
>>   			struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -
>> 427,14 +502,40 @@ tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t
>> number_tbl8s)
>>   	return -ENOSPC;
>>   }
>>
>> +static int32_t
>> +tbl8_alloc(struct rte_lpm *lpm)
>> +{
>> +	int32_t group_idx; /* tbl8 group index. */
>> +
>> +	group_idx = _tbl8_alloc(lpm);
>> +	if ((group_idx < 0) && (lpm->dq != NULL)) {
>> +		/* If there are no tbl8 groups try to reclaim one. */
>> +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL)
>> == 0)
>> +			group_idx = _tbl8_alloc(lpm);
>> +	}
>> +
>> +	return group_idx;
>> +}
>> +
>>   static void
>> -tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
>> +tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>>   {
>> -	/* Set tbl8 group invalid*/
>>   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
>>
>> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
>> -			__ATOMIC_RELAXED);
>> +	if (!lpm->v) {
>> +		/* Set tbl8 group invalid*/
>> +		__atomic_store(&lpm->tbl8[tbl8_group_start],
>> &zero_tbl8_entry,
>> +				__ATOMIC_RELAXED);
>> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
>> +		/* Wait for quiescent state change. */
>> +		rte_rcu_qsbr_synchronize(lpm->v,
>> RTE_QSBR_THRID_INVALID);
>> +		/* Set tbl8 group invalid*/
>> +		__atomic_store(&lpm->tbl8[tbl8_group_start],
>> &zero_tbl8_entry,
>> +				__ATOMIC_RELAXED);
>> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
>> +		/* Push into QSBR defer queue. */
>> +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void
>> *)&tbl8_group_start);
>> +	}
>>   }
>>
>>   static __rte_noinline int32_t
>> @@ -523,7 +624,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
>> ip_masked, uint8_t depth,
>>
>>   	if (!lpm->tbl24[tbl24_index].valid) {
>>   		/* Search for a free tbl8 group. */
>> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
>> +		tbl8_group_index = tbl8_alloc(lpm);
>>
>>   		/* Check tbl8 allocation was successful. */
>>   		if (tbl8_group_index < 0) {
>> @@ -569,7 +670,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
>> ip_masked, uint8_t depth,
>>   	} /* If valid entry but not extended calculate the index into Table8. */
>>   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>>   		/* Search for free tbl8 group. */
>> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
>> +		tbl8_group_index = tbl8_alloc(lpm);
>>
>>   		if (tbl8_group_index < 0) {
>>   			return tbl8_group_index;
>> @@ -977,7 +1078,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t
>> ip_masked,
>>   		 */
>>   		lpm->tbl24[tbl24_index].valid = 0;
>>   		__atomic_thread_fence(__ATOMIC_RELEASE);
>> -		tbl8_free(lpm->tbl8, tbl8_group_start);
>> +		tbl8_free(lpm, tbl8_group_start);
>>   	} else if (tbl8_recycle_index > -1) {
>>   		/* Update tbl24 entry. */
>>   		struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -993,7
>> +1094,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
>>   		__atomic_store(&lpm->tbl24[tbl24_index],
>> &new_tbl24_entry,
>>   				__ATOMIC_RELAXED);
>>   		__atomic_thread_fence(__ATOMIC_RELEASE);
>> -		tbl8_free(lpm->tbl8, tbl8_group_start);
>> +		tbl8_free(lpm, tbl8_group_start);
>>   	}
>>   #undef group_idx
>>   	return 0;
>> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
>> b9d49ac87..8c054509a 100644
>> --- a/lib/librte_lpm/rte_lpm.h
>> +++ b/lib/librte_lpm/rte_lpm.h
>> @@ -1,5 +1,6 @@
>>   /* SPDX-License-Identifier: BSD-3-Clause
>>    * Copyright(c) 2010-2014 Intel Corporation
>> + * Copyright(c) 2020 Arm Limited
>>    */
>>
>>   #ifndef _RTE_LPM_H_
>> @@ -20,6 +21,7 @@
>>   #include <rte_memory.h>
>>   #include <rte_common.h>
>>   #include <rte_vect.h>
>> +#include <rte_rcu_qsbr.h>
>>
>>   #ifdef __cplusplus
>>   extern "C" {
>> @@ -62,6 +64,17 @@ extern "C" {
>>   /** Bitmask used to indicate successful lookup */
>>   #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
>>
>> +/** @internal Default threshold to trigger RCU defer queue reclaimation. */
>> +#define RTE_LPM_RCU_DQ_RECLAIM_THD	32
>> +
>> +/** @internal Default RCU defer queue entries to reclaim in one go. */
>> +#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
>> +
>> +/* Create defer queue for reclaim. */
>> +#define RTE_LPM_QSBR_MODE_DQ		0
>> +/* Use blocking mode reclaim. No defer queue created. */
>> +#define RTE_LPM_QSBR_MODE_SYNC		0x01
>> +
>>   #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
>>   /** @internal Tbl24 entry structure. */  __extension__ @@ -130,6 +143,28
>> @@ struct rte_lpm {
>>   			__rte_cache_aligned; /**< LPM tbl24 table. */
>>   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>>   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
>> +
>> +	/* RCU config. */
>> +	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
>> +	uint32_t rcu_mode;		/* Blocking, defer queue. */
>> +	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
>> +};
>> +
>> +/** LPM RCU QSBR configuration structure. */ struct rte_lpm_rcu_config
>> +{
>> +	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
>> +	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
>> +	 * '0' for default: create defer queue for reclaim.
>> +	 */
>> +	uint32_t mode;
>> +	/* RCU defer queue size. default: lpm->number_tbl8s. */
>> +	uint32_t dq_size;
>> +	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim.
>> +				 * default:
>> RTE_LPM_RCU_DQ_RECLAIM_TRHD.
>> +				 */
>> +	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
>> +				 * default:
>> RTE_LPM_RCU_DQ_RECLAIM_MAX.
>> +				 */
>>   };
>>
>>   /**
>> @@ -179,6 +214,30 @@ rte_lpm_find_existing(const char *name);  void
>> rte_lpm_free(struct rte_lpm *lpm);
>>
>> +/**
>> + * @warning
>> + * @b EXPERIMENTAL: this API may change without prior notice
>> + *
>> + * Associate RCU QSBR variable with an LPM object.
>> + *
>> + * @param lpm
>> + *   the lpm object to add RCU QSBR
>> + * @param cfg
>> + *   RCU QSBR configuration
>> + * @param dq
>> + *   handler of created RCU QSBR defer queue
>> + * @return
>> + *   On success - 0
>> + *   On error - 1 with error code set in rte_errno.
>> + *   Possible rte_errno codes are:
>> + *   - EINVAL - invalid pointer
>> + *   - EEXIST - already added QSBR
>> + *   - ENOMEM - memory allocation failure
>> + */
>> +__rte_experimental
>> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config
>> *cfg,
>> +	struct rte_rcu_qsbr_dq **dq);
>> +
>>   /**
>>    * Add a rule to the LPM table.
>>    *
>> diff --git a/lib/librte_lpm/rte_lpm_version.map
>> b/lib/librte_lpm/rte_lpm_version.map
>> index 500f58b80..bfccd7eac 100644
>> --- a/lib/librte_lpm/rte_lpm_version.map
>> +++ b/lib/librte_lpm/rte_lpm_version.map
>> @@ -21,3 +21,9 @@ DPDK_20.0 {
>>
>>   	local: *;
>>   };
>> +
>> +EXPERIMENTAL {
>> +	global:
>> +
>> +	rte_lpm_rcu_qsbr_add;
>> +};
>> --
>> 2.17.1
  
Ruifeng Wang June 22, 2020, 5:46 a.m. UTC | #4
Hi Vladimir,

> -----Original Message-----
> From: Medvedkin, Vladimir <vladimir.medvedkin@intel.com>
> Sent: Friday, June 19, 2020 1:22 AM
> To: Ruifeng Wang <Ruifeng.Wang@arm.com>; Bruce Richardson
> <bruce.richardson@intel.com>; John McNamara
> <john.mcnamara@intel.com>; Marko Kovacevic
> <marko.kovacevic@intel.com>; Ray Kinsella <mdr@ashroe.eu>; Neil Horman
> <nhorman@tuxdriver.com>
> Cc: dev@dpdk.org; konstantin.ananyev@intel.com; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>
> Subject: Re: [PATCH v4 1/3] lib/lpm: integrate RCU QSBR
> 
> Hi Ruifeng,
> 
> Thanks for patches, see comments below
Thanks for your review.
> 
> 
> On 08/06/2020 06:16, Ruifeng Wang wrote:
> > Currently, the tbl8 group is freed even though the readers might be
> > using the tbl8 group entries. The freed tbl8 group can be reallocated
> > quickly. This results in incorrect lookup results.
> >
> > RCU QSBR process is integrated for safe tbl8 group reclaim.
> > Refer to RCU documentation to understand various aspects of
> > integrating RCU library into other libraries.
> >
> > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > ---
> >   doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
> >   lib/librte_lpm/Makefile            |   2 +-
> >   lib/librte_lpm/meson.build         |   1 +
> >   lib/librte_lpm/rte_lpm.c           | 123 ++++++++++++++++++++++++++---
> >   lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
> >   lib/librte_lpm/rte_lpm_version.map |   6 ++
> >   6 files changed, 211 insertions(+), 12 deletions(-)
> >
> > diff --git a/doc/guides/prog_guide/lpm_lib.rst
> > b/doc/guides/prog_guide/lpm_lib.rst
> > index 1609a57d0..7cc99044a 100644
> > --- a/doc/guides/prog_guide/lpm_lib.rst
> > +++ b/doc/guides/prog_guide/lpm_lib.rst
> > @@ -145,6 +145,38 @@ depending on whether we need to move to the
> next table or not.
> >   Prefix expansion is one of the keys of this algorithm,
> >   since it improves the speed dramatically by adding redundancy.
> >
> > +Deletion
> > +~~~~~~~~
> > +
> > +When deleting a rule, a replacement rule is searched for. Replacement
> > +rule is an existing rule that has the longest prefix match with the rule to be
> deleted, but has smaller depth.
> > +
> > +If a replacement rule is found, target tbl24 and tbl8 entries are
> > +updated to have the same depth and next hop value with the
> replacement rule.
> > +
> > +If no replacement rule can be found, target tbl24 and tbl8 entries will be
> cleared.
> > +
> > +Prefix expansion is performed if the rule's depth is not exactly 24 bits or
> 32 bits.
> > +
> > +After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry
> are freed in following cases:
> > +
> > +*   All tbl8s in the group are empty .
> > +
> > +*   All tbl8s in the group have the same values and with depth no greater
> than 24.
> > +
> > +Free of tbl8s have different behaviors:
> > +
> > +*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
> > +
> > +*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
> > +
> > +When the LPM is not using RCU, tbl8 group can be freed immediately
> > +even though the readers might be using the tbl8 group entries. This might
> result in incorrect lookup results.
> > +
> > +RCU QSBR process is integrated for safe tbl8 group reclaimation.
> > +Application has certain responsibilities while using this feature.
> > +Please refer to resource reclaimation framework of :ref:`RCU library
> <RCU_Library>` for more details.
> > +
> >   Lookup
> >   ~~~~~~
> >
> > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
> > d682785b6..6f06c5c03 100644
> > --- a/lib/librte_lpm/Makefile
> > +++ b/lib/librte_lpm/Makefile
> > @@ -8,7 +8,7 @@ LIB = librte_lpm.a
> >
> >   CFLAGS += -O3
> >   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal
> > -lrte_hash
> > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> >
> >   EXPORT_MAP := rte_lpm_version.map
> >
> > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> > index 021ac6d8d..6cfc083c5 100644
> > --- a/lib/librte_lpm/meson.build
> > +++ b/lib/librte_lpm/meson.build
> > @@ -7,3 +7,4 @@ headers = files('rte_lpm.h', 'rte_lpm6.h')
> >   # without worrying about which architecture we actually need
> >   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
> >   deps += ['hash']
> > +deps += ['rcu']
> > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
> > 38ab512a4..30f541179 100644
> > --- a/lib/librte_lpm/rte_lpm.c
> > +++ b/lib/librte_lpm/rte_lpm.c
> > @@ -1,5 +1,6 @@
> >   /* SPDX-License-Identifier: BSD-3-Clause
> >    * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2020 Arm Limited
> >    */
> >
> >   #include <string.h>
> > @@ -246,12 +247,85 @@ rte_lpm_free(struct rte_lpm *lpm)
> >
> >   	rte_mcfg_tailq_write_unlock();
> >
> > +	if (lpm->dq)
> > +		rte_rcu_qsbr_dq_delete(lpm->dq);
> >   	rte_free(lpm->tbl8);
> >   	rte_free(lpm->rules_tbl);
> >   	rte_free(lpm);
> >   	rte_free(te);
> >   }
> >
> > +static void
> > +__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n) {
> > +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > +	uint32_t tbl8_group_index = *(uint32_t *)data;
> > +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> > +
> > +	RTE_SET_USED(n);
> > +	/* Set tbl8 group invalid */
> > +	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
> > +		__ATOMIC_RELAXED);
> > +}
> > +
> > +/* Associate QSBR variable with an LPM object.
> > + */
> > +int
> > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config
> *cfg,
> > +	struct rte_rcu_qsbr_dq **dq)
> > +{
> > +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> > +	struct rte_rcu_qsbr_dq_parameters params = {0};
> > +
> > +	if ((lpm == NULL) || (cfg == NULL)) {
> > +		rte_errno = EINVAL;
> > +		return 1;
> > +	}
> > +
> > +	if (lpm->v) {
> > +		rte_errno = EEXIST;
> > +		return 1;
> > +	}
> > +
> > +	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
> > +		/* No other things to do. */
> > +	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
> > +		/* Init QSBR defer queue. */
> > +		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
> > +				"LPM_RCU_%s", lpm->name);
> > +		params.name = rcu_dq_name;
> > +		params.size = cfg->dq_size;
> > +		if (params.size == 0)
> > +			params.size = lpm->number_tbl8s;
> > +		params.trigger_reclaim_limit = cfg->reclaim_thd;
> > +		if (params.trigger_reclaim_limit == 0)
> 
> 
> This makes it impossible for a user to configure reclamation triggering with
> every call. Should we allow it?
Yes, use (reclaim_thd = 0) to trigger reclamation at each dq enqueue should be a valid case.
Will remove value overriding and take it as is.
> 
> 
> > +			params.trigger_reclaim_limit =
> > +					RTE_LPM_RCU_DQ_RECLAIM_THD;
> > +		params.max_reclaim_size = cfg->reclaim_max;
> > +		if (params.max_reclaim_size == 0)
> > +			params.max_reclaim_size =
> RTE_LPM_RCU_DQ_RECLAIM_MAX;
> > +		params.esize = sizeof(uint32_t);	/* tbl8 group index */
> > +		params.free_fn = __lpm_rcu_qsbr_free_resource;
> > +		params.p = lpm->tbl8;
> 
> 
> I think it's better to pass the LPM pointer here rather than tbl8, for
> example, in case we decide to add some counters in the future
Use LPM pointer is more extendable.
Will change in next version.
> 
> 
> > +		params.v = cfg->v;
> > +		lpm->dq = rte_rcu_qsbr_dq_create(&params);
> > +		if (lpm->dq == NULL) {
> > +			RTE_LOG(ERR, LPM,
> > +					"LPM QS defer queue creation
> failed\n");
> > +			return 1;
> > +		}
> > +		if (dq)
> > +			*dq = lpm->dq;
> > +	} else {
> > +		rte_errno = EINVAL;
> > +		return 1;
> > +	}
> > +	lpm->rcu_mode = cfg->mode;
> > +	lpm->v = cfg->v;
> > +
> > +	return 0;
> > +}
> > +
> >   /*
> >    * Adds a rule to the rule table.
> >    *
> > @@ -394,14 +468,15 @@ rule_find(struct rte_lpm *lpm, uint32_t
> ip_masked, uint8_t depth)
> >    * Find, clean and allocate a tbl8.
> >    */
> >   static int32_t
> > -tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> > +_tbl8_alloc(struct rte_lpm *lpm)
> >   {
> >   	uint32_t group_idx; /* tbl8 group index. */
> >   	struct rte_lpm_tbl_entry *tbl8_entry;
> >
> >   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> > -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> > -		tbl8_entry = &tbl8[group_idx *
> RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> > +		tbl8_entry = &lpm->tbl8[group_idx *
> > +
> 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> >   		/* If a free tbl8 group is found clean it and set as VALID. */
> >   		if (!tbl8_entry->valid_group) {
> >   			struct rte_lpm_tbl_entry new_tbl8_entry = {
> > @@ -427,14 +502,40 @@ tbl8_alloc(struct rte_lpm_tbl_entry *tbl8,
> uint32_t number_tbl8s)
> >   	return -ENOSPC;
> >   }
> >
> > +static int32_t
> > +tbl8_alloc(struct rte_lpm *lpm)
> > +{
> > +	int32_t group_idx; /* tbl8 group index. */
> > +
> > +	group_idx = _tbl8_alloc(lpm);
> > +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> 
> 
> I think (group_idx == -ENOSPC) will be safer
Will change it in next version.
> 
> 
> > +		/* If there are no tbl8 groups try to reclaim one. */
> > +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL)
> == 0)
> > +			group_idx = _tbl8_alloc(lpm);
> 
> 
> I'm not really happy with this approach. _tbl8_alloc() produces linear
> scan through the memory to find a free group_idx and it is the slowest
> part of rte_lpm_add().
> Here after reclamation of some group index we need to rescan a memory
> again to find it. It would be great if there will be some way to return
> reclaimed elements. Or just to dequeue elements from dq and reclaim them
> manually.
I think there is little chance a rescan will be needed. If RCU QSBR defer queue trigger_reclaim_limit
is configured with reasonable value, tbl8 groups will be reclaimed regularly. So defer queue won't get too long.

Return reclaimed elements makes API complex. Not sure if it is useful for other use cases.
@Honnappa Nagarahalli, any idea?
> 
> 
> > +	}
> > +
> > +	return group_idx;
> > +}
> > +
> >   static void
> > -tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> > +tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
> >   {
> > -	/* Set tbl8 group invalid*/
> >   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> >
> > -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> > -			__ATOMIC_RELAXED);
> > +	if (!lpm->v) {
> > +		/* Set tbl8 group invalid*/
> > +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> &zero_tbl8_entry,
> > +				__ATOMIC_RELAXED);
> > +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
> > +		/* Wait for quiescent state change. */
> > +		rte_rcu_qsbr_synchronize(lpm->v,
> RTE_QSBR_THRID_INVALID);
> > +		/* Set tbl8 group invalid*/
> > +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> &zero_tbl8_entry,
> > +				__ATOMIC_RELAXED);
> > +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
> > +		/* Push into QSBR defer queue. */
> > +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void
> *)&tbl8_group_start);
> > +	}
> >   }
> >
> >   static __rte_noinline int32_t
> > @@ -523,7 +624,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked, uint8_t depth,
> >
> >   	if (!lpm->tbl24[tbl24_index].valid) {
> >   		/* Search for a free tbl8 group. */
> > -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm-
> >number_tbl8s);
> > +		tbl8_group_index = tbl8_alloc(lpm);
> >
> >   		/* Check tbl8 allocation was successful. */
> >   		if (tbl8_group_index < 0) {
> > @@ -569,7 +670,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked, uint8_t depth,
> >   	} /* If valid entry but not extended calculate the index into Table8. */
> >   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
> >   		/* Search for free tbl8 group. */
> > -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm-
> >number_tbl8s);
> > +		tbl8_group_index = tbl8_alloc(lpm);
> >
> >   		if (tbl8_group_index < 0) {
> >   			return tbl8_group_index;
> > @@ -977,7 +1078,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked,
> >   		 */
> >   		lpm->tbl24[tbl24_index].valid = 0;
> >   		__atomic_thread_fence(__ATOMIC_RELEASE);
> > -		tbl8_free(lpm->tbl8, tbl8_group_start);
> > +		tbl8_free(lpm, tbl8_group_start);
> >   	} else if (tbl8_recycle_index > -1) {
> >   		/* Update tbl24 entry. */
> >   		struct rte_lpm_tbl_entry new_tbl24_entry = {
> > @@ -993,7 +1094,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t
> ip_masked,
> >   		__atomic_store(&lpm->tbl24[tbl24_index],
> &new_tbl24_entry,
> >   				__ATOMIC_RELAXED);
> >   		__atomic_thread_fence(__ATOMIC_RELEASE);
> > -		tbl8_free(lpm->tbl8, tbl8_group_start);
> > +		tbl8_free(lpm, tbl8_group_start);
> >   	}
> >   #undef group_idx
> >   	return 0;
> > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> > index b9d49ac87..8c054509a 100644
> > --- a/lib/librte_lpm/rte_lpm.h
> > +++ b/lib/librte_lpm/rte_lpm.h
> > @@ -1,5 +1,6 @@
> >   /* SPDX-License-Identifier: BSD-3-Clause
> >    * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2020 Arm Limited
> >    */
> >
> >   #ifndef _RTE_LPM_H_
> > @@ -20,6 +21,7 @@
> >   #include <rte_memory.h>
> >   #include <rte_common.h>
> >   #include <rte_vect.h>
> > +#include <rte_rcu_qsbr.h>
> >
> >   #ifdef __cplusplus
> >   extern "C" {
> > @@ -62,6 +64,17 @@ extern "C" {
> >   /** Bitmask used to indicate successful lookup */
> >   #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
> >
> > +/** @internal Default threshold to trigger RCU defer queue reclaimation.
> */
> > +#define RTE_LPM_RCU_DQ_RECLAIM_THD	32
> > +
> > +/** @internal Default RCU defer queue entries to reclaim in one go. */
> > +#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
> > +
> > +/* Create defer queue for reclaim. */
> > +#define RTE_LPM_QSBR_MODE_DQ		0
> > +/* Use blocking mode reclaim. No defer queue created. */
> > +#define RTE_LPM_QSBR_MODE_SYNC		0x01
> 
> 
> using enums instead of defines?
Will convert to enums in next version.
> 
> 
> > +
> >   #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
> >   /** @internal Tbl24 entry structure. */
> >   __extension__
> > @@ -130,6 +143,28 @@ struct rte_lpm {
> >   			__rte_cache_aligned; /**< LPM tbl24 table. */
> >   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> >   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > +
> > +	/* RCU config. */
> > +	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
> > +	uint32_t rcu_mode;		/* Blocking, defer queue. */
> > +	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
> > +};
> > +
> > +/** LPM RCU QSBR configuration structure. */
> > +struct rte_lpm_rcu_config {
> > +	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
> > +	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
> > +	 * '0' for default: create defer queue for reclaim.
> > +	 */
> > +	uint32_t mode;
> > +	/* RCU defer queue size. default: lpm->number_tbl8s. */
> > +	uint32_t dq_size;
> > +	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim.
> > +				 * default:
> RTE_LPM_RCU_DQ_RECLAIM_TRHD.
> > +				 */
> > +	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
> > +				 * default:
> RTE_LPM_RCU_DQ_RECLAIM_MAX.
> > +				 */
> >   };
> >
> >   /**
> > @@ -179,6 +214,30 @@ rte_lpm_find_existing(const char *name);
> >   void
> >   rte_lpm_free(struct rte_lpm *lpm);
> >
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Associate RCU QSBR variable with an LPM object.
> > + *
> > + * @param lpm
> > + *   the lpm object to add RCU QSBR
> > + * @param cfg
> > + *   RCU QSBR configuration
> > + * @param dq
> > + *   handler of created RCU QSBR defer queue
> > + * @return
> > + *   On success - 0
> > + *   On error - 1 with error code set in rte_errno.
> > + *   Possible rte_errno codes are:
> > + *   - EINVAL - invalid pointer
> > + *   - EEXIST - already added QSBR
> > + *   - ENOMEM - memory allocation failure
> > + */
> > +__rte_experimental
> > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct
> rte_lpm_rcu_config *cfg,
> > +	struct rte_rcu_qsbr_dq **dq);
> > +
> >   /**
> >    * Add a rule to the LPM table.
> >    *
> > diff --git a/lib/librte_lpm/rte_lpm_version.map
> b/lib/librte_lpm/rte_lpm_version.map
> > index 500f58b80..bfccd7eac 100644
> > --- a/lib/librte_lpm/rte_lpm_version.map
> > +++ b/lib/librte_lpm/rte_lpm_version.map
> > @@ -21,3 +21,9 @@ DPDK_20.0 {
> >
> >   	local: *;
> >   };
> > +
> > +EXPERIMENTAL {
> > +	global:
> > +
> > +	rte_lpm_rcu_qsbr_add;
> > +};
> 
> --
> Regards,
> Vladimir
  
Honnappa Nagarahalli June 23, 2020, 4:34 a.m. UTC | #5
<snip>

> >
> >
> > On 08/06/2020 06:16, Ruifeng Wang wrote:
> > > Currently, the tbl8 group is freed even though the readers might be
> > > using the tbl8 group entries. The freed tbl8 group can be
> > > reallocated quickly. This results in incorrect lookup results.
> > >
> > > RCU QSBR process is integrated for safe tbl8 group reclaim.
> > > Refer to RCU documentation to understand various aspects of
> > > integrating RCU library into other libraries.
> > >
> > > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > ---
> > >   doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
> > >   lib/librte_lpm/Makefile            |   2 +-
> > >   lib/librte_lpm/meson.build         |   1 +
> > >   lib/librte_lpm/rte_lpm.c           | 123 ++++++++++++++++++++++++++---
> > >   lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
> > >   lib/librte_lpm/rte_lpm_version.map |   6 ++
> > >   6 files changed, 211 insertions(+), 12 deletions(-)
> > >
> > > diff --git a/doc/guides/prog_guide/lpm_lib.rst
> > > b/doc/guides/prog_guide/lpm_lib.rst
> > > index 1609a57d0..7cc99044a 100644
> > > --- a/doc/guides/prog_guide/lpm_lib.rst
> > > +++ b/doc/guides/prog_guide/lpm_lib.rst
> > > @@ -145,6 +145,38 @@ depending on whether we need to move to the
> > next table or not.
> > >   Prefix expansion is one of the keys of this algorithm,
> > >   since it improves the speed dramatically by adding redundancy.
> > >
> > > +Deletion
> > > +~~~~~~~~
> > > +
> > > +When deleting a rule, a replacement rule is searched for.
> > > +Replacement rule is an existing rule that has the longest prefix
> > > +match with the rule to be
> > deleted, but has smaller depth.
> > > +
> > > +If a replacement rule is found, target tbl24 and tbl8 entries are
> > > +updated to have the same depth and next hop value with the
> > replacement rule.
> > > +
> > > +If no replacement rule can be found, target tbl24 and tbl8 entries
> > > +will be
> > cleared.
> > > +
> > > +Prefix expansion is performed if the rule's depth is not exactly 24
> > > +bits or
> > 32 bits.
> > > +
> > > +After deleting a rule, a group of tbl8s that belongs to the same
> > > +tbl24 entry
> > are freed in following cases:
> > > +
> > > +*   All tbl8s in the group are empty .
> > > +
> > > +*   All tbl8s in the group have the same values and with depth no
> greater
> > than 24.
> > > +
> > > +Free of tbl8s have different behaviors:
> > > +
> > > +*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
> > > +
> > > +*   If RCU is used, tbl8s are reclaimed when readers are in quiescent
> state.
> > > +
> > > +When the LPM is not using RCU, tbl8 group can be freed immediately
> > > +even though the readers might be using the tbl8 group entries. This
> > > +might
> > result in incorrect lookup results.
> > > +
> > > +RCU QSBR process is integrated for safe tbl8 group reclaimation.
> > > +Application has certain responsibilities while using this feature.
> > > +Please refer to resource reclaimation framework of :ref:`RCU
> > > +library
> > <RCU_Library>` for more details.
> > > +
> > >   Lookup
> > >   ~~~~~~
> > >
> > > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
> > > d682785b6..6f06c5c03 100644
> > > --- a/lib/librte_lpm/Makefile
> > > +++ b/lib/librte_lpm/Makefile
> > > @@ -8,7 +8,7 @@ LIB = librte_lpm.a
> > >
> > >   CFLAGS += -O3
> > >   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal
> > > -lrte_hash
> > > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> > >
> > >   EXPORT_MAP := rte_lpm_version.map
> > >
> > > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> > > index 021ac6d8d..6cfc083c5 100644
> > > --- a/lib/librte_lpm/meson.build
> > > +++ b/lib/librte_lpm/meson.build
> > > @@ -7,3 +7,4 @@ headers = files('rte_lpm.h', 'rte_lpm6.h')
> > >   # without worrying about which architecture we actually need
> > >   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
> > >   deps += ['hash']
> > > +deps += ['rcu']
> > > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> > > index
> > > 38ab512a4..30f541179 100644
> > > --- a/lib/librte_lpm/rte_lpm.c
> > > +++ b/lib/librte_lpm/rte_lpm.c
> > > @@ -1,5 +1,6 @@
> > >   /* SPDX-License-Identifier: BSD-3-Clause
> > >    * Copyright(c) 2010-2014 Intel Corporation
> > > + * Copyright(c) 2020 Arm Limited
> > >    */
> > >
> > >   #include <string.h>
> > > @@ -246,12 +247,85 @@ rte_lpm_free(struct rte_lpm *lpm)
> > >
> > >   rte_mcfg_tailq_write_unlock();
> > >
> > > +if (lpm->dq)
> > > +rte_rcu_qsbr_dq_delete(lpm->dq);
> > >   rte_free(lpm->tbl8);
> > >   rte_free(lpm->rules_tbl);
> > >   rte_free(lpm);
> > >   rte_free(te);
> > >   }
> > >
> > > +static void
> > > +__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n) {
> > > +struct rte_lpm_tbl_entry zero_tbl8_entry = {0}; uint32_t
> > > +tbl8_group_index = *(uint32_t *)data; struct rte_lpm_tbl_entry
> > > +*tbl8 = (struct rte_lpm_tbl_entry *)p;
> > > +
> > > +RTE_SET_USED(n);
> > > +/* Set tbl8 group invalid */
> > > +__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
> > > +__ATOMIC_RELAXED); }
> > > +
> > > +/* Associate QSBR variable with an LPM object.
> > > + */
> > > +int
> > > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config
> > *cfg,
> > > +struct rte_rcu_qsbr_dq **dq)
> > > +{
> > > +char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> > > +struct rte_rcu_qsbr_dq_parameters params = {0};
> > > +
> > > +if ((lpm == NULL) || (cfg == NULL)) { rte_errno = EINVAL; return 1;
> > > +}
> > > +
> > > +if (lpm->v) {
> > > +rte_errno = EEXIST;
> > > +return 1;
> > > +}
> > > +
> > > +if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
> > > +/* No other things to do. */
> > > +} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
> > > +/* Init QSBR defer queue. */
> > > +snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s",
> > > +lpm->name); params.name = rcu_dq_name; params.size = cfg->dq_size;
> > > +if (params.size == 0) params.size = lpm->number_tbl8s;
> > > +params.trigger_reclaim_limit = cfg->reclaim_thd; if
> > > +(params.trigger_reclaim_limit == 0)
> >
> >
> > This makes it impossible for a user to configure reclamation
> > triggering with every call. Should we allow it?
> Yes, use (reclaim_thd = 0) to trigger reclamation at each dq enqueue should
> be a valid case.
> Will remove value overriding and take it as is.
> >
> >
> > > +params.trigger_reclaim_limit =
> > > +RTE_LPM_RCU_DQ_RECLAIM_THD;
> > > +params.max_reclaim_size = cfg->reclaim_max; if
> > > +(params.max_reclaim_size == 0) params.max_reclaim_size =
> > RTE_LPM_RCU_DQ_RECLAIM_MAX;
> > > +params.esize = sizeof(uint32_t);/* tbl8 group index */
> > > +params.free_fn = __lpm_rcu_qsbr_free_resource; params.p =
> > > +lpm->tbl8;
> >
> >
> > I think it's better to pass the LPM pointer here rather than tbl8, for
> > example, in case we decide to add some counters in the future
> Use LPM pointer is more extendable.
> Will change in next version.
> >
> >
> > > +params.v = cfg->v;
> > > +lpm->dq = rte_rcu_qsbr_dq_create(&params);
> > > +if (lpm->dq == NULL) {
> > > +RTE_LOG(ERR, LPM,
> > > +"LPM QS defer queue creation
> > failed\n");
> > > +return 1;
> > > +}
> > > +if (dq)
> > > +*dq = lpm->dq;
> > > +} else {
> > > +rte_errno = EINVAL;
> > > +return 1;
> > > +}
> > > +lpm->rcu_mode = cfg->mode;
> > > +lpm->v = cfg->v;
> > > +
> > > +return 0;
> > > +}
> > > +
> > >   /*
> > >    * Adds a rule to the rule table.
> > >    *
> > > @@ -394,14 +468,15 @@ rule_find(struct rte_lpm *lpm, uint32_t
> > ip_masked, uint8_t depth)
> > >    * Find, clean and allocate a tbl8.
> > >    */
> > >   static int32_t
> > > -tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> > > +_tbl8_alloc(struct rte_lpm *lpm)
> > >   {
> > >   uint32_t group_idx; /* tbl8 group index. */
> > >   struct rte_lpm_tbl_entry *tbl8_entry;
> > >
> > >   /* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> > > -for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> > > -tbl8_entry = &tbl8[group_idx *
> > RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > > +for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> > > +tbl8_entry = &lpm->tbl8[group_idx *
> > > +
> > RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > >   /* If a free tbl8 group is found clean it and set as VALID. */
> > >   if (!tbl8_entry->valid_group) {
> > >   struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -427,14 +502,40 @@
> > > tbl8_alloc(struct rte_lpm_tbl_entry *tbl8,
> > uint32_t number_tbl8s)
> > >   return -ENOSPC;
> > >   }
> > >
> > > +static int32_t
> > > +tbl8_alloc(struct rte_lpm *lpm)
> > > +{
> > > +int32_t group_idx; /* tbl8 group index. */
> > > +
> > > +group_idx = _tbl8_alloc(lpm);
> > > +if ((group_idx < 0) && (lpm->dq != NULL)) {
> >
> >
> > I think (group_idx == -ENOSPC) will be safer
> Will change it in next version.
> >
> >
> > > +/* If there are no tbl8 groups try to reclaim one. */ if
> > > +(rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL)
> > == 0)
> > > +group_idx = _tbl8_alloc(lpm);
> >
> >
> > I'm not really happy with this approach. _tbl8_alloc() produces linear
> > scan through the memory to find a free group_idx and it is the slowest
> > part of rte_lpm_add().
> > Here after reclamation of some group index we need to rescan a memory
> > again to find it. It would be great if there will be some way to
> > return reclaimed elements. Or just to dequeue elements from dq and
> > reclaim them manually.
> I think there is little chance a rescan will be needed. If RCU QSBR defer
> queue trigger_reclaim_limit is configured with reasonable value, tbl8 groups
> will be reclaimed regularly. So defer queue won't get too long.
> 
> Return reclaimed elements makes API complex. Not sure if it is useful for
> other use cases.
> @Honnappa Nagarahalli, any idea?
I think it is a problem that should be solved in LPM. I had hacked up the allocation scheme to use a ring when I was doing the performance testing. I can send that out if required.

> >
> >
> > > +}
> > > +
> > > +return group_idx;
> > > +}
> > > +
> > >   static void
> > > -tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > > tbl8_group_start)
> > > +tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
> > >   {
> > > -/* Set tbl8 group invalid*/
> > >   struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > >
> > > -__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> > > -__ATOMIC_RELAXED);
> > > +if (!lpm->v) {
> > > +/* Set tbl8 group invalid*/
> > > +__atomic_store(&lpm->tbl8[tbl8_group_start],
> > &zero_tbl8_entry,
> > > +__ATOMIC_RELAXED);
> > > +} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
> > > +/* Wait for quiescent state change. */
> > > +rte_rcu_qsbr_synchronize(lpm->v,
> > RTE_QSBR_THRID_INVALID);
> > > +/* Set tbl8 group invalid*/
> > > +__atomic_store(&lpm->tbl8[tbl8_group_start],
> > &zero_tbl8_entry,
> > > +__ATOMIC_RELAXED);
> > > +} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
> > > +/* Push into QSBR defer queue. */
> > > +rte_rcu_qsbr_dq_enqueue(lpm->dq, (void
> > *)&tbl8_group_start);
> > > +}
> > >   }
> > >
> > >   static __rte_noinline int32_t
> > > @@ -523,7 +624,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
> > ip_masked, uint8_t depth,
> > >
> > >   if (!lpm->tbl24[tbl24_index].valid) {
> > >   /* Search for a free tbl8 group. */  -tbl8_group_index =
> > >tbl8_alloc(lpm->tbl8, lpm- number_tbl8s);
> > > +tbl8_group_index = tbl8_alloc(lpm);
> > >
> > >   /* Check tbl8 allocation was successful. */
> > >   if (tbl8_group_index < 0) {
> > > @@ -569,7 +670,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t
> > ip_masked, uint8_t depth,
> > >   } /* If valid entry but not extended calculate the index into Table8. */
> > >   else if (lpm->tbl24[tbl24_index].valid_group == 0) {
> > >   /* Search for free tbl8 group. */
> > > -tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm- number_tbl8s);
> > > +tbl8_group_index = tbl8_alloc(lpm);
> > >
> > >   if (tbl8_group_index < 0) {
> > >   return tbl8_group_index;
> > > @@ -977,7 +1078,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t
> > ip_masked,
> > >    */
> > >   lpm->tbl24[tbl24_index].valid = 0;
> > >   __atomic_thread_fence(__ATOMIC_RELEASE);
> > > -tbl8_free(lpm->tbl8, tbl8_group_start);
> > > +tbl8_free(lpm, tbl8_group_start);
> > >   } else if (tbl8_recycle_index > -1) {
> > >   /* Update tbl24 entry. */
> > >   struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -993,7 +1094,7 @@
> > > delete_depth_big(struct rte_lpm *lpm, uint32_t
> > ip_masked,
> > >   __atomic_store(&lpm->tbl24[tbl24_index],
> > &new_tbl24_entry,
> > >   __ATOMIC_RELAXED);
> > >   __atomic_thread_fence(__ATOMIC_RELEASE);
> > > -tbl8_free(lpm->tbl8, tbl8_group_start);
> > > +tbl8_free(lpm, tbl8_group_start);
> > >   }
> > >   #undef group_idx
> > >   return 0;
> > > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> > > index b9d49ac87..8c054509a 100644
> > > --- a/lib/librte_lpm/rte_lpm.h
> > > +++ b/lib/librte_lpm/rte_lpm.h
> > > @@ -1,5 +1,6 @@
> > >   /* SPDX-License-Identifier: BSD-3-Clause
> > >    * Copyright(c) 2010-2014 Intel Corporation
> > > + * Copyright(c) 2020 Arm Limited
> > >    */
> > >
> > >   #ifndef _RTE_LPM_H_
> > > @@ -20,6 +21,7 @@
> > >   #include <rte_memory.h>
> > >   #include <rte_common.h>
> > >   #include <rte_vect.h>
> > > +#include <rte_rcu_qsbr.h>
> > >
> > >   #ifdef __cplusplus
> > >   extern "C" {
> > > @@ -62,6 +64,17 @@ extern "C" {
> > >   /** Bitmask used to indicate successful lookup */
> > >   #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
> > >
> > > +/** @internal Default threshold to trigger RCU defer queue
> reclaimation.
> > */
> > > +#define RTE_LPM_RCU_DQ_RECLAIM_THD32
> > > +
> > > +/** @internal Default RCU defer queue entries to reclaim in one go.
> > > +*/ #define RTE_LPM_RCU_DQ_RECLAIM_MAX16
> > > +
> > > +/* Create defer queue for reclaim. */ #define
> RTE_LPM_QSBR_MODE_DQ0
> > > +/* Use blocking mode reclaim. No defer queue created. */ #define
> > > +RTE_LPM_QSBR_MODE_SYNC0x01
> >
> >
> > using enums instead of defines?
> Will convert to enums in next version.
> >
> >
> > > +
> > >   #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
> > >   /** @internal Tbl24 entry structure. */
> > >   __extension__
> > > @@ -130,6 +143,28 @@ struct rte_lpm {
> > >   __rte_cache_aligned; /**< LPM tbl24 table. */
> > >   struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> > >   struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > > +
> > > +/* RCU config. */
> > > +struct rte_rcu_qsbr *v;/* RCU QSBR variable. */ uint32_t
> > > +rcu_mode;/* Blocking, defer queue. */ struct rte_rcu_qsbr_dq *dq;/*
> > > +RCU QSBR defer queue. */ };
> > > +
> > > +/** LPM RCU QSBR configuration structure. */ struct
> > > +rte_lpm_rcu_config { struct rte_rcu_qsbr *v;/* RCU QSBR variable.
> > > +*/
> > > +/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
> > > + * '0' for default: create defer queue for reclaim.
> > > + */
> > > +uint32_t mode;
> > > +/* RCU defer queue size. default: lpm->number_tbl8s. */ uint32_t
> > > +dq_size; uint32_t reclaim_thd;/* Threshold to trigger auto reclaim.
> > > + * default:
> > RTE_LPM_RCU_DQ_RECLAIM_TRHD.
> > > + */
> > > +uint32_t reclaim_max;/* Max entries to reclaim in one go.
> > > + * default:
> > RTE_LPM_RCU_DQ_RECLAIM_MAX.
> > > + */
> > >   };
> > >
> > >   /**
> > > @@ -179,6 +214,30 @@ rte_lpm_find_existing(const char *name);
> > >   void
> > >   rte_lpm_free(struct rte_lpm *lpm);
> > >
> > > +/**
> > > + * @warning
> > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > + *
> > > + * Associate RCU QSBR variable with an LPM object.
> > > + *
> > > + * @param lpm
> > > + *   the lpm object to add RCU QSBR
> > > + * @param cfg
> > > + *   RCU QSBR configuration
> > > + * @param dq
> > > + *   handler of created RCU QSBR defer queue
> > > + * @return
> > > + *   On success - 0
> > > + *   On error - 1 with error code set in rte_errno.
> > > + *   Possible rte_errno codes are:
> > > + *   - EINVAL - invalid pointer
> > > + *   - EEXIST - already added QSBR
> > > + *   - ENOMEM - memory allocation failure
> > > + */
> > > +__rte_experimental
> > > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct
> > rte_lpm_rcu_config *cfg,
> > > +struct rte_rcu_qsbr_dq **dq);
> > > +
> > >   /**
> > >    * Add a rule to the LPM table.
> > >    *
> > > diff --git a/lib/librte_lpm/rte_lpm_version.map
> > b/lib/librte_lpm/rte_lpm_version.map
> > > index 500f58b80..bfccd7eac 100644
> > > --- a/lib/librte_lpm/rte_lpm_version.map
> > > +++ b/lib/librte_lpm/rte_lpm_version.map
> > > @@ -21,3 +21,9 @@ DPDK_20.0 {
> > >
> > >   local: *;
> > >   };
> > > +
> > > +EXPERIMENTAL {
> > > +global:
> > > +
> > > +rte_lpm_rcu_qsbr_add;
> > > +};
> >
> > --
> > Regards,
> > Vladimir
>
  

Patch

diff --git a/doc/guides/prog_guide/lpm_lib.rst b/doc/guides/prog_guide/lpm_lib.rst
index 1609a57d0..7cc99044a 100644
--- a/doc/guides/prog_guide/lpm_lib.rst
+++ b/doc/guides/prog_guide/lpm_lib.rst
@@ -145,6 +145,38 @@  depending on whether we need to move to the next table or not.
 Prefix expansion is one of the keys of this algorithm,
 since it improves the speed dramatically by adding redundancy.
 
+Deletion
+~~~~~~~~
+
+When deleting a rule, a replacement rule is searched for. Replacement rule is an existing rule that has
+the longest prefix match with the rule to be deleted, but has smaller depth.
+
+If a replacement rule is found, target tbl24 and tbl8 entries are updated to have the same depth and next hop
+value with the replacement rule.
+
+If no replacement rule can be found, target tbl24 and tbl8 entries will be cleared.
+
+Prefix expansion is performed if the rule's depth is not exactly 24 bits or 32 bits.
+
+After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry are freed in following cases:
+
+*   All tbl8s in the group are empty .
+
+*   All tbl8s in the group have the same values and with depth no greater than 24.
+
+Free of tbl8s have different behaviors:
+
+*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
+
+*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
+
+When the LPM is not using RCU, tbl8 group can be freed immediately even though the readers might be using
+the tbl8 group entries. This might result in incorrect lookup results.
+
+RCU QSBR process is integrated for safe tbl8 group reclaimation. Application has certain responsibilities
+while using this feature. Please refer to resource reclaimation framework of :ref:`RCU library <RCU_Library>`
+for more details.
+
 Lookup
 ~~~~~~
 
diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
index d682785b6..6f06c5c03 100644
--- a/lib/librte_lpm/Makefile
+++ b/lib/librte_lpm/Makefile
@@ -8,7 +8,7 @@  LIB = librte_lpm.a
 
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_hash
+LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
 
 EXPORT_MAP := rte_lpm_version.map
 
diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
index 021ac6d8d..6cfc083c5 100644
--- a/lib/librte_lpm/meson.build
+++ b/lib/librte_lpm/meson.build
@@ -7,3 +7,4 @@  headers = files('rte_lpm.h', 'rte_lpm6.h')
 # without worrying about which architecture we actually need
 headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
 deps += ['hash']
+deps += ['rcu']
diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
index 38ab512a4..30f541179 100644
--- a/lib/librte_lpm/rte_lpm.c
+++ b/lib/librte_lpm/rte_lpm.c
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright(c) 2020 Arm Limited
  */
 
 #include <string.h>
@@ -246,12 +247,85 @@  rte_lpm_free(struct rte_lpm *lpm)
 
 	rte_mcfg_tailq_write_unlock();
 
+	if (lpm->dq)
+		rte_rcu_qsbr_dq_delete(lpm->dq);
 	rte_free(lpm->tbl8);
 	rte_free(lpm->rules_tbl);
 	rte_free(lpm);
 	rte_free(te);
 }
 
+static void
+__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
+{
+	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
+	uint32_t tbl8_group_index = *(uint32_t *)data;
+	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
+
+	RTE_SET_USED(n);
+	/* Set tbl8 group invalid */
+	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
+		__ATOMIC_RELAXED);
+}
+
+/* Associate QSBR variable with an LPM object.
+ */
+int
+rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
+	struct rte_rcu_qsbr_dq **dq)
+{
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params = {0};
+
+	if ((lpm == NULL) || (cfg == NULL)) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	if (lpm->v) {
+		rte_errno = EEXIST;
+		return 1;
+	}
+
+	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
+		/* No other things to do. */
+	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
+		/* Init QSBR defer queue. */
+		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+				"LPM_RCU_%s", lpm->name);
+		params.name = rcu_dq_name;
+		params.size = cfg->dq_size;
+		if (params.size == 0)
+			params.size = lpm->number_tbl8s;
+		params.trigger_reclaim_limit = cfg->reclaim_thd;
+		if (params.trigger_reclaim_limit == 0)
+			params.trigger_reclaim_limit =
+					RTE_LPM_RCU_DQ_RECLAIM_THD;
+		params.max_reclaim_size = cfg->reclaim_max;
+		if (params.max_reclaim_size == 0)
+			params.max_reclaim_size = RTE_LPM_RCU_DQ_RECLAIM_MAX;
+		params.esize = sizeof(uint32_t);	/* tbl8 group index */
+		params.free_fn = __lpm_rcu_qsbr_free_resource;
+		params.p = lpm->tbl8;
+		params.v = cfg->v;
+		lpm->dq = rte_rcu_qsbr_dq_create(&params);
+		if (lpm->dq == NULL) {
+			RTE_LOG(ERR, LPM,
+					"LPM QS defer queue creation failed\n");
+			return 1;
+		}
+		if (dq)
+			*dq = lpm->dq;
+	} else {
+		rte_errno = EINVAL;
+		return 1;
+	}
+	lpm->rcu_mode = cfg->mode;
+	lpm->v = cfg->v;
+
+	return 0;
+}
+
 /*
  * Adds a rule to the rule table.
  *
@@ -394,14 +468,15 @@  rule_find(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth)
  * Find, clean and allocate a tbl8.
  */
 static int32_t
-tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
+_tbl8_alloc(struct rte_lpm *lpm)
 {
 	uint32_t group_idx; /* tbl8 group index. */
 	struct rte_lpm_tbl_entry *tbl8_entry;
 
 	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
-	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
-		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
+	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
+		tbl8_entry = &lpm->tbl8[group_idx *
+					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
 		/* If a free tbl8 group is found clean it and set as VALID. */
 		if (!tbl8_entry->valid_group) {
 			struct rte_lpm_tbl_entry new_tbl8_entry = {
@@ -427,14 +502,40 @@  tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
 	return -ENOSPC;
 }
 
+static int32_t
+tbl8_alloc(struct rte_lpm *lpm)
+{
+	int32_t group_idx; /* tbl8 group index. */
+
+	group_idx = _tbl8_alloc(lpm);
+	if ((group_idx < 0) && (lpm->dq != NULL)) {
+		/* If there are no tbl8 groups try to reclaim one. */
+		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL) == 0)
+			group_idx = _tbl8_alloc(lpm);
+	}
+
+	return group_idx;
+}
+
 static void
-tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
+tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
 {
-	/* Set tbl8 group invalid*/
 	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
 
-	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
-			__ATOMIC_RELAXED);
+	if (!lpm->v) {
+		/* Set tbl8 group invalid*/
+		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
+				__ATOMIC_RELAXED);
+	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
+		/* Wait for quiescent state change. */
+		rte_rcu_qsbr_synchronize(lpm->v, RTE_QSBR_THRID_INVALID);
+		/* Set tbl8 group invalid*/
+		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
+				__ATOMIC_RELAXED);
+	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
+		/* Push into QSBR defer queue. */
+		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&tbl8_group_start);
+	}
 }
 
 static __rte_noinline int32_t
@@ -523,7 +624,7 @@  add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
 
 	if (!lpm->tbl24[tbl24_index].valid) {
 		/* Search for a free tbl8 group. */
-		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
+		tbl8_group_index = tbl8_alloc(lpm);
 
 		/* Check tbl8 allocation was successful. */
 		if (tbl8_group_index < 0) {
@@ -569,7 +670,7 @@  add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
 	} /* If valid entry but not extended calculate the index into Table8. */
 	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
 		/* Search for free tbl8 group. */
-		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
+		tbl8_group_index = tbl8_alloc(lpm);
 
 		if (tbl8_group_index < 0) {
 			return tbl8_group_index;
@@ -977,7 +1078,7 @@  delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
 		 */
 		lpm->tbl24[tbl24_index].valid = 0;
 		__atomic_thread_fence(__ATOMIC_RELEASE);
-		tbl8_free(lpm->tbl8, tbl8_group_start);
+		tbl8_free(lpm, tbl8_group_start);
 	} else if (tbl8_recycle_index > -1) {
 		/* Update tbl24 entry. */
 		struct rte_lpm_tbl_entry new_tbl24_entry = {
@@ -993,7 +1094,7 @@  delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
 		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
 				__ATOMIC_RELAXED);
 		__atomic_thread_fence(__ATOMIC_RELEASE);
-		tbl8_free(lpm->tbl8, tbl8_group_start);
+		tbl8_free(lpm, tbl8_group_start);
 	}
 #undef group_idx
 	return 0;
diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
index b9d49ac87..8c054509a 100644
--- a/lib/librte_lpm/rte_lpm.h
+++ b/lib/librte_lpm/rte_lpm.h
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright(c) 2020 Arm Limited
  */
 
 #ifndef _RTE_LPM_H_
@@ -20,6 +21,7 @@ 
 #include <rte_memory.h>
 #include <rte_common.h>
 #include <rte_vect.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -62,6 +64,17 @@  extern "C" {
 /** Bitmask used to indicate successful lookup */
 #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
 
+/** @internal Default threshold to trigger RCU defer queue reclaimation. */
+#define RTE_LPM_RCU_DQ_RECLAIM_THD	32
+
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
+
+/* Create defer queue for reclaim. */
+#define RTE_LPM_QSBR_MODE_DQ		0
+/* Use blocking mode reclaim. No defer queue created. */
+#define RTE_LPM_QSBR_MODE_SYNC		0x01
+
 #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
 /** @internal Tbl24 entry structure. */
 __extension__
@@ -130,6 +143,28 @@  struct rte_lpm {
 			__rte_cache_aligned; /**< LPM tbl24 table. */
 	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
 	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
+
+	/* RCU config. */
+	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
+	uint32_t rcu_mode;		/* Blocking, defer queue. */
+	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
+};
+
+/** LPM RCU QSBR configuration structure. */
+struct rte_lpm_rcu_config {
+	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
+	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
+	 * '0' for default: create defer queue for reclaim.
+	 */
+	uint32_t mode;
+	/* RCU defer queue size. default: lpm->number_tbl8s. */
+	uint32_t dq_size;
+	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim.
+				 * default: RTE_LPM_RCU_DQ_RECLAIM_TRHD.
+				 */
+	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
+				 * default: RTE_LPM_RCU_DQ_RECLAIM_MAX.
+				 */
 };
 
 /**
@@ -179,6 +214,30 @@  rte_lpm_find_existing(const char *name);
 void
 rte_lpm_free(struct rte_lpm *lpm);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Associate RCU QSBR variable with an LPM object.
+ *
+ * @param lpm
+ *   the lpm object to add RCU QSBR
+ * @param cfg
+ *   RCU QSBR configuration
+ * @param dq
+ *   handler of created RCU QSBR defer queue
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
+	struct rte_rcu_qsbr_dq **dq);
+
 /**
  * Add a rule to the LPM table.
  *
diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
index 500f58b80..bfccd7eac 100644
--- a/lib/librte_lpm/rte_lpm_version.map
+++ b/lib/librte_lpm/rte_lpm_version.map
@@ -21,3 +21,9 @@  DPDK_20.0 {
 
 	local: *;
 };
+
+EXPERIMENTAL {
+	global:
+
+	rte_lpm_rcu_qsbr_add;
+};