[RFC,1/2] vhost: add ingress API for port mirroring datapath

Message ID 20220814124920.14338-2-cheng1.jiang@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Maxime Coquelin
Headers
Series vhost: add port mirroring function in the vhost lib |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Jiang, Cheng1 Aug. 14, 2022, 12:49 p.m. UTC
  From: Wenwu Ma <wenwux.ma@intel.com>

Similar to the port mirroring function on the switch or router, this
patch also implements an ingress function on the Vhost lib. When
data is sent to a front-end, it will also send the data to its mirror
front-end.

Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
---
 lib/vhost/rte_vhost_async.h |  14 +-
 lib/vhost/version.map       |   2 +
 lib/vhost/vhost.h           |   3 +-
 lib/vhost/virtio_net.c      | 666 +++++++++++++++++++++++++++++++++++-
 4 files changed, 679 insertions(+), 6 deletions(-)
  

Comments

Stephen Hemminger Aug. 14, 2022, 2:58 p.m. UTC | #1
On Sun, 14 Aug 2022 12:49:19 +0000
Cheng Jiang <cheng1.jiang@intel.com> wrote:

> From: Wenwu Ma <wenwux.ma@intel.com>
> 
> Similar to the port mirroring function on the switch or router, this
> patch also implements an ingress function on the Vhost lib. When
> data is sent to a front-end, it will also send the data to its mirror
> front-end.
> 
> Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
> Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>

We already have rte_flow, packet capture, and rx/tx callbacks.
This seems like re-invention.
  
Jiang, Cheng1 Aug. 18, 2022, 6:58 a.m. UTC | #2
Hi,

> -----Original Message-----
> From: Stephen Hemminger <stephen@networkplumber.org>
> Sent: Sunday, August 14, 2022 10:58 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>
> Cc: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>;
> dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Ding, Xuan
> <xuan.ding@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>; Wang,
> YuanX <yuanx.wang@intel.com>; Yang, YvonneX <yvonnex.yang@intel.com>
> Subject: Re: [RFC 1/2] vhost: add ingress API for port mirroring datapath
> 
> On Sun, 14 Aug 2022 12:49:19 +0000
> Cheng Jiang <cheng1.jiang@intel.com> wrote:
> 
> > From: Wenwu Ma <wenwux.ma@intel.com>
> >
> > Similar to the port mirroring function on the switch or router, this
> > patch also implements an ingress function on the Vhost lib. When data
> > is sent to a front-end, it will also send the data to its mirror
> > front-end.
> >
> > Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
> > Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> 
> We already have rte_flow, packet capture, and rx/tx callbacks.
> This seems like re-invention.

Sorry that I didn't make it clear in the v1 commit message. This port mirror function is based on async vhost which is accelerated by DMA device. Compared with other mirror implements: 1. It's targeted for vhost. 2. The performance is really good. Its use scenario is to let one front-end(mirror-VM) monitor the traffic of another front-end(VM). It's different from the things you mentioned above. So, IMO I don't think it's re-invention.

Thanks,
Cheng
  
Morten Brørup Aug. 18, 2022, 8:16 a.m. UTC | #3
> From: Jiang, Cheng1 [mailto:cheng1.jiang@intel.com]
> Sent: Thursday, 18 August 2022 08.58
> 
> Hi,
> 
> > -----Original Message-----
> > From: Stephen Hemminger <stephen@networkplumber.org>
> > Sent: Sunday, August 14, 2022 10:58 PM
> >
> > On Sun, 14 Aug 2022 12:49:19 +0000
> > Cheng Jiang <cheng1.jiang@intel.com> wrote:
> >
> > > From: Wenwu Ma <wenwux.ma@intel.com>
> > >
> > > Similar to the port mirroring function on the switch or router,
> this
> > > patch also implements an ingress function on the Vhost lib. When
> data
> > > is sent to a front-end, it will also send the data to its mirror
> > > front-end.
> > >
> > > Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
> > > Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> >
> > We already have rte_flow, packet capture, and rx/tx callbacks.
> > This seems like re-invention.
> 
> Sorry that I didn't make it clear in the v1 commit message. This port
> mirror function is based on async vhost which is accelerated by DMA
> device. Compared with other mirror implements: 1. It's targeted for
> vhost. 2. The performance is really good. Its use scenario is to let
> one front-end(mirror-VM) monitor the traffic of another front-end(VM).
> It's different from the things you mentioned above. So, IMO I don't
> think it's re-invention.
> 
> Thanks,
> Cheng

Thank you for elaborating the use case.

In other words: This is a performance optimization for a specific use case.

This raises two questions:

1. Please convince us that this is a common use case?
2. What is the performance compared to implementing it at the application level?


