diff mbox series

[v4] net/vhost: support asynchronous data path

Message ID 20220929194705.1753793-1-yuanx.wang@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers show
Series [v4] net/vhost: support asynchronous data path | expand

Checks

Context Check Description
ci/intel-Testing success Testing PASS
ci/iol-aarch64-unit-testing success Testing PASS
ci/iol-x86_64-unit-testing success Testing PASS
ci/iol-aarch64-compile-testing success Testing PASS
ci/iol-x86_64-compile-testing success Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/github-robot: build success github build: passed
ci/Intel-compilation success Compilation OK
ci/checkpatch warning coding style issues

Commit Message

Yuan Wang Sept. 29, 2022, 7:47 p.m. UTC
Vhost asynchronous data-path offloads packet copy from the CPU
to the DMA engine. As a result, large packet copy can be accelerated
by the DMA engine, and vhost can free CPU cycles for higher level
functions.

In this patch, we enable asynchronous data-path for vhostpmd.
Asynchronous data path is enabled per tx/rx queue, and users need
to specify the DMA device used by the tx/rx queue. Each tx/rx queue
only supports to use one DMA device, but one DMA device can be shared
among multiple tx/rx queues of different vhost PMD ports.

Two PMD parameters are added:
- dmas:	specify the used DMA device for a tx/rx queue.
	(Default: no queues enable asynchronous data path)
- dma-ring-size: DMA ring size.
	(Default: 4096).

Here is an example:
--vdev 'eth_vhost0,iface=./s0,dmas=[txq0@0000:00.01.0;rxq0@0000:00.01.1],dma-ring-size=4096'

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
---
v4:
- add txq allow_queuing check on poll completed
- unregister dma channel in destroy_device
- add dma ring size minimum limit
- fix serveral code style and logging issues

v3:
- add the API to version.map
 
v2:
- add missing file
- hide async_tx_poll_completed
- change default DMA ring size to 4096
---
 drivers/net/vhost/meson.build     |   1 +
 drivers/net/vhost/rte_eth_vhost.c | 512 ++++++++++++++++++++++++++++--
 drivers/net/vhost/rte_eth_vhost.h |  15 +
 drivers/net/vhost/version.map     |   7 +
 drivers/net/vhost/vhost_testpmd.c |  65 ++++
 5 files changed, 567 insertions(+), 33 deletions(-)
 create mode 100644 drivers/net/vhost/vhost_testpmd.c

Comments

Xia, Chenbo Oct. 19, 2022, 2:10 p.m. UTC | #1
Hi Yuan,

Overall it looks good to me, two inline comments below.

> -----Original Message-----
> From: Wang, YuanX <yuanx.wang@intel.com>
> Sent: Friday, September 30, 2022 3:47 AM
> To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Jiang, Cheng1
> <cheng1.jiang@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>; He, Xingguang
> <xingguang.he@intel.com>; Wang, YuanX <yuanx.wang@intel.com>
> Subject: [PATCH v4] net/vhost: support asynchronous data path
> 
> Vhost asynchronous data-path offloads packet copy from the CPU
> to the DMA engine. As a result, large packet copy can be accelerated
> by the DMA engine, and vhost can free CPU cycles for higher level
> functions.
> 
> In this patch, we enable asynchronous data-path for vhostpmd.
> Asynchronous data path is enabled per tx/rx queue, and users need
> to specify the DMA device used by the tx/rx queue. Each tx/rx queue
> only supports to use one DMA device, but one DMA device can be shared
> among multiple tx/rx queues of different vhost PMD ports.
> 
> Two PMD parameters are added:
> - dmas:	specify the used DMA device for a tx/rx queue.
> 	(Default: no queues enable asynchronous data path)
> - dma-ring-size: DMA ring size.
> 	(Default: 4096).
> 
> Here is an example:
> --vdev
> 'eth_vhost0,iface=./s0,dmas=[txq0@0000:00.01.0;rxq0@0000:00.01.1],dma-
> ring-size=4096'
> 
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
> Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
> ---
> v4:
> - add txq allow_queuing check on poll completed
> - unregister dma channel in destroy_device
> - add dma ring size minimum limit
> - fix serveral code style and logging issues
> 
> v3:
> - add the API to version.map
> 
> v2:
> - add missing file
> - hide async_tx_poll_completed
> - change default DMA ring size to 4096
> ---
>  drivers/net/vhost/meson.build     |   1 +
>  drivers/net/vhost/rte_eth_vhost.c | 512 ++++++++++++++++++++++++++++--
>  drivers/net/vhost/rte_eth_vhost.h |  15 +
>  drivers/net/vhost/version.map     |   7 +
>  drivers/net/vhost/vhost_testpmd.c |  65 ++++
>  5 files changed, 567 insertions(+), 33 deletions(-)
>  create mode 100644 drivers/net/vhost/vhost_testpmd.c
> 
> diff --git a/drivers/net/vhost/meson.build b/drivers/net/vhost/meson.build
> index f481a3a4b8..22a0ab3a58 100644
> --- a/drivers/net/vhost/meson.build
> +++ b/drivers/net/vhost/meson.build
> @@ -9,4 +9,5 @@ endif
> 
>  deps += 'vhost'
>  sources = files('rte_eth_vhost.c')
> +testpmd_sources = files('vhost_testpmd.c')
>  headers = files('rte_eth_vhost.h')
> diff --git a/drivers/net/vhost/rte_eth_vhost.c
> b/drivers/net/vhost/rte_eth_vhost.c
> index b152279fac..80cd9c8d92 100644
> --- a/drivers/net/vhost/rte_eth_vhost.c
> +++ b/drivers/net/vhost/rte_eth_vhost.c
> @@ -7,6 +7,7 @@
>  #include <pthread.h>
>  #include <stdbool.h>
>  #include <sys/epoll.h>
> +#include <ctype.h>
> 
>  #include <rte_mbuf.h>
>  #include <ethdev_driver.h>
> @@ -18,6 +19,8 @@
>  #include <rte_kvargs.h>
>  #include <rte_vhost.h>
>  #include <rte_spinlock.h>
> +#include <rte_vhost_async.h>
> +#include <rte_dmadev.h>
> 
>  #include "rte_eth_vhost.h"
> 
> @@ -37,8 +40,15 @@ enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
>  #define ETH_VHOST_LINEAR_BUF		"linear-buffer"
>  #define ETH_VHOST_EXT_BUF		"ext-buffer"
>  #define ETH_VHOST_LEGACY_OL_FLAGS	"legacy-ol-flags"
> +#define ETH_VHOST_DMA_ARG		"dmas"
> +#define ETH_VHOST_DMA_RING_SIZE		"dma-ring-size"
>  #define VHOST_MAX_PKT_BURST 32
> 
> +#define INVALID_DMA_ID		-1
> +#define DEFAULT_DMA_RING_SIZE	4096
> +/* Minimum package size is 64, dma ring size should be greater than 32 */
> +#define MINIMUM_DMA_RING_SIZE	64
> +
>  static const char *valid_arguments[] = {
>  	ETH_VHOST_IFACE_ARG,
>  	ETH_VHOST_QUEUES_ARG,
> @@ -49,6 +59,8 @@ static const char *valid_arguments[] = {
>  	ETH_VHOST_LINEAR_BUF,
>  	ETH_VHOST_EXT_BUF,
>  	ETH_VHOST_LEGACY_OL_FLAGS,
> +	ETH_VHOST_DMA_ARG,
> +	ETH_VHOST_DMA_RING_SIZE,
>  	NULL
>  };
> 
> @@ -80,8 +92,39 @@ struct vhost_queue {
>  	struct vhost_stats stats;
>  	int intr_enable;
>  	rte_spinlock_t intr_lock;
> +
> +	/* Flag of enabling async data path */
> +	bool async_register;
> +	/* DMA device ID */
> +	int16_t dma_id;
> +	/**
> +	 * For a Rx queue, "txq" points to its peer Tx queue.
> +	 * For a Tx queue, "txq" is never used.
> +	 */
> +	struct vhost_queue *txq;
> +	/* Array to keep DMA completed packets */
> +	struct rte_mbuf *cmpl_pkts[VHOST_MAX_PKT_BURST];
>  };
> 
> +struct dma_input_info {
> +	int16_t dmas[RTE_MAX_QUEUES_PER_PORT * 2];
> +	uint16_t dma_ring_size;
> +};
> +
> +static int16_t configured_dmas[RTE_DMADEV_DEFAULT_MAX];
> +static int dma_count;
> +
> +/**
> + * By default, its Rx path to call rte_vhost_poll_enqueue_completed() for
> enqueue operations.
> + * However, Rx function is never been called in testpmd "txonly" mode,
> thus causing virtio
> + * cannot receive DMA completed packets. To make txonly mode work
> correctly, we provide a
> + * command in testpmd to call rte_vhost_poll_enqueue_completed() in Tx
> path.
> + *
> + * When set async_tx_poll_completed to true, Tx path calls
> rte_vhost_poll_enqueue_completed();
> + * otherwise, Rx path calls it.
> + */
> +bool async_tx_poll_completed;
> +
>  struct pmd_internal {
>  	rte_atomic32_t dev_attached;
>  	char *iface_name;
> @@ -94,6 +137,10 @@ struct pmd_internal {
>  	bool vlan_strip;
>  	bool rx_sw_csum;
>  	bool tx_sw_csum;
> +	struct {
> +		int16_t dma_id;
> +		bool async_register;
> +	} queue_dmas[RTE_MAX_QUEUES_PER_PORT * 2];
>  };
> 
>  struct internal_list {
> @@ -124,6 +171,17 @@ struct rte_vhost_vring_state {
> 
>  static struct rte_vhost_vring_state *vring_states[RTE_MAX_ETHPORTS];
> 
> +static bool
> +dma_is_configured(int16_t dma_id)
> +{
> +	int i;
> +
> +	for (i = 0; i < dma_count; i++)
> +		if (configured_dmas[i] == dma_id)
> +			return true;
> +	return false;
> +}
> +
>  static int
>  vhost_dev_xstats_reset(struct rte_eth_dev *dev)
>  {
> @@ -396,15 +454,27 @@ vhost_dev_rx_sw_csum(struct rte_mbuf *mbuf)
>  	mbuf->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
>  }
> 
> +static inline void
> +vhost_tx_free_completed(int vid, uint16_t virtqueue_id, int16_t dma_id,
> +		struct rte_mbuf **pkts, uint16_t count)
> +{
> +	uint16_t i, ret;
> +
> +	ret = rte_vhost_poll_enqueue_completed(vid, virtqueue_id, pkts,
> count, dma_id, 0);
> +	for (i = 0; likely(i < ret); i++)
> +		rte_pktmbuf_free(pkts[i]);
> +}
> +
>  static uint16_t
>  eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
>  {
>  	struct vhost_queue *r = q;
>  	uint16_t i, nb_rx = 0;
>  	uint16_t nb_receive = nb_bufs;
> +	uint16_t nb_pkts, num;
> 
>  	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
> -		return 0;
> +		goto tx_poll;
> 
>  	rte_atomic32_set(&r->while_queuing, 1);
> 
> @@ -412,19 +482,32 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
>  		goto out;
> 
>  	/* Dequeue packets from guest TX queue */
> -	while (nb_receive) {
> -		uint16_t nb_pkts;
> -		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
> -						 VHOST_MAX_PKT_BURST);
> -
> -		nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
> -						  r->mb_pool, &bufs[nb_rx],
> -						  num);
> -
> -		nb_rx += nb_pkts;
> -		nb_receive -= nb_pkts;
> -		if (nb_pkts < num)
> -			break;
> +	if (!r->async_register) {
> +		while (nb_receive) {
> +			num = (uint16_t)RTE_MIN(nb_receive, VHOST_MAX_PKT_BURST);
> +			nb_pkts = rte_vhost_dequeue_burst(r->vid, r-
> >virtqueue_id,
> +						r->mb_pool, &bufs[nb_rx],
> +						num);
> +
> +			nb_rx += nb_pkts;
> +			nb_receive -= nb_pkts;
> +			if (nb_pkts < num)
> +				break;
> +		}
> +	} else {
> +		int nr_inflight;
> +
> +		while (nb_receive) {
> +			num = (uint16_t)RTE_MIN(nb_receive, VHOST_MAX_PKT_BURST);
> +			nb_pkts = rte_vhost_async_try_dequeue_burst(r->vid, r-
> >virtqueue_id,
> +						r->mb_pool, &bufs[nb_rx], num,
> &nr_inflight,
> +						r->dma_id, 0);
> +
> +			nb_rx += nb_pkts;
> +			nb_receive -= nb_pkts;
> +			if (nb_pkts < num)
> +				break;
> +		}
>  	}
> 
>  	r->stats.pkts += nb_rx;
> @@ -445,6 +528,17 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
>  out:
>  	rte_atomic32_set(&r->while_queuing, 0);
> 
> +tx_poll:
> +	/**
> +	 * Poll and free completed packets for the virtqueue of Tx queue.
> +	 * Note that we access Tx queue's virtqueue, which is protected
> +	 * by vring lock.
> +	 */
> +	if (r->txq->async_register && !async_tx_poll_completed &&
> +			rte_atomic32_read(&r->txq->allow_queuing) == 1)
> +		vhost_tx_free_completed(r->vid, r->txq->virtqueue_id, r->txq-
> >dma_id,
> +				r->cmpl_pkts, VHOST_MAX_PKT_BURST);
> +
>  	return nb_rx;
>  }
> 
> @@ -456,6 +550,7 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t
> nb_bufs)
>  	uint16_t nb_send = 0;
>  	uint64_t nb_bytes = 0;
>  	uint64_t nb_missed = 0;
> +	uint16_t nb_pkts, num;
> 
>  	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
>  		return 0;
> @@ -486,31 +581,49 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
>  	}
> 
>  	/* Enqueue packets to guest RX queue */
> -	while (nb_send) {
> -		uint16_t nb_pkts;
> -		uint16_t num = (uint16_t)RTE_MIN(nb_send,
> -						 VHOST_MAX_PKT_BURST);
> +	if (!r->async_register) {
> +		while (nb_send) {
> +			num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
> +			nb_pkts = rte_vhost_enqueue_burst(r->vid, r-
> >virtqueue_id,
> +						&bufs[nb_tx], num);
> +
> +			nb_tx += nb_pkts;
> +			nb_send -= nb_pkts;
> +			if (nb_pkts < num)
> +				break;
> +		}
> 
> -		nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
> -						  &bufs[nb_tx], num);
> +		for (i = 0; likely(i < nb_tx); i++) {
> +			nb_bytes += bufs[i]->pkt_len;
> +			rte_pktmbuf_free(bufs[i]);
> +		}
> 
> -		nb_tx += nb_pkts;
> -		nb_send -= nb_pkts;
> -		if (nb_pkts < num)
> -			break;
> -	}
> +	} else {
> +		while (nb_send) {
> +			num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
> +			nb_pkts = rte_vhost_submit_enqueue_burst(r->vid, r-
> >virtqueue_id,
> +							&bufs[nb_tx], num, r->dma_id, 0);
> +
> +			nb_tx += nb_pkts;
> +			nb_send -= nb_pkts;
> +			if (nb_pkts < num)
> +				break;
> +		}
> 
> -	for (i = 0; likely(i < nb_tx); i++)
> -		nb_bytes += bufs[i]->pkt_len;
> +		for (i = 0; likely(i < nb_tx); i++)
> +			nb_bytes += bufs[i]->pkt_len;
> 
> -	nb_missed = nb_bufs - nb_tx;
> +		if (unlikely(async_tx_poll_completed)) {
> +			vhost_tx_free_completed(r->vid, r->virtqueue_id, r-
> >dma_id, r->cmpl_pkts,
> +					VHOST_MAX_PKT_BURST);
> +		}
> +	}

