[dpdk-dev,v2] vhost: adaptively batch small guest memory copies

Message ID 20170908125046.28739-1-tiwei.bie@intel.com (mailing list archive)
State Accepted, archived
Delegated to: Yuanhan Liu
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Tiwei Bie Sept. 8, 2017, 12:50 p.m. UTC
  This patch adaptively batches the small guest memory copies.
By batching the small copies, the efficiency of executing the
memory LOAD instructions can be improved greatly, because the
memory LOAD latency can be effectively hidden by the pipeline.
We saw great performance boosts for small packets PVP test.

This patch improves the performance for small packets, and has
distinguished the packets by size. So although the performance
for big packets doesn't change, it makes it relatively easy to
do some special optimizations for the big packets too.

Signed-off-by: Tiwei Bie <tiwei.bie@intel.com>
Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
Signed-off-by: Zhiyong Yang <zhiyong.yang@intel.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
---
This optimization depends on the CPU internal pipeline design.
So further tests (e.g. ARM) from the community is appreciated.

v2: s/burst_copy/batch_copy/g

 lib/librte_vhost/vhost.c      |   2 +-
 lib/librte_vhost/vhost.h      |  13 +++
 lib/librte_vhost/vhost_user.c |  12 +++
 lib/librte_vhost/virtio_net.c | 240 ++++++++++++++++++++++++++++++++----------
 4 files changed, 209 insertions(+), 58 deletions(-)
  

Comments

Santosh Shukla Sept. 9, 2017, 2:58 p.m. UTC | #1
On Friday 08 September 2017 06:20 PM, Tiwei Bie wrote:
> This patch adaptively batches the small guest memory copies.
> By batching the small copies, the efficiency of executing the
> memory LOAD instructions can be improved greatly, because the
> memory LOAD latency can be effectively hidden by the pipeline.
> We saw great performance boosts for small packets PVP test.
>
> This patch improves the performance for small packets, and has
> distinguished the packets by size. So although the performance
> for big packets doesn't change, it makes it relatively easy to
> do some special optimizations for the big packets too.
>
> Signed-off-by: Tiwei Bie <tiwei.bie@intel.com>
> Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
> Signed-off-by: Zhiyong Yang <zhiyong.yang@intel.com>
> Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
> ---
> This optimization depends on the CPU internal pipeline design.
> So further tests (e.g. ARM) from the community is appreciated.
>
> v2: s/burst_copy/batch_copy/g

In pvp setup for arm64 platform, no performance drop noticed with this patch but no significant performance gain either. Tested-by: Santosh Shukla <santosh.shukla@caviumnetworks.com>
  
Tiwei Bie Sept. 11, 2017, 1:27 a.m. UTC | #2
On Sat, Sep 09, 2017 at 08:28:57PM +0530, santosh wrote:
> On Friday 08 September 2017 06:20 PM, Tiwei Bie wrote:
> > This patch adaptively batches the small guest memory copies.
> > By batching the small copies, the efficiency of executing the
> > memory LOAD instructions can be improved greatly, because the
> > memory LOAD latency can be effectively hidden by the pipeline.
> > We saw great performance boosts for small packets PVP test.
> >
> > This patch improves the performance for small packets, and has
> > distinguished the packets by size. So although the performance
> > for big packets doesn't change, it makes it relatively easy to
> > do some special optimizations for the big packets too.
> >
> > Signed-off-by: Tiwei Bie <tiwei.bie@intel.com>
> > Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
> > Signed-off-by: Zhiyong Yang <zhiyong.yang@intel.com>
> > Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
> > ---
> > This optimization depends on the CPU internal pipeline design.
> > So further tests (e.g. ARM) from the community is appreciated.
> >
> > v2: s/burst_copy/batch_copy/g
> 
> In pvp setup for arm64 platform, no performance drop noticed with this patch but no significant performance gain either. Tested-by: Santosh Shukla <santosh.shukla@caviumnetworks.com>
> 

Thank you very much for testing this patch!

Best regards,
Tiwei Bie
  
