@@ -388,11 +388,12 @@ __mlx5_hws_cnt_pool_enqueue_revert(struct rte_ring *r, unsigned int n,
MLX5_ASSERT(r->prod.sync_type == RTE_RING_SYNC_ST);
MLX5_ASSERT(r->cons.sync_type == RTE_RING_SYNC_ST);
- current_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
+ current_head = rte_atomic_load_explicit(&r->prod.head.val.pos,
+ rte_memory_order_relaxed);
MLX5_ASSERT(n <= r->capacity);
MLX5_ASSERT(n <= rte_ring_count(r));
revert2head = current_head - n;
- r->prod.head = revert2head; /* This ring should be SP. */
+ r->prod.head.val.pos = revert2head; /* This ring should be SP. */
__rte_ring_get_elem_addr(r, revert2head, sizeof(cnt_id_t), n,
&zcd->ptr1, &zcd->n1, &zcd->ptr2);
/* Update tail */
@@ -325,7 +325,7 @@ eth_get_monitor_addr(void *rx_queue, struct rte_power_monitor_cond *pmc)
*/
pmc->addr = &rng->prod.head;
pmc->size = sizeof(rng->prod.head);
- pmc->opaque[0] = rng->prod.head;
+ pmc->opaque[0] = rng->prod.head.val.pos;
pmc->fn = ring_monitor_callback;
return 0;
}
@@ -102,7 +102,7 @@ reset_headtail(void *p)
switch (ht->sync_type) {
case RTE_RING_SYNC_MT:
case RTE_RING_SYNC_ST:
- ht->head = 0;
+ ht->head.raw = 0;
ht->tail = 0;
break;
case RTE_RING_SYNC_MT_RTS:
@@ -373,9 +373,9 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
fprintf(f, " size=%"PRIu32"\n", r->size);
fprintf(f, " capacity=%"PRIu32"\n", r->capacity);
fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
- fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
+ fprintf(f, " ch=%"PRIu32"\n", r->cons.head.val.pos);
fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
- fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
+ fprintf(f, " ph=%"PRIu32"\n", r->prod.head.val.pos);
fprintf(f, " used=%u\n", rte_ring_count(r));
fprintf(f, " avail=%u\n", rte_ring_free_count(r));
}
@@ -66,8 +66,17 @@ enum rte_ring_sync_type {
* Depending on sync_type format of that structure might be different,
* but offset for *sync_type* and *tail* values should remain the same.
*/
+union __rte_ring_head_cft {
+ /** raw 8B value to read/write *cnt* and *pos* as one atomic op */
+ alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+ struct {
+ uint32_t pos; /**< head position */
+ uint32_t cft; /**< cached foreign tail value*/
+ } val;
+};
+
struct rte_ring_headtail {
- volatile RTE_ATOMIC(uint32_t) head; /**< prod/consumer head. */
+ uint32_t __unused;
volatile RTE_ATOMIC(uint32_t) tail; /**< prod/consumer tail. */
union {
/** sync type of prod/cons */
@@ -75,6 +84,7 @@ struct rte_ring_headtail {
/** deprecated - True if single prod/cons */
uint32_t single;
};
+ union __rte_ring_head_cft head;
};
union __rte_ring_rts_poscnt {
@@ -38,17 +38,18 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
{
unsigned int max = n;
int success;
+ uint32_t tail;
+ union __rte_ring_head_cft nh, oh;
+
+ oh.raw = rte_atomic_load_explicit(&d->head.raw,
+ rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
n = max;
- *old_head = d->head;
-
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
+ *old_head = oh.val.pos;
+ tail = oh.val.cft;
/*
* The subtraction is done between two unsigned 32bits value
@@ -56,24 +57,41 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
* *old_head > s->tail). So 'free_entries' is always between 0
* and capacity (which is < size).
*/
- *entries = (capacity + s->tail - *old_head);
+ *entries = (capacity + tail - *old_head);
- /* check that we have enough room in ring */
- if (unlikely(n > *entries))
- n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ /* attempt #1: check that we have enough room with
+ * cached-foreign-tail value.
+ * Note that actual tail value can go forward till we cached
+ * it, in that case we might have to update our cached value.
+ */
+ if (unlikely(n > *entries)) {
+
+ tail = rte_atomic_load_explicit(&s->tail,
+ rte_memory_order_relaxed);
+ *entries = (capacity + tail - *old_head);
+
+ /* attempt #2: check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
0 : *entries;
+ }
if (n == 0)
return 0;
*new_head = *old_head + n;
+ nh.val.pos = *new_head;
+ nh.val.cft = tail;
+
if (is_st) {
- d->head = *new_head;
+ d->head.raw = nh.raw;
success = 1;
} else
- success = rte_atomic32_cmpset(
- (uint32_t *)(uintptr_t)&d->head,
- *old_head, *new_head);
+ success = rte_atomic_compare_exchange_strong_explicit(
+ &d->head.raw, (uint64_t *)(uintptr_t)&oh.raw,
+ nh.raw, rte_memory_order_acquire,
+ rte_memory_order_acquire);
+
} while (unlikely(success == 0));
return n;
}
@@ -33,7 +33,7 @@ __rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail,
{
uint32_t h, n, t;
- h = ht->head;
+ h = ht->head.val.pos;
t = ht->tail;
n = h - t;
@@ -58,7 +58,7 @@ __rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail,
RTE_SET_USED(enqueue);
pos = tail + num;
- ht->head = pos;
+ ht->head.val.pos = pos;
rte_atomic_store_explicit(&ht->tail, pos, rte_memory_order_release);
}
@@ -90,7 +90,8 @@ __rte_soring_stage_finalize(struct soring_stage_headtail *sht,
* already finished.
*/
- head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
+ head = rte_atomic_load_explicit(&sht->head.val.pos,
+ rte_memory_order_relaxed);
n = RTE_MIN(head - ot.pos, maxn);
for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
@@ -213,22 +214,36 @@ __rte_soring_stage_move_head(struct soring_stage_headtail *d,
uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
{
uint32_t n, tail;
+ union __rte_ring_head_cft nh, oh;
- *old_head = rte_atomic_load_explicit(&d->head,
+ oh.raw = rte_atomic_load_explicit(&d->head.raw,
rte_memory_order_acquire);
do {
n = num;
- tail = rte_atomic_load_explicit(&s->tail,
- rte_memory_order_relaxed);
+ *old_head = oh.val.pos;
+ tail = oh.val.cft;
*avail = capacity + tail - *old_head;
- if (n > *avail)
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
+
+ if (n > *avail) {
+ tail = rte_atomic_load_explicit(&s->tail,
+ rte_memory_order_relaxed);
+ *avail = capacity + tail - *old_head;
+
+ if (n > *avail)
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ 0 : *avail;
+ }
+
if (n == 0)
return 0;
+
*new_head = *old_head + n;
- } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
- old_head, *new_head, rte_memory_order_acquire,
+ nh.val.pos = *new_head;
+ nh.val.cft = tail;
+
+ } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
+ &oh.raw, nh.raw, rte_memory_order_acquire,
rte_memory_order_acquire) == 0);
return n;
@@ -60,8 +60,8 @@ union soring_stage_tail {
struct soring_stage_headtail {
volatile union soring_stage_tail tail;
- enum rte_ring_sync_type unused; /**< unused */
- volatile RTE_ATOMIC(uint32_t) head;
+ enum rte_ring_sync_type __unused; /**< unused */
+ union __rte_ring_head_cft head;
};
/**