About the stats, should we update them when packet is completed?
Anyway in vhost lib we have inflight xstats, user can know how many packets
are finished and how many are in-flight. 

> 
> +	nb_missed = nb_bufs - nb_tx;
>  	r->stats.pkts += nb_tx;
>  	r->stats.bytes += nb_bytes;
>  	r->stats.missed_pkts += nb_missed;
> 
> -	for (i = 0; likely(i < nb_tx); i++)
> -		rte_pktmbuf_free(bufs[i]);
>  out:
>  	rte_atomic32_set(&r->while_queuing, 0);
> 
> @@ -798,6 +911,8 @@ queue_setup(struct rte_eth_dev *eth_dev, struct
> pmd_internal *internal)
>  		vq->vid = internal->vid;
>  		vq->internal = internal;
>  		vq->port = eth_dev->data->port_id;
> +		if (i < eth_dev->data->nb_tx_queues)
> +			vq->txq = eth_dev->data->tx_queues[i];
>  	}
>  	for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
>  		vq = eth_dev->data->tx_queues[i];
> @@ -878,6 +993,30 @@ new_device(int vid)
>  	return 0;
>  }
> 
> +static inline void
> +async_clear_virtqueue(int vid, uint16_t virtqueue_id, int16_t dma_id)
> +{
> +	struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
> +	uint16_t i, ret, nr_done = 0;
> +
> +	if (vid == -1)
> +		return;
> +
> +	while (rte_vhost_async_get_inflight(vid, virtqueue_id) > 0) {
> +		ret = rte_vhost_clear_queue(vid, virtqueue_id, pkts,
> VHOST_MAX_PKT_BURST, dma_id,
> +				0);
> +		for (i = 0; i < ret ; i++)
> +			rte_pktmbuf_free(pkts[i]);
> +
> +		nr_done += ret;
> +	}
> +	VHOST_LOG(INFO, "Completed %u pkts for vid-%u vring-%u\n", nr_done,
> vid, virtqueue_id);
> +
> +	if (rte_vhost_async_channel_unregister(vid, virtqueue_id))
> +		VHOST_LOG(ERR, "Failed to unregister async for vid-%u vring-
> %u\n", vid,
> +				virtqueue_id);
> +}
> +
>  static void
>  destroy_device(int vid)
>  {
> @@ -908,13 +1047,27 @@ destroy_device(int vid)
>  			vq = eth_dev->data->rx_queues[i];
>  			if (!vq)
>  				continue;
> +			if (vq->async_register) {
> +				async_clear_virtqueue(vq->vid, vq->virtqueue_id,
> +							vq->dma_id);
> +				vq->async_register = false;
> +				internal->queue_dmas[vq-
> >virtqueue_id].async_register = false;
> +			}
>  			vq->vid = -1;
> +
>  		}
>  		for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
>  			vq = eth_dev->data->tx_queues[i];
>  			if (!vq)
>  				continue;
> +			if (vq->async_register) {
> +				async_clear_virtqueue(vq->vid, vq->virtqueue_id,
> +							vq->dma_id);
> +				vq->async_register = false;
> +				internal->queue_dmas[vq-
> >virtqueue_id].async_register = false;
> +			}
>  			vq->vid = -1;
> +
>  		}
>  	}
> 
> @@ -983,6 +1136,9 @@ vring_state_changed(int vid, uint16_t vring, int
> enable)
>  	struct rte_vhost_vring_state *state;
>  	struct rte_eth_dev *eth_dev;
>  	struct internal_list *list;
> +	struct vhost_queue *queue;
> +	struct pmd_internal *internal;
> +	int qid;
>  	char ifname[PATH_MAX];
> 
>  	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
> @@ -1011,6 +1167,65 @@ vring_state_changed(int vid, uint16_t vring, int
> enable)
> 
>  	update_queuing_status(eth_dev, false);
> 
> +	qid = vring / VIRTIO_QNUM;
> +	if (vring % VIRTIO_QNUM == VIRTIO_RXQ)
> +		queue = eth_dev->data->tx_queues[qid];
> +	else
> +		queue = eth_dev->data->rx_queues[qid];
> +
> +	if (!queue)
> +		goto skip;
> +
> +	internal = eth_dev->data->dev_private;
> +
> +	/* Register async data path for the queue assigned valid DMA device
> */
> +	if (internal->queue_dmas[queue->virtqueue_id].dma_id ==
> INVALID_DMA_ID)
> +		goto skip;
> +
> +	if (enable && !queue->async_register) {
> +		if (rte_vhost_async_channel_register_thread_unsafe(vid, vring))
> {
> +			VHOST_LOG(ERR, "Failed to register async for vid-%u
> vring-%u!\n", vid,
> +					vring);
> +			return -1;
> +		}
> +
> +		queue->async_register = true;
> +		internal->queue_dmas[vring].async_register = true;
> +
> +		VHOST_LOG(INFO, "Succeed to register async for vid-%u vring-
> %u\n", vid, vring);
> +	}
> +
> +	if (!enable && queue->async_register) {
> +		struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
> +		uint16_t ret, i, nr_done = 0;
> +		uint16_t dma_id = queue->dma_id;
> +
> +		while (rte_vhost_async_get_inflight_thread_unsafe(vid, vring) >
> 0) {
> +			ret = rte_vhost_clear_queue_thread_unsafe(vid, vring,
> pkts,
> +					VHOST_MAX_PKT_BURST, dma_id, 0);
> +
> +			for (i = 0; i < ret ; i++)
> +				rte_pktmbuf_free(pkts[i]);
> +
> +			nr_done += ret;
> +		}
> +
> +		VHOST_LOG(INFO, "Completed %u in-flight pkts for vid-%u vring-
> %u\n", nr_done, vid,
> +				vring);
> +
> +		if (rte_vhost_async_channel_unregister_thread_unsafe(vid,
> vring)) {
> +			VHOST_LOG(ERR, "Failed to unregister async for vid-%u
> vring-%u\n", vid,
> +					vring);
> +			return -1;
> +		}
> +
> +		queue->async_register = false;
> +		internal->queue_dmas[vring].async_register = false;
> +
> +		VHOST_LOG(INFO, "Succeed to unregister async for vid-%u vring-
> %u\n", vid, vring);
> +	}
> +
> +skip:
>  	VHOST_LOG(INFO, "vring%u is %s\n",
>  			vring, enable ? "enabled" : "disabled");
> 
> @@ -1159,6 +1374,12 @@ rte_eth_vhost_get_vid_from_port_id(uint16_t port_id)
>  	return vid;
>  }
> 
> +void
> +rte_eth_vhost_async_tx_poll_completed(bool enable)
> +{
> +	async_tx_poll_completed = enable;
> +}
> +
>  static int
>  eth_dev_configure(struct rte_eth_dev *dev)
>  {
> @@ -1220,6 +1441,7 @@ eth_dev_close(struct rte_eth_dev *dev)
>  {
>  	struct pmd_internal *internal;
>  	struct internal_list *list;
> +	struct vhost_queue *queue;
>  	unsigned int i, ret;
> 
>  	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
> @@ -1233,6 +1455,25 @@ eth_dev_close(struct rte_eth_dev *dev)
> 
>  	list = find_internal_resource(internal->iface_name);
>  	if (list) {
> +		/* Make sure all in-flight packets are completed before
> destroy vhost */
> +		if (dev->data->rx_queues) {
> +			for (i = 0; i < dev->data->nb_rx_queues; i++) {
> +				queue = dev->data->rx_queues[i];
> +				if (queue->async_register)
> +					async_clear_virtqueue(queue->vid, queue-
> >virtqueue_id,
> +							queue->dma_id);
> +			}
> +		}
> +
> +		if (dev->data->tx_queues) {
> +			for (i = 0; i < dev->data->nb_tx_queues; i++) {
> +				queue = dev->data->tx_queues[i];
> +				if (queue->async_register)
> +					async_clear_virtqueue(queue->vid, queue-
> >virtqueue_id,
> +							queue->dma_id);
> +			}
> +		}
> +
>  		rte_vhost_driver_unregister(internal->iface_name);
>  		pthread_mutex_lock(&internal_list_lock);
>  		TAILQ_REMOVE(&internal_list, list, next);
> @@ -1267,6 +1508,7 @@ eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t
> rx_queue_id,
>  		   struct rte_mempool *mb_pool)
>  {
>  	struct vhost_queue *vq;
> +	struct pmd_internal *internal = dev->data->dev_private;
> 
>  	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
>  			RTE_CACHE_LINE_SIZE, socket_id);
> @@ -1277,6 +1519,8 @@ eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t
> rx_queue_id,
> 
>  	vq->mb_pool = mb_pool;
>  	vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
> +	vq->async_register = internal->queue_dmas[vq-
> >virtqueue_id].async_register;
> +	vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
>  	rte_spinlock_init(&vq->intr_lock);
>  	dev->data->rx_queues[rx_queue_id] = vq;
> 
> @@ -1290,6 +1534,7 @@ eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t
> tx_queue_id,
>  		   const struct rte_eth_txconf *tx_conf __rte_unused)
>  {
>  	struct vhost_queue *vq;
> +	struct pmd_internal *internal = dev->data->dev_private;
> 
>  	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
>  			RTE_CACHE_LINE_SIZE, socket_id);
> @@ -1299,6 +1544,8 @@ eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t
> tx_queue_id,
>  	}
> 
>  	vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
> +	vq->async_register = internal->queue_dmas[vq-
> >virtqueue_id].async_register;
> +	vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
>  	rte_spinlock_init(&vq->intr_lock);
>  	dev->data->tx_queues[tx_queue_id] = vq;
> 
> @@ -1509,13 +1756,14 @@ static const struct eth_dev_ops ops = {
>  static int
>  eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
>  	int16_t queues, const unsigned int numa_node, uint64_t flags,
> -	uint64_t disable_flags)
> +	uint64_t disable_flags, struct dma_input_info *dma_input)
>  {
>  	const char *name = rte_vdev_device_name(dev);
>  	struct rte_eth_dev_data *data;
>  	struct pmd_internal *internal = NULL;
>  	struct rte_eth_dev *eth_dev = NULL;
>  	struct rte_ether_addr *eth_addr = NULL;
> +	int i;
> 
>  	VHOST_LOG(INFO, "Creating VHOST-USER backend on numa socket %u\n",
>  		numa_node);
> @@ -1564,6 +1812,12 @@ eth_dev_vhost_create(struct rte_vdev_device *dev,
> char *iface_name,
>  	eth_dev->rx_pkt_burst = eth_vhost_rx;
>  	eth_dev->tx_pkt_burst = eth_vhost_tx;
> 
> +	for (i = 0; i < RTE_MAX_QUEUES_PER_PORT * 2; i++) {
> +		/* Invalid DMA ID indicates the queue does not want to enable
> async data path */
> +		internal->queue_dmas[i].dma_id = dma_input->dmas[i];
> +		internal->queue_dmas[i].async_register = false;
> +	}
> +
>  	rte_eth_dev_probing_finish(eth_dev);
>  	return 0;
> 
> @@ -1603,6 +1857,155 @@ open_int(const char *key __rte_unused, const char
> *value, void *extra_args)
>  	return 0;
>  }
> 
> +static int
> +init_dma(int16_t dma_id, uint16_t ring_size)
> +{
> +	struct rte_dma_info info;
> +	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
> +	struct rte_dma_vchan_conf qconf = {
> +		.direction = RTE_DMA_DIR_MEM_TO_MEM,
> +	};
> +	int ret = 0;
> +
> +	if (dma_is_configured(dma_id))
> +		goto out;
> +
> +	if (rte_dma_info_get(dma_id, &info) != 0) {
> +		VHOST_LOG(ERR, "dma %u get info failed\n", dma_id);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (info.max_vchans < 1) {
> +		VHOST_LOG(ERR, "No channels available on dma %d\n", dma_id);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (rte_dma_configure(dma_id, &dev_config) != 0) {
> +		VHOST_LOG(ERR, "dma %u configure failed\n", dma_id);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	rte_dma_info_get(dma_id, &info);
> +	if (info.nb_vchans != 1) {
> +		VHOST_LOG(ERR, "dma %u has no queues\n", dma_id);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	ring_size = RTE_MAX(ring_size, MINIMUM_DMA_RING_SIZE);
> +	ring_size = RTE_MAX(ring_size, info.min_desc);
> +	qconf.nb_desc = RTE_MIN(ring_size, info.max_desc);
> +	if (rte_dma_vchan_setup(dma_id, 0, &qconf) != 0) {
> +		VHOST_LOG(ERR, "dma %u queue setup failed\n", dma_id);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (rte_dma_start(dma_id) != 0) {
> +		VHOST_LOG(ERR, "dma %u start failed\n", dma_id);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	configured_dmas[dma_count++] = dma_id;
> +
> +out:
> +	return ret;
> +}
> +
> +static int
> +open_dma(const char *key __rte_unused, const char *value, void
> *extra_args)
> +{
> +	struct dma_input_info *dma_input = extra_args;
> +	char *input = strndup(value, strlen(value) + 1);
> +	char *addrs = input;
> +	char *ptrs[2];
> +	char *start, *end, *substr;
> +	uint16_t qid, virtqueue_id;
> +	int16_t dma_id;
> +	int i, ret = 0;
> +
> +	while (isblank(*addrs))
> +		addrs++;
> +	if (*addrs == '\0') {
> +		VHOST_LOG(ERR, "No input DMA addresses\n");
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	/* process DMA devices within bracket. */
> +	addrs++;
> +	substr = strtok(addrs, ";]");
> +	if (!substr) {
> +		VHOST_LOG(ERR, "No input DMA addresse\n");
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	do {
> +		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
> +
> +		char *txq, *rxq;
> +		bool is_txq;
> +
> +		txq = strstr(ptrs[0], "txq");
> +		rxq = strstr(ptrs[0], "rxq");
> +		if (txq == NULL && rxq == NULL) {
> +			VHOST_LOG(ERR, "Illegal queue\n");
> +			ret = -1;
> +			goto out;
> +		} else if (txq) {
> +			is_txq = true;
> +			start = txq;
> +		} else {
> +			is_txq = false;
> +			start = rxq;
> +		}
> +
> +		start += 3;
> +		qid = strtol(start, &end, 0);
> +		if (end == start) {
> +			VHOST_LOG(ERR, "No input queue ID\n");
> +			ret = -1;
> +			goto out;
> +		}
> +
> +		virtqueue_id = is_txq ? qid * 2 + VIRTIO_RXQ : qid * 2 +
> VIRTIO_TXQ;
> +
> +		dma_id = rte_dma_get_dev_id_by_name(ptrs[1]);
> +		if (dma_id < 0) {
> +			VHOST_LOG(ERR, "Fail to find DMA device %s.\n", ptrs[1]);
> +			ret = -1;
> +			goto out;
> +		}
> +
> +		ret = init_dma(dma_id, dma_input->dma_ring_size);
> +		if (ret != 0) {
> +			VHOST_LOG(ERR, "Fail to initialize DMA %u\n", dma_id);
> +			ret = -1;
> +			break;
> +		}
> +
> +		dma_input->dmas[virtqueue_id] = dma_id;
> +
> +		substr = strtok(NULL, ";]");
> +	} while (substr);
> +
> +	for (i = 0; i < dma_count; i++) {
> +		if (rte_vhost_async_dma_configure(configured_dmas[i], 0) < 0)
> {
> +			VHOST_LOG(ERR, "Fail to configure DMA %u to vhost\n",
> configured_dmas[i]);
> +			ret = -1;
> +		}
> +	}
> +
> +out:
> +	free(input);
> +	return ret;
> +}
> +
>  static int
>  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
>  {
> @@ -1621,6 +2024,10 @@ rte_pmd_vhost_probe(struct rte_vdev_device *dev)
>  	int legacy_ol_flags = 0;
>  	struct rte_eth_dev *eth_dev;
>  	const char *name = rte_vdev_device_name(dev);
> +	struct dma_input_info dma_input;
> +
> +	memset(dma_input.dmas, INVALID_DMA_ID, sizeof(dma_input.dmas));
> +	dma_input.dma_ring_size = DEFAULT_DMA_RING_SIZE;
> 
>  	VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name);
> 
> @@ -1736,6 +2143,43 @@ rte_pmd_vhost_probe(struct rte_vdev_device *dev)
>  			goto out_free;
>  	}
> 
> +	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 1) {
> +		if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
> +			ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_RING_SIZE,
> +					&open_int, &dma_input.dma_ring_size);
> +			if (ret < 0)
> +				goto out_free;
> +
> +			if (!rte_is_power_of_2(dma_input.dma_ring_size)) {
> +				dma_input.dma_ring_size =
> rte_align32pow2(dma_input.dma_ring_size);
> +				VHOST_LOG(INFO, "Convert dma_ring_size to the
> power of two %u\n",
> +						dma_input.dma_ring_size);
> +			}
> +		} else {
> +			VHOST_LOG(WARNING, "%s is not specified, skip to
> parse %s\n",
> +					ETH_VHOST_DMA_ARG, ETH_VHOST_DMA_RING_SIZE);
> +		}
> +	}
> +
> +	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
> +		if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 0)
> +			VHOST_LOG(INFO, "Use default dma ring size %d\n",
> DEFAULT_DMA_RING_SIZE);
> +
> +		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
> +					 &open_dma, &dma_input);
> +		if (ret < 0) {
> +			VHOST_LOG(ERR, "Failed to parse %s\n",
> ETH_VHOST_DMA_ARG);
> +			goto out_free;
> +		}
> +
> +		flags |= RTE_VHOST_USER_ASYNC_COPY;
> +		/**
> +		 * Don't support live migration when enable
> +		 * DMA acceleration.
> +		 */
> +		disable_flags |= (1ULL << VHOST_F_LOG_ALL);
> +	}
> +
>  	if (legacy_ol_flags == 0)
>  		flags |= RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS;
> 
> @@ -1743,7 +2187,7 @@ rte_pmd_vhost_probe(struct rte_vdev_device *dev)
>  		dev->device.numa_node = rte_socket_id();
> 
>  	ret = eth_dev_vhost_create(dev, iface_name, queues,
> -				   dev->device.numa_node, flags, disable_flags);
> +			dev->device.numa_node, flags, disable_flags, &dma_input);
>  	if (ret == -1)
>  		VHOST_LOG(ERR, "Failed to create %s\n", name);
> 
> @@ -1787,4 +2231,6 @@ RTE_PMD_REGISTER_PARAM_STRING(net_vhost,
>  	"postcopy-support=<0|1> "
>  	"tso=<0|1> "
>  	"linear-buffer=<0|1> "
> -	"ext-buffer=<0|1>");
> +	"ext-buffer=<0|1> "
> +	"dma-ring-size=<int> "
> +	"dmas=[txq0@dma_addr;rxq0@dma_addr]");
> diff --git a/drivers/net/vhost/rte_eth_vhost.h
> b/drivers/net/vhost/rte_eth_vhost.h
> index 0e68b9f668..146c98803d 100644
> --- a/drivers/net/vhost/rte_eth_vhost.h
> +++ b/drivers/net/vhost/rte_eth_vhost.h
> @@ -52,6 +52,21 @@ int rte_eth_vhost_get_queue_event(uint16_t port_id,
>   */
>  int rte_eth_vhost_get_vid_from_port_id(uint16_t port_id);
> 
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change, or be removed, without prior
> notice
> + *
> + * By default, rte_vhost_poll_enqueue_completed() is called in Rx path.
> + * This function enables Tx path, rather than Rx path, to poll completed
> + * packets for vhost async enqueue operations. Note that virtio may never
> + * receive DMA completed packets if there are no more Tx operations.
> + *
> + * @param enable
> + *  True indicates Tx path to call rte_vhost_poll_enqueue_completed().
> + */
> +__rte_experimental
> +void rte_eth_vhost_async_tx_poll_completed(bool enable);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/drivers/net/vhost/version.map b/drivers/net/vhost/version.map
> index e42c89f1eb..0a40441227 100644
> --- a/drivers/net/vhost/version.map
> +++ b/drivers/net/vhost/version.map
> @@ -6,3 +6,10 @@ DPDK_23 {
> 
>  	local: *;
>  };
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	# added in 22.11
> +	rte_eth_vhost_async_tx_poll_completed;
> +};
> diff --git a/drivers/net/vhost/vhost_testpmd.c
> b/drivers/net/vhost/vhost_testpmd.c
> new file mode 100644
> index 0000000000..b8227d1086
> --- /dev/null
> +++ b/drivers/net/vhost/vhost_testpmd.c
> @@ -0,0 +1,65 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2022 Intel Corporation.
> + */
> +#include <rte_eth_vhost.h>
> +#include <cmdline_parse_num.h>
> +#include <cmdline_parse_string.h>
> +
> +#include "testpmd.h"
> +
> +struct cmd_tx_poll_result {
> +	cmdline_fixed_string_t async_vhost;
> +	cmdline_fixed_string_t tx;
> +	cmdline_fixed_string_t poll;
> +	cmdline_fixed_string_t completed;
> +	cmdline_fixed_string_t what;
> +};
> +
> +static cmdline_parse_token_string_t cmd_tx_async_vhost =
> +	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, async_vhost,
> "async_vhost");
> +static cmdline_parse_token_string_t cmd_tx_tx =
> +	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, tx, "tx");
> +static cmdline_parse_token_string_t cmd_tx_poll =
> +	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, poll, "poll");
> +static cmdline_parse_token_string_t cmd_tx_completed =
> +	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, completed,
> "completed");
> +static cmdline_parse_token_string_t cmd_tx_what =
> +	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, what, "on#off");
> +
> +static void
> +cmd_tx_poll_parsed(void *parsed_result, __rte_unused struct cmdline *cl,
> __rte_unused void *data)
> +{
> +	struct cmd_tx_poll_result *res = parsed_result;
> +
> +	if (!strcmp(res->what, "on"))
> +		rte_eth_vhost_async_tx_poll_completed(true);
> +	else if (!strcmp(res->what, "off"))
> +		rte_eth_vhost_async_tx_poll_completed(false);
> +}

