[1/4] vhost: support async dequeue for split ring

Message ID 20210906204837.112466-2-wenwux.ma@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series support async dequeue for split ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-testing warning apply patch failure

Commit Message

Ma, WenwuX Sept. 6, 2021, 8:48 p.m. UTC
  From: Yuan Wang <yuanx.wang@intel.com>

This patch implements asynchronous dequeue data path for split ring.
A new asynchronous dequeue function is introduced. With this function,
the application can try to receive packets from the guest with
offloading copies to the async channel, thus saving precious CPU
cycles.

Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
Tested-by: Yinan Wang <yinan.wang@intel.com>
---
 doc/guides/prog_guide/vhost_lib.rst |   9 +
 lib/vhost/rte_vhost_async.h         |  36 +-
 lib/vhost/version.map               |   3 +
 lib/vhost/vhost.h                   |   3 +-
 lib/vhost/virtio_net.c              | 531 ++++++++++++++++++++++++++++
 5 files changed, 579 insertions(+), 3 deletions(-)
  

Comments

Yang, YvonneX Sept. 10, 2021, 7:36 a.m. UTC | #1
> -----Original Message-----
> From: Ma, WenwuX <wenwux.ma@intel.com>
> Sent: Tuesday, September 7, 2021 4:49 AM
> To: dev@dpdk.org
> Cc: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>;
> Jiang, Cheng1 <cheng1.jiang@intel.com>; Hu, Jiayu <jiayu.hu@intel.com>;
> Pai G, Sunil <sunil.pai.g@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Wang, YuanX <yuanx.wang@intel.com>; Ma,
> WenwuX <wenwux.ma@intel.com>; Wang, Yinan <yinan.wang@intel.com>
> Subject: [PATCH 1/4] vhost: support async dequeue for split ring
> 
> From: Yuan Wang <yuanx.wang@intel.com>
> 
> This patch implements asynchronous dequeue data path for split ring.
> A new asynchronous dequeue function is introduced. With this function, the
> application can try to receive packets from the guest with offloading copies
> to the async channel, thus saving precious CPU cycles.
> 
> Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> Tested-by: Yinan Wang <yinan.wang@intel.com>
> ---
>  doc/guides/prog_guide/vhost_lib.rst |   9 +
>  lib/vhost/rte_vhost_async.h         |  36 +-
>  lib/vhost/version.map               |   3 +
>  lib/vhost/vhost.h                   |   3 +-
>  lib/vhost/virtio_net.c              | 531 ++++++++++++++++++++++++++++
>  5 files changed, 579 insertions(+), 3 deletions(-)
> 

Tested-by: Yvonne Yang <yvonnex.yang@intel.com>
  
Chenbo Xia Sept. 15, 2021, 2:51 a.m. UTC | #2
Hi,

> -----Original Message-----
> From: Ma, WenwuX <wenwux.ma@intel.com>
> Sent: Tuesday, September 7, 2021 4:49 AM
> To: dev@dpdk.org
> Cc: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>; Jiang,
> Cheng1 <cheng1.jiang@intel.com>; Hu, Jiayu <jiayu.hu@intel.com>; Pai G, Sunil
> <sunil.pai.g@intel.com>; Yang, YvonneX <yvonnex.yang@intel.com>; Wang, YuanX
> <yuanx.wang@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>; Wang, Yinan
> <yinan.wang@intel.com>
> Subject: [PATCH 1/4] vhost: support async dequeue for split ring
> 
> From: Yuan Wang <yuanx.wang@intel.com>
> 
> This patch implements asynchronous dequeue data path for split ring.
> A new asynchronous dequeue function is introduced. With this function,
> the application can try to receive packets from the guest with
> offloading copies to the async channel, thus saving precious CPU
> cycles.
> 
> Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> Tested-by: Yinan Wang <yinan.wang@intel.com>
> ---
>  doc/guides/prog_guide/vhost_lib.rst |   9 +
>  lib/vhost/rte_vhost_async.h         |  36 +-
>  lib/vhost/version.map               |   3 +
>  lib/vhost/vhost.h                   |   3 +-
>  lib/vhost/virtio_net.c              | 531 ++++++++++++++++++++++++++++
>  5 files changed, 579 insertions(+), 3 deletions(-)
> 
> diff --git a/doc/guides/prog_guide/vhost_lib.rst
> b/doc/guides/prog_guide/vhost_lib.rst
> index 171e0096f6..9ed544db7a 100644
> --- a/doc/guides/prog_guide/vhost_lib.rst
> +++ b/doc/guides/prog_guide/vhost_lib.rst
> @@ -303,6 +303,15 @@ The following is an overview of some key Vhost API
> functions:
>    Clear inflight packets which are submitted to DMA engine in vhost async
> data
>    path. Completed packets are returned to applications through ``pkts``.
> 
> +* ``rte_vhost_async_try_dequeue_burst(vid, queue_id, mbuf_pool, pkts, count,
> nr_inflight)``
> +
> +  This function tries to receive packets from the guest with offloading
> +  copies to the async channel. The packets that are transfer completed
> +  are returned in ``pkts``. The other packets that their copies are submitted
> +  to the async channel but not completed are called "in-flight packets".
> +  This function will not return in-flight packets until their copies are
> +  completed by the async channel.
> +
>  Vhost-user Implementations
>  --------------------------
> 
> diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> index ad71555a7f..5e2429ab70 100644
> --- a/lib/vhost/rte_vhost_async.h
> +++ b/lib/vhost/rte_vhost_async.h
> @@ -83,12 +83,18 @@ struct rte_vhost_async_channel_ops {
>  		uint16_t max_packets);
>  };
> 
> +struct async_nethdr {
> +	struct virtio_net_hdr hdr;
> +	bool valid;
> +};
> +

