On 2/19/19 11:59 AM, Tiwei Bie wrote:
> When IN_ORDER feature is negotiated, device may just write out a
> single used descriptor for a batch of buffers:
>
> """
> Some devices always use descriptors in the same order in which they
> have been made available. These devices can offer the VIRTIO_F_IN_ORDER
> feature. If negotiated, this knowledge allows devices to notify the
> use of a batch of buffers to the driver by only writing out a single
> used descriptor with the Buffer ID corresponding to the last descriptor
> in the batch.
>
> The device then skips forward in the ring according to the size of the
> batch. The driver needs to look up the used Buffer ID and calculate the
> batch size to be able to advance to where the next used descriptor will
> be written by the device.
> """
>
> But the Tx path of packed ring can't handle this. With this patch,
> when IN_ORDER is negotiated, driver will manage the IDs linearly,
> look up the used buffer ID and advance to the next used descriptor
> that will be written by the device.
>
> Fixes: 892dc798fa9c ("net/virtio: implement Tx path for packed queues")
> Cc: stable@dpdk.org
>
> Signed-off-by: Tiwei Bie <tiwei.bie@intel.com>
> ---
> drivers/net/virtio/virtio_ethdev.c | 4 +-
> drivers/net/virtio/virtio_rxtx.c | 69 ++++++++++++++++++++++++------
> 2 files changed, 59 insertions(+), 14 deletions(-)
>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
Thanks,
Maxime
@@ -1453,7 +1453,8 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
if (vtpci_packed_queue(hw)) {
PMD_INIT_LOG(INFO,
- "virtio: using packed ring standard Tx path on port %u",
+ "virtio: using packed ring %s Tx path on port %u",
+ hw->use_inorder_tx ? "inorder" : "standard",
eth_dev->data->port_id);
eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
} else {
@@ -2069,7 +2070,6 @@ virtio_dev_configure(struct rte_eth_dev *dev)
if (vtpci_packed_queue(hw)) {
hw->use_simple_rx = 0;
hw->use_inorder_rx = 0;
- hw->use_inorder_tx = 0;
}
#if defined RTE_ARCH_ARM64 || defined RTE_ARCH_ARM
@@ -224,9 +224,40 @@ virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
#define DEFAULT_TX_FREE_THRESH 32
#endif
-/* Cleanup from completed transmits. */
static void
-virtio_xmit_cleanup_packed(struct virtqueue *vq, int num)
+virtio_xmit_cleanup_inorder_packed(struct virtqueue *vq, int num)
+{
+ uint16_t used_idx, id, curr_id, free_cnt = 0;
+ uint16_t size = vq->vq_nentries;
+ struct vring_packed_desc *desc = vq->ring_packed.desc_packed;
+ struct vq_desc_extra *dxp;
+
+ used_idx = vq->vq_used_cons_idx;
+ while (num > 0 && desc_is_used(&desc[used_idx], vq)) {
+ virtio_rmb(vq->hw->weak_barriers);
+ id = desc[used_idx].id;
+ do {
+ curr_id = used_idx;
+ dxp = &vq->vq_descx[used_idx];
+ used_idx += dxp->ndescs;
+ free_cnt += dxp->ndescs;
+ num -= dxp->ndescs;
+ if (used_idx >= size) {
+ used_idx -= size;
+ vq->used_wrap_counter ^= 1;
+ }
+ if (dxp->cookie != NULL) {
+ rte_pktmbuf_free(dxp->cookie);
+ dxp->cookie = NULL;
+ }
+ } while (curr_id != id);
+ }
+ vq->vq_used_cons_idx = used_idx;
+ vq->vq_free_cnt += free_cnt;
+}
+
+static void
+virtio_xmit_cleanup_normal_packed(struct virtqueue *vq, int num)
{
uint16_t used_idx, id;
uint16_t size = vq->vq_nentries;
@@ -252,6 +283,16 @@ virtio_xmit_cleanup_packed(struct virtqueue *vq, int num)
}
}
+/* Cleanup from completed transmits. */
+static inline void
+virtio_xmit_cleanup_packed(struct virtqueue *vq, int num, int in_order)
+{
+ if (in_order)
+ virtio_xmit_cleanup_inorder_packed(vq, num);
+ else
+ virtio_xmit_cleanup_normal_packed(vq, num);
+}
+
static void
virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
{
@@ -582,7 +623,7 @@ virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
static inline void
virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
- uint16_t needed, int can_push)
+ uint16_t needed, int can_push, int in_order)
{
struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
struct vq_desc_extra *dxp;
@@ -593,7 +634,7 @@ virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
struct virtio_net_hdr *hdr;
uint16_t prev;
- id = vq->vq_desc_head_idx;
+ id = in_order ? vq->vq_avail_idx : vq->vq_desc_head_idx;
dxp = &vq->vq_descx[id];
dxp->ndescs = needed;
@@ -670,13 +711,14 @@ virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
start_dp[prev].id = id;
vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
-
- vq->vq_desc_head_idx = dxp->next;
- if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
- vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
-
vq->vq_avail_idx = idx;
+ if (!in_order) {
+ vq->vq_desc_head_idx = dxp->next;
+ if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
+ vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
+ }
+
virtio_wmb(vq->hw->weak_barriers);
head_dp->flags = head_flags;
}
@@ -1889,6 +1931,7 @@ virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
struct virtio_hw *hw = vq->hw;
uint16_t hdr_size = hw->vtnet_hdr_size;
uint16_t nb_tx = 0;
+ bool in_order = hw->use_inorder_tx;
int error;
if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
@@ -1900,7 +1943,8 @@ virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
if (nb_pkts > vq->vq_free_cnt)
- virtio_xmit_cleanup_packed(vq, nb_pkts - vq->vq_free_cnt);
+ virtio_xmit_cleanup_packed(vq, nb_pkts - vq->vq_free_cnt,
+ in_order);
for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
struct rte_mbuf *txm = tx_pkts[nb_tx];
@@ -1935,7 +1979,7 @@ virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
/* Positive value indicates it need free vring descriptors */
if (unlikely(need > 0)) {
- virtio_xmit_cleanup_packed(vq, need);
+ virtio_xmit_cleanup_packed(vq, need, in_order);
need = slots - vq->vq_free_cnt;
if (unlikely(need > 0)) {
PMD_TX_LOG(ERR,
@@ -1945,7 +1989,8 @@ virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
}
/* Enqueue Packet buffers */
- virtqueue_enqueue_xmit_packed(txvq, txm, slots, can_push);
+ virtqueue_enqueue_xmit_packed(txvq, txm, slots, can_push,
+ in_order);
virtio_update_packet_stats(&txvq->stats, txm);
}