Sorry I forgot to reply v3. I think it's better to do something like

fprintf(stderr, "Unknown parameter\n");

Thanks,
Chenbo

> +
> +static cmdline_parse_inst_t async_vhost_cmd_tx_poll = {
> +	.f = cmd_tx_poll_parsed,
> +	.data = NULL,
> +	.help_str = "async-vhost tx poll completed on|off",
> +	.tokens = {
> +		(void *)&cmd_tx_async_vhost,
> +		(void *)&cmd_tx_tx,
> +		(void *)&cmd_tx_poll,
> +		(void *)&cmd_tx_completed,
> +		(void *)&cmd_tx_what,
> +		NULL,
> +	},
> +};
> +
> +static struct testpmd_driver_commands async_vhost_cmds = {
> +	.commands = {
> +	{
> +		&async_vhost_cmd_tx_poll,
> +		"async_vhost tx poll completed (on|off)\n"
> +		"    Poll and free DMA completed packets in Tx path.\n",
> +	},
> +	{ NULL, NULL },
> +	},
> +};
> +
> +TESTPMD_ADD_DRIVER_COMMANDS(async_vhost_cmds)
> --
> 2.25.1
Yuan Wang Oct. 20, 2022, 2 p.m. UTC | #2
Hi Chenbo,

Thanks for your review. Please see replies inline.