As a struct exposed in public headers, it's better to prefix it with rte_.
In this case I would prefer rte_async_net_hdr.

>  /**
> - * inflight async packet information
> + * in-flight async packet information
>   */
>  struct async_inflight_info {

Could you help to rename it too? Like rte_async_inflight_info.

>  	struct rte_mbuf *mbuf;
> -	uint16_t descs; /* num of descs inflight */
> +	struct async_nethdr nethdr;
> +	uint16_t descs; /* num of descs in-flight */
>  	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
>  };
> 
> @@ -255,5 +261,31 @@ int rte_vhost_async_get_inflight(int vid, uint16_t
> queue_id);
>  __rte_experimental
>  uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count);
> +/**
> + * This function tries to receive packets from the guest with offloading
> + * copies to the async channel. The packets that are transfer completed
> + * are returned in "pkts". The other packets that their copies are submitted
> to
> + * the async channel but not completed are called "in-flight packets".
> + * This function will not return in-flight packets until their copies are
> + * completed by the async channel.
> + *
> + * @param vid
> + *  id of vhost device to dequeue data
> + * @param queue_id
> + *  queue id to dequeue data

Param mbuf_pool is missed.

> + * @param pkts
> + *  blank array to keep successfully dequeued packets
> + * @param count
> + *  size of the packet array
> + * @param nr_inflight
> + *  the amount of in-flight packets. If error occurred, its value is set to -
> 1.
> + * @return
> + *  num of successfully dequeued packets
> + */
> +__rte_experimental
> +uint16_t
> +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
> +	int *nr_inflight);
> 
>  #endif /* _RTE_VHOST_ASYNC_H_ */
> diff --git a/lib/vhost/version.map b/lib/vhost/version.map
> index c92a9d4962..1e033ad8e2 100644
> --- a/lib/vhost/version.map
> +++ b/lib/vhost/version.map
> @@ -85,4 +85,7 @@ EXPERIMENTAL {
>  	rte_vhost_async_channel_register_thread_unsafe;
>  	rte_vhost_async_channel_unregister_thread_unsafe;
>  	rte_vhost_clear_queue_thread_unsafe;
> +
> +	# added in 21.11
> +	rte_vhost_async_try_dequeue_burst;
>  };
> diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
> index 1e56311725..89a31e4ca8 100644
> --- a/lib/vhost/vhost.h
> +++ b/lib/vhost/vhost.h
> @@ -49,7 +49,8 @@

[...]

> +uint16_t
> +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
> +	int *nr_inflight)
> +{
> +	struct virtio_net *dev;
> +	struct rte_mbuf *rarp_mbuf = NULL;
> +	struct vhost_virtqueue *vq;
> +	int16_t success = 1;
> +
> +	*nr_inflight = -1;
> +
> +	dev = get_device(vid);
> +	if (!dev)
> +		return 0;
> +
> +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%d) %s: built-in vhost net backend is disabled.\n",
> +			dev->vid, __func__);
> +		return 0;
> +	}
> +
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%d) %s: invalid virtqueue idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
> +		return 0;
> +
> +	if (unlikely(vq->enabled == 0)) {
> +		count = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (unlikely(!vq->async_registered)) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue
> id %d.\n",
> +			dev->vid, __func__, queue_id);
> +		count = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_lock(vq);
> +
> +	if (unlikely(vq->access_ok == 0))
> +		if (unlikely(vring_translate(dev, vq) < 0)) {
> +			count = 0;
> +			goto out_access_unlock;
> +		}
> +
> +	/*
> +	 * Construct a RARP broadcast packet, and inject it to the "pkts"
> +	 * array, to looks like that guest actually send such packet.
> +	 *
> +	 * Check user_send_rarp() for more information.
> +	 *
> +	 * broadcast_rarp shares a cacheline in the virtio_net structure
> +	 * with some fields that are accessed during enqueue and
> +	 * __atomic_compare_exchange_n causes a write if performed compare
> +	 * and exchange. This could result in false sharing between enqueue
> +	 * and dequeue.
> +	 *
> +	 * Prevent unnecessary false sharing by reading broadcast_rarp first
> +	 * and only performing compare and exchange if the read indicates it
> +	 * is likely to be set.
> +	 */
> +	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
> +			__atomic_compare_exchange_n(&dev->broadcast_rarp,
> +			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
> +
> +		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
> +		if (rarp_mbuf == NULL) {
> +			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
> +			count = 0;
> +			goto out;
> +		}
> +		count -= 1;
> +	}
> +
> +	if (unlikely(vq_is_packed(dev)))
> +		return 0;

