[v4,21/22] pdcp: add thread safe processing

Message ID 20230526210214.617-22-anoobj@marvell.com (mailing list archive)
State Superseded, archived
Delegated to: akhil goyal
Headers
Series lib: add pdcp protocol |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Anoob Joseph May 26, 2023, 9:02 p.m. UTC
  From: Volodymyr Fialko <vfialko@marvell.com>

PDCP state has to be guarded for:

- Uplink pre_process:
    - tx_next atomic increment

- Downlink pre_process:
    - rx_deliv - read

- Downlink post_process:
    - rx_deliv, rx_reorder, rx_next - read/write
    - bitmask/reorder buffer - read/write

When application requires thread safe processing, the state variables
need to be updated atomically. Add config option to select this option
per entity.

Signed-off-by: Anoob Joseph <anoobj@marvell.com>
Signed-off-by: Volodymyr Fialko <vfialko@marvell.com>
---
 lib/pdcp/meson.build    |   2 +
 lib/pdcp/pdcp_entity.h  |  64 ++++++++++++++++++++++
 lib/pdcp/pdcp_process.c | 117 +++++++++++++++++++++++++++++++++-------
 lib/pdcp/rte_pdcp.c     |   9 ++++
 lib/pdcp/rte_pdcp.h     |   2 +
 5 files changed, 175 insertions(+), 19 deletions(-)
  

Comments

Stephen Hemminger May 26, 2023, 10:11 p.m. UTC | #1
On Sat, 27 May 2023 02:32:13 +0530
Anoob Joseph <anoobj@marvell.com> wrote:

> +static inline uint32_t
> +pdcp_atomic_inc(uint32_t *val, const bool mt_safe)
> +{
> +	if (mt_safe)
> +		return __atomic_fetch_add(val, 1, __ATOMIC_RELAXED);
> +	else
> +		return (*val)++;
> +}

This is a bad pattern. None of the rest of DPDK does this.
Either be thread safe or not.
  
Stephen Hemminger May 26, 2023, 10:15 p.m. UTC | #2
On Sat, 27 May 2023 02:32:13 +0530
Anoob Joseph <anoobj@marvell.com> wrote:

> +	/* Lock to protect concurrent updates */
> +	rte_rwlock_t rwl;
>  	/** Bitmap memory region

Reader/writer locks are significantly more expensive than simple
spin lock. Even uncontended access requires additional cache update.
Unless there is case with a long region where lock is held,
just use a spin lock.

See some Paul McKenney's locking talks (LCA video is good) about
lock overheads in Linux. Since DPDK has same set of instructions
to use; the same issues apply to userspace.
  
Anoob Joseph May 27, 2023, 5:24 a.m. UTC | #3
Hi Stephen,

Please see inline.

Thanks,
Anoob

> -----Original Message-----
> From: Stephen Hemminger <stephen@networkplumber.org>
> Sent: Saturday, May 27, 2023 3:42 AM
> To: Anoob Joseph <anoobj@marvell.com>
> Cc: Thomas Monjalon <thomas@monjalon.net>; Akhil Goyal
> <gakhil@marvell.com>; Jerin Jacob Kollanukkaran <jerinj@marvell.com>;
> Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>; Volodymyr Fialko
> <vfialko@marvell.com>; Hemant Agrawal <hemant.agrawal@nxp.com>;
> Mattias Rönnblom <mattias.ronnblom@ericsson.com>; Kiran Kumar
> Kokkilagadda <kirankumark@marvell.com>; dev@dpdk.org; Olivier Matz
> <olivier.matz@6wind.com>
> Subject: [EXT] Re: [PATCH v4 21/22] pdcp: add thread safe processing
> 
> External Email
> 
> ----------------------------------------------------------------------
> On Sat, 27 May 2023 02:32:13 +0530
> Anoob Joseph <anoobj@marvell.com> wrote:
> 
> > +static inline uint32_t
> > +pdcp_atomic_inc(uint32_t *val, const bool mt_safe) {
> > +	if (mt_safe)
> > +		return __atomic_fetch_add(val, 1, __ATOMIC_RELAXED);
> > +	else
> > +		return (*val)++;
> > +}
> 
> This is a bad pattern. None of the rest of DPDK does this.
> Either be thread safe or not.

[Anoob] Most protocol implementation would have a similar issue. I've tried to follow the approach taken in lib IPsec (please check lib/ipsec/ipsec_sqn.h). From the discussion we had on v3, I've tried to make it compile time constant to remove any conditional checks in datapath.

If you still think, this is an issue, I could drop this patch for now and re-introduce it later once we have test applications that can work with traffic. Please share your thoughts.
  
Anoob Joseph May 27, 2023, 7:17 a.m. UTC | #4
Hi Stephen,

I've dropped this patch for now. Based on what we conclude on this thread, will post this as a separate patch as required.

Thanks,
Anoob

> -----Original Message-----
> From: Anoob Joseph <anoobj@marvell.com>
> Sent: Saturday, May 27, 2023 10:55 AM
> To: Stephen Hemminger <stephen@networkplumber.org>
> Cc: Thomas Monjalon <thomas@monjalon.net>; Akhil Goyal
> <gakhil@marvell.com>; Jerin Jacob Kollanukkaran <jerinj@marvell.com>;
> Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>; Volodymyr Fialko
> <vfialko@marvell.com>; Hemant Agrawal <hemant.agrawal@nxp.com>;
> Mattias Rönnblom <mattias.ronnblom@ericsson.com>; Kiran Kumar
> Kokkilagadda <kirankumark@marvell.com>; dev@dpdk.org; Olivier Matz
> <olivier.matz@6wind.com>
> Subject: RE: [EXT] Re: [PATCH v4 21/22] pdcp: add thread safe processing
> 
> Hi Stephen,
> 
> Please see inline.
> 
> Thanks,
> Anoob
> 
> > -----Original Message-----
> > From: Stephen Hemminger <stephen@networkplumber.org>
> > Sent: Saturday, May 27, 2023 3:42 AM
> > To: Anoob Joseph <anoobj@marvell.com>
> > Cc: Thomas Monjalon <thomas@monjalon.net>; Akhil Goyal
> > <gakhil@marvell.com>; Jerin Jacob Kollanukkaran <jerinj@marvell.com>;
> > Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>; Volodymyr
> Fialko
> > <vfialko@marvell.com>; Hemant Agrawal <hemant.agrawal@nxp.com>;
> > Mattias Rönnblom <mattias.ronnblom@ericsson.com>; Kiran Kumar
> > Kokkilagadda <kirankumark@marvell.com>; dev@dpdk.org; Olivier Matz
> > <olivier.matz@6wind.com>
> > Subject: [EXT] Re: [PATCH v4 21/22] pdcp: add thread safe processing
> >
> > External Email
> >
> > ----------------------------------------------------------------------
> > On Sat, 27 May 2023 02:32:13 +0530
> > Anoob Joseph <anoobj@marvell.com> wrote:
> >
> > > +static inline uint32_t
> > > +pdcp_atomic_inc(uint32_t *val, const bool mt_safe) {
> > > +	if (mt_safe)
> > > +		return __atomic_fetch_add(val, 1, __ATOMIC_RELAXED);
> > > +	else
> > > +		return (*val)++;
> > > +}
> >
> > This is a bad pattern. None of the rest of DPDK does this.
> > Either be thread safe or not.
> 
> [Anoob] Most protocol implementation would have a similar issue. I've tried
> to follow the approach taken in lib IPsec (please check lib/ipsec/ipsec_sqn.h).
> From the discussion we had on v3, I've tried to make it compile time constant
> to remove any conditional checks in datapath.
> 
> If you still think, this is an issue, I could drop this patch for now and re-
> introduce it later once we have test applications that can work with traffic.
> Please share your thoughts.
  

Patch

diff --git a/lib/pdcp/meson.build b/lib/pdcp/meson.build
index f4f9246bcb..c4f135778e 100644
--- a/lib/pdcp/meson.build
+++ b/lib/pdcp/meson.build
@@ -19,3 +19,5 @@  headers = files('rte_pdcp.h')
 indirect_headers += files('rte_pdcp_group.h')
 
 deps += ['mbuf', 'net', 'cryptodev', 'security', 'reorder']
+
+annotate_locks = false
diff --git a/lib/pdcp/pdcp_entity.h b/lib/pdcp/pdcp_entity.h
index 9f74b5d0e5..e88af4c3b3 100644
--- a/lib/pdcp/pdcp_entity.h
+++ b/lib/pdcp/pdcp_entity.h
@@ -9,6 +9,7 @@ 
 #include <rte_crypto_sym.h>
 #include <rte_mempool.h>
 #include <rte_pdcp.h>
+#include <rte_rwlock.h>
 #include <rte_security.h>
 
 #include "pdcp_reorder.h"
@@ -162,6 +163,8 @@  struct entity_priv {
 		uint64_t is_status_report_required : 1;
 		/** Is out-of-order delivery enabled */
 		uint64_t is_out_of_order_delivery : 1;