> -----Original Message-----
> From: Xia, Chenbo <chenbo.xia@intel.com>
> Sent: Wednesday, October 19, 2022 10:11 PM
> To: Wang, YuanX <yuanx.wang@intel.com>; maxime.coquelin@redhat.com
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Jiang, Cheng1
> <cheng1.jiang@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>; He,
> Xingguang <xingguang.he@intel.com>
> Subject: RE: [PATCH v4] net/vhost: support asynchronous data path
> 
> Hi Yuan,
> 
> Overall it looks good to me, two inline comments below.
> 
> > -----Original Message-----
> > From: Wang, YuanX <yuanx.wang@intel.com>
> > Sent: Friday, September 30, 2022 3:47 AM
> > To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> > Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Jiang, Cheng1
> > <cheng1.jiang@intel.com>; Ma, WenwuX <wenwux.ma@intel.com>; He,
> > Xingguang <xingguang.he@intel.com>; Wang, YuanX
> <yuanx.wang@intel.com>
> > Subject: [PATCH v4] net/vhost: support asynchronous data path
> >
> > Vhost asynchronous data-path offloads packet copy from the CPU to the
> > DMA engine. As a result, large packet copy can be accelerated by the
> > DMA engine, and vhost can free CPU cycles for higher level functions.
> >