Should add a log here.

Thanks,
Chenbo
  
Chenbo Xia Sept. 15, 2021, 11:35 a.m. UTC | #3
Hi Maxime & Yuan,

> -----Original Message-----
> From: Wang, YuanX <yuanx.wang@intel.com>
> Sent: Wednesday, September 15, 2021 5:09 PM
> To: Xia, Chenbo <chenbo.xia@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>;
> dev@dpdk.org
> Cc: maxime.coquelin@redhat.com; Jiang, Cheng1 <cheng1.jiang@intel.com>; Hu,
> Jiayu <jiayu.hu@intel.com>; Pai G, Sunil <sunil.pai.g@intel.com>; Yang,
> YvonneX <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>
> Subject: RE: [PATCH 1/4] vhost: support async dequeue for split ring
> 
> Hi Chenbo,
> 
> > -----Original Message-----
> > From: Xia, Chenbo <chenbo.xia@intel.com>
> > Sent: Wednesday, September 15, 2021 10:52 AM
> > To: Ma, WenwuX <wenwux.ma@intel.com>; dev@dpdk.org
> > Cc: maxime.coquelin@redhat.com; Jiang, Cheng1 <cheng1.jiang@intel.com>;
> > Hu, Jiayu <jiayu.hu@intel.com>; Pai G, Sunil <sunil.pai.g@intel.com>; Yang,
> > YvonneX <yvonnex.yang@intel.com>; Wang, YuanX
> > <yuanx.wang@intel.com>; Wang, Yinan <yinan.wang@intel.com>
> > Subject: RE: [PATCH 1/4] vhost: support async dequeue for split ring
> >
> > Hi,
> >
> > > -----Original Message-----
> > > From: Ma, WenwuX <wenwux.ma@intel.com>
> > > Sent: Tuesday, September 7, 2021 4:49 AM
> > > To: dev@dpdk.org
> > > Cc: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>;
> > > Jiang,
> > > Cheng1 <cheng1.jiang@intel.com>; Hu, Jiayu <jiayu.hu@intel.com>; Pai
> > > G, Sunil <sunil.pai.g@intel.com>; Yang, YvonneX
> > > <yvonnex.yang@intel.com>; Wang, YuanX <yuanx.wang@intel.com>; Ma,
> > > WenwuX <wenwux.ma@intel.com>; Wang, Yinan <yinan.wang@intel.com>
> > > Subject: [PATCH 1/4] vhost: support async dequeue for split ring
> > >
> > > From: Yuan Wang <yuanx.wang@intel.com>
> > >
> > > This patch implements asynchronous dequeue data path for split ring.
> > > A new asynchronous dequeue function is introduced. With this function,
> > > the application can try to receive packets from the guest with
> > > offloading copies to the async channel, thus saving precious CPU
> > > cycles.
> > >
> > > Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
> > > Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> > > Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> > > Tested-by: Yinan Wang <yinan.wang@intel.com>
> > > ---
> > >  doc/guides/prog_guide/vhost_lib.rst |   9 +
> > >  lib/vhost/rte_vhost_async.h         |  36 +-
> > >  lib/vhost/version.map               |   3 +
> > >  lib/vhost/vhost.h                   |   3 +-
> > >  lib/vhost/virtio_net.c              | 531 ++++++++++++++++++++++++++++
> > >  5 files changed, 579 insertions(+), 3 deletions(-)
> > >
> > > diff --git a/doc/guides/prog_guide/vhost_lib.rst
> > > b/doc/guides/prog_guide/vhost_lib.rst
> > > index 171e0096f6..9ed544db7a 100644
> > > --- a/doc/guides/prog_guide/vhost_lib.rst
> > > +++ b/doc/guides/prog_guide/vhost_lib.rst
> > > @@ -303,6 +303,15 @@ The following is an overview of some key Vhost
> > > API
> > > functions:
> > >    Clear inflight packets which are submitted to DMA engine in vhost
> > > async data
> > >    path. Completed packets are returned to applications through ``pkts``.
> > >
> > > +* ``rte_vhost_async_try_dequeue_burst(vid, queue_id, mbuf_pool, pkts,
> > > +count,
> > > nr_inflight)``
> > > +
> > > +  This function tries to receive packets from the guest with
> > > + offloading  copies to the async channel. The packets that are
> > > + transfer completed  are returned in ``pkts``. The other packets that
> > > + their copies are submitted  to the async channel but not completed are
> > called "in-flight packets".
> > > +  This function will not return in-flight packets until their copies
> > > + are  completed by the async channel.
> > > +
> > >  Vhost-user Implementations
> > >  --------------------------
> > >
> > > diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> > > index ad71555a7f..5e2429ab70 100644
> > > --- a/lib/vhost/rte_vhost_async.h
> > > +++ b/lib/vhost/rte_vhost_async.h
> > > @@ -83,12 +83,18 @@ struct rte_vhost_async_channel_ops {
> > >  		uint16_t max_packets);
> > >  };
> > >
> > > +struct async_nethdr {
> > > +	struct virtio_net_hdr hdr;
> > > +	bool valid;
> > > +};
> > > +
> >
> > As a struct exposed in public headers, it's better to prefix it with rte_.
> > In this case I would prefer rte_async_net_hdr.
> >
> > >  /**
> > > - * inflight async packet information
> > > + * in-flight async packet information
> > >   */
> > >  struct async_inflight_info {
> >
> > Could you help to rename it too? Like rte_async_inflight_info.
> 
> You are right, these two structs are for internal use and not suitable for
> exposure in the public header,
> but they are used for async channel, I think it's not suitable to be placed in
> other headers.
> Could you give some advice on which file to put them in?

@Maxime, What do you think of this? I think either changing it/renaming it/moving it
is ABI breakage. But since it's never used by any APP, I guess it's not big problem.
So what do you think we should do with the struct? I will vote for move it temporarily
to header like vhost.h. At some point, we can create a new internal async header for
structs like this. Or create it now?

@Yuan, I think again of the struct async_nethdr, do we really need to define this?
As for now, header being invalid only happens when virtio_net_with_host_offload(dev)
is false, right? So why not use this to know hdr invalid or not when you need to check?

Thanks,
Chenbo

> 
> >
> > >  	struct rte_mbuf *mbuf;
> > > -	uint16_t descs; /* num of descs inflight */
> > > +	struct async_nethdr nethdr;
> > > +	uint16_t descs; /* num of descs in-flight */
> > >  	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> > > };
> > >
> > > @@ -255,5 +261,31 @@ int rte_vhost_async_get_inflight(int vid,
> > > uint16_t queue_id);  __rte_experimental  uint16_t
> > > rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
> > >  		struct rte_mbuf **pkts, uint16_t count);
> > > +/**
> > > + * This function tries to receive packets from the guest with
> > > +offloading
> > > + * copies to the async channel. The packets that are transfer
> > > +completed
> > > + * are returned in "pkts". The other packets that their copies are
> > > +submitted
> > > to
> > > + * the async channel but not completed are called "in-flight packets".
> > > + * This function will not return in-flight packets until their copies
> > > + are
> > > + * completed by the async channel.
> > > + *
> > > + * @param vid
> > > + *  id of vhost device to dequeue data
> > > + * @param queue_id
> > > + *  queue id to dequeue data
> >
> > Param mbuf_pool is missed.
> 
> Thanks, will fix it in next version.
> 
> Regards,
> Yuan
> 
> >
> > > + * @param pkts
> > > + *  blank array to keep successfully dequeued packets
> > > + * @param count
> > > + *  size of the packet array
> > > + * @param nr_inflight
> > > + *  the amount of in-flight packets. If error occurred, its value is
> > > + set to -
> > > 1.
> > > + * @return
> > > + *  num of successfully dequeued packets  */ __rte_experimental
> > > +uint16_t rte_vhost_async_try_dequeue_burst(int vid, uint16_t
> > > +queue_id,
> > > +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t
> > count,
> > > +	int *nr_inflight);
> > >
> > >  #endif /* _RTE_VHOST_ASYNC_H_ */
> > > diff --git a/lib/vhost/version.map b/lib/vhost/version.map index
> > > c92a9d4962..1e033ad8e2 100644
> > > --- a/lib/vhost/version.map
> > > +++ b/lib/vhost/version.map
> > > @@ -85,4 +85,7 @@ EXPERIMENTAL {
> > >  	rte_vhost_async_channel_register_thread_unsafe;
> > >  	rte_vhost_async_channel_unregister_thread_unsafe;
> > >  	rte_vhost_clear_queue_thread_unsafe;
> > > +
> > > +	# added in 21.11
> > > +	rte_vhost_async_try_dequeue_burst;
> > >  };
> > > diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h index
> > > 1e56311725..89a31e4ca8 100644
> > > --- a/lib/vhost/vhost.h
> > > +++ b/lib/vhost/vhost.h
> > > @@ -49,7 +49,8 @@
> >
> > [...]
> >
> > > +uint16_t
> > > +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> > > +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t
> > count,
> > > +	int *nr_inflight)
> > > +{
> > > +	struct virtio_net *dev;
> > > +	struct rte_mbuf *rarp_mbuf = NULL;
> > > +	struct vhost_virtqueue *vq;
> > > +	int16_t success = 1;
> > > +
> > > +	*nr_inflight = -1;
> > > +
> > > +	dev = get_device(vid);
> > > +	if (!dev)
> > > +		return 0;
> > > +
> > > +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> > > +		VHOST_LOG_DATA(ERR,
> > > +			"(%d) %s: built-in vhost net backend is disabled.\n",
> > > +			dev->vid, __func__);
> > > +		return 0;
> > > +	}
> > > +
> > > +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
> > > +		VHOST_LOG_DATA(ERR,
> > > +			"(%d) %s: invalid virtqueue idx %d.\n",
> > > +			dev->vid, __func__, queue_id);
> > > +		return 0;
> > > +	}
> > > +
> > > +	vq = dev->virtqueue[queue_id];
> > > +
> > > +	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
> > > +		return 0;
> > > +
> > > +	if (unlikely(vq->enabled == 0)) {
> > > +		count = 0;
> > > +		goto out_access_unlock;
> > > +	}
> > > +
> > > +	if (unlikely(!vq->async_registered)) {
> > > +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for
> > queue
> > > id %d.\n",
> > > +			dev->vid, __func__, queue_id);
> > > +		count = 0;
> > > +		goto out_access_unlock;
> > > +	}
> > > +
> > > +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> > > +		vhost_user_iotlb_rd_lock(vq);
> > > +
> > > +	if (unlikely(vq->access_ok == 0))
> > > +		if (unlikely(vring_translate(dev, vq) < 0)) {
> > > +			count = 0;
> > > +			goto out_access_unlock;
> > > +		}
> > > +
> > > +	/*
> > > +	 * Construct a RARP broadcast packet, and inject it to the "pkts"
> > > +	 * array, to looks like that guest actually send such packet.
> > > +	 *
> > > +	 * Check user_send_rarp() for more information.
> > > +	 *
> > > +	 * broadcast_rarp shares a cacheline in the virtio_net structure
> > > +	 * with some fields that are accessed during enqueue and
> > > +	 * __atomic_compare_exchange_n causes a write if performed
> > compare
> > > +	 * and exchange. This could result in false sharing between enqueue
> > > +	 * and dequeue.
> > > +	 *
> > > +	 * Prevent unnecessary false sharing by reading broadcast_rarp first
> > > +	 * and only performing compare and exchange if the read indicates it
> > > +	 * is likely to be set.
> > > +	 */
> > > +	if (unlikely(__atomic_load_n(&dev->broadcast_rarp,
> > __ATOMIC_ACQUIRE) &&
> > > +			__atomic_compare_exchange_n(&dev-
> > >broadcast_rarp,
> > > +			&success, 0, 0, __ATOMIC_RELEASE,
> > __ATOMIC_RELAXED))) {
> > > +
> > > +		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev-
> > >mac);
> > > +		if (rarp_mbuf == NULL) {
> > > +			VHOST_LOG_DATA(ERR, "Failed to make RARP
> > packet.\n");
> > > +			count = 0;
> > > +			goto out;
> > > +		}
> > > +		count -= 1;
> > > +	}
> > > +
> > > +	if (unlikely(vq_is_packed(dev)))
> > > +		return 0;
> >
> > Should add a log here.
> >
> > Thanks,
> > Chenbo
  

