[v8,13/17] test/distributor: add test with packets marking
diff mbox series

Message ID 20201017030701.16134-14-l.wojciechow@partner.samsung.com
State Accepted
Delegated to: David Marchand
Headers show
Series
  • fix distributor synchronization issues
Related show

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Lukasz Wojciechowski Oct. 17, 2020, 3:06 a.m. UTC
All of the former tests analyzed only statistics
of packets processed by all workers.
The new test verifies also if packets are processed
on workers as expected.
Every packets processed by the worker is marked
and analyzed after it is returned to distributor.

This test allows finding issues in matching algorithms.

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Acked-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor.c | 141 ++++++++++++++++++++++++++++++++++++
 1 file changed, 141 insertions(+)

Patch
diff mbox series

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index fdb6ea9ce..cfae5a1ac 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -543,6 +543,141 @@  test_flush_with_worker_shutdown(struct worker_params *wp,
 	return 0;
 }
 
+static int
+handle_and_mark_work(void *arg)
+{
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *db = wp->dist;
+	unsigned int num, i;
+	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
+	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
+	while (!quit) {
+		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+				__ATOMIC_RELAXED);
+		for (i = 0; i < num; i++)
+			buf[i]->udata64 += id + 1;
+		num = rte_distributor_get_pkt(db, id,
+				buf, buf, num);
+	}
+	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+			__ATOMIC_RELAXED);
+	rte_distributor_return_pkt(db, id, buf, num);
+	return 0;
+}
+
+/* sanity_mark_test sends packets to workers which mark them.
+ * Every packet has also encoded sequence number.
+ * The returned packets are sorted and verified if they were handled
+ * by proper workers.
+ */
+static int
+sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
+{
+	const unsigned int buf_count = 24;
+	const unsigned int burst = 8;
+	const unsigned int shift = 12;
+	const unsigned int seq_shift = 10;
+
+	struct rte_distributor *db = wp->dist;
+	struct rte_mbuf *bufs[buf_count];
+	struct rte_mbuf *returns[buf_count];
+	unsigned int i, count, id;
+	unsigned int sorted[buf_count], seq;
+	unsigned int failed = 0;
+
+	printf("=== Marked packets test ===\n");
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
+		printf("line %d: Error getting mbufs from pool\n", __LINE__);
+		return -1;
+	}
+
+	/* bufs' hashes will be like these below, but shifted left.
+	 * The shifting is for avoiding collisions with backlogs
+	 * and in-flight tags left by previous tests.
+	 * [1, 1, 1, 1, 1, 1, 1, 1
+	 *  1, 1, 1, 1, 2, 2, 2, 2
+	 *  2, 2, 2, 2, 1, 1, 1, 1]
+	 */
+	for (i = 0; i < burst; i++) {
+		bufs[0 * burst + i]->hash.usr = 1 << shift;
+		bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
+			<< shift;
+		bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
+			<< shift;
+	}
+	/* Assign a sequence number to each packet. The sequence is shifted,
+	 * so that lower bits of the udate64 will hold mark from worker.
+	 */
+	for (i = 0; i < buf_count; i++)
+		bufs[i]->udata64 = i << seq_shift;
+
+	count = 0;
+	for (i = 0; i < buf_count/burst; i++) {
+		rte_distributor_process(db, &bufs[i * burst], burst);
+		count += rte_distributor_returned_pkts(db, &returns[count],
+			buf_count - count);
+	}
+
+	do {
+		rte_distributor_flush(db);
+		count += rte_distributor_returned_pkts(db, &returns[count],
+			buf_count - count);
+	} while (count < buf_count);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_RELAXED));
+
+	/* Sort returned packets by sent order (sequence numbers). */
+	for (i = 0; i < buf_count; i++) {
+		seq = returns[i]->udata64 >> seq_shift;
+		id = returns[i]->udata64 - (seq << seq_shift);
+		sorted[seq] = id;
+	}
+
+	/* Verify that packets [0-11] and [20-23] were processed
+	 * by the same worker
+	 */
+	for (i = 1; i < 12; i++) {
+		if (sorted[i] != sorted[0]) {
+			printf("Packet number %u processed by worker %u,"
+				" but should be processes by worker %u\n",
+				i, sorted[i], sorted[0]);
+			failed = 1;
+		}
+	}
+	for (i = 20; i < 24; i++) {
+		if (sorted[i] != sorted[0]) {
+			printf("Packet number %u processed by worker %u,"
+				" but should be processes by worker %u\n",
+				i, sorted[i], sorted[0]);
+			failed = 1;
+		}
+	}
+	/* And verify that packets [12-19] were processed
+	 * by the another worker
+	 */
+	for (i = 13; i < 20; i++) {
+		if (sorted[i] != sorted[12]) {
+			printf("Packet number %u processed by worker %u,"
+				" but should be processes by worker %u\n",
+				i, sorted[i], sorted[12]);
+			failed = 1;
+		}
+	}
+
+	rte_mempool_put_bulk(p, (void *)bufs, buf_count);
+
+	if (failed)
+		return -1;
+
+	printf("Marked packets test passed\n");
+	return 0;
+}
+
 static
 int test_error_distributor_create_name(void)
 {
@@ -727,6 +862,12 @@  test_distributor(void)
 				goto err;
 			quit_workers(&worker_params, p);
 
+			rte_eal_mp_remote_launch(handle_and_mark_work,
+					&worker_params, SKIP_MASTER);
+			if (sanity_mark_test(&worker_params, p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
 		} else {
 			printf("Too few cores to run worker shutdown test\n");
 		}