+		/** Is thread safety enabled. */
+		uint64_t is_thread_safety_enabled : 1;
 	} flags;
 	/** Crypto op pool. */
 	struct rte_mempool *cop_pool;
@@ -182,6 +185,8 @@  struct entity_priv_dl_part {
 	struct pdcp_t_reordering t_reorder;
 	/** Reorder packet buffer */
 	struct pdcp_reorder reorder;
+	/* Lock to protect concurrent updates */
+	rte_rwlock_t rwl;
 	/** Bitmap memory region */
 	uint8_t bitmap_mem[0];
 };
@@ -257,4 +262,63 @@  pdcp_hfn_max(enum rte_security_pdcp_sn_size sn_size)
 	return (1 << (32 - sn_size)) - 1;
 }
 
+static inline uint32_t
+pdcp_atomic_inc(uint32_t *val, const bool mt_safe)
+{
+	if (mt_safe)
+		return __atomic_fetch_add(val, 1, __ATOMIC_RELAXED);
+	else
+		return (*val)++;
+}
+
+static inline void
+pdcp_lock_init(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (en_priv->flags.is_thread_safety_enabled)
+		rte_rwlock_init(&dl->rwl);
+}
+
+static inline void
+pdcp_read_lock(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (en_priv->flags.is_thread_safety_enabled)
+		rte_rwlock_read_lock(&dl->rwl);
+}
+
+static inline void
+pdcp_read_unlock(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (en_priv->flags.is_thread_safety_enabled)
+		rte_rwlock_read_unlock(&dl->rwl);
+}
+
+static inline void
+pdcp_write_lock(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (en_priv->flags.is_thread_safety_enabled)
+		rte_rwlock_write_lock(&dl->rwl);
+}
+
+static inline void
+pdcp_write_unlock(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (en_priv->flags.is_thread_safety_enabled)
+		rte_rwlock_write_unlock(&dl->rwl);
+}
+
 #endif /* PDCP_ENTITY_H */
