[v9,7/9] net/virtio: add vectorized packed ring Tx path

Message ID 20200424092445.44693-8-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series add packed ring vectorized path |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Marvin Liu April 24, 2020, 9:24 a.m. UTC
  Optimize packed ring Tx path alike Rx path. Split Tx path into batch and
single Tx functions. Batch function is further optimized by AVX512
instructions.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
  

Comments

Maxime Coquelin April 24, 2020, 12:29 p.m. UTC | #1
On 4/24/20 11:24 AM, Marvin Liu wrote:
> Optimize packed ring Tx path alike Rx path. Split Tx path into batch and

s/alike/like/ ?

> single Tx functions. Batch function is further optimized by AVX512
> instructions.
> 
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> 
> diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
> index 5c112cac7..b7d52d497 100644
> --- a/drivers/net/virtio/virtio_ethdev.h
> +++ b/drivers/net/virtio/virtio_ethdev.h
> @@ -108,6 +108,9 @@ uint16_t virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
>  uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
>  		uint16_t nb_pkts);
>  
> +uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
> +		uint16_t nb_pkts);
> +
>  int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
>  
>  void virtio_interrupt_handler(void *param);
> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
> index cf18fe564..f82fe8d64 100644
> --- a/drivers/net/virtio/virtio_rxtx.c
> +++ b/drivers/net/virtio/virtio_rxtx.c
> @@ -2175,3 +2175,11 @@ virtio_recv_pkts_packed_vec(void *rx_queue __rte_unused,
>  {
>  	return 0;
>  }
> +
> +__rte_weak uint16_t
> +virtio_xmit_pkts_packed_vec(void *tx_queue __rte_unused,
> +			    struct rte_mbuf **tx_pkts __rte_unused,
> +			    uint16_t nb_pkts __rte_unused)
> +{
> +	return 0;
> +}
> diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c b/drivers/net/virtio/virtio_rxtx_packed_avx.c
> index 8a7b459eb..c023ace4e 100644
> --- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
> +++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
> @@ -23,6 +23,24 @@
>  #define PACKED_FLAGS_MASK ((0ULL | VRING_PACKED_DESC_F_AVAIL_USED) << \
>  	FLAGS_BITS_OFFSET)
>  
> +/* reference count offset in mbuf rearm data */
> +#define REFCNT_BITS_OFFSET ((offsetof(struct rte_mbuf, refcnt) - \
> +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
> +/* segment number offset in mbuf rearm data */
> +#define SEG_NUM_BITS_OFFSET ((offsetof(struct rte_mbuf, nb_segs) - \
> +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
> +
> +/* default rearm data */
> +#define DEFAULT_REARM_DATA (1ULL << SEG_NUM_BITS_OFFSET | \
> +	1ULL << REFCNT_BITS_OFFSET)
> +
> +/* id bits offset in packed ring desc higher 64bits */
> +#define ID_BITS_OFFSET ((offsetof(struct vring_packed_desc, id) - \
> +	offsetof(struct vring_packed_desc, len)) * BYTE_SIZE)
> +
> +/* net hdr short size mask */
> +#define NET_HDR_MASK 0x3F
> +
>  #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
>  	sizeof(struct vring_packed_desc))
>  #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
> @@ -47,6 +65,48 @@
>  	for (iter = val; iter < num; iter++)
>  #endif
>  
> +static inline void
> +virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
> +{
> +	struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
> +	struct vq_desc_extra *dxp;
> +	uint16_t used_idx, id, curr_id, free_cnt = 0;
> +	uint16_t size = vq->vq_nentries;
> +	struct rte_mbuf *mbufs[size];
> +	uint16_t nb_mbuf = 0, i;
> +
> +	used_idx = vq->vq_used_cons_idx;
> +
> +	if (!desc_is_used(&desc[used_idx], vq))
> +		return;
> +
> +	id = desc[used_idx].id;
> +
> +	do {
> +		curr_id = used_idx;
> +		dxp = &vq->vq_descx[used_idx];
> +		used_idx += dxp->ndescs;
> +		free_cnt += dxp->ndescs;
> +
> +		if (dxp->cookie != NULL) {
> +			mbufs[nb_mbuf] = dxp->cookie;
> +			dxp->cookie = NULL;
> +			nb_mbuf++;
> +		}
> +
> +		if (used_idx >= size) {
> +			used_idx -= size;
> +			vq->vq_packed.used_wrap_counter ^= 1;
> +		}
> +	} while (curr_id != id);
> +
> +	for (i = 0; i < nb_mbuf; i++)
> +		rte_pktmbuf_free(mbufs[i]);
> +
> +	vq->vq_used_cons_idx = used_idx;
> +	vq->vq_free_cnt += free_cnt;
> +}
> +


