[v2,21/22] pdcp: add thread safe processing

Message ID 20230414174512.642-22-anoobj@marvell.com (mailing list archive)
State Changes Requested, archived
Delegated to: akhil goyal
Headers
Series lib: add pdcp protocol |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Anoob Joseph April 14, 2023, 5:45 p.m. UTC
  From: Volodymyr Fialko <vfialko@marvell.com>

PDCP state has to be guarded for:

- Uplink pre_process:
    - tx_next atomic increment

- Downlink pre_process:
    - rx_deliv - read

- Downlink post_process:
    - rx_deliv, rx_reorder, rx_next - read/write
    - bitmask/reorder buffer - read/write

When application requires thread safe processing, the state variables
need to be updated atomically. Add config option to select this option
per entity.

Signed-off-by: Anoob Joseph <anoobj@marvell.com>
Signed-off-by: Volodymyr Fialko <vfialko@marvell.com>
---
 lib/pdcp/pdcp_entity.h  | 46 +++++++++++++++++++++++++++++++++++++++++
 lib/pdcp/pdcp_process.c | 34 +++++++++++++++++++++++++++---
 lib/pdcp/rte_pdcp.c     |  2 ++
 lib/pdcp/rte_pdcp.h     |  2 ++
 4 files changed, 81 insertions(+), 3 deletions(-)
  

Patch

diff --git a/lib/pdcp/pdcp_entity.h b/lib/pdcp/pdcp_entity.h
index 38fa71acef..2dd6d2417d 100644
--- a/lib/pdcp/pdcp_entity.h
+++ b/lib/pdcp/pdcp_entity.h
@@ -10,6 +10,7 @@ 
 #include <rte_mempool.h>
 #include <rte_pdcp.h>
 #include <rte_security.h>
+#include <rte_spinlock.h>
 
 #include "pdcp_reorder.h"
 
@@ -162,6 +163,8 @@  struct entity_priv {
 		uint64_t is_status_report_required : 1;
 		/** Is out-of-order delivery enabled */
 		uint64_t is_out_of_order_delivery : 1;
+		/** Is thread safety disabled */
+		uint64_t is_thread_safety_disabled : 1;
 	} flags;
 	/** Crypto op pool. */
 	struct rte_mempool *cop_pool;
@@ -175,6 +178,8 @@  struct entity_priv {
 	uint8_t dev_id;
 };
 
+typedef rte_spinlock_t pdcp_lock_t;
+
 struct entity_priv_dl_part {
 	/** PDCP would need to track the count values that are already received.*/
 	struct pdcp_cnt_bitmap bitmap;
@@ -182,6 +187,8 @@  struct entity_priv_dl_part {
 	struct pdcp_t_reordering t_reorder;
 	/** Reorder packet buffer */
 	struct pdcp_reorder reorder;
+	/* Lock to protect concurrent updates */
+	pdcp_lock_t lock;
 	/** Bitmap memory region */
 	uint8_t bitmap_mem[0];
 };
@@ -257,4 +264,43 @@  pdcp_hfn_max(enum rte_security_pdcp_sn_size sn_size)
 	return (1 << (32 - sn_size)) - 1;
 }
 
+static inline uint32_t
+pdcp_atomic_inc(const struct entity_priv *en_priv, uint32_t *val)
+{
+	if (en_priv->flags.is_thread_safety_disabled)
+		return (*val)++;
+	else
+		return __atomic_fetch_add(val, 1, __ATOMIC_RELAXED);
+}
+
+static inline void
+pdcp_lock_init(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (!en_priv->flags.is_thread_safety_disabled)
+		rte_spinlock_init(&dl->lock);
+}
+
+static inline void
+pdcp_lock_lock(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (!en_priv->flags.is_thread_safety_disabled)
+		rte_spinlock_lock(&dl->lock);
+}
+
+static inline void
+pdcp_lock_unlock(const struct rte_pdcp_entity *entity)
+{
+	struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+	struct entity_priv *en_priv = entity_priv_get(entity);
+
+	if (!en_priv->flags.is_thread_safety_disabled)
+		rte_spinlock_unlock(&dl->lock);
+}
+
 #endif /* PDCP_ENTITY_H */