Yuanhan Liu Sept. 11, 2017, 12:06 p.m. UTC | #3
On Sat, Sep 09, 2017 at 08:28:57PM +0530, santosh wrote:
> On Friday 08 September 2017 06:20 PM, Tiwei Bie wrote:
> > This patch adaptively batches the small guest memory copies.
> > By batching the small copies, the efficiency of executing the
> > memory LOAD instructions can be improved greatly, because the
> > memory LOAD latency can be effectively hidden by the pipeline.
> > We saw great performance boosts for small packets PVP test.
> >
> > This patch improves the performance for small packets, and has
> > distinguished the packets by size. So although the performance
> > for big packets doesn't change, it makes it relatively easy to
> > do some special optimizations for the big packets too.
> >
> > Signed-off-by: Tiwei Bie <tiwei.bie@intel.com>
> > Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
> > Signed-off-by: Zhiyong Yang <zhiyong.yang@intel.com>
> > Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
> > ---
> > This optimization depends on the CPU internal pipeline design.
> > So further tests (e.g. ARM) from the community is appreciated.
> >
> > v2: s/burst_copy/batch_copy/g
> 
> In pvp setup for arm64 platform, no performance drop noticed with this patch but no significant performance gain either. Tested-by: Santosh Shukla <santosh.shukla@caviumnetworks.com>

Applied to dpdk-next-virtio.

Thanks.

	--yliu
  

Patch

diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index 0b6aa1c..474b6e4 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -102,7 +102,7 @@  free_device(struct virtio_net *dev)
 		vq = dev->virtqueue[i];
 
 		rte_free(vq->shadow_used_ring);
-
+		rte_free(vq->batch_copy_elems);
 		rte_free(vq);
 	}
 
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 0f294f3..78fd895 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -81,6 +81,16 @@  struct zcopy_mbuf {
 };
 TAILQ_HEAD(zcopy_mbuf_list, zcopy_mbuf);
 
+/*
+ * Structure contains the info for each batched memory copy.
+ */
+struct batch_copy_elem {
+	void *dst;
+	void *src;
+	uint32_t len;
+	uint64_t log_addr;
+};
+
 /**
  * Structure contains variables relevant to RX/TX virtqueues.
  */
@@ -114,6 +124,9 @@  struct vhost_virtqueue {
 
 	struct vring_used_elem  *shadow_used_ring;
 	uint16_t                shadow_used_idx;
+
+	struct batch_copy_elem	*batch_copy_elems;
+	uint16_t		batch_copy_nb_elems;
 } __rte_cache_aligned;
 
 /* Old kernels have no such macros defined */
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index ad2e8d3..b62e382 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -230,6 +230,15 @@  vhost_user_set_vring_num(struct virtio_net *dev,
 		return -1;
 	}
 
+	vq->batch_copy_elems = rte_malloc(NULL,
+				vq->size * sizeof(struct batch_copy_elem),
+				RTE_CACHE_LINE_SIZE);
+	if (!vq->batch_copy_elems) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"failed to allocate memory for batching copy.\n");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -741,6 +750,9 @@  vhost_user_get_vring_base(struct virtio_net *dev,
 	rte_free(vq->shadow_used_ring);
 	vq->shadow_used_ring = NULL;
 
+	rte_free(vq->batch_copy_elems);
+	vq->batch_copy_elems = NULL;
+
 	return 0;
 }
 
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index a5f0eeb..f8732df 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -49,6 +49,8 @@ 
 
 #define MAX_PKT_BURST 32
 
+#define MAX_BATCH_LEN 256
+
 static bool
 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
 {
@@ -105,6 +107,31 @@  update_shadow_used_ring(struct vhost_virtqueue *vq,
 	vq->shadow_used_ring[i].len = len;
 }
 
+static inline void
+do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
+{
+	struct batch_copy_elem *elem = vq->batch_copy_elems;
+	uint16_t count = vq->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++) {
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+		vhost_log_write(dev, elem[i].log_addr, elem[i].len);
+		PRINT_PACKET(dev, (uintptr_t)elem[i].dst, elem[i].len, 0);
+	}
+}
+
+static inline void
+do_data_copy_dequeue(struct vhost_virtqueue *vq)
+{
+	struct batch_copy_elem *elem = vq->batch_copy_elems;
+	uint16_t count = vq->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++)
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+}
+
 /* avoid write operation when necessary, to lessen cache issues */
 #define ASSIGN_UNLESS_EQUAL(var, val) do {	\
 	if ((var) != (val))			\
@@ -168,8 +195,9 @@  virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 }
 
 static __rte_always_inline int