I think you can re-use the inlined non-vectorized cleanup function here.
Or use your implementation in non-vectorized path.
BTW, do you know we have to pass the num argument in non-vectorized
case? I'm not sure to remember.

Maxime
  
Marvin Liu April 24, 2020, 1:33 p.m. UTC | #2
> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Friday, April 24, 2020 8:30 PM
> To: Liu, Yong <yong.liu@intel.com>; Ye, Xiaolong <xiaolong.ye@intel.com>;
> Wang, Zhihong <zhihong.wang@intel.com>
> Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>
> Subject: Re: [PATCH v9 7/9] net/virtio: add vectorized packed ring Tx path
> 
> 
> 
> On 4/24/20 11:24 AM, Marvin Liu wrote:
> > Optimize packed ring Tx path alike Rx path. Split Tx path into batch and
> 
> s/alike/like/ ?
> 
> > single Tx functions. Batch function is further optimized by AVX512
> > instructions.
> >
> > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> >
> > diff --git a/drivers/net/virtio/virtio_ethdev.h
> b/drivers/net/virtio/virtio_ethdev.h
> > index 5c112cac7..b7d52d497 100644
> > --- a/drivers/net/virtio/virtio_ethdev.h
> > +++ b/drivers/net/virtio/virtio_ethdev.h
> > @@ -108,6 +108,9 @@ uint16_t virtio_recv_pkts_vec(void *rx_queue,
> struct rte_mbuf **rx_pkts,
> >  uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf
> **rx_pkts,
> >  		uint16_t nb_pkts);
> >
> > +uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf
> **tx_pkts,
> > +		uint16_t nb_pkts);
> > +
> >  int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
> >
> >  void virtio_interrupt_handler(void *param);
> > diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
> > index cf18fe564..f82fe8d64 100644
> > --- a/drivers/net/virtio/virtio_rxtx.c
> > +++ b/drivers/net/virtio/virtio_rxtx.c
> > @@ -2175,3 +2175,11 @@ virtio_recv_pkts_packed_vec(void *rx_queue
> __rte_unused,
> >  {
> >  	return 0;
> >  }
> > +
> > +__rte_weak uint16_t
> > +virtio_xmit_pkts_packed_vec(void *tx_queue __rte_unused,
> > +			    struct rte_mbuf **tx_pkts __rte_unused,
> > +			    uint16_t nb_pkts __rte_unused)
> > +{
> > +	return 0;
> > +}
> > diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c
> b/drivers/net/virtio/virtio_rxtx_packed_avx.c
> > index 8a7b459eb..c023ace4e 100644
> > --- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
> > +++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
> > @@ -23,6 +23,24 @@
> >  #define PACKED_FLAGS_MASK ((0ULL |
> VRING_PACKED_DESC_F_AVAIL_USED) << \
> >  	FLAGS_BITS_OFFSET)
> >
> > +/* reference count offset in mbuf rearm data */
> > +#define REFCNT_BITS_OFFSET ((offsetof(struct rte_mbuf, refcnt) - \
> > +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
> > +/* segment number offset in mbuf rearm data */
> > +#define SEG_NUM_BITS_OFFSET ((offsetof(struct rte_mbuf, nb_segs) - \
> > +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
> > +
> > +/* default rearm data */
> > +#define DEFAULT_REARM_DATA (1ULL << SEG_NUM_BITS_OFFSET | \
> > +	1ULL << REFCNT_BITS_OFFSET)
> > +
> > +/* id bits offset in packed ring desc higher 64bits */
> > +#define ID_BITS_OFFSET ((offsetof(struct vring_packed_desc, id) - \
> > +	offsetof(struct vring_packed_desc, len)) * BYTE_SIZE)
> > +
> > +/* net hdr short size mask */
> > +#define NET_HDR_MASK 0x3F
> > +
> >  #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
> >  	sizeof(struct vring_packed_desc))
> >  #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
> > @@ -47,6 +65,48 @@
> >  	for (iter = val; iter < num; iter++)
> >  #endif
> >
> > +static inline void
> > +virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
> > +{
> > +	struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
> > +	struct vq_desc_extra *dxp;
> > +	uint16_t used_idx, id, curr_id, free_cnt = 0;
> > +	uint16_t size = vq->vq_nentries;
> > +	struct rte_mbuf *mbufs[size];
> > +	uint16_t nb_mbuf = 0, i;
> > +
> > +	used_idx = vq->vq_used_cons_idx;
> > +
> > +	if (!desc_is_used(&desc[used_idx], vq))
> > +		return;
> > +
> > +	id = desc[used_idx].id;
> > +
> > +	do {
> > +		curr_id = used_idx;
> > +		dxp = &vq->vq_descx[used_idx];
> > +		used_idx += dxp->ndescs;
> > +		free_cnt += dxp->ndescs;
> > +
> > +		if (dxp->cookie != NULL) {
> > +			mbufs[nb_mbuf] = dxp->cookie;
> > +			dxp->cookie = NULL;
> > +			nb_mbuf++;
> > +		}
> > +
> > +		if (used_idx >= size) {
> > +			used_idx -= size;
> > +			vq->vq_packed.used_wrap_counter ^= 1;
> > +		}
> > +	} while (curr_id != id);
> > +
> > +	for (i = 0; i < nb_mbuf; i++)
> > +		rte_pktmbuf_free(mbufs[i]);
> > +
> > +	vq->vq_used_cons_idx = used_idx;
> > +	vq->vq_free_cnt += free_cnt;
> > +}
> > +
> 
> 
> I think you can re-use the inlined non-vectorized cleanup function here.
> Or use your implementation in non-vectorized path.
> BTW, do you know we have to pass the num argument in non-vectorized
> case? I'm not sure to remember.
> 