[...]

> > @@ -486,31 +581,49 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs,
> > uint16_t nb_bufs)
> >  	}
> >
> >  	/* Enqueue packets to guest RX queue */
> > -	while (nb_send) {
> > -		uint16_t nb_pkts;
> > -		uint16_t num = (uint16_t)RTE_MIN(nb_send,
> > -						 VHOST_MAX_PKT_BURST);
> > +	if (!r->async_register) {
> > +		while (nb_send) {
> > +			num = (uint16_t)RTE_MIN(nb_send,
> VHOST_MAX_PKT_BURST);
> > +			nb_pkts = rte_vhost_enqueue_burst(r->vid, r-
> > >virtqueue_id,
> > +						&bufs[nb_tx], num);
> > +
> > +			nb_tx += nb_pkts;
> > +			nb_send -= nb_pkts;
> > +			if (nb_pkts < num)
> > +				break;
> > +		}
> >
> > -		nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
> > -						  &bufs[nb_tx], num);
> > +		for (i = 0; likely(i < nb_tx); i++) {
> > +			nb_bytes += bufs[i]->pkt_len;
> > +			rte_pktmbuf_free(bufs[i]);
> > +		}
> >
> > -		nb_tx += nb_pkts;
> > -		nb_send -= nb_pkts;
> > -		if (nb_pkts < num)
> > -			break;
> > -	}
> > +	} else {
> > +		while (nb_send) {
> > +			num = (uint16_t)RTE_MIN(nb_send,
> VHOST_MAX_PKT_BURST);
> > +			nb_pkts = rte_vhost_submit_enqueue_burst(r->vid,
> r-
> > >virtqueue_id,
> > +							&bufs[nb_tx], num,
> r->dma_id, 0);
> > +
> > +			nb_tx += nb_pkts;
> > +			nb_send -= nb_pkts;
> > +			if (nb_pkts < num)
> > +				break;
> > +		}
> >
> > -	for (i = 0; likely(i < nb_tx); i++)
> > -		nb_bytes += bufs[i]->pkt_len;
> > +		for (i = 0; likely(i < nb_tx); i++)
> > +			nb_bytes += bufs[i]->pkt_len;
> >
> > -	nb_missed = nb_bufs - nb_tx;
> > +		if (unlikely(async_tx_poll_completed)) {
> > +			vhost_tx_free_completed(r->vid, r->virtqueue_id, r-
> > >dma_id, r->cmpl_pkts,
> > +					VHOST_MAX_PKT_BURST);
> > +		}
> > +	}
> 
> About the stats, should we update them when packet is completed?
> Anyway in vhost lib we have inflight xstats, user can know how many packets
> are finished and how many are in-flight.
> 

We think the way it is now makes more sense.

For asynchronous path, the completed packets from rte_vhost_poll_enqueue_completed() may not be the packets of this burst, but from the previous one. 
Using this value for statistics does not accurately reflect the statistics of this burst, 
and makes the missed_pkts definition inconsistent between the synchronous and asynchronous paths.

After rte_vhost_submit_enqueue_burst(), most enqueue process is completed, leaving the DMA copy and update vring index,
while the probability of error in the DMA copy is extremely low (If an error occurs, it can be considered a serious problem,
then the error log recorded, and the data plane should be stopped).
Therefore, the packages submitted to the DMA this burst can be used to represent the completed packages. This statistic is also more in line with the original meaning.

> >

[...]

> > +
> > +static void
> > +cmd_tx_poll_parsed(void *parsed_result, __rte_unused struct cmdline
> > +*cl,
> > __rte_unused void *data)
> > +{
> > +	struct cmd_tx_poll_result *res = parsed_result;
> > +
> > +	if (!strcmp(res->what, "on"))
> > +		rte_eth_vhost_async_tx_poll_completed(true);
> > +	else if (!strcmp(res->what, "off"))
> > +		rte_eth_vhost_async_tx_poll_completed(false);
> > +}
> 
> Sorry I forgot to reply v3. I think it's better to do something like
> 
> fprintf(stderr, "Unknown parameter\n");

Thanks for the suggestion. Will do in v5.

Thanks,
Yuan

> 
> Thanks,
> Chenbo
> 
> > +
> > +static cmdline_parse_inst_t async_vhost_cmd_tx_poll = {
> > +	.f = cmd_tx_poll_parsed,
> > +	.data = NULL,
> > +	.help_str = "async-vhost tx poll completed on|off",
> > +	.tokens = {
> > +		(void *)&cmd_tx_async_vhost,
> > +		(void *)&cmd_tx_tx,
> > +		(void *)&cmd_tx_poll,
> > +		(void *)&cmd_tx_completed,
> > +		(void *)&cmd_tx_what,
> > +		NULL,
> > +	},
> > +};
> > +
> > +static struct testpmd_driver_commands async_vhost_cmds = {
> > +	.commands = {
> > +	{
> > +		&async_vhost_cmd_tx_poll,
> > +		"async_vhost tx poll completed (on|off)\n"
> > +		"    Poll and free DMA completed packets in Tx path.\n",
> > +	},
> > +	{ NULL, NULL },
> > +	},
> > +};
> > +
> > +TESTPMD_ADD_DRIVER_COMMANDS(async_vhost_cmds)
> > --
> > 2.25.1
diff mbox series

Patch

diff --git a/drivers/net/vhost/meson.build b/drivers/net/vhost/meson.build
index f481a3a4b8..22a0ab3a58 100644
--- a/drivers/net/vhost/meson.build
+++ b/drivers/net/vhost/meson.build
@@ -9,4 +9,5 @@  endif
 
 deps += 'vhost'
 sources = files('rte_eth_vhost.c')
+testpmd_sources = files('vhost_testpmd.c')
 headers = files('rte_eth_vhost.h')
diff --git a/drivers/net/vhost/rte_eth_vhost.c b/drivers/net/vhost/rte_eth_vhost.c
index b152279fac..80cd9c8d92 100644
--- a/drivers/net/vhost/rte_eth_vhost.c
+++ b/drivers/net/vhost/rte_eth_vhost.c
@@ -7,6 +7,7 @@ 
 #include <pthread.h>
 #include <stdbool.h>
 #include <sys/epoll.h>
+#include <ctype.h>
 
 #include <rte_mbuf.h>
 #include <ethdev_driver.h>
@@ -18,6 +19,8 @@ 
 #include <rte_kvargs.h>
 #include <rte_vhost.h>
 #include <rte_spinlock.h>
+#include <rte_vhost_async.h>
+#include <rte_dmadev.h>
 
 #include "rte_eth_vhost.h"
 
@@ -37,8 +40,15 @@  enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
 #define ETH_VHOST_LINEAR_BUF		"linear-buffer"
 #define ETH_VHOST_EXT_BUF		"ext-buffer"
 #define ETH_VHOST_LEGACY_OL_FLAGS	"legacy-ol-flags"
+#define ETH_VHOST_DMA_ARG		"dmas"
+#define ETH_VHOST_DMA_RING_SIZE		"dma-ring-size"
 #define VHOST_MAX_PKT_BURST 32
 