Overall, I totally agree with Stephen:

Port Mirroring was removed from ethdev with the 21.11 release [1] for the reasons mentioned by Stephen. Please don't try to reintroduce it.

If the application needs to mirror all (or some) packets, why not just let the application do it?

Furthermore, the application might need to mirror the packets to a physical port, and perhaps encapsulate it for remote packet capture. Sampling could also be required. Here's a prediction for you: More features will creep into to this API over time, and it will end up like the ethdev mirroring API being removed because the alternatives are better and more versatile.

[1]: https://doc.dpdk.org/guides/rel_notes/release_21_11.html#removed-items
  
Jiang, Cheng1 Aug. 25, 2022, 1:53 p.m. UTC | #4
Hi,

> -----Original Message-----
> From: Morten Brørup <mb@smartsharesystems.com>
> Sent: Thursday, August 18, 2022 4:17 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; Stephen Hemminger
> <stephen@networkplumber.org>
> Cc: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>;
> dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Ding, Xuan
> <xuan.ding@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>; Wang,
> YuanX <yuanx.wang@intel.com>; Yang, YvonneX <yvonnex.yang@intel.com>
> Subject: RE: [RFC 1/2] vhost: add ingress API for port mirroring datapath
> 
> > From: Jiang, Cheng1 [mailto:cheng1.jiang@intel.com]
> > Sent: Thursday, 18 August 2022 08.58
> >
> > Hi,
> >
> > > -----Original Message-----
> > > From: Stephen Hemminger <stephen@networkplumber.org>
> > > Sent: Sunday, August 14, 2022 10:58 PM
> > >
> > > On Sun, 14 Aug 2022 12:49:19 +0000
> > > Cheng Jiang <cheng1.jiang@intel.com> wrote:
> > >
> > > > From: Wenwu Ma <wenwux.ma@intel.com>
> > > >
> > > > Similar to the port mirroring function on the switch or router,
> > this
> > > > patch also implements an ingress function on the Vhost lib. When
> > data
> > > > is sent to a front-end, it will also send the data to its mirror
> > > > front-end.
> > > >
> > > > Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
> > > > Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> > >
> > > We already have rte_flow, packet capture, and rx/tx callbacks.
> > > This seems like re-invention.
> >
> > Sorry that I didn't make it clear in the v1 commit message. This port
> > mirror function is based on async vhost which is accelerated by DMA
> > device. Compared with other mirror implements: 1. It's targeted for
> > vhost. 2. The performance is really good. Its use scenario is to let
> > one front-end(mirror-VM) monitor the traffic of another front-end(VM).
> > It's different from the things you mentioned above. So, IMO I don't
> > think it's re-invention.
> >
> > Thanks,
> > Cheng
> 
> Thank you for elaborating the use case.
> 
> In other words: This is a performance optimization for a specific use case.
> 
> This raises two questions:
> 
> 1. Please convince us that this is a common use case?

Using VM to monitor the other VM's traffic is a common use case. This is commonly used in an intrusion detection system, passive probe or real user monitoring technology.

> 2. What is the performance compared to implementing it at the application
> level?
> 

This API is introduced mainly to enable DMA acceleration in vhost port mirror scenario. And leverage DMA's strong performance.

> 
> Overall, I totally agree with Stephen:
> 
> Port Mirroring was removed from ethdev with the 21.11 release [1] for the
> reasons mentioned by Stephen. Please don't try to reintroduce it.
> 
> If the application needs to mirror all (or some) packets, why not just let the
> application do it?
> 
> Furthermore, the application might need to mirror the packets to a physical
> port, and perhaps encapsulate it for remote packet capture. Sampling could
> also be required. Here's a prediction for you: More features will creep into to
> this API over time, and it will end up like the ethdev mirroring API being
> removed because the alternatives are better and more versatile.

First of all, These are Vhost lib APIs based on async vhost which is accelerated by DMA device.
And they are provided for Vhost PMD or other Vhost applications. So, I am not sure whether these APIs are similar to ethdev mirroring APIs which are removed.

As far as I understand, RTE_FLOW is used for ethdev. I'm not sure there are conflicts between mirror API in Vhost and RTE_FLOW, since vhost/virtio is a virtual protocol.

We will consider your suggestion carefully again.
Thank you very much.
Cheng

> 
> [1]: https://doc.dpdk.org/guides/rel_notes/release_21_11.html#removed-
> items
  

Patch

diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index ad71555a7f..b199af078c 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -29,6 +29,7 @@  struct rte_vhost_async_desc {
 	struct rte_vhost_iov_iter *src;
 	/** destination memory iov_iter */
 	struct rte_vhost_iov_iter *dst;
+	struct rte_vhost_iov_iter *mirror_dst;
 };
 
 /**
@@ -64,7 +65,7 @@  struct rte_vhost_async_channel_ops {
 	int32_t (*transfer_data)(int vid, uint16_t queue_id,
 		struct rte_vhost_async_desc *descs,
 		struct rte_vhost_async_status *opaque_data,
-		uint16_t count);
+		uint16_t count, bool mirr_flag);
 	/**
 	 * check copy-completed packets from the async engine
 	 * @param vid
@@ -200,6 +201,12 @@  __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
+__rte_experimental
+uint16_t
+rte_vhost_submit_ingress_mirroring_burst(int vid, uint16_t queue_id,
+		int mirror_vid, uint16_t mirror_queue_id, struct rte_mbuf **pkts, uint16_t count);
+
+
 /**
  * This function checks async completion status for a specific vhost
  * device queue. Packets which finish copying (enqueue) operation
@@ -220,6 +227,11 @@  __rte_experimental
 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
+__rte_experimental
+uint16_t rte_vhost_poll_ingress_completed(int vid, uint16_t queue_id,
+		int mirror_vid, uint16_t mirror_queue_id,
+		struct rte_mbuf **pkts, uint16_t count);
+
 /**
  * This function returns the amount of in-flight packets for the vhost
  * queue which uses async channel acceleration.
diff --git a/lib/vhost/version.map b/lib/vhost/version.map
index c92a9d4962..4c35fa4555 100644
--- a/lib/vhost/version.map
+++ b/lib/vhost/version.map
@@ -76,6 +76,8 @@  EXPERIMENTAL {
 	rte_vhost_async_channel_unregister;
 	rte_vhost_submit_enqueue_burst;
 	rte_vhost_poll_enqueue_completed;
+	rte_vhost_submit_ingress_mirroring_burst;
+	rte_vhost_poll_ingress_completed;
 
 	# added in 21.05
 	rte_vhost_get_negotiated_protocol_features;
diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
index 1e56311725..89a31e4ca8 100644
--- a/lib/vhost/vhost.h
+++ b/lib/vhost/vhost.h
@@ -49,7 +49,8 @@ 
 #define MAX_PKT_BURST 32
 
 #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
-#define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 4)
+#define MAX_ASYNC_COPY_VECTOR 1024
+#define VHOST_MAX_ASYNC_VEC (MAX_ASYNC_COPY_VECTOR * 2)
 
 #define PACKED_DESC_ENQUEUE_USED_FLAG(w)	\
 	((w) ? (VRING_DESC_F_AVAIL | VRING_DESC_F_USED | VRING_DESC_F_WRITE) : \
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index f6127c7d52..c9f0bb22e5 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -1575,7 +1575,7 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
 			BUF_VECTOR_MAX))) {
 			n_xfer = vq->async_ops.transfer_data(dev->vid,
-					queue_id, tdes, 0, pkt_burst_idx);
+					queue_id, tdes, 0, pkt_burst_idx, false);
 			if (likely(n_xfer >= 0)) {
 				n_pkts = n_xfer;
 			} else {
@@ -1606,7 +1606,7 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	}
 
 	if (pkt_burst_idx) {
-		n_xfer = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx);
+		n_xfer = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx, false);
 		if (likely(n_xfer >= 0)) {
 			n_pkts = n_xfer;
 		} else {
@@ -1873,7 +1873,7 @@  virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
 		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
 			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await < BUF_VECTOR_MAX))) {
 			n_xfer = vq->async_ops.transfer_data(dev->vid,
-					queue_id, tdes, 0, pkt_burst_idx);
+					queue_id, tdes, 0, pkt_burst_idx, false);
 			if (likely(n_xfer >= 0)) {
 				n_pkts = n_xfer;
 			} else {
@@ -1903,7 +1903,7 @@  virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
 	} while (pkt_idx < count);
 
 	if (pkt_burst_idx) {
-		n_xfer = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx);
+		n_xfer = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx, false);
 		if (likely(n_xfer >= 0)) {
 			n_pkts = n_xfer;
 		} else {
@@ -2206,6 +2206,664 @@  rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
 }
 
+static __rte_always_inline uint16_t
+vhost_poll_ingress_completed(struct virtio_net *dev, uint16_t queue_id,
+		struct virtio_net *mirror_dev, uint16_t mirror_queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct vhost_virtqueue *vq;
+	struct vhost_virtqueue *mirror_vq;
+	struct async_inflight_info *pkts_info;
+	int32_t n_cpl;
+	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
+	uint16_t start_idx, pkts_idx, vq_size;
+	uint16_t from, i;
+
+	vq = dev->virtqueue[queue_id];
+	mirror_vq = mirror_dev->virtqueue[mirror_queue_id];
+	pkts_idx = vq->async_pkts_idx % vq->size;
+	pkts_info = vq->async_pkts_info;
+	vq_size = vq->size;
+	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
+		vq_size, vq->async_pkts_inflight_n);
+
+	if (count > vq->async_last_pkts_n) {
+		n_cpl = vq->async_ops.check_completed_copies(dev->vid,
+			queue_id, 0, count - vq->async_last_pkts_n);
+		if (likely(n_cpl >= 0)) {
+			n_pkts_cpl = n_cpl;
+		} else {
+			VHOST_LOG_DATA(ERR,
+				"(%d) %s: failed to check completed copies for queue id %d.\n",
+				dev->vid, __func__, queue_id);
+			n_pkts_cpl = 0;
+		}
+	}
+
+	n_pkts_cpl += vq->async_last_pkts_n;
+	n_pkts_put = RTE_MIN(n_pkts_cpl, count);
+	if (unlikely(n_pkts_put == 0)) {
+		vq->async_last_pkts_n = n_pkts_cpl;
+		mirror_vq->async_last_pkts_n = n_pkts_cpl;
+		return 0;
+	}
+
+	for (i = 0; i < n_pkts_put; i++) {
+		from = (start_idx + i) & (vq_size - 1);
+		n_descs += pkts_info[from].descs;
+		pkts[i] = pkts_info[from].mbuf;
+	}
+
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(vq->enabled && vq->access_ok)) {
+		write_back_completed_descs_split(vq, n_descs);
+
+		__atomic_add_fetch(&vq->used->idx, n_descs,
+				__ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	} else {
+		vq->last_async_desc_idx_split += n_descs;
+	}
+
+	mirror_vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	mirror_vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(mirror_vq->enabled && mirror_vq->access_ok)) {
+		write_back_completed_descs_split(mirror_vq, n_descs);
+
+		__atomic_add_fetch(&mirror_vq->used->idx, n_descs,
+				__ATOMIC_RELEASE);
+		vhost_vring_call_split(mirror_dev, mirror_vq);
+	} else {
+		mirror_vq->last_async_desc_idx_split += n_descs;
+	}
+
+	return n_pkts_put;
+}
+
+
+uint16_t
+rte_vhost_poll_ingress_completed(int vid, uint16_t queue_id,
+		int mirror_vid, uint16_t mirror_queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+	struct vhost_virtqueue *vq;
+	struct virtio_net *mirror_dev = get_device(mirror_vid);
+
+	uint16_t n_pkts_cpl = 0;
+
+	if (unlikely(!dev))
+		return 0;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(!vq->async_registered)) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	n_pkts_cpl = vhost_poll_ingress_completed(dev, queue_id,
+				mirror_dev, mirror_queue_id, pkts, count);
+
+	rte_spinlock_unlock(&vq->access_lock);
+
+	return n_pkts_cpl;
+}
+
+static __rte_always_inline void
+ingress_async_fill_desc(struct rte_vhost_async_desc *desc,
+	struct rte_vhost_iov_iter *src, struct rte_vhost_iov_iter *dst,
+	struct rte_vhost_iov_iter *mirror_dst)
+{
+	desc->src = src;
+	desc->dst = dst;
+	desc->mirror_dst = mirror_dst;
+}
+
+static __rte_always_inline int
+ingress_async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			struct virtio_net *mirror_dev, struct vhost_virtqueue *mirror_vq,
+			struct rte_mbuf *m, struct buf_vector *buf_vec,
+			uint16_t nr_vec, uint16_t num_buffers,
+			struct buf_vector *mirror_buf_vec,
+			uint16_t mirror_nr_vec, uint16_t mirror_num_buffers,
+			struct iovec *src_iovec, struct iovec *dst_iovec,
+			struct iovec *mirror_dst_iovec,
+			struct rte_vhost_iov_iter *src_it,
+			struct rte_vhost_iov_iter *dst_it,
+			struct rte_vhost_iov_iter *mirror_dst_it,
+			int nr_iovec)
+{
+	struct rte_mbuf *hdr_mbuf;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr1, *hdr1 = NULL;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr2, *hdr2 = NULL;
+	uint64_t buf_addr1, buf_iova1;
+	uint64_t hdr_addr1;
+	uint64_t buf_addr2, buf_iova2;
+	uint64_t hdr_addr2;
+
+	uint64_t mapped_len;
+	uint64_t mapped_len1;
+	uint64_t mapped_len2;
+	uint32_t vec_idx1 = 0;
+	uint32_t vec_idx2 = 0;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t buf_offset1, buf_avail1;
+	uint32_t buf_offset2, buf_avail2;
+
+	uint32_t cpy_len, buf_len1, buf_len2;
+	int error = 0;
+
+	uint32_t tlen = 0;
+	int tvec_idx = 0;
+	void *hpa1, *hpa2;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	buf_addr1 = buf_vec[vec_idx1].buf_addr;
+	buf_iova1 = buf_vec[vec_idx1].buf_iova;
+	buf_len1 = buf_vec[vec_idx1].buf_len;
+
+	if (unlikely(buf_len1 < dev->vhost_hlen && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	buf_addr2 = mirror_buf_vec[vec_idx2].buf_addr;
+	buf_iova2 = mirror_buf_vec[vec_idx2].buf_iova;
+	buf_len2 = mirror_buf_vec[vec_idx2].buf_len;
+
+	if (unlikely(buf_len2 < mirror_dev->vhost_hlen && mirror_nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_mbuf = m;
+	hdr_addr1 = buf_addr1;
+	if (unlikely(buf_len1 < dev->vhost_hlen)) {
+		memset(&tmp_hdr1, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
+		hdr1 = &tmp_hdr1;
+	} else
+		hdr1 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr1;
+
+	hdr_addr2 = buf_addr2;
+	if (unlikely(buf_len2 < mirror_dev->vhost_hlen)) {
+		memset(&tmp_hdr2, 0, sizeof(struct virtio_net_hdr_mrg_rxbuf));
+		hdr2 = &tmp_hdr2;
+	} else
+		hdr2 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr2;
+
+	if (unlikely(buf_len1 < dev->vhost_hlen)) {
+		buf_offset1 = dev->vhost_hlen - buf_len1;
+		vec_idx1++;
+		buf_addr1 = buf_vec[vec_idx1].buf_addr;
+		buf_iova1 = buf_vec[vec_idx1].buf_iova;
+		buf_len1 = buf_vec[vec_idx1].buf_len;
+		buf_avail1 = buf_len1 - buf_offset1;
+	} else {
+		buf_offset1 = dev->vhost_hlen;
+		buf_avail1 = buf_len1 - dev->vhost_hlen;
+	}
+
+	if (unlikely(buf_len2 < mirror_dev->vhost_hlen)) {
+		buf_offset2 = mirror_dev->vhost_hlen - buf_len2;
+		vec_idx2++;
+		buf_addr2 = mirror_buf_vec[vec_idx2].buf_addr;
+		buf_iova2 = mirror_buf_vec[vec_idx2].buf_iova;
+		buf_len2 = mirror_buf_vec[vec_idx2].buf_len;
+		buf_avail2 = buf_len2 - buf_offset2;
+	} else {
+		buf_offset2 = mirror_dev->vhost_hlen;
+		buf_avail2 = buf_len2 - mirror_dev->vhost_hlen;
+	}
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+
+	int flag = 0;
+	static uint64_t total = 0;
+	while (mbuf_avail != 0 || m->next != NULL) {
+		/* done with current buf, get the next one */
+		if (buf_avail1 == 0) {
+			vec_idx1++;
+			if (unlikely(vec_idx1 >= nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr1 = buf_vec[vec_idx1].buf_addr;
+			buf_iova1 = buf_vec[vec_idx1].buf_iova;
+			buf_len1 = buf_vec[vec_idx1].buf_len;
+
+			buf_offset1 = 0;
+			buf_avail1 = buf_len1;
+		}
+
+		if (buf_avail2 == 0) {
+			vec_idx2++;
+			if (unlikely(vec_idx2 >= mirror_nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr2 = mirror_buf_vec[vec_idx2].buf_addr;
+			buf_iova2 = mirror_buf_vec[vec_idx2].buf_iova;
+			buf_len2 = mirror_buf_vec[vec_idx2].buf_len;
+
+			buf_offset2 = 0;
+			buf_avail2 = buf_len2;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr1) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr1->hdr);
+			if (rxvq_is_mergeable(dev))
+				ASSIGN_UNLESS_EQUAL(hdr1->num_buffers,
+						num_buffers);
+
+			if (unlikely(hdr1 == &tmp_hdr1)) {
+				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr1);
+			} else {
+				PRINT_PACKET(dev, (uintptr_t)hdr_addr1,
+						dev->vhost_hlen, 0);
+				vhost_log_cache_write_iova(dev, vq,
+						buf_vec[0].buf_iova,
+						dev->vhost_hlen);
+			}
+
+			hdr_addr1 = 0;
+		}
+
+		if (hdr_addr2) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr2->hdr);
+			if (rxvq_is_mergeable(mirror_dev))
+				ASSIGN_UNLESS_EQUAL(hdr2->num_buffers,
+						mirror_num_buffers);
+
+			if (unlikely(hdr2 == &tmp_hdr2)) {
+				copy_vnet_hdr_to_desc(mirror_dev, mirror_vq, mirror_buf_vec, hdr2);
+			} else {
+				PRINT_PACKET(mirror_dev, (uintptr_t)hdr_addr2,
+						mirror_dev->vhost_hlen, 0);
+				vhost_log_cache_write_iova(mirror_dev, mirror_vq,
+						mirror_buf_vec[0].buf_iova,
+						mirror_dev->vhost_hlen);
+			}
+
+			hdr_addr2 = 0;
+		}
+
+		cpy_len = RTE_MIN(buf_avail1, mbuf_avail);
+		cpy_len = RTE_MIN(buf_avail2, cpy_len);
+
+		while (unlikely(cpy_len)) {
+			hpa1 = (void *)(uintptr_t)gpa_to_first_hpa(dev,
+					buf_iova1 + buf_offset1,
+					cpy_len, &mapped_len1);
+			if (unlikely(!hpa1)) {
+				VHOST_LOG_DATA(ERR, "(%d) %s: failed to get hpa1.\n",
+				dev->vid, __func__);
+				error = -1;
+				goto out;
+			}
+
+			hpa2 = (void *)(uintptr_t)gpa_to_first_hpa(mirror_dev,
+					buf_iova2 + buf_offset2,
+					cpy_len, &mapped_len2);
+			if (unlikely(!hpa2)) {
+				VHOST_LOG_DATA(ERR, "(%d) %s: failed to get hpa2.\n",
+				mirror_dev->vid, __func__);
+				error = -1;
+				goto out;
+			}
+
+			if ((((uint64_t)hpa1 & 0xFFF) ^ ((uint64_t)hpa2 & 0xFFF)) != 0 && flag == 0)
+			{
+				total++;
+				VHOST_LOG_DATA(ERR, "%lu....................... hpa1=%p hpa2=%p.\n", total, hpa1, hpa2);
+			}
+
+			if (unlikely(tvec_idx >= nr_iovec)) {
+				VHOST_LOG_DATA(ERR, "iovec is not enough for offloading\n");
+				return -1;
+			}
+
+			mapped_len = RTE_MIN(mapped_len1, mapped_len2);
+
+			async_fill_vec(src_iovec + tvec_idx,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
+				mbuf_offset), (size_t)mapped_len);
+			async_fill_vec(dst_iovec + tvec_idx,
+					hpa1, (size_t)mapped_len);
+			async_fill_vec(mirror_dst_iovec + tvec_idx,
+					hpa2, (size_t)mapped_len);
+
+			tlen += (uint32_t)mapped_len;
+			cpy_len -= (uint32_t)mapped_len;
+			mbuf_avail -= (uint32_t)mapped_len;
+			mbuf_offset += (uint32_t)mapped_len;
+			buf_avail1 -= (uint32_t)mapped_len;
+			buf_offset1 += (uint32_t)mapped_len;
+			buf_avail2 -= (uint32_t)mapped_len;
+			buf_offset2 += (uint32_t)mapped_len;
+			tvec_idx++;
+		}
+		flag++;
+	}
+
+	async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
+	async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+	async_fill_iter(mirror_dst_it, tlen, mirror_dst_iovec, tvec_idx);
+out:
+	return error;
+}
+
+static __rte_noinline uint32_t
+virtio_dev_ingress_async_submit_split(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	struct virtio_net *mirror_dev,
+	struct vhost_virtqueue *mirror_vq, uint16_t mirror_queue_id,
+	struct rte_mbuf **pkts, uint32_t count)
+{
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	struct buf_vector mirror_buf_vec[BUF_VECTOR_MAX];
+	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
+	uint16_t num_buffers;
+	uint16_t mirror_num_buffers;
+	uint16_t avail_head1;
+	uint16_t avail_head2;
+
+	struct rte_vhost_iov_iter *it_pool1 = vq->it_pool;
+	struct rte_vhost_iov_iter *it_pool2 = mirror_vq->it_pool;
+	struct iovec *vec_pool1 = vq->vec_pool;
+	struct iovec *vec_pool2 = mirror_vq->vec_pool;
+	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+	struct iovec *src_iovec = vec_pool1;
+	struct iovec *dst_iovec = vec_pool1 + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct iovec *mirror_dst_iovec = vec_pool2;
+	struct async_inflight_info *pkts_info1 = vq->async_pkts_info;
+
+	uint32_t n_pkts = 0, pkt_err = 0;
+	int32_t n_xfer;
+	uint16_t slot_idx1 = 0;
+	uint16_t iovec_idx1 = 0, iovec_idx2 = 0, it_idx1 = 0, it_idx2 = 0;
+
+	/*
+	 * The ordering between avail index and desc reads need to be enforced.
+	 */
+	avail_head1 = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
+	avail_head2 = __atomic_load_n(&mirror_vq->avail->idx, __ATOMIC_ACQUIRE);
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+	rte_prefetch0(&mirror_vq->avail->ring[mirror_vq->last_avail_idx & (mirror_vq->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint16_t vhost_hlen = dev->vhost_hlen > mirror_dev->vhost_hlen ?
+					dev->vhost_hlen : mirror_dev->vhost_hlen;
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vhost_hlen;
+		uint16_t nr_vec = 0;
+		uint16_t mirror_nr_vec = 0;
+
+		if (unlikely(reserve_avail_buf_split(dev, vq,
+						pkt_len, buf_vec, &num_buffers,
+						avail_head1, &nr_vec) < 0)) {
+			VHOST_LOG_DATA(DEBUG,
+				"(%d) failed to get enough desc from vring1\n",
+				dev->vid);
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		if (unlikely(reserve_avail_buf_split(mirror_dev, mirror_vq,
+						pkt_len, mirror_buf_vec, &mirror_num_buffers,
+						avail_head2, &mirror_nr_vec) < 0)) {
+			VHOST_LOG_DATA(DEBUG,
+				"(%d) failed to get enough desc from vring2\n",
+				mirror_dev->vid);
+			mirror_vq->shadow_used_idx -= mirror_num_buffers;
+			break;
+		}
+
+		if (ingress_async_mbuf_to_desc(dev, vq, mirror_dev, mirror_vq, pkts[pkt_idx],
+				buf_vec, nr_vec, num_buffers,
+				mirror_buf_vec, mirror_nr_vec, mirror_num_buffers,
+				&src_iovec[iovec_idx1], &dst_iovec[iovec_idx1],
+				&mirror_dst_iovec[iovec_idx2],
+				&it_pool1[it_idx1], &it_pool1[it_idx1 + 1],
+				&it_pool2[it_idx2],
+				(VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx1) < 0) {
+			vq->shadow_used_idx -= num_buffers;
+			mirror_vq->shadow_used_idx -= mirror_num_buffers;
+			break;
+		}
+
+		ingress_async_fill_desc(&tdes[pkt_burst_idx++], &it_pool1[it_idx1],
+				&it_pool1[it_idx1 + 1], &it_pool2[it_idx2]);
+
+		slot_idx1 = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		pkts_info1[slot_idx1].descs = num_buffers;
+		pkts_info1[slot_idx1].mbuf = pkts[pkt_idx];
+
+		iovec_idx1 += it_pool1[it_idx1].nr_segs;
+		it_idx1 += 2;
+
+		iovec_idx2 += it_pool2[it_idx2].nr_segs;
+		it_idx2 += 1;
+
+		vq->last_avail_idx += num_buffers;
+		mirror_vq->last_avail_idx += mirror_num_buffers;
+
+		/*
+		 * conditions to trigger async device transfer:
+		 * - buffered packet number reaches transfer threshold
+		 * - unused async iov number is less than max vhost vector
+		 */
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx1 <
+			BUF_VECTOR_MAX) ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx2 <
+			BUF_VECTOR_MAX))) {
+
+			n_xfer = vq->async_ops.transfer_data(dev->vid,
+					queue_id, tdes, 0, pkt_burst_idx, true);
+			if (likely(n_xfer >= 0)) {
+				n_pkts = n_xfer;
+			} else {
+				VHOST_LOG_DATA(ERR,
+					"(%d) %s: failed to transfer data for queue id %d.\n",
+					mirror_dev->vid, __func__, mirror_queue_id);
+				n_pkts = 0;
+			}
+
+			iovec_idx1 = 0;
+			iovec_idx2 = 0;
+			it_idx1 = 0;
+			it_idx2 = 0;
+
+			if (unlikely(n_pkts < pkt_burst_idx)) {
+				/*
+				 * log error packets number here and do actual
+				 * error processing when applications poll
+				 * completion
+				 */
+				pkt_err = pkt_burst_idx - n_pkts;
+				pkt_idx++;
+				pkt_burst_idx = 0;
+				break;
+			}
+
+			pkt_burst_idx = 0;
+		}
+	}
+
+	if (pkt_burst_idx) {
+		n_xfer = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, pkt_burst_idx, true);
+		if (likely(n_xfer >= 0)) {
+			n_pkts = n_xfer;
+		} else {
+			VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id %d.\n",
+				dev->vid, __func__, queue_id);
+			n_pkts = 0;
+		}
+
+		if (unlikely(n_pkts < pkt_burst_idx))
+			pkt_err = pkt_burst_idx - n_pkts;
+	}
+
+	if (unlikely(pkt_err)) {
+		uint16_t num_descs1 = 0;
+		uint16_t num_descs2 = 0;
+		/* update number of completed packets */
+		pkt_idx -= pkt_err;
+
+		/* calculate the sum of descriptors to revert */
+		while (pkt_err-- > 0) {
+			num_descs1 += pkts_info1[slot_idx1 & (vq->size - 1)].descs;
+			slot_idx1--;
+		}
+
+		/* recover shadow used ring and available ring */
+		vq->shadow_used_idx -= num_descs1;
+		vq->last_avail_idx -= num_descs1;
+
+		mirror_vq->shadow_used_idx -= num_descs2;
+		mirror_vq->last_avail_idx -= num_descs2;
+	}
+
+	/* keep used descriptors */
+	if (likely(vq->shadow_used_idx)) {
+		uint16_t to = vq->async_desc_idx_split & (vq->size - 1);
+
+		store_dma_desc_info_split(vq->shadow_used_split,
+		vq->async_descs_split, vq->size, 0, to, vq->shadow_used_idx);
+
+		vq->async_desc_idx_split += vq->shadow_used_idx;
+		vq->async_pkts_idx += pkt_idx;
+		vq->async_pkts_inflight_n += pkt_idx;
+		vq->shadow_used_idx = 0;
+	}
+
+	if (likely(mirror_vq->shadow_used_idx)) {
+		uint16_t to = mirror_vq->async_desc_idx_split & (mirror_vq->size - 1);
+
+		store_dma_desc_info_split(mirror_vq->shadow_used_split,
+		mirror_vq->async_descs_split, mirror_vq->size, 0, to, mirror_vq->shadow_used_idx);
+
+		mirror_vq->async_desc_idx_split += mirror_vq->shadow_used_idx;
+		mirror_vq->async_pkts_idx += pkt_idx;
+		mirror_vq->async_pkts_inflight_n += pkt_idx;
+		mirror_vq->shadow_used_idx = 0;
+	}
+
+	return pkt_idx;
+}
+
+
+static __rte_always_inline uint32_t
+virtio_dev_ingress_async_submit(struct virtio_net *dev, uint16_t queue_id,
+	struct virtio_net *mirror_dev, uint16_t mirror_queue_id, struct rte_mbuf **pkts, uint32_t count)
+{
+	VHOST_LOG_DATA(DEBUG, "(%d) (%d) %s\n", dev->vid, mirror_dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	if (unlikely(!is_valid_virt_queue_idx(mirror_queue_id, 0, mirror_dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			mirror_dev->vid, __func__, mirror_queue_id);
+		return 0;
+	}
+
+	struct vhost_virtqueue *vq;
+	struct vhost_virtqueue *mirror_vq;
+	uint32_t nb_tx = 0;
+
+	vq = dev->virtqueue[queue_id];
+	mirror_vq = mirror_dev->virtqueue[mirror_queue_id];
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	if (unlikely(!vq->enabled || !vq->async_registered))
+		goto out_access_unlock;
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(!vq->access_ok))
+		if (unlikely(vring_translate(dev, vq) < 0))
+			goto out;
+
+	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
+	if (count == 0)
+		goto out;
+
+	nb_tx = virtio_dev_ingress_async_submit_split(dev, vq, queue_id,
+				mirror_dev, mirror_vq, mirror_queue_id, pkts, count);
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	return nb_tx;
+}
+
+
+uint16_t
+rte_vhost_submit_ingress_mirroring_burst(int vid, uint16_t queue_id,
+		int mirror_vid, uint16_t mirror_queue_id, struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+	struct virtio_net *mirror_dev = get_device(mirror_vid);
+
+	if (!dev || !mirror_dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: built-in vhost net backend is disabled.\n",
+			dev->vid, __func__);
+		return 0;
+	}
+
+	if (unlikely(!(mirror_dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: built-in vhost net backend is disabled.\n",
+			mirror_dev->vid, __func__);
+		return 0;
+	}
+
+	return virtio_dev_ingress_async_submit(dev, queue_id,
+				mirror_dev, mirror_queue_id, pkts, count);
+}
+
 static inline bool
 virtio_net_with_host_offload(struct virtio_net *dev)
 {