Maxime,
This is simple version of xmit clean up function. It is based on the concept that backend will update used id in burst which also match frontend's requirement.
I just found original version work better in loopback case. Will adapt it in next version. 

Thanks,
Marvin

> Maxime
  
Maxime Coquelin April 24, 2020, 1:35 p.m. UTC | #3
On 4/24/20 3:33 PM, Liu, Yong wrote:
> 
> 
>> -----Original Message-----
>> From: Maxime Coquelin <maxime.coquelin@redhat.com>
>> Sent: Friday, April 24, 2020 8:30 PM
>> To: Liu, Yong <yong.liu@intel.com>; Ye, Xiaolong <xiaolong.ye@intel.com>;
>> Wang, Zhihong <zhihong.wang@intel.com>
>> Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>
>> Subject: Re: [PATCH v9 7/9] net/virtio: add vectorized packed ring Tx path
>>
>>
>>
>> On 4/24/20 11:24 AM, Marvin Liu wrote:
>>> Optimize packed ring Tx path alike Rx path. Split Tx path into batch and
>>
>> s/alike/like/ ?
>>
>>> single Tx functions. Batch function is further optimized by AVX512
>>> instructions.
>>>
>>> Signed-off-by: Marvin Liu <yong.liu@intel.com>
>>>
>>> diff --git a/drivers/net/virtio/virtio_ethdev.h
>> b/drivers/net/virtio/virtio_ethdev.h
>>> index 5c112cac7..b7d52d497 100644
>>> --- a/drivers/net/virtio/virtio_ethdev.h
>>> +++ b/drivers/net/virtio/virtio_ethdev.h
>>> @@ -108,6 +108,9 @@ uint16_t virtio_recv_pkts_vec(void *rx_queue,
>> struct rte_mbuf **rx_pkts,
>>>  uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf
>> **rx_pkts,
>>>  		uint16_t nb_pkts);
>>>
>>> +uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf
>> **tx_pkts,
>>> +		uint16_t nb_pkts);
>>> +
>>>  int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
>>>
>>>  void virtio_interrupt_handler(void *param);
>>> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
>>> index cf18fe564..f82fe8d64 100644
>>> --- a/drivers/net/virtio/virtio_rxtx.c
>>> +++ b/drivers/net/virtio/virtio_rxtx.c
>>> @@ -2175,3 +2175,11 @@ virtio_recv_pkts_packed_vec(void *rx_queue
>> __rte_unused,
>>>  {
>>>  	return 0;
>>>  }
>>> +
>>> +__rte_weak uint16_t
>>> +virtio_xmit_pkts_packed_vec(void *tx_queue __rte_unused,
>>> +			    struct rte_mbuf **tx_pkts __rte_unused,
>>> +			    uint16_t nb_pkts __rte_unused)
>>> +{
>>> +	return 0;
>>> +}
>>> diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c
>> b/drivers/net/virtio/virtio_rxtx_packed_avx.c
>>> index 8a7b459eb..c023ace4e 100644
>>> --- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
>>> +++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
>>> @@ -23,6 +23,24 @@
>>>  #define PACKED_FLAGS_MASK ((0ULL |
>> VRING_PACKED_DESC_F_AVAIL_USED) << \
>>>  	FLAGS_BITS_OFFSET)
>>>
>>> +/* reference count offset in mbuf rearm data */
>>> +#define REFCNT_BITS_OFFSET ((offsetof(struct rte_mbuf, refcnt) - \
>>> +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
>>> +/* segment number offset in mbuf rearm data */
>>> +#define SEG_NUM_BITS_OFFSET ((offsetof(struct rte_mbuf, nb_segs) - \
>>> +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
>>> +
>>> +/* default rearm data */
>>> +#define DEFAULT_REARM_DATA (1ULL << SEG_NUM_BITS_OFFSET | \
>>> +	1ULL << REFCNT_BITS_OFFSET)
>>> +
>>> +/* id bits offset in packed ring desc higher 64bits */
>>> +#define ID_BITS_OFFSET ((offsetof(struct vring_packed_desc, id) - \
>>> +	offsetof(struct vring_packed_desc, len)) * BYTE_SIZE)
>>> +
>>> +/* net hdr short size mask */
>>> +#define NET_HDR_MASK 0x3F
>>> +
>>>  #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
>>>  	sizeof(struct vring_packed_desc))
>>>  #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
>>> @@ -47,6 +65,48 @@
>>>  	for (iter = val; iter < num; iter++)
>>>  #endif
>>>
>>> +static inline void
>>> +virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
>>> +{
>>> +	struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
>>> +	struct vq_desc_extra *dxp;
>>> +	uint16_t used_idx, id, curr_id, free_cnt = 0;
>>> +	uint16_t size = vq->vq_nentries;
>>> +	struct rte_mbuf *mbufs[size];
>>> +	uint16_t nb_mbuf = 0, i;
>>> +
>>> +	used_idx = vq->vq_used_cons_idx;
>>> +
>>> +	if (!desc_is_used(&desc[used_idx], vq))
>>> +		return;
>>> +
>>> +	id = desc[used_idx].id;
>>> +
>>> +	do {
>>> +		curr_id = used_idx;
>>> +		dxp = &vq->vq_descx[used_idx];
>>> +		used_idx += dxp->ndescs;
>>> +		free_cnt += dxp->ndescs;
>>> +
>>> +		if (dxp->cookie != NULL) {
>>> +			mbufs[nb_mbuf] = dxp->cookie;
>>> +			dxp->cookie = NULL;
>>> +			nb_mbuf++;
>>> +		}
>>> +
>>> +		if (used_idx >= size) {
>>> +			used_idx -= size;
>>> +			vq->vq_packed.used_wrap_counter ^= 1;
>>> +		}
>>> +	} while (curr_id != id);
>>> +
>>> +	for (i = 0; i < nb_mbuf; i++)
>>> +		rte_pktmbuf_free(mbufs[i]);
>>> +
>>> +	vq->vq_used_cons_idx = used_idx;
>>> +	vq->vq_free_cnt += free_cnt;
>>> +}
>>> +
>>
>>
>> I think you can re-use the inlined non-vectorized cleanup function here.
>> Or use your implementation in non-vectorized path.
>> BTW, do you know we have to pass the num argument in non-vectorized
>> case? I'm not sure to remember.
>>
> 
> Maxime,
> This is simple version of xmit clean up function. It is based on the concept that backend will update used id in burst which also match frontend's requirement.