+#define INVALID_DMA_ID		-1
+#define DEFAULT_DMA_RING_SIZE	4096
+/* Minimum package size is 64, dma ring size should be greater than 32 */
+#define MINIMUM_DMA_RING_SIZE	64
+
 static const char *valid_arguments[] = {
 	ETH_VHOST_IFACE_ARG,
 	ETH_VHOST_QUEUES_ARG,
@@ -49,6 +59,8 @@  static const char *valid_arguments[] = {
 	ETH_VHOST_LINEAR_BUF,
 	ETH_VHOST_EXT_BUF,
 	ETH_VHOST_LEGACY_OL_FLAGS,
+	ETH_VHOST_DMA_ARG,
+	ETH_VHOST_DMA_RING_SIZE,
 	NULL
 };
 
@@ -80,8 +92,39 @@  struct vhost_queue {
 	struct vhost_stats stats;
 	int intr_enable;
 	rte_spinlock_t intr_lock;
+
+	/* Flag of enabling async data path */
+	bool async_register;
+	/* DMA device ID */
+	int16_t dma_id;
+	/**
+	 * For a Rx queue, "txq" points to its peer Tx queue.
+	 * For a Tx queue, "txq" is never used.
+	 */
+	struct vhost_queue *txq;
+	/* Array to keep DMA completed packets */
+	struct rte_mbuf *cmpl_pkts[VHOST_MAX_PKT_BURST];
 };
 
+struct dma_input_info {
+	int16_t dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint16_t dma_ring_size;
+};
+
+static int16_t configured_dmas[RTE_DMADEV_DEFAULT_MAX];
+static int dma_count;
+
+/**
+ * By default, its Rx path to call rte_vhost_poll_enqueue_completed() for enqueue operations.
+ * However, Rx function is never been called in testpmd "txonly" mode, thus causing virtio
+ * cannot receive DMA completed packets. To make txonly mode work correctly, we provide a
+ * command in testpmd to call rte_vhost_poll_enqueue_completed() in Tx path.
+ *
+ * When set async_tx_poll_completed to true, Tx path calls rte_vhost_poll_enqueue_completed();
+ * otherwise, Rx path calls it.
+ */
+bool async_tx_poll_completed;
+
 struct pmd_internal {
 	rte_atomic32_t dev_attached;
 	char *iface_name;
@@ -94,6 +137,10 @@  struct pmd_internal {
 	bool vlan_strip;
 	bool rx_sw_csum;
 	bool tx_sw_csum;
+	struct {
+		int16_t dma_id;
+		bool async_register;
+	} queue_dmas[RTE_MAX_QUEUES_PER_PORT * 2];
 };
 
 struct internal_list {
@@ -124,6 +171,17 @@  struct rte_vhost_vring_state {
 
 static struct rte_vhost_vring_state *vring_states[RTE_MAX_ETHPORTS];
 
+static bool
+dma_is_configured(int16_t dma_id)
+{
+	int i;
+
+	for (i = 0; i < dma_count; i++)
+		if (configured_dmas[i] == dma_id)
+			return true;
+	return false;
+}
+
 static int
 vhost_dev_xstats_reset(struct rte_eth_dev *dev)
 {
@@ -396,15 +454,27 @@  vhost_dev_rx_sw_csum(struct rte_mbuf *mbuf)
 	mbuf->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
 }
 
+static inline void
+vhost_tx_free_completed(int vid, uint16_t virtqueue_id, int16_t dma_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	uint16_t i, ret;
+
+	ret = rte_vhost_poll_enqueue_completed(vid, virtqueue_id, pkts, count, dma_id, 0);
+	for (i = 0; likely(i < ret); i++)
+		rte_pktmbuf_free(pkts[i]);
+}
+
 static uint16_t
 eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 {
 	struct vhost_queue *r = q;
 	uint16_t i, nb_rx = 0;
 	uint16_t nb_receive = nb_bufs;
+	uint16_t nb_pkts, num;
 
 	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
-		return 0;
+		goto tx_poll;
 
 	rte_atomic32_set(&r->while_queuing, 1);
 
@@ -412,19 +482,32 @@  eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 		goto out;
 
 	/* Dequeue packets from guest TX queue */
-	while (nb_receive) {
-		uint16_t nb_pkts;
-		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
-						 VHOST_MAX_PKT_BURST);
-
-		nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
-						  r->mb_pool, &bufs[nb_rx],
-						  num);
-
-		nb_rx += nb_pkts;
-		nb_receive -= nb_pkts;
-		if (nb_pkts < num)
-			break;
+	if (!r->async_register) {
+		while (nb_receive) {
+			num = (uint16_t)RTE_MIN(nb_receive, VHOST_MAX_PKT_BURST);
+			nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
+						r->mb_pool, &bufs[nb_rx],
+						num);
+
+			nb_rx += nb_pkts;
+			nb_receive -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
+	} else {
+		int nr_inflight;
+
+		while (nb_receive) {
+			num = (uint16_t)RTE_MIN(nb_receive, VHOST_MAX_PKT_BURST);
+			nb_pkts = rte_vhost_async_try_dequeue_burst(r->vid, r->virtqueue_id,
+						r->mb_pool, &bufs[nb_rx], num, &nr_inflight,
+						r->dma_id, 0);
+
+			nb_rx += nb_pkts;
+			nb_receive -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
 	}
 
 	r->stats.pkts += nb_rx;
@@ -445,6 +528,17 @@  eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 out:
 	rte_atomic32_set(&r->while_queuing, 0);
 
+tx_poll:
+	/**
+	 * Poll and free completed packets for the virtqueue of Tx queue.
+	 * Note that we access Tx queue's virtqueue, which is protected
+	 * by vring lock.
+	 */
+	if (r->txq->async_register && !async_tx_poll_completed &&
+			rte_atomic32_read(&r->txq->allow_queuing) == 1)
+		vhost_tx_free_completed(r->vid, r->txq->virtqueue_id, r->txq->dma_id,
+				r->cmpl_pkts, VHOST_MAX_PKT_BURST);
+
 	return nb_rx;
 }
 
@@ -456,6 +550,7 @@  eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 	uint16_t nb_send = 0;
 	uint64_t nb_bytes = 0;
 	uint64_t nb_missed = 0;
+	uint16_t nb_pkts, num;
 
 	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
 		return 0;
@@ -486,31 +581,49 @@  eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 	}
 
 	/* Enqueue packets to guest RX queue */
-	while (nb_send) {
-		uint16_t nb_pkts;
-		uint16_t num = (uint16_t)RTE_MIN(nb_send,
-						 VHOST_MAX_PKT_BURST);
+	if (!r->async_register) {
+		while (nb_send) {
+			num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
+			nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
+						&bufs[nb_tx], num);
+
+			nb_tx += nb_pkts;
+			nb_send -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
 
-		nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
-						  &bufs[nb_tx], num);
+		for (i = 0; likely(i < nb_tx); i++) {
+			nb_bytes += bufs[i]->pkt_len;
+			rte_pktmbuf_free(bufs[i]);
+		}
 
-		nb_tx += nb_pkts;
-		nb_send -= nb_pkts;
-		if (nb_pkts < num)
-			break;
-	}
+	} else {
+		while (nb_send) {
+			num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
+			nb_pkts = rte_vhost_submit_enqueue_burst(r->vid, r->virtqueue_id,
+							&bufs[nb_tx], num, r->dma_id, 0);
+
+			nb_tx += nb_pkts;
+			nb_send -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
 
-	for (i = 0; likely(i < nb_tx); i++)
-		nb_bytes += bufs[i]->pkt_len;
+		for (i = 0; likely(i < nb_tx); i++)
+			nb_bytes += bufs[i]->pkt_len;
 
-	nb_missed = nb_bufs - nb_tx;
+		if (unlikely(async_tx_poll_completed)) {
+			vhost_tx_free_completed(r->vid, r->virtqueue_id, r->dma_id, r->cmpl_pkts,
+					VHOST_MAX_PKT_BURST);
+		}
+	}
 
+	nb_missed = nb_bufs - nb_tx;
 	r->stats.pkts += nb_tx;
 	r->stats.bytes += nb_bytes;
 	r->stats.missed_pkts += nb_missed;
 
-	for (i = 0; likely(i < nb_tx); i++)
-		rte_pktmbuf_free(bufs[i]);
 out:
 	rte_atomic32_set(&r->while_queuing, 0);
 
@@ -798,6 +911,8 @@  queue_setup(struct rte_eth_dev *eth_dev, struct pmd_internal *internal)
 		vq->vid = internal->vid;
 		vq->internal = internal;
 		vq->port = eth_dev->data->port_id;
+		if (i < eth_dev->data->nb_tx_queues)
+			vq->txq = eth_dev->data->tx_queues[i];
 	}
 	for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
 		vq = eth_dev->data->tx_queues[i];
@@ -878,6 +993,30 @@  new_device(int vid)
 	return 0;
 }
 
+static inline void
+async_clear_virtqueue(int vid, uint16_t virtqueue_id, int16_t dma_id)
+{
+	struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
+	uint16_t i, ret, nr_done = 0;
+
+	if (vid == -1)
+		return;
+
+	while (rte_vhost_async_get_inflight(vid, virtqueue_id) > 0) {
+		ret = rte_vhost_clear_queue(vid, virtqueue_id, pkts, VHOST_MAX_PKT_BURST, dma_id,
+				0);
+		for (i = 0; i < ret ; i++)
+			rte_pktmbuf_free(pkts[i]);
+
+		nr_done += ret;
+	}
+	VHOST_LOG(INFO, "Completed %u pkts for vid-%u vring-%u\n", nr_done, vid, virtqueue_id);
+
+	if (rte_vhost_async_channel_unregister(vid, virtqueue_id))
+		VHOST_LOG(ERR, "Failed to unregister async for vid-%u vring-%u\n", vid,
+				virtqueue_id);
+}
+
 static void
 destroy_device(int vid)
 {
@@ -908,13 +1047,27 @@  destroy_device(int vid)
 			vq = eth_dev->data->rx_queues[i];
 			if (!vq)
 				continue;
+			if (vq->async_register) {
+				async_clear_virtqueue(vq->vid, vq->virtqueue_id,
+							vq->dma_id);
+				vq->async_register = false;
+				internal->queue_dmas[vq->virtqueue_id].async_register = false;
+			}
 			vq->vid = -1;
+
 		}
 		for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
 			vq = eth_dev->data->tx_queues[i];
 			if (!vq)
 				continue;
+			if (vq->async_register) {
+				async_clear_virtqueue(vq->vid, vq->virtqueue_id,
+							vq->dma_id);
+				vq->async_register = false;
+				internal->queue_dmas[vq->virtqueue_id].async_register = false;
+			}
 			vq->vid = -1;
+
 		}
 	}
 
