[v3,2/9] ring: prepare ring to allow new sync schemes
Checks
Commit Message
Change from *single* to *sync_type* to allow different
synchronisation schemes to be applied.
Mark *single* as deprecated in comments.
Add new functions to allow user to query ring sync types.
Replace direct access to *single* with appopriate function call.
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
app/test/test_pdump.c | 6 +-
lib/librte_pdump/rte_pdump.c | 2 +-
lib/librte_port/rte_port_ring.c | 12 ++--
lib/librte_ring/rte_ring.c | 6 +-
lib/librte_ring/rte_ring.h | 113 ++++++++++++++++++++++++++------
lib/librte_ring/rte_ring_elem.h | 8 +--
6 files changed, 108 insertions(+), 39 deletions(-)
Comments
<snip>
> Subject: [PATCH v3 2/9] ring: prepare ring to allow new sync schemes
>
> Change from *single* to *sync_type* to allow different synchronisation
> schemes to be applied.
> Mark *single* as deprecated in comments.
> Add new functions to allow user to query ring sync types.
> Replace direct access to *single* with appopriate function call.
>
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
> app/test/test_pdump.c | 6 +-
> lib/librte_pdump/rte_pdump.c | 2 +-
> lib/librte_port/rte_port_ring.c | 12 ++--
> lib/librte_ring/rte_ring.c | 6 +-
> lib/librte_ring/rte_ring.h | 113 ++++++++++++++++++++++++++------
> lib/librte_ring/rte_ring_elem.h | 8 +--
> 6 files changed, 108 insertions(+), 39 deletions(-)
>
> diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index
> ad183184c..6a1180bcb 100644
> --- a/app/test/test_pdump.c
> +++ b/app/test/test_pdump.c
> @@ -57,8 +57,7 @@ run_pdump_client_tests(void)
> if (ret < 0)
> return -1;
> mp->flags = 0x0000;
> - ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> - RING_F_SP_ENQ | RING_F_SC_DEQ);
> + ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
Are you saying to get SP and SC behavior we now have to set the flags to 0? Isn't that a ABI break?
> if (ring_client == NULL) {
> printf("rte_ring_create SR0 failed");
> return -1;
> @@ -71,9 +70,6 @@ run_pdump_client_tests(void)
> }
> rte_eth_dev_probing_finish(eth_dev);
>
> - ring_client->prod.single = 0;
> - ring_client->cons.single = 0;
Just wondering if users outside of DPDK have done the same. I hope not, otherwise, we have an API break?
> -
> printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
>
> for (itr = 0; itr < NUM_ITR; itr++) {
> diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
> index 8a01ac510..65364f2c5 100644
> --- a/lib/librte_pdump/rte_pdump.c
> +++ b/lib/librte_pdump/rte_pdump.c
> @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct
> rte_mempool *mp)
> rte_errno = EINVAL;
> return -1;
> }
> - if (ring->prod.single || ring->cons.single) {
> + if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) {
> PDUMP_LOG(ERR, "ring with either SP or SC settings"
> " is not valid for pdump, should have MP and MC settings\n");
> rte_errno = EINVAL;
> diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
> index 47fcdd06a..2f6c050fa 100644
> --- a/lib/librte_port/rte_port_ring.c
> +++ b/lib/librte_port/rte_port_ring.c
> @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int
> socket_id,
> /* Check input parameters */
> if ((conf == NULL) ||
> (conf->ring == NULL) ||
> - (conf->ring->cons.single && is_multi) ||
> - (!(conf->ring->cons.single) && !is_multi)) {
> + (rte_ring_cons_single(conf->ring) && is_multi) ||
> + (!rte_ring_cons_single(conf->ring) && !is_multi)) {
> RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> return NULL;
> }
> @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params,
> int socket_id,
> /* Check input parameters */
> if ((conf == NULL) ||
> (conf->ring == NULL) ||
> - (conf->ring->prod.single && is_multi) ||
> - (!(conf->ring->prod.single) && !is_multi) ||
> + (rte_ring_prod_single(conf->ring) && is_multi) ||
> + (!rte_ring_prod_single(conf->ring) && !is_multi) ||
> (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> return NULL;
> @@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void
> *params, int socket_id,
> /* Check input parameters */
> if ((conf == NULL) ||
> (conf->ring == NULL) ||
> - (conf->ring->prod.single && is_multi) ||
> - (!(conf->ring->prod.single) && !is_multi) ||
> + (rte_ring_prod_single(conf->ring) && is_multi) ||
> + (!rte_ring_prod_single(conf->ring) && !is_multi) ||
> (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> return NULL;
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> 77e5de099..fa5733907 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
> if (ret < 0 || ret >= (int)sizeof(r->name))
> return -ENAMETOOLONG;
> r->flags = flags;
> - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> + r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> + r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
>
> if (flags & RING_F_EXACT_SZ) {
> r->size = rte_align32pow2(count + 1); diff --git
> a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> 18fc5d845..d4775a063 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -61,11 +61,27 @@ enum rte_ring_queue_behavior { #define
> RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> sizeof(RTE_RING_MZ_PREFIX) + 1)
>
> -/* structure to hold a pair of head/tail values and other metadata */
> +/** prod/cons sync types */
> +enum rte_ring_sync_type {
> + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */
> + RTE_RING_SYNC_ST, /**< single thread only */
> +};
> +
> +/**
> + * structure to hold a pair of head/tail values and other metadata.
> + * Depending on sync_type format of that structure might be different,
> + * but offset for *sync_type* and *tail* values should remain the same.
> + */
> struct rte_ring_headtail {
> - volatile uint32_t head; /**< Prod/consumer head. */
> - volatile uint32_t tail; /**< Prod/consumer tail. */
> - uint32_t single; /**< True if single prod/cons */
> + volatile uint32_t head; /**< prod/consumer head. */
> + volatile uint32_t tail; /**< prod/consumer tail. */
> + RTE_STD_C11
> + union {
> + /** sync type of prod/cons */
> + enum rte_ring_sync_type sync_type;
> + /** deprecated - True if single prod/cons */
> + uint32_t single;
> + };
> };
>
> /**
> @@ -116,11 +132,10 @@ struct rte_ring {
> #define RING_F_EXACT_SZ 0x0004
> #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
>
> -/* @internal defines for passing to the enqueue dequeue worker functions
> */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define
> __IS_MC 0
> +#define __IS_SP RTE_RING_SYNC_ST
> +#define __IS_MP RTE_RING_SYNC_MT
> +#define __IS_SC RTE_RING_SYNC_ST
> +#define __IS_MC RTE_RING_SYNC_MT
I think we can remove these #defines and use the new SYNC types
>
> /**
> * Calculate the memory size needed for a ring @@ -420,7 +435,7 @@
> rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> - __IS_MP, free_space);
> + RTE_RING_SYNC_MT, free_space);
> }
>
> /**
> @@ -443,7 +458,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
> unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> - __IS_SP, free_space);
> + RTE_RING_SYNC_ST, free_space);
> }
>
> /**
> @@ -470,7 +485,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
> unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> - r->prod.single, free_space);
> + r->prod.sync_type, free_space);
> }
>
> /**
> @@ -554,7 +569,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void
> **obj_table,
> unsigned int n, unsigned int *available) {
> return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> - __IS_MC, available);
> + RTE_RING_SYNC_MT, available);
> }
>
> /**
> @@ -578,7 +593,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void
> **obj_table,
> unsigned int n, unsigned int *available) {
> return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> - __IS_SC, available);
> + RTE_RING_SYNC_ST, available);
> }
>
> /**
> @@ -605,7 +620,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> **obj_table, unsigned int n,
> unsigned int *available)
> {
> return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> - r->cons.single, available);
> + r->cons.sync_type, available);
> }
>
> /**
> @@ -777,6 +792,62 @@ rte_ring_get_capacity(const struct rte_ring *r)
> return r->capacity;
> }
>
> +/**
> + * Return sync type used by producer in the ring.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * Producer sync type value.
> + */
> +static inline enum rte_ring_sync_type
> +rte_ring_get_prod_sync_type(const struct rte_ring *r) {
> + return r->prod.sync_type;
> +}
> +
> +/**
> + * Check is the ring for single producer.
^^ if
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * true if ring is SP, zero otherwise.
> + */
> +static inline int
> +rte_ring_prod_single(const struct rte_ring *r) {
would rte_ring_is_prod_single better?
> + return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); }
> +
> +/**
> + * Return sync type used by consumer in the ring.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * Consumer sync type value.
> + */
> +static inline enum rte_ring_sync_type
> +rte_ring_get_cons_sync_type(const struct rte_ring *r) {
> + return r->cons.sync_type;
> +}
> +
> +/**
> + * Check is the ring for single consumer.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * true if ring is SC, zero otherwise.
> + */
> +static inline int
> +rte_ring_cons_single(const struct rte_ring *r) {
> + return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); }
> +
All these new functions are not required to be called in the data path. They can be made non-inline.
> /**
> * Dump the status of all rings on the console
> *
> @@ -820,7 +891,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
> unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
> + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT,
> free_space);
> }
>
> /**
> @@ -843,7 +914,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
> unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
> + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST,
> free_space);
> }
>
> /**
> @@ -870,7 +941,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
> unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> - r->prod.single, free_space);
> + r->prod.sync_type, free_space);
> }
>
> /**
> @@ -898,7 +969,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void
> **obj_table,
> unsigned int n, unsigned int *available) {
> return __rte_ring_do_dequeue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
> + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT,
> available);
> }
>
> /**
> @@ -923,7 +994,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void
> **obj_table,
> unsigned int n, unsigned int *available) {
> return __rte_ring_do_dequeue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
> + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST,
> available);
> }
>
> /**
> @@ -951,7 +1022,7 @@ rte_ring_dequeue_burst(struct rte_ring *r, void
> **obj_table, {
> return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> - r->cons.single, available);
> + r->cons.sync_type, available);
> }
>
> #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> index 663addc73..28f9836e6 100644
> --- a/lib/librte_ring/rte_ring_elem.h
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -570,7 +570,7 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const
> void *obj_table,
> unsigned int esize, unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> - RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
> + RTE_RING_QUEUE_FIXED, r->prod.sync_type,
> free_space);
> }
>
> /**
> @@ -734,7 +734,7 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void
> *obj_table,
> unsigned int esize, unsigned int n, unsigned int *available) {
> return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> - RTE_RING_QUEUE_FIXED, r->cons.single, available);
> + RTE_RING_QUEUE_FIXED, r->cons.sync_type,
> available);
> }
>
> /**
> @@ -902,7 +902,7 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r,
> const void *obj_table,
> unsigned int esize, unsigned int n, unsigned int *free_space) {
> return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> - RTE_RING_QUEUE_VARIABLE, r->prod.single,
> free_space);
> + RTE_RING_QUEUE_VARIABLE, r->prod.sync_type,
> free_space);
> }
>
> /**
> @@ -995,7 +995,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void
> *obj_table, {
> return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> RTE_RING_QUEUE_VARIABLE,
> - r->cons.single, available);
> + r->cons.sync_type, available);
> }
>
> #ifdef __cplusplus
> --
> 2.17.1
> > Subject: [PATCH v3 2/9] ring: prepare ring to allow new sync schemes
> >
> > Change from *single* to *sync_type* to allow different synchronisation
> > schemes to be applied.
> > Mark *single* as deprecated in comments.
> > Add new functions to allow user to query ring sync types.
> > Replace direct access to *single* with appopriate function call.
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > ---
> > app/test/test_pdump.c | 6 +-
> > lib/librte_pdump/rte_pdump.c | 2 +-
> > lib/librte_port/rte_port_ring.c | 12 ++--
> > lib/librte_ring/rte_ring.c | 6 +-
> > lib/librte_ring/rte_ring.h | 113 ++++++++++++++++++++++++++------
> > lib/librte_ring/rte_ring_elem.h | 8 +--
> > 6 files changed, 108 insertions(+), 39 deletions(-)
> >
> > diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index
> > ad183184c..6a1180bcb 100644
> > --- a/app/test/test_pdump.c
> > +++ b/app/test/test_pdump.c
> > @@ -57,8 +57,7 @@ run_pdump_client_tests(void)
> > if (ret < 0)
> > return -1;
> > mp->flags = 0x0000;
> > - ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> > - RING_F_SP_ENQ | RING_F_SC_DEQ);
> > + ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
> Are you saying to get SP and SC behavior we now have to set the flags to 0?
No.
What the original cause does:
creates SP/SC ring:
ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
RING_F_SP_ENQ | RING_F_SC_DEQ);
Then manually makes it MP/MC by:
ring_client->prod.single = 0;
ring_client->cons.single = 0;
Instead it should just create MP/MC ring straightway, as the patch does:
ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
>Isn't that a ABI break?
I don't see any.
>
> > if (ring_client == NULL) {
> > printf("rte_ring_create SR0 failed");
> > return -1;
> > @@ -71,9 +70,6 @@ run_pdump_client_tests(void)
> > }
> > rte_eth_dev_probing_finish(eth_dev);
> >
> > - ring_client->prod.single = 0;
> > - ring_client->cons.single = 0;
> Just wondering if users outside of DPDK have done the same. I hope not, otherwise, we have an API break?
I think no. While it is completely wrong practise, it would keep working
even with these changes.
>
> > -
> > printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
> >
> > for (itr = 0; itr < NUM_ITR; itr++) {
> > diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
> > index 8a01ac510..65364f2c5 100644
> > --- a/lib/librte_pdump/rte_pdump.c
> > +++ b/lib/librte_pdump/rte_pdump.c
> > @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct
> > rte_mempool *mp)
> > rte_errno = EINVAL;
> > return -1;
> > }
> > - if (ring->prod.single || ring->cons.single) {
> > + if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) {
> > PDUMP_LOG(ERR, "ring with either SP or SC settings"
> > " is not valid for pdump, should have MP and MC settings\n");
> > rte_errno = EINVAL;
> > diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
> > index 47fcdd06a..2f6c050fa 100644
> > --- a/lib/librte_port/rte_port_ring.c
> > +++ b/lib/librte_port/rte_port_ring.c
> > @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int
> > socket_id,
> > /* Check input parameters */
> > if ((conf == NULL) ||
> > (conf->ring == NULL) ||
> > - (conf->ring->cons.single && is_multi) ||
> > - (!(conf->ring->cons.single) && !is_multi)) {
> > + (rte_ring_cons_single(conf->ring) && is_multi) ||
> > + (!rte_ring_cons_single(conf->ring) && !is_multi)) {
> > RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> > return NULL;
> > }
> > @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params,
> > int socket_id,
> > /* Check input parameters */
> > if ((conf == NULL) ||
> > (conf->ring == NULL) ||
> > - (conf->ring->prod.single && is_multi) ||
> > - (!(conf->ring->prod.single) && !is_multi) ||
> > + (rte_ring_prod_single(conf->ring) && is_multi) ||
> > + (!rte_ring_prod_single(conf->ring) && !is_multi) ||
> > (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> > RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> > return NULL;
> > @@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void
> > *params, int socket_id,
> > /* Check input parameters */
> > if ((conf == NULL) ||
> > (conf->ring == NULL) ||
> > - (conf->ring->prod.single && is_multi) ||
> > - (!(conf->ring->prod.single) && !is_multi) ||
> > + (rte_ring_prod_single(conf->ring) && is_multi) ||
> > + (!rte_ring_prod_single(conf->ring) && !is_multi) ||
> > (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> > RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> > return NULL;
> > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> > 77e5de099..fa5733907 100644
> > --- a/lib/librte_ring/rte_ring.c
> > +++ b/lib/librte_ring/rte_ring.c
> > @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> > unsigned count,
> > if (ret < 0 || ret >= (int)sizeof(r->name))
> > return -ENAMETOOLONG;
> > r->flags = flags;
> > - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> > - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> > + r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> > + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> > + r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> > + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> >
> > if (flags & RING_F_EXACT_SZ) {
> > r->size = rte_align32pow2(count + 1); diff --git
> > a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> > 18fc5d845..d4775a063 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -61,11 +61,27 @@ enum rte_ring_queue_behavior { #define
> > RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> > sizeof(RTE_RING_MZ_PREFIX) + 1)
> >
> > -/* structure to hold a pair of head/tail values and other metadata */
> > +/** prod/cons sync types */
> > +enum rte_ring_sync_type {
> > + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */
> > + RTE_RING_SYNC_ST, /**< single thread only */
> > +};
> > +
> > +/**
> > + * structure to hold a pair of head/tail values and other metadata.
> > + * Depending on sync_type format of that structure might be different,
> > + * but offset for *sync_type* and *tail* values should remain the same.
> > + */
> > struct rte_ring_headtail {
> > - volatile uint32_t head; /**< Prod/consumer head. */
> > - volatile uint32_t tail; /**< Prod/consumer tail. */
> > - uint32_t single; /**< True if single prod/cons */
> > + volatile uint32_t head; /**< prod/consumer head. */
> > + volatile uint32_t tail; /**< prod/consumer tail. */
> > + RTE_STD_C11
> > + union {
> > + /** sync type of prod/cons */
> > + enum rte_ring_sync_type sync_type;
> > + /** deprecated - True if single prod/cons */
> > + uint32_t single;
> > + };
> > };
> >
> > /**
> > @@ -116,11 +132,10 @@ struct rte_ring {
> > #define RING_F_EXACT_SZ 0x0004
> > #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
> >
> > -/* @internal defines for passing to the enqueue dequeue worker functions
> > */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define
> > __IS_MC 0
> > +#define __IS_SP RTE_RING_SYNC_ST
> > +#define __IS_MP RTE_RING_SYNC_MT
> > +#define __IS_SC RTE_RING_SYNC_ST
> > +#define __IS_MC RTE_RING_SYNC_MT
> I think we can remove these #defines and use the new SYNC types
Wouldn't that introduce an API breakage?
Or we are ok here, as they are marked as internal?
I think I can for sure mark them as deprecated.
> >
> > /**
> > * Calculate the memory size needed for a ring @@ -420,7 +435,7 @@
> > rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> > unsigned int n, unsigned int *free_space) {
> > return __rte_ring_do_enqueue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > - __IS_MP, free_space);
> > + RTE_RING_SYNC_MT, free_space);
> > }
> >
> > /**
> > @@ -443,7 +458,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *
> > const *obj_table,
> > unsigned int n, unsigned int *free_space) {
> > return __rte_ring_do_enqueue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > - __IS_SP, free_space);
> > + RTE_RING_SYNC_ST, free_space);
> > }
> >
> > /**
> > @@ -470,7 +485,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void *
> > const *obj_table,
> > unsigned int n, unsigned int *free_space) {
> > return __rte_ring_do_enqueue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > - r->prod.single, free_space);
> > + r->prod.sync_type, free_space);
> > }
> >
> > /**
> > @@ -554,7 +569,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void
> > **obj_table,
> > unsigned int n, unsigned int *available) {
> > return __rte_ring_do_dequeue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > - __IS_MC, available);
> > + RTE_RING_SYNC_MT, available);
> > }
> >
> > /**
> > @@ -578,7 +593,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void
> > **obj_table,
> > unsigned int n, unsigned int *available) {
> > return __rte_ring_do_dequeue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > - __IS_SC, available);
> > + RTE_RING_SYNC_ST, available);
> > }
> >
> > /**
> > @@ -605,7 +620,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> > **obj_table, unsigned int n,
> > unsigned int *available)
> > {
> > return __rte_ring_do_dequeue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > - r->cons.single, available);
> > + r->cons.sync_type, available);
> > }
> >
> > /**
> > @@ -777,6 +792,62 @@ rte_ring_get_capacity(const struct rte_ring *r)
> > return r->capacity;
> > }
> >
> > +/**
> > + * Return sync type used by producer in the ring.
> > + *
> > + * @param r
> > + * A pointer to the ring structure.
> > + * @return
> > + * Producer sync type value.
> > + */
> > +static inline enum rte_ring_sync_type
> > +rte_ring_get_prod_sync_type(const struct rte_ring *r) {
> > + return r->prod.sync_type;
> > +}
> > +
> > +/**
> > + * Check is the ring for single producer.
> ^^ if
> > + *
> > + * @param r
> > + * A pointer to the ring structure.
> > + * @return
> > + * true if ring is SP, zero otherwise.
> > + */
> > +static inline int
> > +rte_ring_prod_single(const struct rte_ring *r) {
> would rte_ring_is_prod_single better?
Ok, can rename.
>
> > + return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); }
> > +
> > +/**
> > + * Return sync type used by consumer in the ring.
> > + *
> > + * @param r
> > + * A pointer to the ring structure.
> > + * @return
> > + * Consumer sync type value.
> > + */
> > +static inline enum rte_ring_sync_type
> > +rte_ring_get_cons_sync_type(const struct rte_ring *r) {
> > + return r->cons.sync_type;
> > +}
> > +
> > +/**
> > + * Check is the ring for single consumer.
> > + *
> > + * @param r
> > + * A pointer to the ring structure.
> > + * @return
> > + * true if ring is SC, zero otherwise.
> > + */
> > +static inline int
> > +rte_ring_cons_single(const struct rte_ring *r) {
> > + return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); }
> > +
> All these new functions are not required to be called in the data path. They can be made non-inline.
Well, all these functions are introduced to encourage people not to
access ring fields sync_type/single directly but use functions instead.
I don't know do people access ring.single directly at data-path or not,
but assuming that they do - making these functions not-inline would
force them to ignore these functions and keep accessing it directly.
That was my thoughts besides making them inline.
I think we have the same for get_size/get_capacity().
<snip>
>
> > > Subject: [PATCH v3 2/9] ring: prepare ring to allow new sync schemes
> > >
> > > Change from *single* to *sync_type* to allow different
> > > synchronisation schemes to be applied.
> > > Mark *single* as deprecated in comments.
> > > Add new functions to allow user to query ring sync types.
> > > Replace direct access to *single* with appopriate function call.
> > >
> > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > > ---
> > > app/test/test_pdump.c | 6 +-
> > > lib/librte_pdump/rte_pdump.c | 2 +-
> > > lib/librte_port/rte_port_ring.c | 12 ++--
> > > lib/librte_ring/rte_ring.c | 6 +-
> > > lib/librte_ring/rte_ring.h | 113 ++++++++++++++++++++++++++------
> > > lib/librte_ring/rte_ring_elem.h | 8 +--
> > > 6 files changed, 108 insertions(+), 39 deletions(-)
> > >
> > > diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index
> > > ad183184c..6a1180bcb 100644
> > > --- a/app/test/test_pdump.c
> > > +++ b/app/test/test_pdump.c
> > > @@ -57,8 +57,7 @@ run_pdump_client_tests(void)
> > > if (ret < 0)
> > > return -1;
> > > mp->flags = 0x0000;
> > > - ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> > > - RING_F_SP_ENQ | RING_F_SC_DEQ);
> > > + ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> > > +0);
> > Are you saying to get SP and SC behavior we now have to set the flags to 0?
>
> No.
> What the original cause does:
> creates SP/SC ring:
> ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> RING_F_SP_ENQ | RING_F_SC_DEQ); Then
> manually makes it MP/MC by:
> ring_client->prod.single = 0;
> ring_client->cons.single = 0;
>
> Instead it should just create MP/MC ring straightway, as the patch does:
> ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
>
> >Isn't that a ABI break?
> I don't see any.
Ack
>
> >
> > > if (ring_client == NULL) {
> > > printf("rte_ring_create SR0 failed");
> > > return -1;
> > > @@ -71,9 +70,6 @@ run_pdump_client_tests(void)
> > > }
> > > rte_eth_dev_probing_finish(eth_dev);
> > >
> > > - ring_client->prod.single = 0;
> > > - ring_client->cons.single = 0;
> > Just wondering if users outside of DPDK have done the same. I hope not,
> otherwise, we have an API break?
>
> I think no. While it is completely wrong practise, it would keep working even
> with these changes.
Ack
>
> >
> > > -
> > > printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
> > >
> > > for (itr = 0; itr < NUM_ITR; itr++) { diff --git
> > > a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c index
> > > 8a01ac510..65364f2c5 100644
> > > --- a/lib/librte_pdump/rte_pdump.c
> > > +++ b/lib/librte_pdump/rte_pdump.c
> > > @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring,
> > > struct rte_mempool *mp)
> > > rte_errno = EINVAL;
> > > return -1;
> > > }
> > > - if (ring->prod.single || ring->cons.single) {
> > > + if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) {
> > > PDUMP_LOG(ERR, "ring with either SP or SC settings"
> > > " is not valid for pdump, should have MP and MC settings\n");
> > > rte_errno = EINVAL;
> > > diff --git a/lib/librte_port/rte_port_ring.c
> > > b/lib/librte_port/rte_port_ring.c index 47fcdd06a..2f6c050fa 100644
> > > --- a/lib/librte_port/rte_port_ring.c
> > > +++ b/lib/librte_port/rte_port_ring.c
> > > @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params,
> > > int socket_id,
> > > /* Check input parameters */
> > > if ((conf == NULL) ||
> > > (conf->ring == NULL) ||
> > > - (conf->ring->cons.single && is_multi) ||
> > > - (!(conf->ring->cons.single) && !is_multi)) {
> > > + (rte_ring_cons_single(conf->ring) && is_multi) ||
> > > + (!rte_ring_cons_single(conf->ring) && !is_multi)) {
> > > RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> > > return NULL;
> > > }
> > > @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void
> > > *params, int socket_id,
> > > /* Check input parameters */
> > > if ((conf == NULL) ||
> > > (conf->ring == NULL) ||
> > > - (conf->ring->prod.single && is_multi) ||
> > > - (!(conf->ring->prod.single) && !is_multi) ||
> > > + (rte_ring_prod_single(conf->ring) && is_multi) ||
> > > + (!rte_ring_prod_single(conf->ring) && !is_multi) ||
> > > (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> > > RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> > > return NULL;
> > > @@ -440,8 +440,8 @@
> rte_port_ring_writer_nodrop_create_internal(void
> > > *params, int socket_id,
> > > /* Check input parameters */
> > > if ((conf == NULL) ||
> > > (conf->ring == NULL) ||
> > > - (conf->ring->prod.single && is_multi) ||
> > > - (!(conf->ring->prod.single) && !is_multi) ||
> > > + (rte_ring_prod_single(conf->ring) && is_multi) ||
> > > + (!rte_ring_prod_single(conf->ring) && !is_multi) ||
> > > (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> > > RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> > > return NULL;
> > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> > > index
> > > 77e5de099..fa5733907 100644
> > > --- a/lib/librte_ring/rte_ring.c
> > > +++ b/lib/librte_ring/rte_ring.c
> > > @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char
> > > *name, unsigned count,
> > > if (ret < 0 || ret >= (int)sizeof(r->name))
> > > return -ENAMETOOLONG;
> > > r->flags = flags;
> > > - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> > > - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> > > + r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> > > + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> > > + r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> > > + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> > >
> > > if (flags & RING_F_EXACT_SZ) {
> > > r->size = rte_align32pow2(count + 1); diff --git
> > > a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> > > 18fc5d845..d4775a063 100644
> > > --- a/lib/librte_ring/rte_ring.h
> > > +++ b/lib/librte_ring/rte_ring.h
> > > @@ -61,11 +61,27 @@ enum rte_ring_queue_behavior { #define
> > > RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> > > sizeof(RTE_RING_MZ_PREFIX) + 1)
> > >
> > > -/* structure to hold a pair of head/tail values and other metadata
> > > */
> > > +/** prod/cons sync types */
> > > +enum rte_ring_sync_type {
> > > + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */
> > > + RTE_RING_SYNC_ST, /**< single thread only */
> > > +};
> > > +
> > > +/**
> > > + * structure to hold a pair of head/tail values and other metadata.
> > > + * Depending on sync_type format of that structure might be
> > > +different,
> > > + * but offset for *sync_type* and *tail* values should remain the same.
> > > + */
> > > struct rte_ring_headtail {
> > > - volatile uint32_t head; /**< Prod/consumer head. */
> > > - volatile uint32_t tail; /**< Prod/consumer tail. */
> > > - uint32_t single; /**< True if single prod/cons */
> > > + volatile uint32_t head; /**< prod/consumer head. */
> > > + volatile uint32_t tail; /**< prod/consumer tail. */
> > > + RTE_STD_C11
> > > + union {
> > > + /** sync type of prod/cons */
> > > + enum rte_ring_sync_type sync_type;
> > > + /** deprecated - True if single prod/cons */
> > > + uint32_t single;
> > > + };
> > > };
> > >
> > > /**
> > > @@ -116,11 +132,10 @@ struct rte_ring { #define RING_F_EXACT_SZ
> > > 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask
> > > */
> > >
> > > -/* @internal defines for passing to the enqueue dequeue worker
> > > functions */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC
> > > 1 -#define __IS_MC 0
> > > +#define __IS_SP RTE_RING_SYNC_ST
> > > +#define __IS_MP RTE_RING_SYNC_MT
> > > +#define __IS_SC RTE_RING_SYNC_ST
> > > +#define __IS_MC RTE_RING_SYNC_MT
> > I think we can remove these #defines and use the new SYNC types
>
> Wouldn't that introduce an API breakage?
> Or we are ok here, as they are marked as internal?
> I think I can for sure mark them as deprecated.
I think they are internal.
Although it does not apply to your patch, rte_ring_queue_behavior also should be internal.
>
> > >
> > > /**
> > > * Calculate the memory size needed for a ring @@ -420,7 +435,7 @@
> > > rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> > > unsigned int n, unsigned int *free_space) {
> > > return __rte_ring_do_enqueue(r, obj_table, n,
> > > RTE_RING_QUEUE_FIXED,
> > > - __IS_MP, free_space);
> > > + RTE_RING_SYNC_MT, free_space);
> > > }
> > >
> > > /**
> > > @@ -443,7 +458,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r,
> > > void * const *obj_table,
> > > unsigned int n, unsigned int *free_space) {
> > > return __rte_ring_do_enqueue(r, obj_table, n,
> > > RTE_RING_QUEUE_FIXED,
> > > - __IS_SP, free_space);
> > > + RTE_RING_SYNC_ST, free_space);
> > > }
> > >
> > > /**
> > > @@ -470,7 +485,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void *
> > > const *obj_table,
> > > unsigned int n, unsigned int *free_space) {
> > > return __rte_ring_do_enqueue(r, obj_table, n,
> > > RTE_RING_QUEUE_FIXED,
> > > - r->prod.single, free_space);
> > > + r->prod.sync_type, free_space);
> > > }
> > >
> > > /**
> > > @@ -554,7 +569,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r,
> > > void **obj_table,
> > > unsigned int n, unsigned int *available) {
> > > return __rte_ring_do_dequeue(r, obj_table, n,
> > > RTE_RING_QUEUE_FIXED,
> > > - __IS_MC, available);
> > > + RTE_RING_SYNC_MT, available);
> > > }
> > >
> > > /**
> > > @@ -578,7 +593,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r,
> > > void **obj_table,
> > > unsigned int n, unsigned int *available) {
> > > return __rte_ring_do_dequeue(r, obj_table, n,
> > > RTE_RING_QUEUE_FIXED,
> > > - __IS_SC, available);
> > > + RTE_RING_SYNC_ST, available);
> > > }
> > >
> > > /**
> > > @@ -605,7 +620,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> > > **obj_table, unsigned int n,
> > > unsigned int *available)
> > > {
> > > return __rte_ring_do_dequeue(r, obj_table, n,
> > > RTE_RING_QUEUE_FIXED,
> > > - r->cons.single, available);
> > > + r->cons.sync_type, available);
> > > }
> > >
> > > /**
> > > @@ -777,6 +792,62 @@ rte_ring_get_capacity(const struct rte_ring *r)
> > > return r->capacity;
> > > }
> > >
> > > +/**
> > > + * Return sync type used by producer in the ring.
> > > + *
> > > + * @param r
> > > + * A pointer to the ring structure.
> > > + * @return
> > > + * Producer sync type value.
> > > + */
> > > +static inline enum rte_ring_sync_type
> > > +rte_ring_get_prod_sync_type(const struct rte_ring *r) {
> > > + return r->prod.sync_type;
> > > +}
> > > +
> > > +/**
> > > + * Check is the ring for single producer.
> > ^^ if
> > > + *
> > > + * @param r
> > > + * A pointer to the ring structure.
> > > + * @return
> > > + * true if ring is SP, zero otherwise.
> > > + */
> > > +static inline int
> > > +rte_ring_prod_single(const struct rte_ring *r) {
> > would rte_ring_is_prod_single better?
>
> Ok, can rename.
>
> >
> > > + return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); }
> > > +
> > > +/**
> > > + * Return sync type used by consumer in the ring.
> > > + *
> > > + * @param r
> > > + * A pointer to the ring structure.
> > > + * @return
> > > + * Consumer sync type value.
> > > + */
> > > +static inline enum rte_ring_sync_type
> > > +rte_ring_get_cons_sync_type(const struct rte_ring *r) {
> > > + return r->cons.sync_type;
> > > +}
> > > +
> > > +/**
> > > + * Check is the ring for single consumer.
> > > + *
> > > + * @param r
> > > + * A pointer to the ring structure.
> > > + * @return
> > > + * true if ring is SC, zero otherwise.
> > > + */
> > > +static inline int
> > > +rte_ring_cons_single(const struct rte_ring *r) {
> > > + return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); }
> > > +
> > All these new functions are not required to be called in the data path. They
> can be made non-inline.
>
> Well, all these functions are introduced to encourage people not to access
> ring fields sync_type/single directly but use functions instead.
> I don't know do people access ring.single directly at data-path or not, but
> assuming that they do - making these functions not-inline would force them
> to ignore these functions and keep accessing it directly.
> That was my thoughts besides making them inline.
> I think we have the same for get_size/get_capacity().
Ack
@@ -57,8 +57,7 @@ run_pdump_client_tests(void)
if (ret < 0)
return -1;
mp->flags = 0x0000;
- ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
- RING_F_SP_ENQ | RING_F_SC_DEQ);
+ ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
if (ring_client == NULL) {
printf("rte_ring_create SR0 failed");
return -1;
@@ -71,9 +70,6 @@ run_pdump_client_tests(void)
}
rte_eth_dev_probing_finish(eth_dev);
- ring_client->prod.single = 0;
- ring_client->cons.single = 0;
-
printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
for (itr = 0; itr < NUM_ITR; itr++) {
@@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp)
rte_errno = EINVAL;
return -1;
}
- if (ring->prod.single || ring->cons.single) {
+ if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) {
PDUMP_LOG(ERR, "ring with either SP or SC settings"
" is not valid for pdump, should have MP and MC settings\n");
rte_errno = EINVAL;
@@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int socket_id,
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->cons.single && is_multi) ||
- (!(conf->ring->cons.single) && !is_multi)) {
+ (rte_ring_cons_single(conf->ring) && is_multi) ||
+ (!rte_ring_cons_single(conf->ring) && !is_multi)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
}
@@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params, int socket_id,
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->prod.single && is_multi) ||
- (!(conf->ring->prod.single) && !is_multi) ||
+ (rte_ring_prod_single(conf->ring) && is_multi) ||
+ (!rte_ring_prod_single(conf->ring) && !is_multi) ||
(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
@@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->prod.single && is_multi) ||
- (!(conf->ring->prod.single) && !is_multi) ||
+ (rte_ring_prod_single(conf->ring) && is_multi) ||
+ (!rte_ring_prod_single(conf->ring) && !is_multi) ||
(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
@@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
if (ret < 0 || ret >= (int)sizeof(r->name))
return -ENAMETOOLONG;
r->flags = flags;
- r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
- r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+ r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
+ RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
+ r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
+ RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
if (flags & RING_F_EXACT_SZ) {
r->size = rte_align32pow2(count + 1);
@@ -61,11 +61,27 @@ enum rte_ring_queue_behavior {
#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
sizeof(RTE_RING_MZ_PREFIX) + 1)
-/* structure to hold a pair of head/tail values and other metadata */
+/** prod/cons sync types */
+enum rte_ring_sync_type {
+ RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */
+ RTE_RING_SYNC_ST, /**< single thread only */
+};
+
+/**
+ * structure to hold a pair of head/tail values and other metadata.
+ * Depending on sync_type format of that structure might be different,
+ * but offset for *sync_type* and *tail* values should remain the same.
+ */
struct rte_ring_headtail {
- volatile uint32_t head; /**< Prod/consumer head. */
- volatile uint32_t tail; /**< Prod/consumer tail. */
- uint32_t single; /**< True if single prod/cons */
+ volatile uint32_t head; /**< prod/consumer head. */
+ volatile uint32_t tail; /**< prod/consumer tail. */
+ RTE_STD_C11
+ union {
+ /** sync type of prod/cons */
+ enum rte_ring_sync_type sync_type;
+ /** deprecated - True if single prod/cons */
+ uint32_t single;
+ };
};
/**
@@ -116,11 +132,10 @@ struct rte_ring {
#define RING_F_EXACT_SZ 0x0004
#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
-/* @internal defines for passing to the enqueue dequeue worker functions */
-#define __IS_SP 1
-#define __IS_MP 0
-#define __IS_SC 1
-#define __IS_MC 0
+#define __IS_SP RTE_RING_SYNC_ST
+#define __IS_MP RTE_RING_SYNC_MT
+#define __IS_SC RTE_RING_SYNC_ST
+#define __IS_MC RTE_RING_SYNC_MT
/**
* Calculate the memory size needed for a ring
@@ -420,7 +435,7 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MP, free_space);
+ RTE_RING_SYNC_MT, free_space);
}
/**
@@ -443,7 +458,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SP, free_space);
+ RTE_RING_SYNC_ST, free_space);
}
/**
@@ -470,7 +485,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->prod.single, free_space);
+ r->prod.sync_type, free_space);
}
/**
@@ -554,7 +569,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MC, available);
+ RTE_RING_SYNC_MT, available);
}
/**
@@ -578,7 +593,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SC, available);
+ RTE_RING_SYNC_ST, available);
}
/**
@@ -605,7 +620,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->cons.single, available);
+ r->cons.sync_type, available);
}
/**
@@ -777,6 +792,62 @@ rte_ring_get_capacity(const struct rte_ring *r)
return r->capacity;
}
+/**
+ * Return sync type used by producer in the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * Producer sync type value.
+ */
+static inline enum rte_ring_sync_type
+rte_ring_get_prod_sync_type(const struct rte_ring *r)
+{
+ return r->prod.sync_type;
+}
+
+/**
+ * Check is the ring for single producer.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * true if ring is SP, zero otherwise.
+ */
+static inline int
+rte_ring_prod_single(const struct rte_ring *r)
+{
+ return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
+/**
+ * Return sync type used by consumer in the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * Consumer sync type value.
+ */
+static inline enum rte_ring_sync_type
+rte_ring_get_cons_sync_type(const struct rte_ring *r)
+{
+ return r->cons.sync_type;
+}
+
+/**
+ * Check is the ring for single consumer.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * true if ring is SC, zero otherwise.
+ */
+static inline int
+rte_ring_cons_single(const struct rte_ring *r)
+{
+ return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
/**
* Dump the status of all rings on the console
*
@@ -820,7 +891,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
}
/**
@@ -843,7 +914,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
}
/**
@@ -870,7 +941,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
- r->prod.single, free_space);
+ r->prod.sync_type, free_space);
}
/**
@@ -898,7 +969,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
}
/**
@@ -923,7 +994,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
}
/**
@@ -951,7 +1022,7 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
{
return __rte_ring_do_dequeue(r, obj_table, n,
RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ r->cons.sync_type, available);
}
#ifdef __cplusplus
@@ -570,7 +570,7 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+ RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
}
/**
@@ -734,7 +734,7 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, r->cons.single, available);
+ RTE_RING_QUEUE_FIXED, r->cons.sync_type, available);
}
/**
@@ -902,7 +902,7 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+ RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space);
}
/**
@@ -995,7 +995,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ r->cons.sync_type, available);
}
#ifdef __cplusplus