And what the backend doesn't follow that concept?
It is just slower or broken?

> I just found original version work better in loopback case. Will adapt it in next version. 
> 
> Thanks,
> Marvin
> 
>> Maxime
>
  
Marvin Liu April 24, 2020, 1:47 p.m. UTC | #4
> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Friday, April 24, 2020 9:36 PM
> To: Liu, Yong <yong.liu@intel.com>; Ye, Xiaolong <xiaolong.ye@intel.com>;
> Wang, Zhihong <zhihong.wang@intel.com>
> Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>
> Subject: Re: [PATCH v9 7/9] net/virtio: add vectorized packed ring Tx path
> 
> 
> 
> On 4/24/20 3:33 PM, Liu, Yong wrote:
> >
> >
> >> -----Original Message-----
> >> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> >> Sent: Friday, April 24, 2020 8:30 PM
> >> To: Liu, Yong <yong.liu@intel.com>; Ye, Xiaolong <xiaolong.ye@intel.com>;
> >> Wang, Zhihong <zhihong.wang@intel.com>
> >> Cc: dev@dpdk.org; Van Haaren, Harry <harry.van.haaren@intel.com>
> >> Subject: Re: [PATCH v9 7/9] net/virtio: add vectorized packed ring Tx path
> >>
> >>
> >>
> >> On 4/24/20 11:24 AM, Marvin Liu wrote:
> >>> Optimize packed ring Tx path alike Rx path. Split Tx path into batch and
> >>
> >> s/alike/like/ ?
> >>
> >>> single Tx functions. Batch function is further optimized by AVX512
> >>> instructions.
> >>>
> >>> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> >>>
> >>> diff --git a/drivers/net/virtio/virtio_ethdev.h
> >> b/drivers/net/virtio/virtio_ethdev.h
> >>> index 5c112cac7..b7d52d497 100644
> >>> --- a/drivers/net/virtio/virtio_ethdev.h
> >>> +++ b/drivers/net/virtio/virtio_ethdev.h
> >>> @@ -108,6 +108,9 @@ uint16_t virtio_recv_pkts_vec(void *rx_queue,
> >> struct rte_mbuf **rx_pkts,
> >>>  uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf
> >> **rx_pkts,
> >>>  		uint16_t nb_pkts);
> >>>
> >>> +uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf
> >> **tx_pkts,
> >>> +		uint16_t nb_pkts);
> >>> +
> >>>  int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
> >>>
> >>>  void virtio_interrupt_handler(void *param);
> >>> diff --git a/drivers/net/virtio/virtio_rxtx.c
> b/drivers/net/virtio/virtio_rxtx.c
> >>> index cf18fe564..f82fe8d64 100644
> >>> --- a/drivers/net/virtio/virtio_rxtx.c
> >>> +++ b/drivers/net/virtio/virtio_rxtx.c
> >>> @@ -2175,3 +2175,11 @@ virtio_recv_pkts_packed_vec(void *rx_queue
> >> __rte_unused,
> >>>  {
> >>>  	return 0;
> >>>  }
> >>> +
> >>> +__rte_weak uint16_t
> >>> +virtio_xmit_pkts_packed_vec(void *tx_queue __rte_unused,
> >>> +			    struct rte_mbuf **tx_pkts __rte_unused,
> >>> +			    uint16_t nb_pkts __rte_unused)
> >>> +{
> >>> +	return 0;
> >>> +}
> >>> diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c
> >> b/drivers/net/virtio/virtio_rxtx_packed_avx.c
> >>> index 8a7b459eb..c023ace4e 100644
> >>> --- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
> >>> +++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
> >>> @@ -23,6 +23,24 @@
> >>>  #define PACKED_FLAGS_MASK ((0ULL |
> >> VRING_PACKED_DESC_F_AVAIL_USED) << \
> >>>  	FLAGS_BITS_OFFSET)
> >>>
> >>> +/* reference count offset in mbuf rearm data */
> >>> +#define REFCNT_BITS_OFFSET ((offsetof(struct rte_mbuf, refcnt) - \
> >>> +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
> >>> +/* segment number offset in mbuf rearm data */
> >>> +#define SEG_NUM_BITS_OFFSET ((offsetof(struct rte_mbuf, nb_segs) - \
> >>> +	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
> >>> +
> >>> +/* default rearm data */
> >>> +#define DEFAULT_REARM_DATA (1ULL << SEG_NUM_BITS_OFFSET | \
> >>> +	1ULL << REFCNT_BITS_OFFSET)
> >>> +
> >>> +/* id bits offset in packed ring desc higher 64bits */
> >>> +#define ID_BITS_OFFSET ((offsetof(struct vring_packed_desc, id) - \
> >>> +	offsetof(struct vring_packed_desc, len)) * BYTE_SIZE)
> >>> +
> >>> +/* net hdr short size mask */
> >>> +#define NET_HDR_MASK 0x3F
> >>> +
> >>>  #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
> >>>  	sizeof(struct vring_packed_desc))
> >>>  #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
> >>> @@ -47,6 +65,48 @@
> >>>  	for (iter = val; iter < num; iter++)
> >>>  #endif
> >>>
> >>> +static inline void
> >>> +virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
> >>> +{
> >>> +	struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
> >>> +	struct vq_desc_extra *dxp;
> >>> +	uint16_t used_idx, id, curr_id, free_cnt = 0;
> >>> +	uint16_t size = vq->vq_nentries;
> >>> +	struct rte_mbuf *mbufs[size];
> >>> +	uint16_t nb_mbuf = 0, i;
> >>> +
> >>> +	used_idx = vq->vq_used_cons_idx;
> >>> +
> >>> +	if (!desc_is_used(&desc[used_idx], vq))
> >>> +		return;
> >>> +
> >>> +	id = desc[used_idx].id;
> >>> +
> >>> +	do {
> >>> +		curr_id = used_idx;
> >>> +		dxp = &vq->vq_descx[used_idx];
> >>> +		used_idx += dxp->ndescs;
> >>> +		free_cnt += dxp->ndescs;
> >>> +
> >>> +		if (dxp->cookie != NULL) {
> >>> +			mbufs[nb_mbuf] = dxp->cookie;
> >>> +			dxp->cookie = NULL;
> >>> +			nb_mbuf++;
> >>> +		}
> >>> +
> >>> +		if (used_idx >= size) {
> >>> +			used_idx -= size;
> >>> +			vq->vq_packed.used_wrap_counter ^= 1;
> >>> +		}
> >>> +	} while (curr_id != id);
> >>> +
> >>> +	for (i = 0; i < nb_mbuf; i++)
> >>> +		rte_pktmbuf_free(mbufs[i]);
> >>> +
> >>> +	vq->vq_used_cons_idx = used_idx;
> >>> +	vq->vq_free_cnt += free_cnt;
> >>> +}
> >>> +
> >>
> >>
> >> I think you can re-use the inlined non-vectorized cleanup function here.
> >> Or use your implementation in non-vectorized path.
> >> BTW, do you know we have to pass the num argument in non-vectorized
> >> case? I'm not sure to remember.
> >>
> >
> > Maxime,
> > This is simple version of xmit clean up function. It is based on the concept
> that backend will update used id in burst which also match frontend's
> requirement.
> 
> And what the backend doesn't follow that concept?
> It is just slower or broken?