diff --git a/lib/pdcp/pdcp_process.c b/lib/pdcp/pdcp_process.c
index cdadc9b6b8..0bafa3447a 100644
--- a/lib/pdcp/pdcp_process.c
+++ b/lib/pdcp/pdcp_process.c
@@ -369,7 +369,7 @@  pdcp_pre_process_uplane_sn_12_ul_set_sn(struct entity_priv *en_priv, struct rte_
 		return false;
 
 	/* Update sequence num in the PDU header */
-	*count = en_priv->state.tx_next++;
+	*count = pdcp_atomic_inc(en_priv, &en_priv->state.tx_next);
 	sn = pdcp_sn_from_count_get(*count, RTE_SECURITY_PDCP_SN_SIZE_12);
 
 	pdu_hdr->d_c = RTE_PDCP_PDU_TYPE_DATA;
@@ -451,7 +451,7 @@  pdcp_pre_process_uplane_sn_18_ul_set_sn(struct entity_priv *en_priv, struct rte_
 		return false;
 
 	/* Update sequence num in the PDU header */
-	*count = en_priv->state.tx_next++;
+	*count = pdcp_atomic_inc(en_priv, &en_priv->state.tx_next);
 	sn = pdcp_sn_from_count_get(*count, RTE_SECURITY_PDCP_SN_SIZE_18);
 
 	pdu_hdr->d_c = RTE_PDCP_PDU_TYPE_DATA;
@@ -561,7 +561,7 @@  pdcp_pre_process_cplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
 			memset(mac_i, 0, PDCP_MAC_I_LEN);
 
 		/* Update sequence number in the PDU header */
-		count = en_priv->state.tx_next++;
+		count = pdcp_atomic_inc(en_priv, &en_priv->state.tx_next);
 		sn = pdcp_sn_from_count_get(count, RTE_SECURITY_PDCP_SN_SIZE_12);
 
 		pdu_hdr->sn_11_8 = ((sn & 0xf00) >> 8);
@@ -654,7 +654,9 @@  pdcp_pre_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
 	nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
 					  num);
 
+	pdcp_lock_lock(entity);
 	const uint32_t rx_deliv = en_priv->state.rx_deliv;
+	pdcp_lock_unlock(entity);
 
 	for (i = 0; i < nb_cop; i++) {
 		mb = in_mb[i];
@@ -714,7 +716,9 @@  pdcp_pre_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
 	nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
 					  num);
 
+	pdcp_lock_lock(entity);
 	const uint32_t rx_deliv = en_priv->state.rx_deliv;
+	pdcp_lock_unlock(entity);
 
 	for (i = 0; i < nb_cop; i++) {
 		mb = in_mb[i];
@@ -775,7 +779,9 @@  pdcp_pre_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity, struct rt
 	nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
 					  num);
 
+	pdcp_lock_lock(entity);
 	const uint32_t rx_deliv = en_priv->state.rx_deliv;
+	pdcp_lock_unlock(entity);
 
 	for (i = 0; i < nb_cop; i++) {
 		mb = in_mb[i];
@@ -925,6 +931,8 @@  pdcp_post_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
 	struct rte_mbuf *mb;
 	uint32_t count;
 
+	pdcp_lock_lock(entity);
+
 	for (i = 0; i < num; i++) {
 		mb = in_mb[i];
 		if (unlikely(mb->ol_flags & RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED))
@@ -954,6 +962,8 @@  pdcp_post_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
 		err_mb[nb_err++] = mb;
 	}
 
+	pdcp_lock_unlock(entity);
+
 	if (unlikely(nb_err != 0))
 		rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
 
@@ -994,6 +1004,7 @@  pdcp_post_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
 	int32_t rsn = 0;
 	uint32_t count;
 
+	pdcp_lock_lock(entity);
 
 	for (i = 0; i < num; i++) {
 		mb = in_mb[i];
@@ -1026,6 +1037,8 @@  pdcp_post_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
 		err_mb[nb_err++] = mb;
 	}
 
+	pdcp_lock_unlock(entity);
+
 	if (unlikely(nb_err != 0))
 		rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
 
@@ -1066,6 +1079,8 @@  pdcp_post_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity,
 	uint32_t count;
 	int32_t rsn;
 
+	pdcp_lock_lock(entity);
+
 	for (i = 0; i < num; i++) {
 		mb = in_mb[i];
 		if (unlikely(mb->ol_flags & RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED))
@@ -1091,6 +1106,8 @@  pdcp_post_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity,
 		err_mb[nb_err++] = mb;
 	}
 
+	pdcp_lock_unlock(entity);
+
 	if (unlikely(nb_err != 0))
 		rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
 
@@ -1254,6 +1271,13 @@  pdcp_entity_priv_populate(struct entity_priv *en_priv, const struct rte_pdcp_ent
 	 */
 	en_priv->flags.is_out_of_order_delivery = conf->out_of_order_delivery;
 
+	/**
+	 * flags.disable_thread_safety
+	 *
+	 * Indicate whether the thread safety is disabled for PDCP entity.
+	 */
+	en_priv->flags.is_thread_safety_disabled = conf->disable_thread_safety;
+
 	/**
 	 * hdr_sz
 	 *
@@ -1316,6 +1340,8 @@  rte_pdcp_t_reordering_expiry_handle(const struct rte_pdcp_entity *entity, struct
 	 *   performing header decompression, if not decompressed before:
 	 */
 
+	pdcp_lock_lock(entity);
+
 	/*   - all stored PDCP SDU(s) with associated COUNT value(s) < RX_REORD; */
 	nb_out = pdcp_reorder_up_to_get(&dl->reorder, out_mb, capacity, en_priv->state.rx_reord);
 	capacity -= nb_out;
@@ -1347,5 +1373,7 @@  rte_pdcp_t_reordering_expiry_handle(const struct rte_pdcp_entity *entity, struct
 		dl->t_reorder.state = TIMER_EXPIRED;
 	}
 
+	pdcp_lock_unlock(entity);
+
 	return nb_out;
 }
diff --git a/lib/pdcp/rte_pdcp.c b/lib/pdcp/rte_pdcp.c
index 95d2283cef..06b86c274e 100644
--- a/lib/pdcp/rte_pdcp.c
+++ b/lib/pdcp/rte_pdcp.c
@@ -72,6 +72,8 @@  pdcp_dl_establish(struct rte_pdcp_entity *entity, const struct rte_pdcp_entity_c
 	if (ret)
 		return ret;
 
+	pdcp_lock_init(entity);
+
 	return 0;
 }
 
diff --git a/lib/pdcp/rte_pdcp.h b/lib/pdcp/rte_pdcp.h
index c077acce63..a8b824a7ee 100644
--- a/lib/pdcp/rte_pdcp.h
+++ b/lib/pdcp/rte_pdcp.h
@@ -137,6 +137,8 @@  struct rte_pdcp_entity_conf {
 	bool is_slrb;
 	/** Enable security offload on the device specified. */
 	bool en_sec_offload;
+	/** Disable usage of synchronization primitives for entity. */
+	bool disable_thread_safety;
 	/** Device on which security/crypto session need to be created. */
 	uint8_t dev_id;
 	/** Reverse direction during IV generation. Can be used to simulate UE crypto processing.*/