-copy_mbuf_to_desc(struct virtio_net *dev, struct vring_desc *descs,
-		  struct rte_mbuf *m, uint16_t desc_idx, uint32_t size)
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  struct vring_desc *descs, struct rte_mbuf *m,
+		  uint16_t desc_idx, uint32_t size)
 {
 	uint32_t desc_avail, desc_offset;
 	uint32_t mbuf_avail, mbuf_offset;
@@ -178,6 +206,9 @@  copy_mbuf_to_desc(struct virtio_net *dev, struct vring_desc *descs,
 	uint64_t desc_addr;
 	/* A counter to avoid desc dead loop chain */
 	uint16_t nr_desc = 1;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	uint16_t copy_nb = vq->batch_copy_nb_elems;
+	int error = 0;
 
 	desc = &descs[desc_idx];
 	desc_addr = rte_vhost_gpa_to_vva(dev->mem, desc->addr);
@@ -186,8 +217,10 @@  copy_mbuf_to_desc(struct virtio_net *dev, struct vring_desc *descs,
 	 * performance issue with some versions of gcc (4.8.4 and 5.3.0) which
 	 * otherwise stores offset on the stack instead of in a register.
 	 */
-	if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
-		return -1;
+	if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr) {
+		error = -1;
+		goto out;
+	}
 
 	rte_prefetch0((void *)(uintptr_t)desc_addr);
 
@@ -213,27 +246,43 @@  copy_mbuf_to_desc(struct virtio_net *dev, struct vring_desc *descs,
 		if (desc_avail == 0) {
 			if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
 				/* Room in vring buffer is not enough */
-				return -1;
+				error = -1;
+				goto out;
+			}
+			if (unlikely(desc->next >= size || ++nr_desc > size)) {
+				error = -1;
+				goto out;
 			}
-			if (unlikely(desc->next >= size || ++nr_desc > size))
-				return -1;
 
 			desc = &descs[desc->next];
 			desc_addr = rte_vhost_gpa_to_vva(dev->mem, desc->addr);
-			if (unlikely(!desc_addr))
-				return -1;
+			if (unlikely(!desc_addr)) {
+				error = -1;
+				goto out;
+			}
 
 			desc_offset = 0;
 			desc_avail  = desc->len;
 		}
 
 		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
-		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
-			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
-			cpy_len);
-		vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
-		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
-			     cpy_len, 0);
+		if (likely(cpy_len > MAX_BATCH_LEN || copy_nb >= vq->size)) {
+			rte_memcpy((void *)((uintptr_t)(desc_addr +
+							desc_offset)),
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+				cpy_len);
+			vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
+			PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+				     cpy_len, 0);
+		} else {
+			batch_copy[copy_nb].dst =
+				(void *)((uintptr_t)(desc_addr + desc_offset));
+			batch_copy[copy_nb].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+			batch_copy[copy_nb].log_addr = desc->addr + desc_offset;
+			batch_copy[copy_nb].len = cpy_len;
+			copy_nb++;
+		}
 
 		mbuf_avail  -= cpy_len;
 		mbuf_offset += cpy_len;
@@ -241,7 +290,10 @@  copy_mbuf_to_desc(struct virtio_net *dev, struct vring_desc *descs,
 		desc_offset += cpy_len;
 	}
 
-	return 0;
+out:
+	vq->batch_copy_nb_elems = copy_nb;
+
+	return error;
 }
 
 /**
@@ -284,6 +336,8 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	LOG_DEBUG(VHOST_DATA, "(%d) start_idx %d | end_idx %d\n",
 		dev->vid, start_idx, start_idx + count);
 
+	vq->batch_copy_nb_elems = 0;
+
 	/* Retrieve all of the desc indexes first to avoid caching issues. */
 	rte_prefetch0(&vq->avail->ring[start_idx & (vq->size - 1)]);
 	for (i = 0; i < count; i++) {
@@ -318,7 +372,7 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 			sz = vq->size;
 		}
 
-		err = copy_mbuf_to_desc(dev, descs, pkts[i], desc_idx, sz);
+		err = copy_mbuf_to_desc(dev, vq, descs, pkts[i], desc_idx, sz);
 		if (unlikely(err)) {
 			used_idx = (start_idx + i) & (vq->size - 1);
 			vq->used->ring[used_idx].len = dev->vhost_hlen;
@@ -331,6 +385,8 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 			rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
 	}
 
+	do_data_copy_enqueue(dev, vq);
+
 	rte_smp_wmb();
 
 	*(volatile uint16_t *)&vq->used->idx += count;
@@ -439,8 +495,9 @@  reserve_avail_buf_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
 }
 
 static __rte_always_inline int
-copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct rte_mbuf *m,
-			    struct buf_vector *buf_vec, uint16_t num_buffers)
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			    struct rte_mbuf *m, struct buf_vector *buf_vec,
+			    uint16_t num_buffers)
 {
 	uint32_t vec_idx = 0;
 	uint64_t desc_addr;
@@ -449,13 +506,20 @@  copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct rte_mbuf *m,
 	uint32_t cpy_len;
 	uint64_t hdr_addr, hdr_phys_addr;
 	struct rte_mbuf *hdr_mbuf;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	uint16_t copy_nb = vq->batch_copy_nb_elems;
+	int error = 0;
 
-	if (unlikely(m == NULL))
-		return -1;
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
 
 	desc_addr = rte_vhost_gpa_to_vva(dev->mem, buf_vec[vec_idx].buf_addr);
-	if (buf_vec[vec_idx].buf_len < dev->vhost_hlen || !desc_addr)
-		return -1;
+	if (buf_vec[vec_idx].buf_len < dev->vhost_hlen || !desc_addr) {
+		error = -1;
+		goto out;
+	}
 
 	hdr_mbuf = m;
 	hdr_addr = desc_addr;
@@ -476,8 +540,10 @@  copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct rte_mbuf *m,
 			vec_idx++;
 			desc_addr = rte_vhost_gpa_to_vva(dev->mem,
 					buf_vec[vec_idx].buf_addr);
-			if (unlikely(!desc_addr))
-				return -1;
+			if (unlikely(!desc_addr)) {
+				error = -1;
+				goto out;
+			}
 
 			/* Prefetch buffer address. */
 			rte_prefetch0((void *)(uintptr_t)desc_addr);
@@ -509,13 +575,27 @@  copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct rte_mbuf *m,
 		}
 
 		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
-		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
-			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
-			cpy_len);
-		vhost_log_write(dev, buf_vec[vec_idx].buf_addr + desc_offset,
-			cpy_len);
-		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
-			cpy_len, 0);
+
+		if (likely(cpy_len > MAX_BATCH_LEN || copy_nb >= vq->size)) {
+			rte_memcpy((void *)((uintptr_t)(desc_addr +
+							desc_offset)),
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+				cpy_len);
+			vhost_log_write(dev,
+				buf_vec[vec_idx].buf_addr + desc_offset,
+				cpy_len);
+			PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+				cpy_len, 0);
+		} else {
+			batch_copy[copy_nb].dst =
+				(void *)((uintptr_t)(desc_addr + desc_offset));
+			batch_copy[copy_nb].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+			batch_copy[copy_nb].log_addr =
+				buf_vec[vec_idx].buf_addr + desc_offset;
+			batch_copy[copy_nb].len = cpy_len;
+			copy_nb++;
+		}
 
 		mbuf_avail  -= cpy_len;
 		mbuf_offset += cpy_len;
@@ -523,7 +603,10 @@  copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct rte_mbuf *m,
 		desc_offset += cpy_len;
 	}
 
-	return 0;
+out:
+	vq->batch_copy_nb_elems = copy_nb;
+
+	return error;
 }
 
 static __rte_always_inline uint32_t
@@ -551,6 +634,8 @@  virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 	if (count == 0)
 		return 0;
 
+	vq->batch_copy_nb_elems = 0;
+
 	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
 
 	vq->shadow_used_idx = 0;
@@ -572,7 +657,7 @@  virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 			dev->vid, vq->last_avail_idx,
 			vq->last_avail_idx + num_buffers);
 