Patch

diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
index 171e0096f6..9ed544db7a 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -303,6 +303,15 @@  The following is an overview of some key Vhost API functions:
   Clear inflight packets which are submitted to DMA engine in vhost async data
   path. Completed packets are returned to applications through ``pkts``.
 
+* ``rte_vhost_async_try_dequeue_burst(vid, queue_id, mbuf_pool, pkts, count, nr_inflight)``
+
+  This function tries to receive packets from the guest with offloading
+  copies to the async channel. The packets that are transfer completed
+  are returned in ``pkts``. The other packets that their copies are submitted
+  to the async channel but not completed are called "in-flight packets".
+  This function will not return in-flight packets until their copies are
+  completed by the async channel.
+
 Vhost-user Implementations
 --------------------------
 
diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index ad71555a7f..5e2429ab70 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -83,12 +83,18 @@  struct rte_vhost_async_channel_ops {
 		uint16_t max_packets);
 };
 
+struct async_nethdr {
+	struct virtio_net_hdr hdr;
+	bool valid;
+};
+
 /**
- * inflight async packet information
+ * in-flight async packet information
  */
 struct async_inflight_info {
 	struct rte_mbuf *mbuf;
-	uint16_t descs; /* num of descs inflight */
+	struct async_nethdr nethdr;
+	uint16_t descs; /* num of descs in-flight */
 	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
 };
 