@@ -983,6 +1136,9 @@  vring_state_changed(int vid, uint16_t vring, int enable)
 	struct rte_vhost_vring_state *state;
 	struct rte_eth_dev *eth_dev;
 	struct internal_list *list;
+	struct vhost_queue *queue;
+	struct pmd_internal *internal;
+	int qid;
 	char ifname[PATH_MAX];
 
 	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
@@ -1011,6 +1167,65 @@  vring_state_changed(int vid, uint16_t vring, int enable)
 
 	update_queuing_status(eth_dev, false);
 
+	qid = vring / VIRTIO_QNUM;
+	if (vring % VIRTIO_QNUM == VIRTIO_RXQ)
+		queue = eth_dev->data->tx_queues[qid];
+	else
+		queue = eth_dev->data->rx_queues[qid];
+
+	if (!queue)
+		goto skip;
+
+	internal = eth_dev->data->dev_private;
+
+	/* Register async data path for the queue assigned valid DMA device */
+	if (internal->queue_dmas[queue->virtqueue_id].dma_id == INVALID_DMA_ID)
+		goto skip;
+
+	if (enable && !queue->async_register) {
+		if (rte_vhost_async_channel_register_thread_unsafe(vid, vring)) {
+			VHOST_LOG(ERR, "Failed to register async for vid-%u vring-%u!\n", vid,
+					vring);
+			return -1;
+		}
+
+		queue->async_register = true;
+		internal->queue_dmas[vring].async_register = true;
+
+		VHOST_LOG(INFO, "Succeed to register async for vid-%u vring-%u\n", vid, vring);
+	}
+
+	if (!enable && queue->async_register) {
+		struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
+		uint16_t ret, i, nr_done = 0;
+		uint16_t dma_id = queue->dma_id;
+
+		while (rte_vhost_async_get_inflight_thread_unsafe(vid, vring) > 0) {
+			ret = rte_vhost_clear_queue_thread_unsafe(vid, vring, pkts,
+					VHOST_MAX_PKT_BURST, dma_id, 0);
+
+			for (i = 0; i < ret ; i++)
+				rte_pktmbuf_free(pkts[i]);
+
+			nr_done += ret;
+		}
+
+		VHOST_LOG(INFO, "Completed %u in-flight pkts for vid-%u vring-%u\n", nr_done, vid,
+				vring);
+
+		if (rte_vhost_async_channel_unregister_thread_unsafe(vid, vring)) {
+			VHOST_LOG(ERR, "Failed to unregister async for vid-%u vring-%u\n", vid,
+					vring);
+			return -1;
+		}
+
+		queue->async_register = false;
+		internal->queue_dmas[vring].async_register = false;
+
+		VHOST_LOG(INFO, "Succeed to unregister async for vid-%u vring-%u\n", vid, vring);
+	}
+
+skip:
 	VHOST_LOG(INFO, "vring%u is %s\n",
 			vring, enable ? "enabled" : "disabled");
 
@@ -1159,6 +1374,12 @@  rte_eth_vhost_get_vid_from_port_id(uint16_t port_id)
 	return vid;
 }
 
+void
+rte_eth_vhost_async_tx_poll_completed(bool enable)
+{
+	async_tx_poll_completed = enable;
+}
+
 static int
 eth_dev_configure(struct rte_eth_dev *dev)
 {
@@ -1220,6 +1441,7 @@  eth_dev_close(struct rte_eth_dev *dev)
 {
 	struct pmd_internal *internal;
 	struct internal_list *list;
+	struct vhost_queue *queue;
 	unsigned int i, ret;
 
 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
@@ -1233,6 +1455,25 @@  eth_dev_close(struct rte_eth_dev *dev)
 
 	list = find_internal_resource(internal->iface_name);
 	if (list) {
+		/* Make sure all in-flight packets are completed before destroy vhost */
+		if (dev->data->rx_queues) {
+			for (i = 0; i < dev->data->nb_rx_queues; i++) {
+				queue = dev->data->rx_queues[i];
+				if (queue->async_register)
+					async_clear_virtqueue(queue->vid, queue->virtqueue_id,
+							queue->dma_id);
+			}
+		}
+
+		if (dev->data->tx_queues) {
+			for (i = 0; i < dev->data->nb_tx_queues; i++) {
+				queue = dev->data->tx_queues[i];
+				if (queue->async_register)
+					async_clear_virtqueue(queue->vid, queue->virtqueue_id,
+							queue->dma_id);
+			}
+		}
+
 		rte_vhost_driver_unregister(internal->iface_name);
 		pthread_mutex_lock(&internal_list_lock);
 		TAILQ_REMOVE(&internal_list, list, next);
@@ -1267,6 +1508,7 @@  eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
 		   struct rte_mempool *mb_pool)
 {
 	struct vhost_queue *vq;
+	struct pmd_internal *internal = dev->data->dev_private;
 
 	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
 			RTE_CACHE_LINE_SIZE, socket_id);
@@ -1277,6 +1519,8 @@  eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
 
 	vq->mb_pool = mb_pool;
 	vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
+	vq->async_register = internal->queue_dmas[vq->virtqueue_id].async_register;
+	vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
 	rte_spinlock_init(&vq->intr_lock);
 	dev->data->rx_queues[rx_queue_id] = vq;
 
@@ -1290,6 +1534,7 @@  eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 		   const struct rte_eth_txconf *tx_conf __rte_unused)
 {
 	struct vhost_queue *vq;
+	struct pmd_internal *internal = dev->data->dev_private;
 
 	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
 			RTE_CACHE_LINE_SIZE, socket_id);
@@ -1299,6 +1544,8 @@  eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 	}
 
 	vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
+	vq->async_register = internal->queue_dmas[vq->virtqueue_id].async_register;
+	vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
 	rte_spinlock_init(&vq->intr_lock);
 	dev->data->tx_queues[tx_queue_id] = vq;
 
@@ -1509,13 +1756,14 @@  static const struct eth_dev_ops ops = {
 static int
 eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
 	int16_t queues, const unsigned int numa_node, uint64_t flags,
-	uint64_t disable_flags)
+	uint64_t disable_flags, struct dma_input_info *dma_input)
 {
 	const char *name = rte_vdev_device_name(dev);
 	struct rte_eth_dev_data *data;
 	struct pmd_internal *internal = NULL;
 	struct rte_eth_dev *eth_dev = NULL;
 	struct rte_ether_addr *eth_addr = NULL;
+	int i;
 
 	VHOST_LOG(INFO, "Creating VHOST-USER backend on numa socket %u\n",
 		numa_node);
@@ -1564,6 +1812,12 @@  eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
 	eth_dev->rx_pkt_burst = eth_vhost_rx;
 	eth_dev->tx_pkt_burst = eth_vhost_tx;
 
+	for (i = 0; i < RTE_MAX_QUEUES_PER_PORT * 2; i++) {
+		/* Invalid DMA ID indicates the queue does not want to enable async data path */
+		internal->queue_dmas[i].dma_id = dma_input->dmas[i];
+		internal->queue_dmas[i].async_register = false;
+	}
+
 	rte_eth_dev_probing_finish(eth_dev);
 	return 0;
 
@@ -1603,6 +1857,155 @@  open_int(const char *key __rte_unused, const char *value, void *extra_args)
 	return 0;
 }
 