It is just slower. More packets maybe drop due to no free room in the ring. 
I will replace vectorized with non-vectorized version it shown good number.

> 
> > I just found original version work better in loopback case. Will adapt it in
> next version.
> >
> > Thanks,
> > Marvin
> >
> >> Maxime
> >
  

Patch

diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index 5c112cac7..b7d52d497 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -108,6 +108,9 @@  uint16_t virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 		uint16_t nb_pkts);
 
+uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+		uint16_t nb_pkts);
+
 int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
 
 void virtio_interrupt_handler(void *param);
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index cf18fe564..f82fe8d64 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -2175,3 +2175,11 @@  virtio_recv_pkts_packed_vec(void *rx_queue __rte_unused,
 {
 	return 0;
 }
+
+__rte_weak uint16_t
+virtio_xmit_pkts_packed_vec(void *tx_queue __rte_unused,
+			    struct rte_mbuf **tx_pkts __rte_unused,
+			    uint16_t nb_pkts __rte_unused)
+{
+	return 0;
+}
diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c b/drivers/net/virtio/virtio_rxtx_packed_avx.c
index 8a7b459eb..c023ace4e 100644
--- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
+++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
@@ -23,6 +23,24 @@ 
 #define PACKED_FLAGS_MASK ((0ULL | VRING_PACKED_DESC_F_AVAIL_USED) << \
 	FLAGS_BITS_OFFSET)
 