@@ -255,5 +261,31 @@  int rte_vhost_async_get_inflight(int vid, uint16_t queue_id);
 __rte_experimental
 uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
+/**
+ * This function tries to receive packets from the guest with offloading
+ * copies to the async channel. The packets that are transfer completed
+ * are returned in "pkts". The other packets that their copies are submitted to
+ * the async channel but not completed are called "in-flight packets".
+ * This function will not return in-flight packets until their copies are
+ * completed by the async channel.
+ *
+ * @param vid
+ *  id of vhost device to dequeue data
+ * @param queue_id
+ *  queue id to dequeue data
+ * @param pkts
+ *  blank array to keep successfully dequeued packets
+ * @param count
+ *  size of the packet array
+ * @param nr_inflight
+ *  the amount of in-flight packets. If error occurred, its value is set to -1.
+ * @return
+ *  num of successfully dequeued packets
+ */
+__rte_experimental
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight);
 
 #endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/vhost/version.map b/lib/vhost/version.map
index c92a9d4962..1e033ad8e2 100644
--- a/lib/vhost/version.map
+++ b/lib/vhost/version.map
@@ -85,4 +85,7 @@  EXPERIMENTAL {
 	rte_vhost_async_channel_register_thread_unsafe;
 	rte_vhost_async_channel_unregister_thread_unsafe;
 	rte_vhost_clear_queue_thread_unsafe;
+
+	# added in 21.11
+	rte_vhost_async_try_dequeue_burst;
 };
diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
index 1e56311725..89a31e4ca8 100644
--- a/lib/vhost/vhost.h
+++ b/lib/vhost/vhost.h
@@ -49,7 +49,8 @@ 
 #define MAX_PKT_BURST 32
 
 #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
-#define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 4)
+#define MAX_ASYNC_COPY_VECTOR 1024
+#define VHOST_MAX_ASYNC_VEC (MAX_ASYNC_COPY_VECTOR * 2)
 
 #define PACKED_DESC_ENQUEUE_USED_FLAG(w)	\
 	((w) ? (VRING_DESC_F_AVAIL | VRING_DESC_F_USED | VRING_DESC_F_WRITE) : \
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index 0350f6fcce..67a8cd2c41 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -3170,3 +3170,534 @@  rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 
 	return count;
 }
+
+static __rte_always_inline int
+async_desc_to_mbuf(struct virtio_net *dev,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
+		  struct iovec *src_iovec, struct iovec *dst_iovec,
+		  struct rte_vhost_iov_iter *src_it,
+		  struct rte_vhost_iov_iter *dst_it,
+		  struct async_nethdr *nethdr,
+		  int nr_iovec)
+{
+	uint64_t buf_addr, buf_iova;
+	uint64_t mapped_len;
+	uint32_t tlen = 0;
+	uint32_t buf_avail, buf_offset, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	/* A counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	int tvec_idx = 0;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_len = buf_vec[vec_idx].buf_len;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+
+	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
+		return -1;
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/*
+			 * No luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/*
+	 * A virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->vhost_hlen)) {
+		buf_offset = dev->vhost_hlen - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->vhost_hlen) {
+		if (unlikely(++vec_idx >= nr_vec))
+			return -1;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->vhost_hlen;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
+	}
+
+	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), (uint32_t)buf_avail, 0);
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		while (cpy_len) {
+			void *hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
+						buf_iova + buf_offset, cpy_len,
+						&mapped_len);
+			if (unlikely(!hpa)) {
+				VHOST_LOG_DATA(ERR, "(%d) %s: failed to get hpa.\n",
+					dev->vid, __func__);
+				return -1;
+			}
+			if (unlikely(tvec_idx >= nr_iovec)) {
+				VHOST_LOG_DATA(ERR, "iovec is not enough for offloading\n");
+				return -1;
+			}
+
+			async_fill_vec(src_iovec + tvec_idx, hpa, (size_t)mapped_len);
+			async_fill_vec(dst_iovec + tvec_idx,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(cur, mbuf_offset),
+				(size_t)mapped_len);
+
+			tvec_idx++;
+			tlen += (uint32_t)mapped_len;
+			cpy_len -= (uint32_t)mapped_len;
+			mbuf_avail -= (uint32_t)mapped_len;
+			mbuf_offset += (uint32_t)mapped_len;
+			buf_avail -= (uint32_t)mapped_len;
+			buf_offset += (uint32_t)mapped_len;
+		}
+
+		/* This buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail = buf_len;
+
+			PRINT_PACKET(dev, (uintptr_t)buf_addr, (uint32_t)buf_avail, 0);
+		}
+
+		/*
+		 * This mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG_DATA(ERR, "Failed to allocate memory for mbuf.\n");
+				return -1;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	if (tlen) {
+		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
+		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
+		if (hdr) {
+			nethdr->valid = true;
+			nethdr->hdr = *hdr;
+		} else
+			nethdr->valid = false;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline uint16_t
+async_poll_dequeue_completed_split(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count, bool legacy_ol_flags)
+{
+	uint16_t n_pkts_cpl = 0, n_pkts_put = 0;
+	uint16_t start_idx, pkt_idx, from;
+	struct async_inflight_info *pkts_info;
+
+	pkt_idx = vq->async_pkts_idx & (vq->size - 1);
+	pkts_info = vq->async_pkts_info;
+	start_idx = virtio_dev_rx_async_get_info_idx(pkt_idx, vq->size,
+			vq->async_pkts_inflight_n);
+
+	if (count > vq->async_last_pkts_n) {
+		int ret;
+
+		ret = vq->async_ops.check_completed_copies(dev->vid, queue_id,
+				0, count - vq->async_last_pkts_n);
+		if (unlikely(ret < 0)) {
+			VHOST_LOG_DATA(ERR, "(%d) async channel poll error\n", dev->vid);
+			ret = 0;
+		}
+		n_pkts_cpl = ret;
+	}
+
+	n_pkts_cpl += vq->async_last_pkts_n;
+	if (unlikely(n_pkts_cpl == 0))
+		return 0;
+
+	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+
+	for (pkt_idx = 0; pkt_idx < n_pkts_put; pkt_idx++) {
+		from = (start_idx + pkt_idx) & (vq->size - 1);
+		pkts[pkt_idx] = pkts_info[from].mbuf;
+
+		if (pkts_info[from].nethdr.valid) {
+			vhost_dequeue_offload(&pkts_info[from].nethdr.hdr,
+					pkts[pkt_idx], legacy_ol_flags);
+		}
+	}
+
+	/* write back completed descs to used ring and update used idx */
+	write_back_completed_descs_split(vq, n_pkts_put);
+	__atomic_add_fetch(&vq->used->idx, n_pkts_put, __ATOMIC_RELEASE);
+	vhost_vring_call_split(dev, vq);
+
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	return n_pkts_put;
+}
+
+static __rte_always_inline uint16_t
+virtio_dev_tx_async_split(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, bool legacy_ol_flags)
+{
+	static bool allocerr_warned;
+	bool dropped = false;
+	uint16_t free_entries;
+	uint16_t pkt_idx, slot_idx = 0;
+	uint16_t nr_done_pkts = 0;
+	uint16_t nr_async_burst = 0;
+	uint16_t pkt_err = 0;
+	uint16_t iovec_idx = 0, it_idx = 0;
+	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
+	struct iovec *vec_pool = vq->vec_pool;
+	struct iovec *src_iovec = vec_pool;
+	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+	struct async_inflight_info *pkts_info = vq->async_pkts_info;
+	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
+
+	/**
+	 * The ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - vq->last_avail_idx;
+	if (free_entries == 0)
+		goto out;
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	count = RTE_MIN(count, MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n", dev->vid, count);
+
+	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
+		goto out;
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint16_t head_idx = 0;
+		uint16_t nr_vec = 0;
+		uint16_t to;
+		uint32_t buf_len;
+		int err;
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
+
+		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
+						&nr_vec, buf_vec,
+						&head_idx, &buf_len,
+						VHOST_ACCESS_RO) < 0)) {
+			dropped = true;
+			break;
+		}
+
+		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
+		if (unlikely(err)) {
+			/**
+			 * mbuf allocation fails for jumbo packets when external
+			 * buffer allocation is not allowed and linear buffer
+			 * is required. Drop this packet.
+			 */
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"Failed mbuf alloc of size %d from %s on %s.\n",
+					buf_len, mbuf_pool->name, dev->ifname);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		err = async_desc_to_mbuf(dev, buf_vec, nr_vec, pkt,
+				mbuf_pool, &src_iovec[iovec_idx],
+				&dst_iovec[iovec_idx], &it_pool[it_idx],
+				&it_pool[it_idx + 1],
+				&pkts_info[slot_idx].nethdr,
+				(VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx);
+		if (unlikely(err)) {
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"Failed to offload copies to async channel %s.\n",
+					dev->ifname);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		async_fill_desc(&tdes[nr_async_burst], &it_pool[it_idx], &it_pool[it_idx + 1]);
+		pkts_info[slot_idx].mbuf = pkt;
+		nr_async_burst++;
+
+		iovec_idx += it_pool[it_idx].nr_segs;
+		it_idx += 2;
+
+		/* store used descs */
+		to = vq->async_desc_idx_split & (vq->size - 1);
+		vq->async_descs_split[to].id = head_idx;
+		vq->async_descs_split[to].len = 0;
+		vq->async_desc_idx_split++;
+
+		vq->last_avail_idx++;
+
+		if (unlikely(nr_async_burst >= VHOST_ASYNC_BATCH_THRESHOLD)) {
+			uint16_t nr_pkts;
+			int32_t ret;
+
+			ret = vq->async_ops.transfer_data(dev->vid, queue_id,
+					tdes, 0, nr_async_burst);
+			if (unlikely(ret < 0)) {
+				VHOST_LOG_DATA(ERR, "(%d) async channel submit error\n", dev->vid);
+				ret = 0;
+			}
+			nr_pkts = ret;
+
+			vq->async_pkts_inflight_n += nr_pkts;
+			it_idx = 0;
+			iovec_idx = 0;
+
+			if (unlikely(nr_pkts < nr_async_burst)) {
+				pkt_err = nr_async_burst - nr_pkts;
+				nr_async_burst = 0;
+				pkt_idx++;
+				break;
+			}
+			nr_async_burst = 0;
+		}
+	}
+
+	if (unlikely(dropped))
+		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
+
+	if (nr_async_burst) {
+		uint16_t nr_pkts;
+		int32_t ret;
+
+		ret = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, nr_async_burst);
+		if (unlikely(ret < 0)) {
+			VHOST_LOG_DATA(ERR, "(%d) async channel submit error\n", dev->vid);
+			ret = 0;
+		}
+		nr_pkts = ret;
+
+		vq->async_pkts_inflight_n += nr_pkts;
+
+		if (unlikely(nr_pkts < nr_async_burst))
+			pkt_err = nr_async_burst - nr_pkts;
+	}
+
+	if (unlikely(pkt_err)) {
+		uint16_t nr_err_dma = pkt_err;
+
+		pkt_idx -= nr_err_dma;
+
+		/**
+		 * recover async channel copy related structures and free pktmbufs
+		 * for error pkts.
+		 */
+		vq->async_desc_idx_split -= nr_err_dma;
+		while (nr_err_dma-- > 0) {
+			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
+			slot_idx--;
+		}
+
+		/* recover available ring */
+		vq->last_avail_idx -= pkt_err;
+	}
+
+	vq->async_pkts_idx += pkt_idx;
+
+out:
+	if (vq->async_pkts_inflight_n > 0) {
+		nr_done_pkts = async_poll_dequeue_completed_split(dev, vq,
+					queue_id, pkts, count, legacy_ol_flags);
+	}
+
+	return nr_done_pkts;
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, true);
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, false);
+}
+
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight)
+{
+	struct virtio_net *dev;
+	struct rte_mbuf *rarp_mbuf = NULL;
+	struct vhost_virtqueue *vq;
+	int16_t success = 1;
+
+	*nr_inflight = -1;
+
+	dev = get_device(vid);
+	if (!dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: built-in vhost net backend is disabled.\n",
+			dev->vid, __func__);
+		return 0;
+	}
+
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
+		return 0;
+
+	if (unlikely(vq->enabled == 0)) {
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (unlikely(!vq->async_registered)) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(vq->access_ok == 0))
+		if (unlikely(vring_translate(dev, vq) < 0)) {
+			count = 0;
+			goto out_access_unlock;
+		}
+
+	/*
+	 * Construct a RARP broadcast packet, and inject it to the "pkts"
+	 * array, to looks like that guest actually send such packet.
+	 *
+	 * Check user_send_rarp() for more information.
+	 *
+	 * broadcast_rarp shares a cacheline in the virtio_net structure
+	 * with some fields that are accessed during enqueue and
+	 * __atomic_compare_exchange_n causes a write if performed compare
+	 * and exchange. This could result in false sharing between enqueue
+	 * and dequeue.
+	 *
+	 * Prevent unnecessary false sharing by reading broadcast_rarp first
+	 * and only performing compare and exchange if the read indicates it
+	 * is likely to be set.
+	 */
+	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
+			__atomic_compare_exchange_n(&dev->broadcast_rarp,
+			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
+
+		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
+		if (rarp_mbuf == NULL) {
+			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
+			count = 0;
+			goto out;
+		}
+		count -= 1;
+	}
+
+	if (unlikely(vq_is_packed(dev)))
+		return 0;
+
+	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
+				mbuf_pool, pkts, count);
+	else
+		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
+				mbuf_pool, pkts, count);
+
+out:
+	*nr_inflight = vq->async_pkts_inflight_n;
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	if (unlikely(rarp_mbuf != NULL)) {
+		/*
+		 * Inject it to the head of "pkts" array, so that switch's mac
+		 * learning table will get updated first.
+		 */
+		memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *));
+		pkts[0] = rarp_mbuf;
+		count += 1;
+	}
+
+	return count;
+}