+static int
+init_dma(int16_t dma_id, uint16_t ring_size)
+{
+	struct rte_dma_info info;
+	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
+	struct rte_dma_vchan_conf qconf = {
+		.direction = RTE_DMA_DIR_MEM_TO_MEM,
+	};
+	int ret = 0;
+
+	if (dma_is_configured(dma_id))
+		goto out;
+
+	if (rte_dma_info_get(dma_id, &info) != 0) {
+		VHOST_LOG(ERR, "dma %u get info failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	if (info.max_vchans < 1) {
+		VHOST_LOG(ERR, "No channels available on dma %d\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	if (rte_dma_configure(dma_id, &dev_config) != 0) {
+		VHOST_LOG(ERR, "dma %u configure failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	rte_dma_info_get(dma_id, &info);
+	if (info.nb_vchans != 1) {
+		VHOST_LOG(ERR, "dma %u has no queues\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	ring_size = RTE_MAX(ring_size, MINIMUM_DMA_RING_SIZE);
+	ring_size = RTE_MAX(ring_size, info.min_desc);
+	qconf.nb_desc = RTE_MIN(ring_size, info.max_desc);
+	if (rte_dma_vchan_setup(dma_id, 0, &qconf) != 0) {
+		VHOST_LOG(ERR, "dma %u queue setup failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	if (rte_dma_start(dma_id) != 0) {
+		VHOST_LOG(ERR, "dma %u start failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	configured_dmas[dma_count++] = dma_id;
+
+out:
+	return ret;
+}
+
+static int
+open_dma(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	struct dma_input_info *dma_input = extra_args;
+	char *input = strndup(value, strlen(value) + 1);
+	char *addrs = input;
+	char *ptrs[2];
+	char *start, *end, *substr;
+	uint16_t qid, virtqueue_id;
+	int16_t dma_id;
+	int i, ret = 0;
+
+	while (isblank(*addrs))
+		addrs++;
+	if (*addrs == '\0') {
+		VHOST_LOG(ERR, "No input DMA addresses\n");
+		ret = -1;
+		goto out;
+	}
+
+	/* process DMA devices within bracket. */
+	addrs++;
+	substr = strtok(addrs, ";]");
+	if (!substr) {
+		VHOST_LOG(ERR, "No input DMA addresse\n");
+		ret = -1;
+		goto out;
+	}
+
+	do {
+		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
+
+		char *txq, *rxq;
+		bool is_txq;
+
+		txq = strstr(ptrs[0], "txq");
+		rxq = strstr(ptrs[0], "rxq");
+		if (txq == NULL && rxq == NULL) {
+			VHOST_LOG(ERR, "Illegal queue\n");
+			ret = -1;
+			goto out;
+		} else if (txq) {
+			is_txq = true;
+			start = txq;
+		} else {
+			is_txq = false;
+			start = rxq;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		virtqueue_id = is_txq ? qid * 2 + VIRTIO_RXQ : qid * 2 + VIRTIO_TXQ;
+
+		dma_id = rte_dma_get_dev_id_by_name(ptrs[1]);
+		if (dma_id < 0) {
+			VHOST_LOG(ERR, "Fail to find DMA device %s.\n", ptrs[1]);
+			ret = -1;
+			goto out;
+		}
+
+		ret = init_dma(dma_id, dma_input->dma_ring_size);
+		if (ret != 0) {
+			VHOST_LOG(ERR, "Fail to initialize DMA %u\n", dma_id);
+			ret = -1;
+			break;
+		}
+
+		dma_input->dmas[virtqueue_id] = dma_id;
+
+		substr = strtok(NULL, ";]");
+	} while (substr);
+
+	for (i = 0; i < dma_count; i++) {
+		if (rte_vhost_async_dma_configure(configured_dmas[i], 0) < 0) {
+			VHOST_LOG(ERR, "Fail to configure DMA %u to vhost\n", configured_dmas[i]);
+			ret = -1;
+		}
+	}
+
+out:
+	free(input);
+	return ret;
+}
+
 static int
 rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 {
@@ -1621,6 +2024,10 @@  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 	int legacy_ol_flags = 0;
 	struct rte_eth_dev *eth_dev;
 	const char *name = rte_vdev_device_name(dev);
+	struct dma_input_info dma_input;
+
+	memset(dma_input.dmas, INVALID_DMA_ID, sizeof(dma_input.dmas));
+	dma_input.dma_ring_size = DEFAULT_DMA_RING_SIZE;
 
 	VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name);
 
@@ -1736,6 +2143,43 @@  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 			goto out_free;
 	}
 
+	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 1) {
+		if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+			ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_RING_SIZE,
+					&open_int, &dma_input.dma_ring_size);
+			if (ret < 0)
+				goto out_free;
+
+			if (!rte_is_power_of_2(dma_input.dma_ring_size)) {
+				dma_input.dma_ring_size = rte_align32pow2(dma_input.dma_ring_size);
+				VHOST_LOG(INFO, "Convert dma_ring_size to the power of two %u\n",
+						dma_input.dma_ring_size);
+			}
+		} else {
+			VHOST_LOG(WARNING, "%s is not specified, skip to parse %s\n",
+					ETH_VHOST_DMA_ARG, ETH_VHOST_DMA_RING_SIZE);
+		}
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+		if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 0)
+			VHOST_LOG(INFO, "Use default dma ring size %d\n", DEFAULT_DMA_RING_SIZE);
+
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
+					 &open_dma, &dma_input);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to parse %s\n", ETH_VHOST_DMA_ARG);
+			goto out_free;
+		}
+
+		flags |= RTE_VHOST_USER_ASYNC_COPY;
+		/**
+		 * Don't support live migration when enable
+		 * DMA acceleration.
+		 */
+		disable_flags |= (1ULL << VHOST_F_LOG_ALL);
+	}
+
 	if (legacy_ol_flags == 0)
 		flags |= RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS;
 
@@ -1743,7 +2187,7 @@  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 		dev->device.numa_node = rte_socket_id();
 
 	ret = eth_dev_vhost_create(dev, iface_name, queues,
-				   dev->device.numa_node, flags, disable_flags);
+			dev->device.numa_node, flags, disable_flags, &dma_input);
 	if (ret == -1)
 		VHOST_LOG(ERR, "Failed to create %s\n", name);
 
@@ -1787,4 +2231,6 @@  RTE_PMD_REGISTER_PARAM_STRING(net_vhost,
 	"postcopy-support=<0|1> "
 	"tso=<0|1> "
 	"linear-buffer=<0|1> "
-	"ext-buffer=<0|1>");
+	"ext-buffer=<0|1> "
+	"dma-ring-size=<int> "
+	"dmas=[txq0@dma_addr;rxq0@dma_addr]");
diff --git a/drivers/net/vhost/rte_eth_vhost.h b/drivers/net/vhost/rte_eth_vhost.h
index 0e68b9f668..146c98803d 100644
--- a/drivers/net/vhost/rte_eth_vhost.h
+++ b/drivers/net/vhost/rte_eth_vhost.h
@@ -52,6 +52,21 @@  int rte_eth_vhost_get_queue_event(uint16_t port_id,
  */
 int rte_eth_vhost_get_vid_from_port_id(uint16_t port_id);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice
+ *
+ * By default, rte_vhost_poll_enqueue_completed() is called in Rx path.
+ * This function enables Tx path, rather than Rx path, to poll completed
+ * packets for vhost async enqueue operations. Note that virtio may never
+ * receive DMA completed packets if there are no more Tx operations.
+ *
+ * @param enable
+ *  True indicates Tx path to call rte_vhost_poll_enqueue_completed().
+ */
+__rte_experimental
+void rte_eth_vhost_async_tx_poll_completed(bool enable);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/drivers/net/vhost/version.map b/drivers/net/vhost/version.map
index e42c89f1eb..0a40441227 100644
--- a/drivers/net/vhost/version.map
+++ b/drivers/net/vhost/version.map
@@ -6,3 +6,10 @@  DPDK_23 {
 
 	local: *;
 };
+
+EXPERIMENTAL {
+	global:
+
+	# added in 22.11
+	rte_eth_vhost_async_tx_poll_completed;
+};
diff --git a/drivers/net/vhost/vhost_testpmd.c b/drivers/net/vhost/vhost_testpmd.c
new file mode 100644
index 0000000000..b8227d1086
--- /dev/null
+++ b/drivers/net/vhost/vhost_testpmd.c
@@ -0,0 +1,65 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2022 Intel Corporation.
+ */
+#include <rte_eth_vhost.h>
+#include <cmdline_parse_num.h>
+#include <cmdline_parse_string.h>
+
+#include "testpmd.h"
+
+struct cmd_tx_poll_result {
+	cmdline_fixed_string_t async_vhost;
+	cmdline_fixed_string_t tx;
+	cmdline_fixed_string_t poll;
+	cmdline_fixed_string_t completed;
+	cmdline_fixed_string_t what;
+};
+
+static cmdline_parse_token_string_t cmd_tx_async_vhost =
+	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, async_vhost, "async_vhost");
+static cmdline_parse_token_string_t cmd_tx_tx =
+	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, tx, "tx");
+static cmdline_parse_token_string_t cmd_tx_poll =
+	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, poll, "poll");
+static cmdline_parse_token_string_t cmd_tx_completed =
+	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, completed, "completed");
+static cmdline_parse_token_string_t cmd_tx_what =
+	TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, what, "on#off");
+
+static void
+cmd_tx_poll_parsed(void *parsed_result, __rte_unused struct cmdline *cl, __rte_unused void *data)
+{
+	struct cmd_tx_poll_result *res = parsed_result;
+
+	if (!strcmp(res->what, "on"))
+		rte_eth_vhost_async_tx_poll_completed(true);
+	else if (!strcmp(res->what, "off"))
+		rte_eth_vhost_async_tx_poll_completed(false);
+}
+
+static cmdline_parse_inst_t async_vhost_cmd_tx_poll = {
+	.f = cmd_tx_poll_parsed,
+	.data = NULL,
+	.help_str = "async-vhost tx poll completed on|off",
+	.tokens = {
+		(void *)&cmd_tx_async_vhost,
+		(void *)&cmd_tx_tx,
+		(void *)&cmd_tx_poll,
+		(void *)&cmd_tx_completed,
+		(void *)&cmd_tx_what,
+		NULL,
+	},
+};
+
+static struct testpmd_driver_commands async_vhost_cmds = {
+	.commands = {
+	{
+		&async_vhost_cmd_tx_poll,
+		"async_vhost tx poll completed (on|off)\n"
+		"    Poll and free DMA completed packets in Tx path.\n",
+	},
+	{ NULL, NULL },
+	},
+};
+
+TESTPMD_ADD_DRIVER_COMMANDS(async_vhost_cmds)