[RFC,02/13] add vhost packed ring fast enqueue function

Message ID 20190708171320.38802-3-yong.liu@intel.com
State New
Delegated to: Maxime Coquelin
Headers show
Series
  • [RFC,01/13] add vhost normal enqueue function
Related show

Checks

Context Check Description
ci/Intel-compilation fail Compilation issues
ci/checkpatch success coding style OK

Commit Message

Liu, Yong July 8, 2019, 5:13 p.m.
In fast enqueue function, will first check whether descriptors are
cache aligned. Fast enqueue function will check prerequisites in the
beginning. Fast enqueue function do not support chained mbufs, normal
function will handle that.

Signed-off-by: Marvin Liu <yong.liu@intel.com>

Comments

Ilya Maximets July 8, 2019, 11:37 a.m. | #1
On 08.07.2019 20:13, Marvin Liu wrote:
> In fast enqueue function, will first check whether descriptors are
> cache aligned. Fast enqueue function will check prerequisites in the
> beginning. Fast enqueue function do not support chained mbufs, normal
> function will handle that.
> 
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> 
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 884befa85..f24026acd 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,8 @@
>  
>  #define VHOST_LOG_CACHE_NR 32
>  
> +/* Used in fast packed ring functions */
> +#define PACKED_DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_packed_desc))
>  /**
>   * Structure contains buffer address, length and descriptor index
>   * from vring to do scatter RX.
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 003aec1d4..b877510da 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -897,6 +897,115 @@ virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
>  	return pkt_idx;
>  }
>  
> +static __rte_always_inline uint16_t
> +virtio_dev_rx_fast_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint64_t desc_addr, desc_addr1, desc_addr2, desc_addr3, len, len1,
> +		len2, len3;
> +	struct virtio_net_hdr_mrg_rxbuf *hdr, *hdr1, *hdr2, *hdr3;
> +	uint32_t buf_offset = dev->vhost_hlen;
> +
> +	if (unlikely(avail_idx & 0x3))
> +		return -1;
> +
> +	if (unlikely(avail_idx < (vq->size - PACKED_DESC_PER_CACHELINE)))

Doe it makes sense to check this?
If this condition is not 'true', all the code below will access incorrect memory.

> +		rte_prefetch0((void *)(uintptr_t)&descs[avail_idx +
> +			PACKED_DESC_PER_CACHELINE]);
> +	else
> +		rte_prefetch0((void *)(uintptr_t)&descs[0]);
> +
> +	if (unlikely((pkts[0]->next != NULL) |
> +		(pkts[1]->next != NULL) |
> +		(pkts[2]->next != NULL) |
> +		(pkts[3]->next != NULL)))
> +		return -1;
> +
> +	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 1], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 2], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 3], wrap_counter)))
> +		return 1;
> +
> +	rte_smp_rmb();
> +
> +	len = descs[avail_idx].len;
> +	len1 = descs[avail_idx + 1].len;
> +	len2 = descs[avail_idx + 2].len;
> +	len3 = descs[avail_idx + 3].len;
> +
> +	if (unlikely((pkts[0]->pkt_len > (len - buf_offset)) |
> +		(pkts[1]->pkt_len > (len1 - buf_offset)) |
> +		(pkts[2]->pkt_len > (len2 - buf_offset)) |
> +		(pkts[3]->pkt_len > (len3 - buf_offset))))
> +		return -1;
> +
> +	desc_addr = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx].addr,
> +			&len,
> +			VHOST_ACCESS_RW);
> +
> +	desc_addr1 = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx + 1].addr,
> +			&len1,
> +			VHOST_ACCESS_RW);
> +
> +	desc_addr2 = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx + 2].addr,
> +			&len2,
> +			VHOST_ACCESS_RW);
> +
> +	desc_addr3 = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx + 3].addr,
> +			&len3,
> +			VHOST_ACCESS_RW);
> +
> +	if (unlikely((len != descs[avail_idx].len) |
> +		(len1 != descs[avail_idx + 1].len) |
> +		(len2 != descs[avail_idx + 2].len) |
> +		(len3 != descs[avail_idx + 3].len)))
> +		return -1;
> +
> +	hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
> +	hdr1 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr1;
> +	hdr2 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr2;
> +	hdr3 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr3;
> +
> +	virtio_enqueue_offload(pkts[0], &hdr->hdr);
> +	virtio_enqueue_offload(pkts[1], &hdr1->hdr);
> +	virtio_enqueue_offload(pkts[2], &hdr2->hdr);
> +	virtio_enqueue_offload(pkts[3], &hdr3->hdr);
> +
> +	len = pkts[0]->pkt_len + dev->vhost_hlen;
> +	len1 = pkts[1]->pkt_len + dev->vhost_hlen;
> +	len2 = pkts[2]->pkt_len + dev->vhost_hlen;
> +	len3 = pkts[3]->pkt_len + dev->vhost_hlen;
> +
> +	vq->last_avail_idx += PACKED_DESC_PER_CACHELINE;

The whole function assumes that PACKED_DESC_PER_CACHELINE equals to 4,
but if it's not, it will not work correctly.

> +	if (vq->last_avail_idx >= vq->size) {
> +		vq->last_avail_idx -= vq->size;
> +		vq->avail_wrap_counter ^= 1;
> +	}
> +
> +	rte_memcpy((void *)(uintptr_t)(desc_addr + buf_offset),
> +		rte_pktmbuf_mtod_offset(pkts[0], void *, 0),
> +		pkts[0]->pkt_len);
> +	rte_memcpy((void *)(uintptr_t)(desc_addr1 + buf_offset),
> +		rte_pktmbuf_mtod_offset(pkts[1], void *, 0),
> +		pkts[1]->pkt_len);
> +	rte_memcpy((void *)(uintptr_t)(desc_addr2 + buf_offset),
> +		rte_pktmbuf_mtod_offset(pkts[2], void *, 0),
> +		pkts[2]->pkt_len);
> +	rte_memcpy((void *)(uintptr_t)(desc_addr3 + buf_offset),
> +		rte_pktmbuf_mtod_offset(pkts[3], void *, 0),
> +		pkts[3]->pkt_len);
> +
> +	return 0;
> +}
> +
>  static __rte_always_inline uint16_t
>  virtio_dev_rx_normal_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
>  			struct rte_mbuf *pkt)
>
Liu, Yong July 9, 2019, 1:15 a.m. | #2
Thank, Ilya.

> -----Original Message-----
> From: Ilya Maximets [mailto:i.maximets@samsung.com]
> Sent: Monday, July 08, 2019 7:38 PM
> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
> maxime.coquelin@redhat.com; dev@dpdk.org
> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
> function
> 
> On 08.07.2019 20:13, Marvin Liu wrote:
> > In fast enqueue function, will first check whether descriptors are
> > cache aligned. Fast enqueue function will check prerequisites in the
> > beginning. Fast enqueue function do not support chained mbufs, normal
> > function will handle that.
> >
> > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> >
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 884befa85..f24026acd 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -39,6 +39,8 @@
> >
> >  #define VHOST_LOG_CACHE_NR 32
> >
> > +/* Used in fast packed ring functions */
> > +#define PACKED_DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct
> vring_packed_desc))
> >  /**
> >   * Structure contains buffer address, length and descriptor index
> >   * from vring to do scatter RX.
> > diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> > index 003aec1d4..b877510da 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -897,6 +897,115 @@ virtio_dev_rx_split(struct virtio_net *dev, struct
> vhost_virtqueue *vq,
> >  	return pkt_idx;
> >  }
> >
> > +static __rte_always_inline uint16_t
> > +virtio_dev_rx_fast_packed(struct virtio_net *dev, struct vhost_virtqueue
> *vq,
> > +		struct rte_mbuf **pkts)
> > +{
> > +	bool wrap_counter = vq->avail_wrap_counter;
> > +	struct vring_packed_desc *descs = vq->desc_packed;
> > +	uint16_t avail_idx = vq->last_avail_idx;
> > +	uint64_t desc_addr, desc_addr1, desc_addr2, desc_addr3, len, len1,
> > +		len2, len3;
> > +	struct virtio_net_hdr_mrg_rxbuf *hdr, *hdr1, *hdr2, *hdr3;
> > +	uint32_t buf_offset = dev->vhost_hlen;
> > +
> > +	if (unlikely(avail_idx & 0x3))
> > +		return -1;
> > +
> > +	if (unlikely(avail_idx < (vq->size - PACKED_DESC_PER_CACHELINE)))
> 
> Doe it makes sense to check this?
> If this condition is not 'true', all the code below will access incorrect
> memory.
> 
Sorry, this condition should be likely:)

> > +		rte_prefetch0((void *)(uintptr_t)&descs[avail_idx +
> > +			PACKED_DESC_PER_CACHELINE]);
> > +	else
> > +		rte_prefetch0((void *)(uintptr_t)&descs[0]);
> > +
> > +	if (unlikely((pkts[0]->next != NULL) |
> > +		(pkts[1]->next != NULL) |
> > +		(pkts[2]->next != NULL) |
> > +		(pkts[3]->next != NULL)))
> > +		return -1;
> > +
> > +	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)) |
> > +		unlikely(!desc_is_avail(&descs[avail_idx + 1], wrap_counter)) |
> > +		unlikely(!desc_is_avail(&descs[avail_idx + 2], wrap_counter)) |
> > +		unlikely(!desc_is_avail(&descs[avail_idx + 3], wrap_counter)))
> > +		return 1;
> > +
> > +	rte_smp_rmb();
> > +
> > +	len = descs[avail_idx].len;
> > +	len1 = descs[avail_idx + 1].len;
> > +	len2 = descs[avail_idx + 2].len;
> > +	len3 = descs[avail_idx + 3].len;
> > +
> > +	if (unlikely((pkts[0]->pkt_len > (len - buf_offset)) |
> > +		(pkts[1]->pkt_len > (len1 - buf_offset)) |
> > +		(pkts[2]->pkt_len > (len2 - buf_offset)) |
> > +		(pkts[3]->pkt_len > (len3 - buf_offset))))
> > +		return -1;
> > +
> > +	desc_addr = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx].addr,
> > +			&len,
> > +			VHOST_ACCESS_RW);
> > +
> > +	desc_addr1 = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx + 1].addr,
> > +			&len1,
> > +			VHOST_ACCESS_RW);
> > +
> > +	desc_addr2 = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx + 2].addr,
> > +			&len2,
> > +			VHOST_ACCESS_RW);
> > +
> > +	desc_addr3 = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx + 3].addr,
> > +			&len3,
> > +			VHOST_ACCESS_RW);
> > +
> > +	if (unlikely((len != descs[avail_idx].len) |
> > +		(len1 != descs[avail_idx + 1].len) |
> > +		(len2 != descs[avail_idx + 2].len) |
> > +		(len3 != descs[avail_idx + 3].len)))
> > +		return -1;
> > +
> > +	hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
> > +	hdr1 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr1;
> > +	hdr2 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr2;
> > +	hdr3 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr3;
> > +
> > +	virtio_enqueue_offload(pkts[0], &hdr->hdr);
> > +	virtio_enqueue_offload(pkts[1], &hdr1->hdr);
> > +	virtio_enqueue_offload(pkts[2], &hdr2->hdr);
> > +	virtio_enqueue_offload(pkts[3], &hdr3->hdr);
> > +
> > +	len = pkts[0]->pkt_len + dev->vhost_hlen;
> > +	len1 = pkts[1]->pkt_len + dev->vhost_hlen;
> > +	len2 = pkts[2]->pkt_len + dev->vhost_hlen;
> > +	len3 = pkts[3]->pkt_len + dev->vhost_hlen;
> > +
> > +	vq->last_avail_idx += PACKED_DESC_PER_CACHELINE;
> 
> The whole function assumes that PACKED_DESC_PER_CACHELINE equals to 4,
> but if it's not, it will not work correctly.

Thanks for point out this, I just noticed that power8 has 128 bytes cache line.
Main idea here is to unroll the loop and check conditions first.
PACKED_DESC_PER_CACHELINE can be removed and renamed like FAST_BURST_NUM which hardcoded to four.

Best regards,
Marvin
Jason Wang July 10, 2019, 4:28 a.m. | #3
On 2019/7/9 上午1:13, Marvin Liu wrote:
> In fast enqueue function, will first check whether descriptors are
> cache aligned. Fast enqueue function will check prerequisites in the
> beginning. Fast enqueue function do not support chained mbufs, normal
> function will handle that.
>
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
>
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 884befa85..f24026acd 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,8 @@
>   
>   #define VHOST_LOG_CACHE_NR 32
>   
> +/* Used in fast packed ring functions */
> +#define PACKED_DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_packed_desc))
>   /**
>    * Structure contains buffer address, length and descriptor index
>    * from vring to do scatter RX.
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 003aec1d4..b877510da 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -897,6 +897,115 @@ virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
>   	return pkt_idx;
>   }
>   
> +static __rte_always_inline uint16_t
> +virtio_dev_rx_fast_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint64_t desc_addr, desc_addr1, desc_addr2, desc_addr3, len, len1,
> +		len2, len3;
> +	struct virtio_net_hdr_mrg_rxbuf *hdr, *hdr1, *hdr2, *hdr3;
> +	uint32_t buf_offset = dev->vhost_hlen;
> +
> +	if (unlikely(avail_idx & 0x3))
> +		return -1;
> +
> +	if (unlikely(avail_idx < (vq->size - PACKED_DESC_PER_CACHELINE)))
> +		rte_prefetch0((void *)(uintptr_t)&descs[avail_idx +
> +			PACKED_DESC_PER_CACHELINE]);
> +	else
> +		rte_prefetch0((void *)(uintptr_t)&descs[0]);
> +
> +	if (unlikely((pkts[0]->next != NULL) |
> +		(pkts[1]->next != NULL) |
> +		(pkts[2]->next != NULL) |
> +		(pkts[3]->next != NULL)))
> +		return -1;
> +
> +	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 1], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 2], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 3], wrap_counter)))
> +		return 1;


Any reason for not letting compiler to unroll the loops?

Thanks
Liu, Yong July 10, 2019, 7:30 a.m. | #4
> -----Original Message-----
> From: Jason Wang [mailto:jasowang@redhat.com]
> Sent: Wednesday, July 10, 2019 12:28 PM
> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
> maxime.coquelin@redhat.com; dev@dpdk.org
> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
> function
> 
> 
> On 2019/7/9 上午1:13, Marvin Liu wrote:
> > In fast enqueue function, will first check whether descriptors are
> > cache aligned. Fast enqueue function will check prerequisites in the
> > beginning. Fast enqueue function do not support chained mbufs, normal
> > function will handle that.
> >
> > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> 
> Any reason for not letting compiler to unroll the loops?
> 

Hi Jason,
I'm not sure about how much compiler can help on unrolling loops as it can't know how much loops will create in one call.
After force not using unroll-loop optimization by "-fno-unroll-loops", virtio_dev_rx_packed function size remained the same.
So look like gcc unroll-loop optimization do not help here. 

And fast enqueue function not only did unroll loop, it also checked cache alignment which can help performance in another side.

Regards,
Marvin

> Thanks
>
Jason Wang July 11, 2019, 4:11 a.m. | #5
On 2019/7/10 下午3:30, Liu, Yong wrote:
>
>> -----Original Message-----
>> From: Jason Wang [mailto:jasowang@redhat.com]
>> Sent: Wednesday, July 10, 2019 12:28 PM
>> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
>> maxime.coquelin@redhat.com; dev@dpdk.org
>> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
>> function
>>
>>
>> On 2019/7/9 上午1:13, Marvin Liu wrote:
>>> In fast enqueue function, will first check whether descriptors are
>>> cache aligned. Fast enqueue function will check prerequisites in the
>>> beginning. Fast enqueue function do not support chained mbufs, normal
>>> function will handle that.
>>>
>>> Signed-off-by: Marvin Liu <yong.liu@intel.com>
>> Any reason for not letting compiler to unroll the loops?
>>
> Hi Jason,
> I'm not sure about how much compiler can help on unrolling loops as it can't know how much loops will create in one call.
> After force not using unroll-loop optimization by "-fno-unroll-loops", virtio_dev_rx_packed function size remained the same.
> So look like gcc unroll-loop optimization do not help here.


I meant something like "pragma GCC unroll N" just before the loop you 
want unrolled.

Thanks


>
> And fast enqueue function not only did unroll loop, it also checked cache alignment which can help performance in another side.
>
> Regards,
> Marvin
>
>> Thanks
>>
Jason Wang July 11, 2019, 8:35 a.m. | #6
On 2019/7/9 上午1:13, Marvin Liu wrote:
> In fast enqueue function, will first check whether descriptors are
> cache aligned. Fast enqueue function will check prerequisites in the
> beginning. Fast enqueue function do not support chained mbufs, normal
> function will handle that.
>
> Signed-off-by: Marvin Liu<yong.liu@intel.com>
>
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 884befa85..f24026acd 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,8 @@
>   
>   #define VHOST_LOG_CACHE_NR 32
>   
> +/* Used in fast packed ring functions */
> +#define PACKED_DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_packed_desc))
>   /**
>    * Structure contains buffer address, length and descriptor index
>    * from vring to do scatter RX.
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 003aec1d4..b877510da 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -897,6 +897,115 @@ virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
>   	return pkt_idx;
>   }
>   
> +static __rte_always_inline uint16_t
> +virtio_dev_rx_fast_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint64_t desc_addr, desc_addr1, desc_addr2, desc_addr3, len, len1,
> +		len2, len3;
> +	struct virtio_net_hdr_mrg_rxbuf *hdr, *hdr1, *hdr2, *hdr3;
> +	uint32_t buf_offset = dev->vhost_hlen;
> +
> +	if (unlikely(avail_idx & 0x3))
> +		return -1;
> +
> +	if (unlikely(avail_idx < (vq->size - PACKED_DESC_PER_CACHELINE)))
> +		rte_prefetch0((void *)(uintptr_t)&descs[avail_idx +
> +			PACKED_DESC_PER_CACHELINE]);
> +	else
> +		rte_prefetch0((void *)(uintptr_t)&descs[0]);
> +
> +	if (unlikely((pkts[0]->next != NULL) |
> +		(pkts[1]->next != NULL) |
> +		(pkts[2]->next != NULL) |
> +		(pkts[3]->next != NULL)))
> +		return -1;
> +
> +	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 1], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 2], wrap_counter)) |
> +		unlikely(!desc_is_avail(&descs[avail_idx + 3], wrap_counter)))
> +		return 1;
> +
> +	rte_smp_rmb();
> +
> +	len = descs[avail_idx].len;
> +	len1 = descs[avail_idx + 1].len;
> +	len2 = descs[avail_idx + 2].len;
> +	len3 = descs[avail_idx + 3].len;
> +
> +	if (unlikely((pkts[0]->pkt_len > (len - buf_offset)) |
> +		(pkts[1]->pkt_len > (len1 - buf_offset)) |
> +		(pkts[2]->pkt_len > (len2 - buf_offset)) |
> +		(pkts[3]->pkt_len > (len3 - buf_offset))))
> +		return -1;
> +
> +	desc_addr = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx].addr,
> +			&len,
> +			VHOST_ACCESS_RW);
> +
> +	desc_addr1 = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx + 1].addr,
> +			&len1,
> +			VHOST_ACCESS_RW);
> +
> +	desc_addr2 = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx + 2].addr,
> +			&len2,
> +			VHOST_ACCESS_RW);
> +
> +	desc_addr3 = vhost_iova_to_vva(dev, vq,
> +			descs[avail_idx + 3].addr,
> +			&len3,
> +			VHOST_ACCESS_RW);


How can you guarantee that len3 is zero after this?

Thanks
Liu, Yong July 11, 2019, 9:37 a.m. | #7
> -----Original Message-----
> From: Jason Wang [mailto:jasowang@redhat.com]
> Sent: Thursday, July 11, 2019 4:35 PM
> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
> maxime.coquelin@redhat.com; dev@dpdk.org
> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
> function
> 
> 
> On 2019/7/9 上午1:13, Marvin Liu wrote:
> > In fast enqueue function, will first check whether descriptors are
> > cache aligned. Fast enqueue function will check prerequisites in the
> > beginning. Fast enqueue function do not support chained mbufs, normal
> > function will handle that.
> >
> > Signed-off-by: Marvin Liu<yong.liu@intel.com>
> >
> > +	len = descs[avail_idx].len;
> > +	len1 = descs[avail_idx + 1].len;
> > +	len2 = descs[avail_idx + 2].len;
> > +	len3 = descs[avail_idx + 3].len;
> > +
> > +	if (unlikely((pkts[0]->pkt_len > (len - buf_offset)) |
> > +		(pkts[1]->pkt_len > (len1 - buf_offset)) |
> > +		(pkts[2]->pkt_len > (len2 - buf_offset)) |
> > +		(pkts[3]->pkt_len > (len3 - buf_offset))))
> > +		return -1;
> > +
> > +	desc_addr = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx].addr,
> > +			&len,
> > +			VHOST_ACCESS_RW);
> > +
> > +	desc_addr1 = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx + 1].addr,
> > +			&len1,
> > +			VHOST_ACCESS_RW);
> > +
> > +	desc_addr2 = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx + 2].addr,
> > +			&len2,
> > +			VHOST_ACCESS_RW);
> > +
> > +	desc_addr3 = vhost_iova_to_vva(dev, vq,
> > +			descs[avail_idx + 3].addr,
> > +			&len3,
> > +			VHOST_ACCESS_RW);
> 
> 
> How can you guarantee that len3 is zero after this?
> 

Jason,
Here just guarantee host mapped length of desc is same as value in desc.
If value of len matched, data can be directly copied. 

If anything wrong in address conversion, value of len will be mismatched. 
This case will be handled by normal path.

Thanks,
Marvin

> Thanks
Jason Wang July 11, 2019, 9:44 a.m. | #8
On 2019/7/11 下午5:37, Liu, Yong wrote:
>
>> -----Original Message-----
>> From: Jason Wang [mailto:jasowang@redhat.com]
>> Sent: Thursday, July 11, 2019 4:35 PM
>> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
>> maxime.coquelin@redhat.com; dev@dpdk.org
>> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
>> function
>>
>>
>> On 2019/7/9 上午1:13, Marvin Liu wrote:
>>> In fast enqueue function, will first check whether descriptors are
>>> cache aligned. Fast enqueue function will check prerequisites in the
>>> beginning. Fast enqueue function do not support chained mbufs, normal
>>> function will handle that.
>>>
>>> Signed-off-by: Marvin Liu<yong.liu@intel.com>
>>>
>>> +	len = descs[avail_idx].len;
>>> +	len1 = descs[avail_idx + 1].len;
>>> +	len2 = descs[avail_idx + 2].len;
>>> +	len3 = descs[avail_idx + 3].len;
>>> +
>>> +	if (unlikely((pkts[0]->pkt_len > (len - buf_offset)) |
>>> +		(pkts[1]->pkt_len > (len1 - buf_offset)) |
>>> +		(pkts[2]->pkt_len > (len2 - buf_offset)) |
>>> +		(pkts[3]->pkt_len > (len3 - buf_offset))))
>>> +		return -1;
>>> +
>>> +	desc_addr = vhost_iova_to_vva(dev, vq,
>>> +			descs[avail_idx].addr,
>>> +			&len,
>>> +			VHOST_ACCESS_RW);
>>> +
>>> +	desc_addr1 = vhost_iova_to_vva(dev, vq,
>>> +			descs[avail_idx + 1].addr,
>>> +			&len1,
>>> +			VHOST_ACCESS_RW);
>>> +
>>> +	desc_addr2 = vhost_iova_to_vva(dev, vq,
>>> +			descs[avail_idx + 2].addr,
>>> +			&len2,
>>> +			VHOST_ACCESS_RW);
>>> +
>>> +	desc_addr3 = vhost_iova_to_vva(dev, vq,
>>> +			descs[avail_idx + 3].addr,
>>> +			&len3,
>>> +			VHOST_ACCESS_RW);
>>
>> How can you guarantee that len3 is zero after this?
>>
> Jason,
> Here just guarantee host mapped length of desc is same as value in desc.
> If value of len matched, data can be directly copied.
>
> If anything wrong in address conversion, value of len will be mismatched.
> This case will be handled by normal path.
>
> Thanks,
> Marvin


Right, I miss-read the code.

Thanks


>
>> Thanks
Liu, Yong July 11, 2019, 9:49 a.m. | #9
> -----Original Message-----
> From: Jason Wang [mailto:jasowang@redhat.com]
> Sent: Thursday, July 11, 2019 12:11 PM
> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
> maxime.coquelin@redhat.com; dev@dpdk.org
> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
> function
> 
> 
> On 2019/7/10 下午3:30, Liu, Yong wrote:
> >
> >> -----Original Message-----
> >> From: Jason Wang [mailto:jasowang@redhat.com]
> >> Sent: Wednesday, July 10, 2019 12:28 PM
> >> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
> >> maxime.coquelin@redhat.com; dev@dpdk.org
> >> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast
> enqueue
> >> function
> >>
> >>
> >> On 2019/7/9 上午1:13, Marvin Liu wrote:
> >>> In fast enqueue function, will first check whether descriptors are
> >>> cache aligned. Fast enqueue function will check prerequisites in the
> >>> beginning. Fast enqueue function do not support chained mbufs, normal
> >>> function will handle that.
> >>>
> >>> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> >> Any reason for not letting compiler to unroll the loops?
> >>
> > Hi Jason,
> > I'm not sure about how much compiler can help on unrolling loops as it
> can't know how much loops will create in one call.
> > After force not using unroll-loop optimization by "-fno-unroll-loops",
> virtio_dev_rx_packed function size remained the same.
> > So look like gcc unroll-loop optimization do not help here.
> 
> 
> I meant something like "pragma GCC unroll N" just before the loop you
> want unrolled.
> 
> Thanks
> 

Hi Jason,
Just tired with gcc8.3.0 and master code, only 0.1Mpps performance gain with "#pragma GCC unroll". 
I think this compiler pragma is not helpful in the big loop which contained so much functions. 

Thanks,
Marvin

> 
> >
> > And fast enqueue function not only did unroll loop, it also checked cache
> alignment which can help performance in another side.
> >
> > Regards,
> > Marvin
> >
> >> Thanks
> >>
Jason Wang July 11, 2019, 9:54 a.m. | #10
On 2019/7/11 下午5:49, Liu, Yong wrote:
>
>> -----Original Message-----
>> From: Jason Wang [mailto:jasowang@redhat.com]
>> Sent: Thursday, July 11, 2019 12:11 PM
>> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
>> maxime.coquelin@redhat.com; dev@dpdk.org
>> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast enqueue
>> function
>>
>>
>> On 2019/7/10 下午3:30, Liu, Yong wrote:
>>>> -----Original Message-----
>>>> From: Jason Wang [mailto:jasowang@redhat.com]
>>>> Sent: Wednesday, July 10, 2019 12:28 PM
>>>> To: Liu, Yong <yong.liu@intel.com>; Bie, Tiwei <tiwei.bie@intel.com>;
>>>> maxime.coquelin@redhat.com; dev@dpdk.org
>>>> Subject: Re: [dpdk-dev] [RFC PATCH 02/13] add vhost packed ring fast
>> enqueue
>>>> function
>>>>
>>>>
>>>> On 2019/7/9 上午1:13, Marvin Liu wrote:
>>>>> In fast enqueue function, will first check whether descriptors are
>>>>> cache aligned. Fast enqueue function will check prerequisites in the
>>>>> beginning. Fast enqueue function do not support chained mbufs, normal
>>>>> function will handle that.
>>>>>
>>>>> Signed-off-by: Marvin Liu <yong.liu@intel.com>
>>>> Any reason for not letting compiler to unroll the loops?
>>>>
>>> Hi Jason,
>>> I'm not sure about how much compiler can help on unrolling loops as it
>> can't know how much loops will create in one call.
>>> After force not using unroll-loop optimization by "-fno-unroll-loops",
>> virtio_dev_rx_packed function size remained the same.
>>> So look like gcc unroll-loop optimization do not help here.
>>
>> I meant something like "pragma GCC unroll N" just before the loop you
>> want unrolled.
>>
>> Thanks
>>
> Hi Jason,
> Just tired with gcc8.3.0 and master code, only 0.1Mpps performance gain with "#pragma GCC unroll".
> I think this compiler pragma is not helpful in the big loop which contained so much functions.
>
> Thanks,
> Marvin


Yes, it probably need some trick e.g break the big loop into small ones. 
What I want do here is unroll the loop based on 
PACKED_DESC_PER_CACHELINE instead of a hard-coded 4.

Thanks


>>> And fast enqueue function not only did unroll loop, it also checked cache
>> alignment which can help performance in another side.
>>> Regards,
>>> Marvin
>>>
>>>> Thanks
>>>>

Patch

diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 884befa85..f24026acd 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -39,6 +39,8 @@ 
 
 #define VHOST_LOG_CACHE_NR 32
 
+/* Used in fast packed ring functions */
+#define PACKED_DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_packed_desc))
 /**
  * Structure contains buffer address, length and descriptor index
  * from vring to do scatter RX.
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 003aec1d4..b877510da 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -897,6 +897,115 @@  virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return pkt_idx;
 }
 
+static __rte_always_inline uint16_t
+virtio_dev_rx_fast_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		struct rte_mbuf **pkts)
+{
+	bool wrap_counter = vq->avail_wrap_counter;
+	struct vring_packed_desc *descs = vq->desc_packed;
+	uint16_t avail_idx = vq->last_avail_idx;
+	uint64_t desc_addr, desc_addr1, desc_addr2, desc_addr3, len, len1,
+		len2, len3;
+	struct virtio_net_hdr_mrg_rxbuf *hdr, *hdr1, *hdr2, *hdr3;
+	uint32_t buf_offset = dev->vhost_hlen;
+
+	if (unlikely(avail_idx & 0x3))
+		return -1;
+
+	if (unlikely(avail_idx < (vq->size - PACKED_DESC_PER_CACHELINE)))
+		rte_prefetch0((void *)(uintptr_t)&descs[avail_idx +
+			PACKED_DESC_PER_CACHELINE]);
+	else
+		rte_prefetch0((void *)(uintptr_t)&descs[0]);
+
+	if (unlikely((pkts[0]->next != NULL) |
+		(pkts[1]->next != NULL) |
+		(pkts[2]->next != NULL) |
+		(pkts[3]->next != NULL)))
+		return -1;
+
+	if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)) |
+		unlikely(!desc_is_avail(&descs[avail_idx + 1], wrap_counter)) |
+		unlikely(!desc_is_avail(&descs[avail_idx + 2], wrap_counter)) |
+		unlikely(!desc_is_avail(&descs[avail_idx + 3], wrap_counter)))
+		return 1;
+
+	rte_smp_rmb();
+
+	len = descs[avail_idx].len;
+	len1 = descs[avail_idx + 1].len;
+	len2 = descs[avail_idx + 2].len;
+	len3 = descs[avail_idx + 3].len;
+
+	if (unlikely((pkts[0]->pkt_len > (len - buf_offset)) |
+		(pkts[1]->pkt_len > (len1 - buf_offset)) |
+		(pkts[2]->pkt_len > (len2 - buf_offset)) |
+		(pkts[3]->pkt_len > (len3 - buf_offset))))
+		return -1;
+
+	desc_addr = vhost_iova_to_vva(dev, vq,
+			descs[avail_idx].addr,
+			&len,
+			VHOST_ACCESS_RW);
+
+	desc_addr1 = vhost_iova_to_vva(dev, vq,
+			descs[avail_idx + 1].addr,
+			&len1,
+			VHOST_ACCESS_RW);
+
+	desc_addr2 = vhost_iova_to_vva(dev, vq,
+			descs[avail_idx + 2].addr,
+			&len2,
+			VHOST_ACCESS_RW);
+
+	desc_addr3 = vhost_iova_to_vva(dev, vq,
+			descs[avail_idx + 3].addr,
+			&len3,
+			VHOST_ACCESS_RW);
+
+	if (unlikely((len != descs[avail_idx].len) |
+		(len1 != descs[avail_idx + 1].len) |
+		(len2 != descs[avail_idx + 2].len) |
+		(len3 != descs[avail_idx + 3].len)))
+		return -1;
+
+	hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
+	hdr1 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr1;
+	hdr2 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr2;
+	hdr3 = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr3;
+
+	virtio_enqueue_offload(pkts[0], &hdr->hdr);
+	virtio_enqueue_offload(pkts[1], &hdr1->hdr);
+	virtio_enqueue_offload(pkts[2], &hdr2->hdr);
+	virtio_enqueue_offload(pkts[3], &hdr3->hdr);
+
+	len = pkts[0]->pkt_len + dev->vhost_hlen;
+	len1 = pkts[1]->pkt_len + dev->vhost_hlen;
+	len2 = pkts[2]->pkt_len + dev->vhost_hlen;
+	len3 = pkts[3]->pkt_len + dev->vhost_hlen;
+
+	vq->last_avail_idx += PACKED_DESC_PER_CACHELINE;
+	if (vq->last_avail_idx >= vq->size) {
+		vq->last_avail_idx -= vq->size;
+		vq->avail_wrap_counter ^= 1;
+	}
+
+	rte_memcpy((void *)(uintptr_t)(desc_addr + buf_offset),
+		rte_pktmbuf_mtod_offset(pkts[0], void *, 0),
+		pkts[0]->pkt_len);
+	rte_memcpy((void *)(uintptr_t)(desc_addr1 + buf_offset),
+		rte_pktmbuf_mtod_offset(pkts[1], void *, 0),
+		pkts[1]->pkt_len);
+	rte_memcpy((void *)(uintptr_t)(desc_addr2 + buf_offset),
+		rte_pktmbuf_mtod_offset(pkts[2], void *, 0),
+		pkts[2]->pkt_len);
+	rte_memcpy((void *)(uintptr_t)(desc_addr3 + buf_offset),
+		rte_pktmbuf_mtod_offset(pkts[3], void *, 0),
+		pkts[3]->pkt_len);
+
+	return 0;
+}
+
 static __rte_always_inline uint16_t
 virtio_dev_rx_normal_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 			struct rte_mbuf *pkt)