diff mbox series

[06/10] port: rework the ring output port behavior to non-blocking

Message ID 20220805220029.1096212-7-cristian.dumitrescu@intel.com (mailing list archive)
State Accepted
Delegated to: Thomas Monjalon
Headers show
Series port: implement output port non-blocking behavior | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Dumitrescu, Cristian Aug. 5, 2022, 10 p.m. UTC
Drop packets that cannot be sent instead of retry sending the same
packets potentially forever when the ring consumer that is down.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
---
 lib/port/rte_swx_port_ring.c | 99 ++++++++++++++++++++++++------------
 1 file changed, 66 insertions(+), 33 deletions(-)
diff mbox series

Patch

diff --git a/lib/port/rte_swx_port_ring.c b/lib/port/rte_swx_port_ring.c
index c62fb3d8c8..72ec7209bf 100644
--- a/lib/port/rte_swx_port_ring.c
+++ b/lib/port/rte_swx_port_ring.c
@@ -172,6 +172,7 @@  struct writer {
 
 	struct rte_mbuf **pkts;
 	int n_pkts;
+	uint32_t n_bytes;
 };
 
 static void *
@@ -218,31 +219,55 @@  writer_create(void *args)
 	return NULL;
 }
 
-static void
+static inline void
 __writer_flush(struct writer *p)
 {
-	int n_pkts;
-
-	for (n_pkts = 0; ; ) {
-		n_pkts += rte_ring_sp_enqueue_burst(p->params.ring,
-						    (void **)p->pkts + n_pkts,
-						    p->n_pkts - n_pkts,
-						    NULL);
-
-		TRACE("[Ring %s] %d packets out\n", p->params.name, n_pkts);
-
-		if (n_pkts == p->n_pkts)
-			break;
+	struct rte_mbuf **pkts = p->pkts;
+	uint64_t n_pkts_total = p->stats.n_pkts;
+	uint64_t n_bytes_total = p->stats.n_bytes;
+	uint64_t n_pkts_drop_total = p->stats.n_pkts_drop;
+	uint64_t n_bytes_drop_total = p->stats.n_bytes_drop;
+	int n_pkts = p->n_pkts, n_pkts_drop, n_pkts_tx;
+	uint32_t n_bytes = p->n_bytes, n_bytes_drop = 0;
+
+	/* Packet TX. */
+	n_pkts_tx = rte_ring_sp_enqueue_burst(p->params.ring,
+					      (void **)pkts,
+					      n_pkts,
+					      NULL);
+
+	/* Packet drop. */
+	n_pkts_drop = n_pkts - n_pkts_tx;
+
+	for ( ; n_pkts_tx < n_pkts; n_pkts_tx++) {
+		struct rte_mbuf *m = pkts[n_pkts_tx];
+
+		n_bytes_drop += m->pkt_len;
+		rte_pktmbuf_free(m);
 	}
 
+	/* Port update. */
+	p->stats.n_pkts = n_pkts_total + n_pkts - n_pkts_drop;
+	p->stats.n_bytes = n_bytes_total + n_bytes - n_bytes_drop;
+	p->stats.n_pkts_drop = n_pkts_drop_total + n_pkts_drop;
+	p->stats.n_bytes_drop = n_bytes_drop_total + n_bytes_drop;
 	p->n_pkts = 0;
+	p->n_bytes = 0;
+
+	TRACE("[Ring %s] Buffered packets flushed: %d out, %d dropped\n",
+	      p->params.name,
+	      n_pkts - n_pkts_drop,
+	      n_pkts_drop);
 }
 
 static void
 writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
 {
 	struct writer *p = port;
+	int n_pkts = p->n_pkts;
+	uint32_t n_bytes = p->n_bytes;
 	struct rte_mbuf *m = pkt->handle;
+	uint32_t pkt_length = pkt->length;
 
 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
 	      p->params.name,
@@ -252,15 +277,15 @@  writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
 	if (TRACE_LEVEL)
 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
 
-	m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len);
-	m->pkt_len = pkt->length;
+	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
+	m->pkt_len = pkt_length;
 	m->data_off = (uint16_t)pkt->offset;
 
-	p->stats.n_pkts++;
-	p->stats.n_bytes += pkt->length;
+	p->pkts[n_pkts++] = m;
+	p->n_pkts = n_pkts;
+	p->n_bytes = n_bytes + pkt_length;
 
-	p->pkts[p->n_pkts++] = m;
-	if (p->n_pkts ==  (int)p->params.burst_size)
+	if (n_pkts == (int)p->params.burst_size)
 		__writer_flush(p);
 }
 
@@ -268,7 +293,11 @@  static void
 writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt)
 {
 	struct writer *p = port;
+	int n_pkts = p->n_pkts;
+	uint32_t n_bytes = p->n_bytes;
+	uint64_t n_pkts_clone = p->stats.n_pkts_clone;
 	struct rte_mbuf *m = pkt->handle;
+	uint32_t pkt_length = pkt->length;
 
 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (fast clone)\n",
 	      p->params.name,
@@ -278,17 +307,17 @@  writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt)
 	if (TRACE_LEVEL)
 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
 
-	m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len);
-	m->pkt_len = pkt->length;
+	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
+	m->pkt_len = pkt_length;
 	m->data_off = (uint16_t)pkt->offset;
 	rte_pktmbuf_refcnt_update(m, 1);
 
-	p->stats.n_pkts++;
-	p->stats.n_bytes += pkt->length;
-	p->stats.n_pkts_clone++;
+	p->pkts[n_pkts++] = m;
+	p->n_pkts = n_pkts;
+	p->n_bytes = n_bytes + pkt_length;
+	p->stats.n_pkts_clone = n_pkts_clone + 1;
 
-	p->pkts[p->n_pkts++] = m;
-	if (p->n_pkts == (int)p->params.burst_size)
+	if (n_pkts == (int)p->params.burst_size)
 		__writer_flush(p);
 }
 
@@ -296,7 +325,11 @@  static void
 writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_length)
 {
 	struct writer *p = port;
+	int n_pkts = p->n_pkts;
+	uint32_t n_bytes = p->n_bytes;
+	uint64_t n_pkts_clone = p->stats.n_pkts_clone;
 	struct rte_mbuf *m = pkt->handle, *m_clone;
+	uint32_t pkt_length = pkt->length;
 
 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (clone)\n",
 	      p->params.name,
@@ -306,8 +339,8 @@  writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_len
 	if (TRACE_LEVEL)
 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
 
-	m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len);
-	m->pkt_len = pkt->length;
+	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
+	m->pkt_len = pkt_length;
 	m->data_off = (uint16_t)pkt->offset;
 
 	m_clone = rte_pktmbuf_copy(m, m->pool, 0, truncation_length);
@@ -316,12 +349,12 @@  writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_len
 		return;
 	}
 
-	p->stats.n_pkts++;
-	p->stats.n_bytes += pkt->length;
-	p->stats.n_pkts_clone++;
+	p->pkts[n_pkts++] = m_clone;
+	p->n_pkts = n_pkts;
+	p->n_bytes = n_bytes + pkt_length;
+	p->stats.n_pkts_clone = n_pkts_clone + 1;
 
-	p->pkts[p->n_pkts++] = m_clone;
-	if (p->n_pkts == (int)p->params.burst_size)
+	if (n_pkts == (int)p->params.burst_size)
 		__writer_flush(p);
 }