diff --git a/lib/pdcp/pdcp_process.c b/lib/pdcp/pdcp_process.c
index c093b78e10..aa13801d5d 100644
--- a/lib/pdcp/pdcp_process.c
+++ b/lib/pdcp/pdcp_process.c
@@ -357,7 +357,7 @@  cop_prepare(const struct entity_priv *en_priv, struct rte_mbuf *mb, struct rte_c
 
 static inline bool
 pdcp_pre_process_uplane_sn_12_ul_set_sn(struct entity_priv *en_priv, struct rte_mbuf *mb,
-					uint32_t *count)
+					uint32_t *count, const bool mt_safe)
 {
 	struct rte_pdcp_up_data_pdu_sn_12_hdr *pdu_hdr;
 	const uint8_t hdr_sz = en_priv->hdr_sz;
@@ -369,7 +369,7 @@  pdcp_pre_process_uplane_sn_12_ul_set_sn(struct entity_priv *en_priv, struct rte_
 		return false;
 
 	/* Update sequence num in the PDU header */
-	*count = en_priv->state.tx_next++;
+	*count = pdcp_atomic_inc(&en_priv->state.tx_next, mt_safe);
 	sn = pdcp_sn_from_count_get(*count, RTE_SECURITY_PDCP_SN_SIZE_12);
 
 	pdu_hdr->d_c = RTE_PDCP_PDU_TYPE_DATA;
@@ -379,9 +379,11 @@  pdcp_pre_process_uplane_sn_12_ul_set_sn(struct entity_priv *en_priv, struct rte_
 	return true;
 }
 
-static uint16_t
-pdcp_pre_process_uplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
-				 struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+static inline uint16_t
+pdcp_pre_process_uplane_sn_12_ul_flags(const struct rte_pdcp_entity *entity,
+				       struct rte_mbuf *in_mb[], struct rte_crypto_op *cop[],
+				       uint16_t num, uint16_t *nb_err_ret,
+				       const bool mt_safe)
 {
 	struct entity_priv *en_priv = entity_priv_get(entity);
 	uint16_t nb_cop, nb_prep = 0, nb_err = 0;
@@ -410,7 +412,7 @@  pdcp_pre_process_uplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
 				memset(mac_i, 0, RTE_PDCP_MAC_I_LEN);
 
 			if (unlikely(!pdcp_pre_process_uplane_sn_12_ul_set_sn(en_priv, mb,
-									      &count))) {
+									      &count, mt_safe))) {
 				in_mb[nb_err++] = mb;
 				continue;
 			}
@@ -421,7 +423,7 @@  pdcp_pre_process_uplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
 		for (i = 0; i < nb_cop; i++) {
 			mb = in_mb[i];
 			if (unlikely(!pdcp_pre_process_uplane_sn_12_ul_set_sn(en_priv, mb,
-									      &count))) {
+									      &count, mt_safe))) {
 				in_mb[nb_err++] = mb;
 				continue;
 			}
@@ -439,9 +441,25 @@  pdcp_pre_process_uplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
 	return nb_prep;
 }
 
