Hi Stephen,
Please see inline.
Thanks,
Anoob
> -----Original Message-----
> From: Stephen Hemminger <stephen@networkplumber.org>
> Sent: Thursday, May 25, 2023 12:02 AM
> To: Anoob Joseph <anoobj@marvell.com>
> Cc: Thomas Monjalon <thomas@monjalon.net>; Akhil Goyal
> <gakhil@marvell.com>; Jerin Jacob Kollanukkaran <jerinj@marvell.com>;
> Konstantin Ananyev <konstantin.v.ananyev@yandex.ru>; Bernard
> Iremonger <bernard.iremonger@intel.com>; Volodymyr Fialko
> <vfialko@marvell.com>; Hemant Agrawal <hemant.agrawal@nxp.com>;
> Mattias Rönnblom <mattias.ronnblom@ericsson.com>; Kiran Kumar
> Kokkilagadda <kirankumark@marvell.com>; dev@dpdk.org; Olivier Matz
> <olivier.matz@6wind.com>
> Subject: [EXT] Re: [PATCH v3 21/22] pdcp: add thread safe processing
>
> External Email
>
> ----------------------------------------------------------------------
> On Wed, 24 May 2023 21:31:15 +0530
> Anoob Joseph <anoobj@marvell.com> wrote:
>
> > From: Volodymyr Fialko <vfialko@marvell.com>
> >
> > PDCP state has to be guarded for:
> >
> > - Uplink pre_process:
> > - tx_next atomic increment
> >
> > - Downlink pre_process:
> > - rx_deliv - read
> >
> > - Downlink post_process:
> > - rx_deliv, rx_reorder, rx_next - read/write
> > - bitmask/reorder buffer - read/write
> >
> > When application requires thread safe processing, the state variables
> > need to be updated atomically. Add config option to select this option
> > per entity.
> >
> > Signed-off-by: Anoob Joseph <anoobj@marvell.com>
> > Signed-off-by: Volodymyr Fialko <vfialko@marvell.com>
>
> NAK
> Conditional locking is a bad design pattern.
> It leads to lots of problems, and makes it almost impossible for analysis tools.
[Anoob] With PDCP (& most other protocols), we have to update the states atomically. Application designers would have a choice of either use single thread or do multi-thread processing. If the library is designed for multi-thread and if application uses only single thread, then there would be unnecessary overheads from library. If library sticks to single-thread and if application needs more threads for scaling, then again it would become a library issue.
Is your issue with providing such an option or is it about how it is implemented? IPsec also has a similar challenge and similar per SA configuration is provided in lib IPsec as well.
@@ -10,6 +10,7 @@
#include <rte_mempool.h>
#include <rte_pdcp.h>
#include <rte_security.h>
+#include <rte_spinlock.h>
#include "pdcp_reorder.h"
@@ -162,6 +163,8 @@ struct entity_priv {
uint64_t is_status_report_required : 1;
/** Is out-of-order delivery enabled */
uint64_t is_out_of_order_delivery : 1;
+ /** Is thread safety disabled */
+ uint64_t is_thread_safety_disabled : 1;
} flags;
/** Crypto op pool. */
struct rte_mempool *cop_pool;
@@ -175,6 +178,8 @@ struct entity_priv {
uint8_t dev_id;
};
+typedef rte_spinlock_t pdcp_lock_t;
+
struct entity_priv_dl_part {
/** PDCP would need to track the count values that are already received.*/
struct pdcp_cnt_bitmap bitmap;
@@ -182,6 +187,8 @@ struct entity_priv_dl_part {
struct pdcp_t_reordering t_reorder;
/** Reorder packet buffer */
struct pdcp_reorder reorder;
+ /* Lock to protect concurrent updates */
+ pdcp_lock_t lock;
/** Bitmap memory region */
uint8_t bitmap_mem[0];
};
@@ -257,4 +264,43 @@ pdcp_hfn_max(enum rte_security_pdcp_sn_size sn_size)
return (1 << (32 - sn_size)) - 1;
}
+static inline uint32_t
+pdcp_atomic_inc(const struct entity_priv *en_priv, uint32_t *val)
+{
+ if (en_priv->flags.is_thread_safety_disabled)
+ return (*val)++;
+ else
+ return __atomic_fetch_add(val, 1, __ATOMIC_RELAXED);
+}
+
+static inline void
+pdcp_lock_init(const struct rte_pdcp_entity *entity)
+{
+ struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+ struct entity_priv *en_priv = entity_priv_get(entity);
+
+ if (!en_priv->flags.is_thread_safety_disabled)
+ rte_spinlock_init(&dl->lock);
+}
+
+static inline void
+pdcp_lock_lock(const struct rte_pdcp_entity *entity)
+{
+ struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+ struct entity_priv *en_priv = entity_priv_get(entity);
+
+ if (!en_priv->flags.is_thread_safety_disabled)
+ rte_spinlock_lock(&dl->lock);
+}
+
+static inline void
+pdcp_lock_unlock(const struct rte_pdcp_entity *entity)
+{
+ struct entity_priv_dl_part *dl = entity_dl_part_get(entity);
+ struct entity_priv *en_priv = entity_priv_get(entity);
+
+ if (!en_priv->flags.is_thread_safety_disabled)
+ rte_spinlock_unlock(&dl->lock);
+}
+
#endif /* PDCP_ENTITY_H */
@@ -369,7 +369,7 @@ pdcp_pre_process_uplane_sn_12_ul_set_sn(struct entity_priv *en_priv, struct rte_
return false;
/* Update sequence num in the PDU header */
- *count = en_priv->state.tx_next++;
+ *count = pdcp_atomic_inc(en_priv, &en_priv->state.tx_next);
sn = pdcp_sn_from_count_get(*count, RTE_SECURITY_PDCP_SN_SIZE_12);
pdu_hdr->d_c = RTE_PDCP_PDU_TYPE_DATA;
@@ -451,7 +451,7 @@ pdcp_pre_process_uplane_sn_18_ul_set_sn(struct entity_priv *en_priv, struct rte_
return false;
/* Update sequence num in the PDU header */
- *count = en_priv->state.tx_next++;
+ *count = pdcp_atomic_inc(en_priv, &en_priv->state.tx_next);
sn = pdcp_sn_from_count_get(*count, RTE_SECURITY_PDCP_SN_SIZE_18);
pdu_hdr->d_c = RTE_PDCP_PDU_TYPE_DATA;
@@ -561,7 +561,7 @@ pdcp_pre_process_cplane_sn_12_ul(const struct rte_pdcp_entity *entity, struct rt
memset(mac_i, 0, RTE_PDCP_MAC_I_LEN);
/* Update sequence number in the PDU header */
- count = en_priv->state.tx_next++;
+ count = pdcp_atomic_inc(en_priv, &en_priv->state.tx_next);
sn = pdcp_sn_from_count_get(count, RTE_SECURITY_PDCP_SN_SIZE_12);
pdu_hdr->sn_11_8 = ((sn & 0xf00) >> 8);
@@ -654,7 +654,9 @@ pdcp_pre_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
num);
+ pdcp_lock_lock(entity);
const uint32_t rx_deliv = en_priv->state.rx_deliv;
+ pdcp_lock_unlock(entity);
for (i = 0; i < nb_cop; i++) {
mb = in_mb[i];
@@ -717,7 +719,9 @@ pdcp_pre_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
num);
+ pdcp_lock_lock(entity);
const uint32_t rx_deliv = en_priv->state.rx_deliv;
+ pdcp_lock_unlock(entity);
for (i = 0; i < nb_cop; i++) {
mb = in_mb[i];
@@ -781,7 +785,9 @@ pdcp_pre_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity, struct rt
nb_cop = rte_crypto_op_bulk_alloc(en_priv->cop_pool, RTE_CRYPTO_OP_TYPE_SYMMETRIC, cop,
num);
+ pdcp_lock_lock(entity);
const uint32_t rx_deliv = en_priv->state.rx_deliv;
+ pdcp_lock_unlock(entity);
for (i = 0; i < nb_cop; i++) {
mb = in_mb[i];
@@ -923,6 +929,8 @@ pdcp_post_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
struct rte_mbuf *mb;
uint32_t count;
+ pdcp_lock_lock(entity);
+
for (i = 0; i < num; i++) {
mb = in_mb[i];
if (unlikely(mb->ol_flags & RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED))
@@ -949,6 +957,8 @@ pdcp_post_process_uplane_sn_12_dl_flags(const struct rte_pdcp_entity *entity,
err_mb[nb_err++] = mb;
}
+ pdcp_lock_unlock(entity);
+
if (unlikely(nb_err != 0))
rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
@@ -989,6 +999,7 @@ pdcp_post_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
int32_t rsn = 0;
uint32_t count;
+ pdcp_lock_lock(entity);
for (i = 0; i < num; i++) {
mb = in_mb[i];
@@ -1017,6 +1028,8 @@ pdcp_post_process_uplane_sn_18_dl_flags(const struct rte_pdcp_entity *entity,
err_mb[nb_err++] = mb;
}
+ pdcp_lock_unlock(entity);
+
if (unlikely(nb_err != 0))
rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
@@ -1057,6 +1070,8 @@ pdcp_post_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity,
uint32_t count;
int32_t rsn;
+ pdcp_lock_lock(entity);
+
for (i = 0; i < num; i++) {
mb = in_mb[i];
if (unlikely(mb->ol_flags & RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED))
@@ -1082,6 +1097,8 @@ pdcp_post_process_cplane_sn_12_dl(const struct rte_pdcp_entity *entity,
err_mb[nb_err++] = mb;
}
+ pdcp_lock_unlock(entity);
+
if (unlikely(nb_err != 0))
rte_memcpy(&out_mb[nb_success], err_mb, nb_err * sizeof(struct rte_mbuf *));
@@ -1245,6 +1262,13 @@ pdcp_entity_priv_populate(struct entity_priv *en_priv, const struct rte_pdcp_ent
*/
en_priv->flags.is_out_of_order_delivery = conf->out_of_order_delivery;
+ /**
+ * flags.disable_thread_safety
+ *
+ * Indicate whether the thread safety is disabled for PDCP entity.
+ */
+ en_priv->flags.is_thread_safety_disabled = conf->disable_thread_safety;
+
/**
* hdr_sz
*
@@ -72,6 +72,8 @@ pdcp_dl_establish(struct rte_pdcp_entity *entity, const struct rte_pdcp_entity_c
if (ret)
return ret;
+ pdcp_lock_init(entity);
+
return 0;
}
@@ -276,6 +278,8 @@ rte_pdcp_t_reordering_expiry_handle(const struct rte_pdcp_entity *entity, struct
* performing header decompression, if not decompressed before:
*/
+ pdcp_lock_lock(entity);
+
/* - all stored PDCP SDU(s) with associated COUNT value(s) < RX_REORD; */
nb_out = pdcp_reorder_up_to_get(&dl->reorder, out_mb, capacity, en_priv->state.rx_reord);
capacity -= nb_out;
@@ -307,5 +311,7 @@ rte_pdcp_t_reordering_expiry_handle(const struct rte_pdcp_entity *entity, struct
dl->t_reorder.state = TIMER_EXPIRED;
}
+ pdcp_lock_unlock(entity);
+
return nb_out;
}
@@ -141,6 +141,8 @@ struct rte_pdcp_entity_conf {
bool is_slrb;
/** Enable security offload on the device specified. */
bool en_sec_offload;
+ /** Disable usage of synchronization primitives for entity. */
+ bool disable_thread_safety;
/** Device on which security/crypto session need to be created. */
uint8_t dev_id;
/**