From patchwork Tue Mar 9 10:19:56 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Loftus, Ciara" X-Patchwork-Id: 88745 X-Patchwork-Delegate: ferruh.yigit@amd.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 5C258A0567; Tue, 9 Mar 2021 11:50:55 +0100 (CET) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id E571622A4F2; Tue, 9 Mar 2021 11:50:49 +0100 (CET) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by mails.dpdk.org (Postfix) with ESMTP id 0193322A498 for ; Tue, 9 Mar 2021 11:50:47 +0100 (CET) IronPort-SDR: hyIx7mjdH3LDSEogkrdnRRzPl1e2+JmErk5Iyrb/QfCer7V7qAqRALLORyHO8liJWotgynp+mP mwGcS03gISWQ== X-IronPort-AV: E=McAfee;i="6000,8403,9917"; a="168118943" X-IronPort-AV: E=Sophos;i="5.81,234,1610438400"; d="scan'208";a="168118943" Received: from orsmga006.jf.intel.com ([10.7.209.51]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 09 Mar 2021 02:50:46 -0800 IronPort-SDR: LWqtPZu8zkt6w9josPPq2DjxbrrO2c03DLXeBV75X/4MB+MCA4e2B0eWfjxuL4OwiEzATDbCQv UaJ60bNdyAfA== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.81,234,1610438400"; d="scan'208";a="371499380" Received: from silpixa00399839.ir.intel.com (HELO localhost.localdomain) ([10.237.222.142]) by orsmga006.jf.intel.com with ESMTP; 09 Mar 2021 02:50:45 -0800 From: Ciara Loftus To: dev@dpdk.org Cc: Ciara Loftus Date: Tue, 9 Mar 2021 10:19:56 +0000 Message-Id: <20210309101958.27355-2-ciara.loftus@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20210309101958.27355-1-ciara.loftus@intel.com> References: <20210224111852.11947-1-ciara.loftus@intel.com> <20210309101958.27355-1-ciara.loftus@intel.com> MIME-Version: 1.0 Subject: [dpdk-dev] [PATCH v2 1/3] net/af_xdp: allow bigger batch sizes X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Prior to this commit, the maximum batch sizes for zero-copy and copy-mode rx and copy-mode tx were set to 32. Apart from zero-copy tx, the user could never rx/tx any more than 32 packets at a time and without inspecting the code the user wouldn't be aware of this. This commit removes these upper limits placed on the user and instead sets an internal batch size equal to the default ring size (2048). Batches larger than this are still processed, however they are split into smaller batches similar to how it's done in other drivers. This is necessary because some arrays used during rx/tx need to be sized at compile-time. Allowing a larger batch size allows for fewer batches and thus larger bulk operations, fewer ring accesses and fewer syscalls which should yield improved performance. Signed-off-by: Ciara Loftus --- drivers/net/af_xdp/rte_eth_af_xdp.c | 67 ++++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c index 3957227bf0..be524e4784 100644 --- a/drivers/net/af_xdp/rte_eth_af_xdp.c +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c @@ -66,8 +66,8 @@ RTE_LOG_REGISTER(af_xdp_logtype, pmd.net.af_xdp, NOTICE); #define ETH_AF_XDP_DFLT_START_QUEUE_IDX 0 #define ETH_AF_XDP_DFLT_QUEUE_COUNT 1 -#define ETH_AF_XDP_RX_BATCH_SIZE 32 -#define ETH_AF_XDP_TX_BATCH_SIZE 32 +#define ETH_AF_XDP_RX_BATCH_SIZE XSK_RING_CONS__DEFAULT_NUM_DESCS +#define ETH_AF_XDP_TX_BATCH_SIZE XSK_RING_CONS__DEFAULT_NUM_DESCS struct xsk_umem_info { @@ -329,8 +329,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE]; if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) - (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE, - NULL, fq); + (void)reserve_fill_queue(umem, nb_pkts, NULL, fq); nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); if (nb_pkts == 0) { @@ -379,10 +378,8 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) #endif static uint16_t -eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) { - nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE); - #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG) return af_xdp_rx_zc(queue, bufs, nb_pkts); #else @@ -390,6 +387,32 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) #endif } +static uint16_t +eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + uint16_t nb_rx; + + if (likely(nb_pkts <= ETH_AF_XDP_RX_BATCH_SIZE)) + return af_xdp_rx(queue, bufs, nb_pkts); + + /* Split larger batch into smaller batches of size + * ETH_AF_XDP_RX_BATCH_SIZE or less. + */ + nb_rx = 0; + while (nb_pkts) { + uint16_t ret, n; + + n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE); + ret = af_xdp_rx(queue, &bufs[nb_rx], n); + nb_rx = (uint16_t)(nb_rx + ret); + nb_pkts = (uint16_t)(nb_pkts - ret); + if (ret < n) + break; + } + + return nb_rx; +} + static void pull_umem_cq(struct xsk_umem_info *umem, int size, struct xsk_ring_cons *cq) { @@ -535,8 +558,6 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) uint32_t idx_tx; struct xsk_ring_cons *cq = &txq->pair->cq; - nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); - pull_umem_cq(umem, nb_pkts, cq); nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs, @@ -575,6 +596,32 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) return nb_pkts; } + +static uint16_t +af_xdp_tx_cp_batch(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + uint16_t nb_tx; + + if (likely(nb_pkts <= ETH_AF_XDP_TX_BATCH_SIZE)) + return af_xdp_tx_cp(queue, bufs, nb_pkts); + + nb_tx = 0; + while (nb_pkts) { + uint16_t ret, n; + + /* Split larger batch into smaller batches of size + * ETH_AF_XDP_TX_BATCH_SIZE or less. + */ + n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); + ret = af_xdp_tx_cp(queue, &bufs[nb_tx], n); + nb_tx = (uint16_t)(nb_tx + ret); + nb_pkts = (uint16_t)(nb_pkts - ret); + if (ret < n) + break; + } + + return nb_tx; +} #endif static uint16_t @@ -583,7 +630,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG) return af_xdp_tx_zc(queue, bufs, nb_pkts); #else - return af_xdp_tx_cp(queue, bufs, nb_pkts); + return af_xdp_tx_cp_batch(queue, bufs, nb_pkts); #endif }