+static uint16_t
+pdcp_pre_process_uplane_sn_12_ul_st(const struct rte_pdcp_entity *entity,
+				    struct rte_mbuf *in_mb[], struct rte_crypto_op *cop[],
+				    uint16_t num, uint16_t *nb_err_ret)
+{
+	return pdcp_pre_process_uplane_sn_12_ul_flags(entity, in_mb, cop, num, nb_err_ret, false);
+}
+
+static uint16_t
+pdcp_pre_process_uplane_sn_12_ul_mt(const struct rte_pdcp_entity *entity,
+				    struct rte_mbuf *in_mb[], struct rte_crypto_op *cop[],
+				    uint16_t num, uint16_t *nb_err_ret)
+{
+	return pdcp_pre_process_uplane_sn_12_ul_flags(entity, in_mb, cop, num, nb_err_ret, true);
+}
+
 static inline bool
 pdcp_pre_process_uplane_sn_18_ul_set_sn(struct entity_priv *en_priv, struct rte_mbuf *mb,
-					uint32_t *count)
+					uint32_t *count, const bool mt_safe)
 {
 	struct rte_pdcp_up_data_pdu_sn_18_hdr *pdu_hdr;
 	const uint8_t hdr_sz = en_priv->hdr_sz;
@@ -453,7 +471,7 @@  pdcp_pre_process_uplane_sn_18_ul_set_sn(struct entity_priv *en_priv, struct rte_
 		return false;
 
 	/* Update sequence num in the PDU header */
-	*count = en_priv->state.tx_next++;
+	*count = pdcp_atomic_inc(&en_priv->state.tx_next, mt_safe);
 	sn = pdcp_sn_from_count_get(*count, RTE_SECURITY_PDCP_SN_SIZE_18);
 
 	pdu_hdr->d_c = RTE_PDCP_PDU_TYPE_DATA;
@@ -466,8 +484,9 @@  pdcp_pre_process_uplane_sn_18_ul_set_sn(struct entity_priv *en_priv, struct rte_
 }
 
 static inline uint16_t
-pdcp_pre_process_uplane_sn_18_ul(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
-				 struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+pdcp_pre_process_uplane_sn_18_ul_flags(const struct rte_pdcp_entity *entity,
+				       struct rte_mbuf *in_mb[], struct rte_crypto_op *cop[],
+				       uint16_t num, uint16_t *nb_err_ret, const bool mt_safe)
 {
 	struct entity_priv *en_priv = entity_priv_get(entity);
 	uint16_t nb_cop, nb_prep = 0, nb_err = 0;
@@ -496,7 +515,7 @@  pdcp_pre_process_uplane_sn_18_ul(const struct rte_pdcp_entity *entity, struct rt
 				memset(mac_i, 0, RTE_PDCP_MAC_I_LEN);
 
 			if (unlikely(!pdcp_pre_process_uplane_sn_18_ul_set_sn(en_priv, mb,
-									      &count))) {
+									      &count, mt_safe))) {
 				in_mb[nb_err++] = mb;
 				continue;
 			}
@@ -507,7 +526,7 @@  pdcp_pre_process_uplane_sn_18_ul(const struct rte_pdcp_entity *entity, struct rt
 		for (i = 0; i < nb_cop; i++) {
 			mb = in_mb[i];
 			if (unlikely(!pdcp_pre_process_uplane_sn_18_ul_set_sn(en_priv, mb,
-									      &count))) {
+									      &count, mt_safe))) {
 
 				in_mb[nb_err++] = mb;
 				continue;
@@ -527,8 +546,23 @@  pdcp_pre_process_uplane_sn_18_ul(const struct rte_pdcp_entity *entity, struct rt
 }
 
 static uint16_t
-pdcp_pre_process_cplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
-				 struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+pdcp_pre_process_uplane_sn_18_ul_st(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
+				    struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+{
+	return pdcp_pre_process_uplane_sn_18_ul_flags(entity, in_mb, cop, num, nb_err_ret, false);
+}
+
+static uint16_t
+pdcp_pre_process_uplane_sn_18_ul_mt(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
+				    struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+{
+	return pdcp_pre_process_uplane_sn_18_ul_flags(entity, in_mb, cop, num, nb_err_ret, true);
+}
+
+static inline uint16_t
+pdcp_pre_process_cplane_sn_12_ul_flags(const struct rte_pdcp_entity *entity,
+				       struct rte_mbuf *in_mb[], struct rte_crypto_op *cop[],
+				       uint16_t num, uint16_t *nb_err_ret, const bool mt_safe)
 {
 	struct entity_priv *en_priv = entity_priv_get(entity);
 	struct rte_pdcp_cp_data_pdu_sn_12_hdr *pdu_hdr;
@@ -565,7 +599,7 @@  pdcp_pre_process_cplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
 			memset(mac_i, 0, RTE_PDCP_MAC_I_LEN);
 
 		/* Update sequence number in the PDU header */
-		count = en_priv->state.tx_next++;
+		count = pdcp_atomic_inc(&en_priv->state.tx_next, mt_safe);
 		sn = pdcp_sn_from_count_get(count, RTE_SECURITY_PDCP_SN_SIZE_12);
 
 		pdu_hdr->sn_11_8 = ((sn & 0xf00) >> 8);
@@ -584,6 +618,21 @@  pdcp_pre_process_cplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
 	return nb_prep;
 }
 
+static uint16_t
+pdcp_pre_process_cplane_sn_12_ul_st(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
+				    struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+{
+	return pdcp_pre_process_cplane_sn_12_ul_flags(entity, in_mb, cop, num, nb_err_ret, false);
+}
+
+static uint16_t
+pdcp_pre_process_cplane_sn_12_ul_mt(const struct rte_pdcp_entity *entity, struct rte_mbuf *in_mb[],
+				    struct rte_crypto_op *cop[], uint16_t num, uint16_t *nb_err_ret)
+{
+	return pdcp_pre_process_cplane_sn_12_ul_flags(entity, in_mb, cop, num, nb_err_ret, true);
+}
+
+
 static uint16_t
 pdcp_post_process_ul(const struct rte_pdcp_entity *entity,
 		     struct rte_mbuf *in_mb[], struct rte_mbuf *out_mb[],
@@ -659,7 +708,9 @@  pdcp_pre_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
 	nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
 					  num);
 
+	pdcp_read_lock(entity);
 	const uint32_t rx_deliv = en_priv->state.rx_deliv;
+	pdcp_read_unlock(entity);
 
 	for (i = 0; i < nb_cop; i++) {
 		mb = in_mb[i];
@@ -727,7 +778,9 @@  pdcp_pre_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
 	nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
 					  num);
 
+	pdcp_read_lock(entity);
 	const uint32_t rx_deliv = en_priv->state.rx_deliv;
+	pdcp_read_unlock(entity);
 
 	for (i = 0; i < nb_cop; i++) {
 		mb = in_mb[i];
@@ -796,7 +849,9 @@  pdcp_pre_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity, struct rt
 	nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
 					  num);
 
+	pdcp_read_lock(entity);
 	const uint32_t rx_deliv = en_priv->state.rx_deliv;
+	pdcp_read_unlock(entity);
 
 	for (i = 0; i < nb_cop; i++) {
 		mb = in_mb[i];
@@ -938,6 +993,8 @@  pdcp_post_process_uplane_dl_flags(const struct rte_pdcp_entity *entity, struct r
 	struct rte_mbuf *mb;
 	uint32_t count;
 
+	pdcp_write_lock(entity);
+
 	for (i = 0; i < num; i++) {
 		mb = in_mb[i];
 		if (unlikely(mb->ol_flags & RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED))
@@ -958,6 +1015,8 @@  pdcp_post_process_uplane_dl_flags(const struct rte_pdcp_entity *entity, struct r
 		err_mb[nb_err++] = mb;
 	}
 
+	pdcp_write_unlock(entity);
+
 	if (unlikely(nb_err != 0))
 		rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
 
@@ -991,6 +1050,8 @@  pdcp_post_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity,
 	struct rte_mbuf *mb;
 	uint32_t count;
 
+	pdcp_write_lock(entity);
+
 	for (i = 0; i < num; i++) {
 		mb = in_mb[i];
 		if (unlikely(mb->ol_flags & RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED))
@@ -1011,6 +1072,8 @@  pdcp_post_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity,
 		err_mb[nb_err++] = mb;
 	}
 
+	pdcp_write_unlock(entity);
+
 	if (unlikely(nb_err != 0))
 		rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
 
@@ -1029,7 +1092,10 @@  pdcp_pre_post_func_set(struct rte_pdcp_entity *entity, const struct rte_pdcp_ent
 	if ((conf->pdcp_xfrm.domain == RTE_SECURITY_PDCP_MODE_CONTROL) &&
 	    (conf->pdcp_xfrm.sn_size == RTE_SECURITY_PDCP_SN_SIZE_12) &&
 	    (conf->pdcp_xfrm.pkt_dir == RTE_SECURITY_PDCP_UPLINK)) {
-		entity->pre_process = pdcp_pre_process_cplane_sn_12_ul;
+		if (conf->enable_thread_safety)
+			entity->pre_process = pdcp_pre_process_cplane_sn_12_ul_mt;
+		else
+			entity->pre_process = pdcp_pre_process_cplane_sn_12_ul_st;
 		entity->post_process = pdcp_post_process_ul;
 	}
 
@@ -1043,14 +1109,20 @@  pdcp_pre_post_func_set(struct rte_pdcp_entity *entity, const struct rte_pdcp_ent
 	if ((conf->pdcp_xfrm.domain == RTE_SECURITY_PDCP_MODE_DATA) &&
 	    (conf->pdcp_xfrm.sn_size == RTE_SECURITY_PDCP_SN_SIZE_12) &&
 	    (conf->pdcp_xfrm.pkt_dir == RTE_SECURITY_PDCP_UPLINK)) {
-		entity->pre_process = pdcp_pre_process_uplane_sn_12_ul;
+		if (conf->enable_thread_safety)
+			entity->pre_process = pdcp_pre_process_uplane_sn_12_ul_mt;
+		else
+			entity->pre_process = pdcp_pre_process_uplane_sn_12_ul_st;
 		entity->post_process = pdcp_post_process_ul;
 	}
 
 	if ((conf->pdcp_xfrm.domain == RTE_SECURITY_PDCP_MODE_DATA) &&
 	    (conf->pdcp_xfrm.sn_size == RTE_SECURITY_PDCP_SN_SIZE_18) &&
 	    (conf->pdcp_xfrm.pkt_dir == RTE_SECURITY_PDCP_UPLINK)) {
-		entity->pre_process = pdcp_pre_process_uplane_sn_18_ul;
+		if (conf->enable_thread_safety)
+			entity->pre_process = pdcp_pre_process_uplane_sn_18_ul_mt;
+		else
+			entity->pre_process = pdcp_pre_process_uplane_sn_18_ul_st;
 		entity->post_process = pdcp_post_process_ul;
 	}
 
@@ -1174,6 +1246,13 @@  pdcp_entity_priv_populate(struct entity_priv *en_priv, const struct rte_pdcp_ent
 	 */
 	en_priv->flags.is_out_of_order_delivery = conf->out_of_order_delivery;
 
+	/**
+	 * flags.enable_thread_safety
+	 *
+	 * Indicate whether the thread safety is enabled for PDCP entity.
+	 */
+	en_priv->flags.is_thread_safety_enabled = conf->enable_thread_safety;
+
 	/**
 	 * hdr_sz
 	 *
diff --git a/lib/pdcp/rte_pdcp.c b/lib/pdcp/rte_pdcp.c
index 1c6d2466b2..7431345d3f 100644
--- a/lib/pdcp/rte_pdcp.c
+++ b/lib/pdcp/rte_pdcp.c
@@ -92,6 +92,8 @@  pdcp_dl_establish(struct rte_pdcp_entity *entity, const struct rte_pdcp_entity_c
 	if (ret)
 		return ret;
 
+	pdcp_lock_init(entity);
+
 	return 0;
 }
 
@@ -237,8 +239,11 @@  rte_pdcp_entity_suspend(struct rte_pdcp_entity *pdcp_entity,
 		nb_out = pdcp_reorder_up_to_get(&dl->reorder, out_mb, pdcp_entity->max_pkt_cache,
 				en_priv->state.rx_next);
 		pdcp_reorder_stop(&dl->reorder);
+
+		pdcp_write_lock(pdcp_entity);
 		en_priv->state.rx_next = 0;
 		en_priv->state.rx_deliv = 0;
+		pdcp_write_unlock(pdcp_entity);
 	}
 
 	return nb_out;
@@ -299,6 +304,8 @@  rte_pdcp_t_reordering_expiry_handle(const struct rte_pdcp_entity *entity, struct
 	 *   performing header decompression, if not decompressed before:
 	 */
 
+	pdcp_write_lock(entity);
+
 	/*   - all stored PDCP SDU(s) with associated COUNT value(s) < RX_REORD; */
 	nb_out = pdcp_reorder_up_to_get(&dl->reorder, out_mb, capacity, en_priv->state.rx_reord);
 	capacity -= nb_out;
@@ -330,5 +337,7 @@  rte_pdcp_t_reordering_expiry_handle(const struct rte_pdcp_entity *entity, struct
 		dl->t_reorder.state = TIMER_EXPIRED;
 	}
 
+	pdcp_write_unlock(entity);
+
 	return nb_out;
 }
diff --git a/lib/pdcp/rte_pdcp.h b/lib/pdcp/rte_pdcp.h
index b926b0df29..88700e3737 100644
--- a/lib/pdcp/rte_pdcp.h
+++ b/lib/pdcp/rte_pdcp.h
@@ -141,6 +141,8 @@  struct rte_pdcp_entity_conf {
 	bool is_slrb;
 	/** Enable security offload on the device specified. */
 	bool en_sec_offload;
+	/** Enable usage of synchronization primitives for entity. */
+	bool enable_thread_safety;
 	/** Device on which security/crypto session need to be created. */
 	uint8_t dev_id;
 	/**