-		if (copy_mbuf_to_desc_mergeable(dev, pkts[pkt_idx],
+		if (copy_mbuf_to_desc_mergeable(dev, vq, pkts[pkt_idx],
 						buf_vec, num_buffers) < 0) {
 			vq->shadow_used_idx -= num_buffers;
 			break;
@@ -581,6 +666,8 @@  virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 		vq->last_avail_idx += num_buffers;
 	}
 
+	do_data_copy_enqueue(dev, vq);
+
 	if (likely(vq->shadow_used_idx)) {
 		flush_shadow_used_ring(dev, vq);
 
@@ -766,8 +853,9 @@  put_zmbuf(struct zcopy_mbuf *zmbuf)
 }
 
 static __rte_always_inline int
-copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
-		  uint16_t max_desc, struct rte_mbuf *m, uint16_t desc_idx,
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  struct vring_desc *descs, uint16_t max_desc,
+		  struct rte_mbuf *m, uint16_t desc_idx,
 		  struct rte_mempool *mbuf_pool)
 {
 	struct vring_desc *desc;
@@ -779,15 +867,22 @@  copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
 	struct virtio_net_hdr *hdr = NULL;
 	/* A counter to avoid desc dead loop chain */
 	uint32_t nr_desc = 1;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	uint16_t copy_nb = vq->batch_copy_nb_elems;
+	int error = 0;
 
 	desc = &descs[desc_idx];
 	if (unlikely((desc->len < dev->vhost_hlen)) ||
-			(desc->flags & VRING_DESC_F_INDIRECT))
-		return -1;
+			(desc->flags & VRING_DESC_F_INDIRECT)) {
+		error = -1;
+		goto out;
+	}
 
 	desc_addr = rte_vhost_gpa_to_vva(dev->mem, desc->addr);
-	if (unlikely(!desc_addr))
-		return -1;
+	if (unlikely(!desc_addr)) {
+		error = -1;
+		goto out;
+	}
 
 	if (virtio_net_with_host_offload(dev)) {
 		hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
@@ -802,12 +897,16 @@  copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
 	if (likely((desc->len == dev->vhost_hlen) &&
 		   (desc->flags & VRING_DESC_F_NEXT) != 0)) {
 		desc = &descs[desc->next];
-		if (unlikely(desc->flags & VRING_DESC_F_INDIRECT))
-			return -1;
+		if (unlikely(desc->flags & VRING_DESC_F_INDIRECT)) {
+			error = -1;
+			goto out;
+		}
 
 		desc_addr = rte_vhost_gpa_to_vva(dev->mem, desc->addr);
-		if (unlikely(!desc_addr))
-			return -1;
+		if (unlikely(!desc_addr)) {
+			error = -1;
+			goto out;
+		}
 
 		desc_offset = 0;
 		desc_avail  = desc->len;
@@ -846,10 +945,23 @@  copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
 			 */
 			mbuf_avail = cpy_len;
 		} else {
-			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
-							   mbuf_offset),
-				(void *)((uintptr_t)(desc_addr + desc_offset)),
-				cpy_len);
+			if (likely(cpy_len > MAX_BATCH_LEN ||
+				   copy_nb >= vq->size)) {
+				rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+								   mbuf_offset),
+					   (void *)((uintptr_t)(desc_addr +
+								desc_offset)),
+					   cpy_len);
+			} else {
+				batch_copy[copy_nb].dst =
+					rte_pktmbuf_mtod_offset(cur, void *,
+								mbuf_offset);
+				batch_copy[copy_nb].src =
+					(void *)((uintptr_t)(desc_addr +
+							     desc_offset));
+				batch_copy[copy_nb].len = cpy_len;
+				copy_nb++;
+			}
 		}
 
 		mbuf_avail  -= cpy_len;
@@ -863,15 +975,21 @@  copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
 				break;
 
 			if (unlikely(desc->next >= max_desc ||
-				     ++nr_desc > max_desc))
-				return -1;
+				     ++nr_desc > max_desc)) {
+				error = -1;
+				goto out;
+			}
 			desc = &descs[desc->next];
-			if (unlikely(desc->flags & VRING_DESC_F_INDIRECT))
-				return -1;
+			if (unlikely(desc->flags & VRING_DESC_F_INDIRECT)) {
+				error = -1;
+				goto out;
+			}
 
 			desc_addr = rte_vhost_gpa_to_vva(dev->mem, desc->addr);
-			if (unlikely(!desc_addr))
-				return -1;
+			if (unlikely(!desc_addr)) {
+				error = -1;
+				goto out;
+			}
 
 			rte_prefetch0((void *)(uintptr_t)desc_addr);
 
@@ -890,7 +1008,8 @@  copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
 			if (unlikely(cur == NULL)) {
 				RTE_LOG(ERR, VHOST_DATA, "Failed to "
 					"allocate memory for mbuf.\n");
-				return -1;
+				error = -1;
+				goto out;
 			}
 			if (unlikely(dev->dequeue_zero_copy))
 				rte_mbuf_refcnt_update(cur, 1);
@@ -912,7 +1031,10 @@  copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
 	if (hdr)
 		vhost_dequeue_offload(hdr, m);
 
-	return 0;
+out:
+	vq->batch_copy_nb_elems = copy_nb;
+
+	return error;
 }
 
 static __rte_always_inline void
@@ -1016,6 +1138,8 @@  rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 	if (unlikely(vq->enabled == 0))
 		return 0;
 
+	vq->batch_copy_nb_elems = 0;
+
 	if (unlikely(dev->dequeue_zero_copy)) {
 		struct zcopy_mbuf *zmbuf, *next;
 		int nr_updated = 0;
@@ -1136,7 +1260,8 @@  rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 			break;
 		}
 
-		err = copy_desc_to_mbuf(dev, desc, sz, pkts[i], idx, mbuf_pool);
+		err = copy_desc_to_mbuf(dev, vq, desc, sz, pkts[i], idx,
+					mbuf_pool);
 		if (unlikely(err)) {
 			rte_pktmbuf_free(pkts[i]);
 			break;
@@ -1168,6 +1293,7 @@  rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 	vq->last_avail_idx += i;
 
 	if (likely(dev->dequeue_zero_copy == 0)) {
+		do_data_copy_dequeue(vq);
 		vq->last_used_idx += i;
 		update_used_idx(dev, vq, i);
 	}