+/* reference count offset in mbuf rearm data */
+#define REFCNT_BITS_OFFSET ((offsetof(struct rte_mbuf, refcnt) - \
+	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
+/* segment number offset in mbuf rearm data */
+#define SEG_NUM_BITS_OFFSET ((offsetof(struct rte_mbuf, nb_segs) - \
+	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
+
+/* default rearm data */
+#define DEFAULT_REARM_DATA (1ULL << SEG_NUM_BITS_OFFSET | \
+	1ULL << REFCNT_BITS_OFFSET)
+
+/* id bits offset in packed ring desc higher 64bits */
+#define ID_BITS_OFFSET ((offsetof(struct vring_packed_desc, id) - \
+	offsetof(struct vring_packed_desc, len)) * BYTE_SIZE)
+
+/* net hdr short size mask */
+#define NET_HDR_MASK 0x3F
+
 #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
 	sizeof(struct vring_packed_desc))
 #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
@@ -47,6 +65,48 @@ 
 	for (iter = val; iter < num; iter++)
 #endif
 
+static inline void
+virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
+{
+	struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
+	struct vq_desc_extra *dxp;
+	uint16_t used_idx, id, curr_id, free_cnt = 0;
+	uint16_t size = vq->vq_nentries;
+	struct rte_mbuf *mbufs[size];
+	uint16_t nb_mbuf = 0, i;
+
+	used_idx = vq->vq_used_cons_idx;
+
+	if (!desc_is_used(&desc[used_idx], vq))
+		return;
+
+	id = desc[used_idx].id;
+
+	do {
+		curr_id = used_idx;
+		dxp = &vq->vq_descx[used_idx];
+		used_idx += dxp->ndescs;
+		free_cnt += dxp->ndescs;
+
+		if (dxp->cookie != NULL) {
+			mbufs[nb_mbuf] = dxp->cookie;
+			dxp->cookie = NULL;
+			nb_mbuf++;
+		}
+
+		if (used_idx >= size) {
+			used_idx -= size;
+			vq->vq_packed.used_wrap_counter ^= 1;
+		}
+	} while (curr_id != id);
+
+	for (i = 0; i < nb_mbuf; i++)
+		rte_pktmbuf_free(mbufs[i]);
+
+	vq->vq_used_cons_idx = used_idx;
+	vq->vq_free_cnt += free_cnt;
+}
+
 static inline void
 virtio_update_batch_stats(struct virtnet_stats *stats,
 			  uint16_t pkt_len1,
@@ -60,6 +120,237 @@  virtio_update_batch_stats(struct virtnet_stats *stats,
 	stats->bytes += pkt_len4;
 }
 
+static inline int
+virtqueue_enqueue_batch_packed_vec(struct virtnet_tx *txvq,
+				   struct rte_mbuf **tx_pkts)
+{
+	struct virtqueue *vq = txvq->vq;
+	uint16_t head_size = vq->hw->vtnet_hdr_size;
+	uint16_t idx = vq->vq_avail_idx;
+	struct virtio_net_hdr *hdr;
+	uint16_t i, cmp;
+
+	if (vq->vq_avail_idx & PACKED_BATCH_MASK)
+		return -1;
+
+	if (unlikely((idx + PACKED_BATCH_SIZE) > vq->vq_nentries))
+		return -1;
+
+	/* Load four mbufs rearm data */
+	RTE_BUILD_BUG_ON(REFCNT_BITS_OFFSET >= 64);
+	RTE_BUILD_BUG_ON(SEG_NUM_BITS_OFFSET >= 64);
+	__m256i mbufs = _mm256_set_epi64x(*tx_pkts[3]->rearm_data,
+					  *tx_pkts[2]->rearm_data,
+					  *tx_pkts[1]->rearm_data,
+					  *tx_pkts[0]->rearm_data);
+
+	/* refcnt=1 and nb_segs=1 */
+	__m256i mbuf_ref = _mm256_set1_epi64x(DEFAULT_REARM_DATA);
+	__m256i head_rooms = _mm256_set1_epi16(head_size);
+
+	/* Check refcnt and nb_segs */
+	const __mmask16 mask = 0x6 | 0x6 << 4 | 0x6 << 8 | 0x6 << 12;
+	cmp = _mm256_mask_cmpneq_epu16_mask(mask, mbufs, mbuf_ref);
+	if (unlikely(cmp))
+		return -1;
+
+	/* Check headroom is enough */
+	const __mmask16 data_mask = 0x1 | 0x1 << 4 | 0x1 << 8 | 0x1 << 12;
+	RTE_BUILD_BUG_ON(offsetof(struct rte_mbuf, data_off) !=
+		offsetof(struct rte_mbuf, rearm_data));
+	cmp = _mm256_mask_cmplt_epu16_mask(data_mask, mbufs, head_rooms);
+	if (unlikely(cmp))
+		return -1;
+
+	__m512i v_descx = _mm512_set_epi64(0x1, (uint64_t)tx_pkts[3],
+					   0x1, (uint64_t)tx_pkts[2],
+					   0x1, (uint64_t)tx_pkts[1],
+					   0x1, (uint64_t)tx_pkts[0]);
+
+	_mm512_storeu_si512((void *)&vq->vq_descx[idx], v_descx);
+
+	virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		tx_pkts[i]->data_off -= head_size;
+		tx_pkts[i]->data_len += head_size;
+	}
+
+#ifdef RTE_VIRTIO_USER
+	__m512i descs_base = _mm512_set_epi64(tx_pkts[3]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[3])),
+			tx_pkts[2]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[2])),
+			tx_pkts[1]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[1])),
+			tx_pkts[0]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[0])));
+#else
+	__m512i descs_base = _mm512_set_epi64(tx_pkts[3]->data_len,
+					      tx_pkts[3]->buf_iova,
+					      tx_pkts[2]->data_len,
+					      tx_pkts[2]->buf_iova,
+					      tx_pkts[1]->data_len,
+					      tx_pkts[1]->buf_iova,
+					      tx_pkts[0]->data_len,
+					      tx_pkts[0]->buf_iova);
+#endif
+
+	/* id offset and data offset */
+	__m512i data_offsets = _mm512_set_epi64((uint64_t)3 << ID_BITS_OFFSET,
+						tx_pkts[3]->data_off,
+						(uint64_t)2 << ID_BITS_OFFSET,
+						tx_pkts[2]->data_off,
+						(uint64_t)1 << ID_BITS_OFFSET,
+						tx_pkts[1]->data_off,
+						0, tx_pkts[0]->data_off);
+
+	__m512i new_descs = _mm512_add_epi64(descs_base, data_offsets);
+
+	uint64_t flags_temp = (uint64_t)idx << ID_BITS_OFFSET |
+		(uint64_t)vq->vq_packed.cached_flags << FLAGS_BITS_OFFSET;
+
+	/* flags offset and guest virtual address offset */
+#ifdef RTE_VIRTIO_USER
+	__m128i flag_offset = _mm_set_epi64x(flags_temp, (uint64_t)vq->offset);
+#else
+	__m128i flag_offset = _mm_set_epi64x(flags_temp, 0);
+#endif
+	__m512i v_offset = _mm512_broadcast_i32x4(flag_offset);
+
+	__m512i v_desc = _mm512_add_epi64(new_descs, v_offset);
+
+	if (!vq->hw->has_tx_offload) {
+		__m128i all_mask = _mm_set1_epi16(0xFFFF);
+		virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+			hdr = rte_pktmbuf_mtod_offset(tx_pkts[i],
+					struct virtio_net_hdr *, -head_size);
+			__m128i v_hdr = _mm_loadu_si128((void *)hdr);
+			if (unlikely(_mm_mask_test_epi16_mask(NET_HDR_MASK,
+							v_hdr, all_mask))) {
+				__m128i all_zero = _mm_setzero_si128();
+				_mm_mask_storeu_epi16((void *)hdr,
+						NET_HDR_MASK, all_zero);
+			}
+		}
+	} else {
+		virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+			hdr = rte_pktmbuf_mtod_offset(tx_pkts[i],
+					struct virtio_net_hdr *, -head_size);
+			virtqueue_xmit_offload(hdr, tx_pkts[i], true);
+		}
+	}
+
+	/* Enqueue Packet buffers */
+	_mm512_storeu_si512((void *)&vq->vq_packed.ring.desc[idx], v_desc);
+
+	virtio_update_batch_stats(&txvq->stats, tx_pkts[0]->pkt_len,
+			tx_pkts[1]->pkt_len, tx_pkts[2]->pkt_len,
+			tx_pkts[3]->pkt_len);
+
+	vq->vq_avail_idx += PACKED_BATCH_SIZE;
+	vq->vq_free_cnt -= PACKED_BATCH_SIZE;
+
+	if (vq->vq_avail_idx >= vq->vq_nentries) {
+		vq->vq_avail_idx -= vq->vq_nentries;
+		vq->vq_packed.cached_flags ^=
+			VRING_PACKED_DESC_F_AVAIL_USED;
+	}
+
+	return 0;
+}
+
+static inline int
+virtqueue_enqueue_single_packed_vec(struct virtnet_tx *txvq,
+				    struct rte_mbuf *txm)
+{
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t hdr_size = hw->vtnet_hdr_size;
+	uint16_t slots, can_push;
+	int16_t need;
+
+	/* How many main ring entries are needed to this Tx?
+	 * any_layout => number of segments
+	 * default    => number of segments + 1
+	 */
+	can_push = rte_mbuf_refcnt_read(txm) == 1 &&
+		   RTE_MBUF_DIRECT(txm) &&
+		   txm->nb_segs == 1 &&
+		   rte_pktmbuf_headroom(txm) >= hdr_size;
+
+	slots = txm->nb_segs + !can_push;
+	need = slots - vq->vq_free_cnt;
+
+	/* Positive value indicates it need free vring descriptors */
+	if (unlikely(need > 0)) {
+		virtio_xmit_cleanup_packed_vec(vq);
+		need = slots - vq->vq_free_cnt;
+		if (unlikely(need > 0)) {
+			PMD_TX_LOG(ERR,
+				   "No free tx descriptors to transmit");
+			return -1;
+		}
+	}
+
+	/* Enqueue Packet buffers */
+	virtqueue_enqueue_xmit_packed(txvq, txm, slots, can_push, 1);
+
+	txvq->stats.bytes += txm->pkt_len;
+	return 0;
+}
+
+uint16_t
+virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+			uint16_t nb_pkts)
+{
+	struct virtnet_tx *txvq = tx_queue;
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t nb_tx = 0;
+	uint16_t remained;
+
+	if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
+		return nb_tx;
+
+	if (unlikely(nb_pkts < 1))
+		return nb_pkts;
+
+	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+	if (vq->vq_free_cnt <= vq->vq_nentries - vq->vq_free_thresh)
+		virtio_xmit_cleanup_packed_vec(vq);
+
+	remained = RTE_MIN(nb_pkts, vq->vq_free_cnt);
+
+	while (remained) {
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (!virtqueue_enqueue_batch_packed_vec(txvq,
+						&tx_pkts[nb_tx])) {
+				nb_tx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
+			}
+		}
+		if (!virtqueue_enqueue_single_packed_vec(txvq,
+					tx_pkts[nb_tx])) {
+			nb_tx++;
+			remained--;
+			continue;
+		}
+		break;
+	};
+
+	txvq->stats.packets += nb_tx;
+
+	if (likely(nb_tx)) {
+		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
+			virtqueue_notify(vq);
+			PMD_TX_LOG(DEBUG, "Notified backend after xmit");
+		}
+	}
+
+	return nb_tx;
+}
+
 /* Optionally fill offload information in structure */
 static inline int
 virtio_vec_rx_offload(struct rte_mbuf *m, struct virtio_net_hdr *hdr)