[v6,07/13] vhost: flush enqueue updates by batch
Checks
Commit Message
Buffer vhost enqueue shadowed ring flush action buffered number exceed
one batch. Thus virtio can receive packets at a faster frequency.
Signed-off-by: Marvin Liu <yong.liu@intel.com>
Comments
On 10/15/19 6:07 PM, Marvin Liu wrote:
> Buffer vhost enqueue shadowed ring flush action buffered number exceed
> one batch. Thus virtio can receive packets at a faster frequency.
The commit message isn't very clear to me. Could you please improve it?
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
>
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 96bf763b1..a60b88d89 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -166,6 +166,8 @@ struct vhost_virtqueue {
> struct vring_used_elem_packed *shadow_used_packed;
> };
> uint16_t shadow_used_idx;
> + /* Record packed ring enqueue latest desc cache aligned index */
> + uint16_t shadow_aligned_idx;
> struct vhost_vring_addr ring_addrs;
>
> struct batch_copy_elem *batch_copy_elems;
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 274a28f99..020c9b858 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -91,6 +91,69 @@ update_shadow_used_ring_split(struct vhost_virtqueue *vq,
> vq->shadow_used_split[i].len = len;
> }
>
> +static __rte_always_inline void
> +vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
> + struct vhost_virtqueue *vq)
> +{
> + int i;
> + uint16_t used_idx = vq->last_used_idx;
> + uint16_t head_idx = vq->last_used_idx;
> + uint16_t head_flags = 0;
> +
> + /* Split loop in two to save memory barriers */
> + for (i = 0; i < vq->shadow_used_idx; i++) {
> + vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
> + vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
> +
> + used_idx += vq->shadow_used_packed[i].count;
> + if (used_idx >= vq->size)
> + used_idx -= vq->size;
> + }
> +
> + rte_smp_wmb();
> +
> + for (i = 0; i < vq->shadow_used_idx; i++) {
> + uint16_t flags;
> +
> + if (vq->shadow_used_packed[i].len)
> + flags = VRING_DESC_F_WRITE;
> + else
> + flags = 0;
> +
> + if (vq->used_wrap_counter) {
> + flags |= VRING_DESC_F_USED;
> + flags |= VRING_DESC_F_AVAIL;
> + } else {
> + flags &= ~VRING_DESC_F_USED;
> + flags &= ~VRING_DESC_F_AVAIL;
> + }
> +
> + if (i > 0) {
> + vq->desc_packed[vq->last_used_idx].flags = flags;
> +
> + vhost_log_cache_used_vring(dev, vq,
> + vq->last_used_idx *
> + sizeof(struct vring_packed_desc),
> + sizeof(struct vring_packed_desc));
> + } else {
> + head_idx = vq->last_used_idx;
> + head_flags = flags;
> + }
> +
> + vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
> + }
> +
> + vq->desc_packed[head_idx].flags = head_flags;
> +
> + vhost_log_cache_used_vring(dev, vq,
> + head_idx *
> + sizeof(struct vring_packed_desc),
> + sizeof(struct vring_packed_desc));
> +
> + vq->shadow_used_idx = 0;
> + vhost_log_cache_sync(dev, vq);
> +}
> +
> static __rte_always_inline void
> flush_shadow_used_ring_packed(struct virtio_net *dev,
> struct vhost_virtqueue *vq)
> @@ -194,6 +257,33 @@ do_data_copy_dequeue(struct vhost_virtqueue *vq)
> vq->batch_copy_nb_elems = 0;
> }
>
> +static __rte_always_inline void
> +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> + struct vhost_virtqueue *vq,
> + uint32_t len[],
> + uint16_t id[],
> + uint16_t count[],
> + uint16_t num_buffers)
> +{
> + uint16_t i;
> + for (i = 0; i < num_buffers; i++) {
> + /* enqueue shadow flush action aligned with batch num */
> + if (!vq->shadow_used_idx)
> + vq->shadow_aligned_idx = vq->last_used_idx &
> + PACKED_BATCH_MASK;
> + vq->shadow_used_packed[vq->shadow_used_idx].id = id[i];
> + vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
> + vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
> + vq->shadow_aligned_idx += count[i];
> + vq->shadow_used_idx++;
> + }
> +
> + if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
> + do_data_copy_enqueue(dev, vq);
> + vhost_flush_enqueue_shadow_packed(dev, vq);
> + }
> +}
> +
> /* avoid write operation when necessary, to lessen cache issues */
> #define ASSIGN_UNLESS_EQUAL(var, val) do { \
> if ((var) != (val)) \
> @@ -785,6 +875,9 @@ vhost_enqueue_single_packed(struct virtio_net *dev,
> uint16_t desc_count;
> uint32_t size = pkt->pkt_len + dev->vhost_hlen;
> uint16_t num_buffers = 0;
> + uint32_t buffer_len[vq->size];
> + uint16_t buffer_buf_id[vq->size];
> + uint16_t buffer_desc_count[vq->size];
With rings up to 1024 elements, maybe it would be better to have that
allocated as vq metadata like shadow_used_packed?
>
> if (rxvq_is_mergeable(dev))
> max_tries = vq->size - 1;
> @@ -810,6 +903,9 @@ vhost_enqueue_single_packed(struct virtio_net *dev,
> len = RTE_MIN(len, size);
> size -= len;
>
> + buffer_len[num_buffers] = len;
> + buffer_buf_id[num_buffers] = buf_id;
> + buffer_desc_count[num_buffers] = desc_count;
> num_buffers += 1;
>
> *nr_descs += desc_count;
> @@ -821,6 +917,9 @@ vhost_enqueue_single_packed(struct virtio_net *dev,
> if (copy_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers) < 0)
> return -1;
>
> + vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
> + buffer_desc_count, num_buffers);
> +
> return 0;
> }
>
> @@ -1017,7 +1116,7 @@ virtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
> do_data_copy_enqueue(dev, vq);
>
> if (likely(vq->shadow_used_idx)) {
> - flush_shadow_used_ring_packed(dev, vq);
> + vhost_flush_enqueue_shadow_packed(dev, vq);
> vhost_vring_call_packed(dev, vq);
> }
>
>
@@ -166,6 +166,8 @@ struct vhost_virtqueue {
struct vring_used_elem_packed *shadow_used_packed;
};
uint16_t shadow_used_idx;
+ /* Record packed ring enqueue latest desc cache aligned index */
+ uint16_t shadow_aligned_idx;
struct vhost_vring_addr ring_addrs;
struct batch_copy_elem *batch_copy_elems;
@@ -91,6 +91,69 @@ update_shadow_used_ring_split(struct vhost_virtqueue *vq,
vq->shadow_used_split[i].len = len;
}
+static __rte_always_inline void
+vhost_flush_enqueue_shadow_packed(struct virtio_net *dev,
+ struct vhost_virtqueue *vq)
+{
+ int i;
+ uint16_t used_idx = vq->last_used_idx;
+ uint16_t head_idx = vq->last_used_idx;
+ uint16_t head_flags = 0;
+
+ /* Split loop in two to save memory barriers */
+ for (i = 0; i < vq->shadow_used_idx; i++) {
+ vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
+ vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
+
+ used_idx += vq->shadow_used_packed[i].count;
+ if (used_idx >= vq->size)
+ used_idx -= vq->size;
+ }
+
+ rte_smp_wmb();
+
+ for (i = 0; i < vq->shadow_used_idx; i++) {
+ uint16_t flags;
+
+ if (vq->shadow_used_packed[i].len)
+ flags = VRING_DESC_F_WRITE;
+ else
+ flags = 0;
+
+ if (vq->used_wrap_counter) {
+ flags |= VRING_DESC_F_USED;
+ flags |= VRING_DESC_F_AVAIL;
+ } else {
+ flags &= ~VRING_DESC_F_USED;
+ flags &= ~VRING_DESC_F_AVAIL;
+ }
+
+ if (i > 0) {
+ vq->desc_packed[vq->last_used_idx].flags = flags;
+
+ vhost_log_cache_used_vring(dev, vq,
+ vq->last_used_idx *
+ sizeof(struct vring_packed_desc),
+ sizeof(struct vring_packed_desc));
+ } else {
+ head_idx = vq->last_used_idx;
+ head_flags = flags;
+ }
+
+ vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
+ }
+
+ vq->desc_packed[head_idx].flags = head_flags;
+
+ vhost_log_cache_used_vring(dev, vq,
+ head_idx *
+ sizeof(struct vring_packed_desc),
+ sizeof(struct vring_packed_desc));
+
+ vq->shadow_used_idx = 0;
+ vhost_log_cache_sync(dev, vq);
+}
+
static __rte_always_inline void
flush_shadow_used_ring_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq)
@@ -194,6 +257,33 @@ do_data_copy_dequeue(struct vhost_virtqueue *vq)
vq->batch_copy_nb_elems = 0;
}
+static __rte_always_inline void
+vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
+ struct vhost_virtqueue *vq,
+ uint32_t len[],
+ uint16_t id[],
+ uint16_t count[],
+ uint16_t num_buffers)
+{
+ uint16_t i;
+ for (i = 0; i < num_buffers; i++) {
+ /* enqueue shadow flush action aligned with batch num */
+ if (!vq->shadow_used_idx)
+ vq->shadow_aligned_idx = vq->last_used_idx &
+ PACKED_BATCH_MASK;
+ vq->shadow_used_packed[vq->shadow_used_idx].id = id[i];
+ vq->shadow_used_packed[vq->shadow_used_idx].len = len[i];
+ vq->shadow_used_packed[vq->shadow_used_idx].count = count[i];
+ vq->shadow_aligned_idx += count[i];
+ vq->shadow_used_idx++;
+ }
+
+ if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
+ do_data_copy_enqueue(dev, vq);
+ vhost_flush_enqueue_shadow_packed(dev, vq);
+ }
+}
+
/* avoid write operation when necessary, to lessen cache issues */
#define ASSIGN_UNLESS_EQUAL(var, val) do { \
if ((var) != (val)) \
@@ -785,6 +875,9 @@ vhost_enqueue_single_packed(struct virtio_net *dev,
uint16_t desc_count;
uint32_t size = pkt->pkt_len + dev->vhost_hlen;
uint16_t num_buffers = 0;
+ uint32_t buffer_len[vq->size];
+ uint16_t buffer_buf_id[vq->size];
+ uint16_t buffer_desc_count[vq->size];
if (rxvq_is_mergeable(dev))
max_tries = vq->size - 1;
@@ -810,6 +903,9 @@ vhost_enqueue_single_packed(struct virtio_net *dev,
len = RTE_MIN(len, size);
size -= len;
+ buffer_len[num_buffers] = len;
+ buffer_buf_id[num_buffers] = buf_id;
+ buffer_desc_count[num_buffers] = desc_count;
num_buffers += 1;
*nr_descs += desc_count;
@@ -821,6 +917,9 @@ vhost_enqueue_single_packed(struct virtio_net *dev,
if (copy_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, num_buffers) < 0)
return -1;
+ vhost_shadow_enqueue_single_packed(dev, vq, buffer_len, buffer_buf_id,
+ buffer_desc_count, num_buffers);
+
return 0;
}
@@ -1017,7 +1116,7 @@ virtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
do_data_copy_enqueue(dev, vq);
if (likely(vq->shadow_used_idx)) {
- flush_shadow_used_ring_packed(dev, vq);
+ vhost_flush_enqueue_shadow_packed(dev, vq);
vhost_vring_call_packed(dev, vq);
}