[V6,08/11] examples/pipeline: rework the thread configuration updates

Message ID 20230126141256.380415-9-cristian.dumitrescu@intel.com (mailing list archive)
State Accepted, archived
Delegated to: Thomas Monjalon
Headers
Series pipeline: add IPsec support |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Cristian Dumitrescu Jan. 26, 2023, 2:12 p.m. UTC
  Previously, the configuration updates for the data plane threads were
performed through message queues. Now, this mechanism is replaced by
the control thread updating the mirror copy of the data plane thread
configuration followed by pointer swapping.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
Signed-off-by: Kamalakannan R <kamalakannan.r@intel.com>
---
 examples/pipeline/cli.c                       | 154 ++---
 examples/pipeline/examples/fib.cli            |   2 +-
 examples/pipeline/examples/hash_func.cli      |   2 +-
 examples/pipeline/examples/l2fwd.cli          |   2 +-
 examples/pipeline/examples/l2fwd_macswp.cli   |   2 +-
 .../pipeline/examples/l2fwd_macswp_pcap.cli   |   2 +-
 examples/pipeline/examples/l2fwd_pcap.cli     |   2 +-
 examples/pipeline/examples/learner.cli        |   2 +-
 examples/pipeline/examples/meter.cli          |   2 +-
 examples/pipeline/examples/mirroring.cli      |   2 +-
 examples/pipeline/examples/recirculation.cli  |   2 +-
 examples/pipeline/examples/registers.cli      |   2 +-
 examples/pipeline/examples/selector.cli       |   2 +-
 examples/pipeline/examples/varbit.cli         |   2 +-
 examples/pipeline/examples/vxlan.cli          |   2 +-
 examples/pipeline/examples/vxlan_pcap.cli     |   2 +-
 examples/pipeline/thread.c                    | 586 ++++--------------
 examples/pipeline/thread.h                    |  17 +-
 18 files changed, 217 insertions(+), 570 deletions(-)
  

Patch

diff --git a/examples/pipeline/cli.c b/examples/pipeline/cli.c
index bcb3e54fb0..d9c325c89c 100644
--- a/examples/pipeline/cli.c
+++ b/examples/pipeline/cli.c
@@ -3171,119 +3171,86 @@  cmd_ipsec_sa_delete(char **tokens,
 	rte_swx_ipsec_sa_delete(ipsec, sa_id);
 }
 
-static const char cmd_thread_pipeline_enable_help[] =
-"thread <thread_id> pipeline <pipeline_name> enable [ period <timer_period_ms> ]\n";
-
-#ifndef TIMER_PERIOD_MS_DEFAULT
-#define TIMER_PERIOD_MS_DEFAULT 10
-#endif
+static const char cmd_pipeline_enable_help[] =
+"pipeline <pipeline_name> enable thread <thread_id>\n";
 
 static void
-cmd_thread_pipeline_enable(char **tokens,
-	uint32_t n_tokens,
-	char *out,
-	size_t out_size,
-	void *obj __rte_unused)
+cmd_pipeline_enable(char **tokens,
+		    uint32_t n_tokens,
+		    char *out,
+		    size_t out_size,
+		    void *obj __rte_unused)
 {
 	char *pipeline_name;
 	struct rte_swx_pipeline *p;
-	uint32_t thread_id, timer_period_ms = TIMER_PERIOD_MS_DEFAULT;
+	uint32_t thread_id;
 	int status;
 
-	if ((n_tokens != 5) && (n_tokens != 7)) {
+	if (n_tokens != 5) {
 		snprintf(out, out_size, MSG_ARG_MISMATCH, tokens[0]);
 		return;
 	}
 
-	if (parser_read_uint32(&thread_id, tokens[1]) != 0) {
-		snprintf(out, out_size, MSG_ARG_INVALID, "thread_id");
-		return;
-	}
-
-	if (strcmp(tokens[2], "pipeline") != 0) {
-		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "pipeline");
-		return;
-	}
-
-	pipeline_name = tokens[3];
+	pipeline_name = tokens[1];
 	p = rte_swx_pipeline_find(pipeline_name);
 	if (!p) {
 		snprintf(out, out_size, MSG_ARG_INVALID, "pipeline_name");
 		return;
 	}
 
-	if (strcmp(tokens[4], "enable") != 0) {
+	if (strcmp(tokens[2], "enable") != 0) {
 		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "enable");
 		return;
 	}
 
-	if (n_tokens == 7) {
-		if (strcmp(tokens[5], "period") != 0) {
-			snprintf(out, out_size, MSG_ARG_NOT_FOUND, "period");
-			return;
-		}
+	if (strcmp(tokens[3], "thread") != 0) {
+		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "thread");
+		return;
+	}
 
-		if (parser_read_uint32(&timer_period_ms, tokens[6]) != 0) {
-			snprintf(out, out_size, MSG_ARG_INVALID, "timer_period_ms");
-			return;
-		}
+	if (parser_read_uint32(&thread_id, tokens[4]) != 0) {
+		snprintf(out, out_size, MSG_ARG_INVALID, "thread_id");
+		return;
 	}
 
-	status = thread_pipeline_enable(thread_id, p, timer_period_ms);
+	status = pipeline_enable(p, thread_id);
 	if (status) {
-		snprintf(out, out_size, MSG_CMD_FAIL, "thread pipeline enable");
+		snprintf(out, out_size, MSG_CMD_FAIL, "pipeline enable");
 		return;
 	}
 }
 
-static const char cmd_thread_pipeline_disable_help[] =
-"thread <thread_id> pipeline <pipeline_name> disable\n";
+static const char cmd_pipeline_disable_help[] =
+"pipeline <pipeline_name> disable\n";
 
 static void
-cmd_thread_pipeline_disable(char **tokens,
-	uint32_t n_tokens,
-	char *out,
-	size_t out_size,
-	void *obj __rte_unused)
+cmd_pipeline_disable(char **tokens,
+		     uint32_t n_tokens,
+		     char *out,
+		     size_t out_size,
+		     void *obj __rte_unused)
 {
 	struct rte_swx_pipeline *p;
 	char *pipeline_name;
-	uint32_t thread_id;
-	int status;
 
-	if (n_tokens != 5) {
+	if (n_tokens != 3) {
 		snprintf(out, out_size, MSG_ARG_MISMATCH, tokens[0]);
 		return;
 	}
 
-	if (parser_read_uint32(&thread_id, tokens[1]) != 0) {
-		snprintf(out, out_size, MSG_ARG_INVALID, "thread_id");
-		return;
-	}
-
-	if (strcmp(tokens[2], "pipeline") != 0) {
-		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "pipeline");
-		return;
-	}
-
-	pipeline_name = tokens[3];
+	pipeline_name = tokens[1];
 	p = rte_swx_pipeline_find(pipeline_name);
 	if (!p) {
 		snprintf(out, out_size, MSG_ARG_INVALID, "pipeline_name");
 		return;
 	}
 
-	if (strcmp(tokens[4], "disable") != 0) {
+	if (strcmp(tokens[2], "disable") != 0) {
 		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "disable");
 		return;
 	}
 
-	status = thread_pipeline_disable(thread_id, p);
-	if (status) {
-		snprintf(out, out_size, MSG_CMD_FAIL,
-			"thread pipeline disable");
-		return;
-	}
+	pipeline_disable(p);
 }
 
 static void
@@ -3329,11 +3296,12 @@  cmd_help(char **tokens,
 			"\tpipeline meter stats\n"
 			"\tpipeline stats\n"
 			"\tpipeline mirror session\n"
+			"\tpipeline enable\n"
+			"\tpipeline disable\n\n"
 			"\tipsec create\n"
 			"\tipsec sa add\n"
 			"\tipsec sa delete\n"
-			"\tthread pipeline enable\n"
-			"\tthread pipeline disable\n\n");
+			);
 		return;
 	}
 
@@ -3556,6 +3524,18 @@  cmd_help(char **tokens,
 		return;
 	}
 
+	if (!strcmp(tokens[0], "pipeline") &&
+		(n_tokens == 2) && !strcmp(tokens[1], "enable")) {
+		snprintf(out, out_size, "\n%s\n", cmd_pipeline_enable_help);
+		return;
+	}
+
+	if (!strcmp(tokens[0], "pipeline") &&
+		(n_tokens == 2) && !strcmp(tokens[1], "disable")) {
+		snprintf(out, out_size, "\n%s\n", cmd_pipeline_disable_help);
+		return;
+	}
+
 	if (!strcmp(tokens[0], "ipsec") &&
 		(n_tokens == 2) && !strcmp(tokens[1], "create")) {
 		snprintf(out, out_size, "\n%s\n", cmd_ipsec_create_help);
@@ -3576,22 +3556,6 @@  cmd_help(char **tokens,
 		return;
 	}
 
-	if ((n_tokens == 3) &&
-		(strcmp(tokens[0], "thread") == 0) &&
-		(strcmp(tokens[1], "pipeline") == 0)) {
-		if (strcmp(tokens[2], "enable") == 0) {
-			snprintf(out, out_size, "\n%s\n",
-				cmd_thread_pipeline_enable_help);
-			return;
-		}
-
-		if (strcmp(tokens[2], "disable") == 0) {
-			snprintf(out, out_size, "\n%s\n",
-				cmd_thread_pipeline_disable_help);
-			return;
-		}
-	}
-
 	snprintf(out, out_size, "Invalid command\n");
 }
 
@@ -3822,6 +3786,16 @@  cli_process(char *in, char *out, size_t out_size, void *obj)
 			cmd_pipeline_mirror_session(tokens, n_tokens, out, out_size, obj);
 			return;
 		}
+
+		if (n_tokens >= 3 && !strcmp(tokens[2], "enable")) {
+			cmd_pipeline_enable(tokens, n_tokens, out, out_size, obj);
+			return;
+		}
+
+		if (n_tokens >= 3 && !strcmp(tokens[2], "disable")) {
+			cmd_pipeline_disable(tokens, n_tokens, out, out_size, obj);
+			return;
+		}
 	}
 
 	if (!strcmp(tokens[0], "ipsec")) {
@@ -3841,22 +3815,6 @@  cli_process(char *in, char *out, size_t out_size, void *obj)
 		}
 	}
 
-	if (strcmp(tokens[0], "thread") == 0) {
-		if ((n_tokens >= 5) &&
-			(strcmp(tokens[4], "enable") == 0)) {
-			cmd_thread_pipeline_enable(tokens, n_tokens,
-				out, out_size, obj);
-			return;
-		}
-
-		if ((n_tokens >= 5) &&
-			(strcmp(tokens[4], "disable") == 0)) {
-			cmd_thread_pipeline_disable(tokens, n_tokens,
-				out, out_size, obj);
-			return;
-		}
-	}
-
 	snprintf(out, out_size, MSG_CMD_UNKNOWN, tokens[0]);
 }
 
diff --git a/examples/pipeline/examples/fib.cli b/examples/pipeline/examples/fib.cli
index 2450dc9ca4..d2485cefe0 100644
--- a/examples/pipeline/examples/fib.cli
+++ b/examples/pipeline/examples/fib.cli
@@ -54,4 +54,4 @@  pipeline PIPELINE0 commit
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/hash_func.cli b/examples/pipeline/examples/hash_func.cli
index 3840e47a2b..2c20f7a7a0 100644
--- a/examples/pipeline/examples/hash_func.cli
+++ b/examples/pipeline/examples/hash_func.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/hash_func.so io ./examples/pipeline/examples/e
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/l2fwd.cli b/examples/pipeline/examples/l2fwd.cli
index c5505df296..383fe346c0 100644
--- a/examples/pipeline/examples/l2fwd.cli
+++ b/examples/pipeline/examples/l2fwd.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/l2fwd.so io ./examples/pipeline/examples/ethde
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/l2fwd_macswp.cli b/examples/pipeline/examples/l2fwd_macswp.cli
index bd700707f5..d02cb2c470 100644
--- a/examples/pipeline/examples/l2fwd_macswp.cli
+++ b/examples/pipeline/examples/l2fwd_macswp.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/l2fwd_macswp.so io ./examples/pipeline/example
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/l2fwd_macswp_pcap.cli b/examples/pipeline/examples/l2fwd_macswp_pcap.cli
index aa64b32ab2..d4ea449e03 100644
--- a/examples/pipeline/examples/l2fwd_macswp_pcap.cli
+++ b/examples/pipeline/examples/l2fwd_macswp_pcap.cli
@@ -28,4 +28,4 @@  pipeline PIPELINE0 build lib /tmp/l2fwd_macswp.so io ./examples/pipeline/example
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/l2fwd_pcap.cli b/examples/pipeline/examples/l2fwd_pcap.cli
index 619d17474f..8a25c98096 100644
--- a/examples/pipeline/examples/l2fwd_pcap.cli
+++ b/examples/pipeline/examples/l2fwd_pcap.cli
@@ -28,4 +28,4 @@  pipeline PIPELINE0 build lib /tmp/l2fwd.so io ./examples/pipeline/examples/pcap.
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/learner.cli b/examples/pipeline/examples/learner.cli
index 42184be1c8..04c7b4d26f 100644
--- a/examples/pipeline/examples/learner.cli
+++ b/examples/pipeline/examples/learner.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/learner.so io ./examples/pipeline/examples/eth
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/meter.cli b/examples/pipeline/examples/meter.cli
index 0fa38b16b8..9e064dbb52 100644
--- a/examples/pipeline/examples/meter.cli
+++ b/examples/pipeline/examples/meter.cli
@@ -40,4 +40,4 @@  pipeline PIPELINE0 meter meters set profile platinum index from 0 to 15
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/mirroring.cli b/examples/pipeline/examples/mirroring.cli
index e7391de74b..da6bd338fb 100644
--- a/examples/pipeline/examples/mirroring.cli
+++ b/examples/pipeline/examples/mirroring.cli
@@ -42,4 +42,4 @@  pipeline PIPELINE0 mirror session 3 port 0 clone slow truncate 128
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/recirculation.cli b/examples/pipeline/examples/recirculation.cli
index 965c870fd4..f925179bf7 100644
--- a/examples/pipeline/examples/recirculation.cli
+++ b/examples/pipeline/examples/recirculation.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/recirculation.so io ./examples/pipeline/exampl
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/registers.cli b/examples/pipeline/examples/registers.cli
index aa775f71e5..58dde5fdf1 100644
--- a/examples/pipeline/examples/registers.cli
+++ b/examples/pipeline/examples/registers.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/registers.so io ./examples/pipeline/examples/e
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/selector.cli b/examples/pipeline/examples/selector.cli
index f0608de184..9af56cd0f1 100644
--- a/examples/pipeline/examples/selector.cli
+++ b/examples/pipeline/examples/selector.cli
@@ -42,4 +42,4 @@  pipeline PIPELINE0 selector s show
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/varbit.cli b/examples/pipeline/examples/varbit.cli
index 648b8882c3..621e7f5627 100644
--- a/examples/pipeline/examples/varbit.cli
+++ b/examples/pipeline/examples/varbit.cli
@@ -32,4 +32,4 @@  pipeline PIPELINE0 build lib /tmp/varbit.so io ./examples/pipeline/examples/ethd
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/vxlan.cli b/examples/pipeline/examples/vxlan.cli
index 4565adfaea..73eb29a6b2 100644
--- a/examples/pipeline/examples/vxlan.cli
+++ b/examples/pipeline/examples/vxlan.cli
@@ -40,4 +40,4 @@  pipeline PIPELINE0 commit
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/examples/vxlan_pcap.cli b/examples/pipeline/examples/vxlan_pcap.cli
index 510c2ad870..828342408b 100644
--- a/examples/pipeline/examples/vxlan_pcap.cli
+++ b/examples/pipeline/examples/vxlan_pcap.cli
@@ -36,4 +36,4 @@  pipeline PIPELINE0 commit
 ;
 ; Pipelines-to-threads mapping.
 ;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
diff --git a/examples/pipeline/thread.c b/examples/pipeline/thread.c
index 6d15f51fb2..3001bc0858 100644
--- a/examples/pipeline/thread.c
+++ b/examples/pipeline/thread.c
@@ -3,17 +3,11 @@ 
  */
 
 #include <stdlib.h>
+#include <errno.h>
 
+#include <rte_atomic.h>
 #include <rte_common.h>
-#include <rte_cycles.h>
 #include <rte_lcore.h>
-#include <rte_ring.h>
-
-#include <rte_table_acl.h>
-#include <rte_table_array.h>
-#include <rte_table_hash.h>
-#include <rte_table_lpm.h>
-#include <rte_table_lpm_ipv6.h>
 
 #include "obj.h"
 #include "thread.h"
@@ -22,14 +16,6 @@ 
 #define THREAD_PIPELINES_MAX                               256
 #endif
 
-#ifndef THREAD_MSGQ_SIZE
-#define THREAD_MSGQ_SIZE                                   64
-#endif
-
-#ifndef THREAD_TIMER_PERIOD_MS
-#define THREAD_TIMER_PERIOD_MS                             100
-#endif
-
 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
  * work, but not too big to avoid starving any other pipelines mapped to the
  * same thread. For a pipeline that executes 10 instructions per packet, a
@@ -40,509 +26,209 @@ 
 #endif
 
 /**
- * Control thread: data plane thread context
+ * In this design, there is a single control plane (CP) thread and one or multiple data plane (DP)
+ * threads. Each DP thread can run up to THREAD_PIPELINES_MAX pipelines and up to THREAD_BLOCKS_MAX
+ * blocks.
+ *
+ * The pipelines and blocks are single threaded, meaning that a given pipeline/block can be run by a
+ * single thread at any given time, so the same pipeline/block cannot show up in the list of
+ * pipelines/blocks of more than one thread at any specific moment.
+ *
+ * Each DP thread has its own context (struct thread instance), which it shares with the CP thread:
+ *  - Read-write by the CP thread;
+ *  - Read-only by the DP thread.
  */
 struct thread {
-	struct rte_ring *msgq_req;
-	struct rte_ring *msgq_rsp;
-
-	uint32_t enabled;
-};
-
-static struct thread thread[RTE_MAX_LCORE];
-
-/**
- * Data plane threads: context
- */
-struct pipeline_data {
-	struct rte_swx_pipeline *p;
-	uint64_t timer_period; /* Measured in CPU cycles. */
-	uint64_t time_next;
-};
-
-struct thread_data {
-	struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
-	uint32_t n_pipelines;
-
-	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
-	struct rte_ring *msgq_req;
-	struct rte_ring *msgq_rsp;
-	uint64_t timer_period; /* Measured in CPU cycles. */
-	uint64_t time_next;
-	uint64_t time_next_min;
+	struct rte_swx_pipeline *pipelines[THREAD_PIPELINES_MAX];
+	volatile uint64_t n_pipelines;
+	int enabled;
 } __rte_cache_aligned;
 
-static struct thread_data thread_data[RTE_MAX_LCORE];
+static struct thread threads[RTE_MAX_LCORE];
 
 /**
- * Control thread: data plane thread init
+ * Control plane (CP) thread.
  */
-static void
-thread_free(void)
-{
-	uint32_t i;
-
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct thread *t = &thread[i];
-
-		if (!rte_lcore_is_enabled(i))
-			continue;
-
-		/* MSGQs */
-		rte_ring_free(t->msgq_req);
-
-		rte_ring_free(t->msgq_rsp);
-	}
-}
-
 int
 thread_init(void)
 {
-	uint32_t i;
-
-	RTE_LCORE_FOREACH_WORKER(i) {
-		char name[NAME_MAX];
-		struct rte_ring *msgq_req, *msgq_rsp;
-		struct thread *t = &thread[i];
-		struct thread_data *t_data = &thread_data[i];
-		uint32_t cpu_id = rte_lcore_to_socket_id(i);
-
-		/* MSGQs */
-		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
-
-		msgq_req = rte_ring_create(name,
-			THREAD_MSGQ_SIZE,
-			cpu_id,
-			RING_F_SP_ENQ | RING_F_SC_DEQ);
-
-		if (msgq_req == NULL) {
-			thread_free();
-			return -1;
-		}
+	uint32_t thread_id;
 
-		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
+	RTE_LCORE_FOREACH_WORKER(thread_id) {
+		struct thread *t = &threads[thread_id];
 
-		msgq_rsp = rte_ring_create(name,
-			THREAD_MSGQ_SIZE,
-			cpu_id,
-			RING_F_SP_ENQ | RING_F_SC_DEQ);
-
-		if (msgq_rsp == NULL) {
-			thread_free();
-			return -1;
-		}
-
-		/* Control thread records */
-		t->msgq_req = msgq_req;
-		t->msgq_rsp = msgq_rsp;
 		t->enabled = 1;
-
-		/* Data plane thread records */
-		t_data->n_pipelines = 0;
-		t_data->msgq_req = msgq_req;
-		t_data->msgq_rsp = msgq_rsp;
-		t_data->timer_period =
-			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
-		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
-		t_data->time_next_min = t_data->time_next;
 	}
 
 	return 0;
 }
 
-static inline int
-thread_is_running(uint32_t thread_id)
+static uint32_t
+pipeline_find(struct rte_swx_pipeline *p)
 {
-	enum rte_lcore_state_t thread_state;
+	uint32_t thread_id;
 
-	thread_state = rte_eal_get_lcore_state(thread_id);
-	return (thread_state == RUNNING) ? 1 : 0;
-}
-
-/**
- * Control thread & data plane threads: message passing
- */
-enum thread_req_type {
-	THREAD_REQ_PIPELINE_ENABLE = 0,
-	THREAD_REQ_PIPELINE_DISABLE,
-	THREAD_REQ_MAX
-};
-
-struct thread_msg_req {
-	enum thread_req_type type;
-
-	union {
-		struct {
-			struct rte_swx_pipeline *p;
-			uint32_t timer_period_ms;
-		} pipeline_enable;
-
-		struct {
-			struct rte_swx_pipeline *p;
-		} pipeline_disable;
-	};
-};
-
-struct thread_msg_rsp {
-	int status;
-};
-
-/**
- * Control thread
- */
-static struct thread_msg_req *
-thread_msg_alloc(void)
-{
-	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
-		sizeof(struct thread_msg_rsp));
-
-	return calloc(1, size);
-}
-
-static void
-thread_msg_free(struct thread_msg_rsp *rsp)
-{
-	free(rsp);
-}
-
-static struct thread_msg_rsp *
-thread_msg_send_recv(uint32_t thread_id,
-	struct thread_msg_req *req)
-{
-	struct thread *t = &thread[thread_id];
-	struct rte_ring *msgq_req = t->msgq_req;
-	struct rte_ring *msgq_rsp = t->msgq_rsp;
-	struct thread_msg_rsp *rsp;
-	int status;
-
-	/* send */
-	do {
-		status = rte_ring_sp_enqueue(msgq_req, req);
-	} while (status == -ENOBUFS);
-
-	/* recv */
-	do {
-		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
-	} while (status != 0);
-
-	return rsp;
-}
-
-static int
-thread_is_pipeline_enabled(uint32_t thread_id, struct rte_swx_pipeline *p)
-{
-	struct thread *t = &thread[thread_id];
-	struct thread_data *td = &thread_data[thread_id];
-	uint32_t i;
-
-	if (!t->enabled)
-		return 0; /* Pipeline NOT enabled on this thread. */
-
-	for (i = 0; i < td->n_pipelines; i++)
-		if (td->p[i] == p)
-			return 1; /* Pipeline enabled on this thread. */
-
-	return 0 /* Pipeline NOT enabled on this thread. */;
-}
-
-int
-thread_pipeline_enable(uint32_t thread_id, struct rte_swx_pipeline *p, uint32_t timer_period_ms)
-{
-	struct thread *t;
-	struct thread_msg_req *req;
-	struct thread_msg_rsp *rsp;
-	int status;
-
-	/* Check input params */
-	if ((thread_id >= RTE_MAX_LCORE) || !p || !timer_period_ms)
-		return -1;
-
-	t = &thread[thread_id];
-	if (t->enabled == 0)
-		return -1;
-
-	if (!thread_is_running(thread_id)) {
-		struct thread_data *td = &thread_data[thread_id];
-		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
-
-		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
-			return -1;
-
-		/* Data plane thread */
-		td->p[td->n_pipelines] = p;
-
-		tdp->p = p;
-		tdp->timer_period = (rte_get_tsc_hz() * timer_period_ms) / 1000;
-		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
+	for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
+		struct thread *t = &threads[thread_id];
+		uint32_t i;
 
-		td->n_pipelines++;
+		if (!t->enabled)
+			continue;
 
-		return 0;
+		for (i = 0; i < t->n_pipelines; i++)
+			if (t->pipelines[i] == p)
+				break;
 	}
 
-	/* Allocate request */
-	req = thread_msg_alloc();
-	if (req == NULL)
-		return -1;
-
-	/* Write request */
-	req->type = THREAD_REQ_PIPELINE_ENABLE;
-	req->pipeline_enable.p = p;
-	req->pipeline_enable.timer_period_ms = timer_period_ms;
-
-	/* Send request and wait for response */
-	rsp = thread_msg_send_recv(thread_id, req);
-
-	/* Read response */
-	status = rsp->status;
-
-	/* Free response */
-	thread_msg_free(rsp);
-
-	/* Request completion */
-	if (status)
-		return status;
-
-	return 0;
+	return thread_id;
 }
 
+/**
+ * Enable a given pipeline to run on a specific DP thread.
+ *
+ * CP thread:
+ *  - Adds a new pipeline to the end of the DP thread pipeline list (t->pipelines[]);
+ *  - Increments the DP thread number of pipelines (t->n_pipelines). It is important to make sure
+ *    that t->pipelines[] update is completed BEFORE the t->n_pipelines update, hence the memory
+ *    write barrier used below.
+ *
+ * DP thread:
+ *  - Reads t->n_pipelines before starting every new iteration through t->pipelines[]. It detects
+ *    the new pipeline when it sees the updated t->n_pipelines value;
+ *  - If somehow the above condition is not met, so t->n_pipelines update is incorrectly taking
+ *    place before the t->pipelines[] update is completed, then the DP thread will use an incorrect
+ *    handle for the new pipeline, which can result in memory corruption or segmentation fault.
+ */
 int
-thread_pipeline_disable(uint32_t thread_id, struct rte_swx_pipeline *p)
+pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id)
 {
 	struct thread *t;
-	struct thread_msg_req *req;
-	struct thread_msg_rsp *rsp;
-	int status;
+	uint64_t n_pipelines;
 
 	/* Check input params */
-	if ((thread_id >= RTE_MAX_LCORE) || !p)
-		return -1;
-
-	t = &thread[thread_id];
-	if (t->enabled == 0)
-		return -1;
-
-	if (!thread_is_pipeline_enabled(thread_id, p))
-		return 0;
+	if (!p || thread_id >= RTE_MAX_LCORE)
+		return -EINVAL;
 
-	if (!thread_is_running(thread_id)) {
-		struct thread_data *td = &thread_data[thread_id];
-		uint32_t i;
-
-		for (i = 0; i < td->n_pipelines; i++) {
-			struct pipeline_data *tdp = &td->pipeline_data[i];
-
-			if (tdp->p != p)
-				continue;
-
-			/* Data plane thread */
-			if (i < td->n_pipelines - 1) {
-				struct rte_swx_pipeline *pipeline_last =
-					td->p[td->n_pipelines - 1];
-				struct pipeline_data *tdp_last =
-					&td->pipeline_data[td->n_pipelines - 1];
-
-				td->p[i] = pipeline_last;
-				memcpy(tdp, tdp_last, sizeof(*tdp));
-			}
-
-			td->n_pipelines--;
-
-			break;
-		}
+	if (pipeline_find(p) < RTE_MAX_LCORE)
+		return -EEXIST;
 
-		return 0;
-	}
-
-	/* Allocate request */
-	req = thread_msg_alloc();
-	if (req == NULL)
-		return -1;
-
-	/* Write request */
-	req->type = THREAD_REQ_PIPELINE_DISABLE;
-	req->pipeline_disable.p = p;
-
-	/* Send request and wait for response */
-	rsp = thread_msg_send_recv(thread_id, req);
+	t = &threads[thread_id];
+	if (!t->enabled)
+		return -EINVAL;
 
-	/* Read response */
-	status = rsp->status;
+	n_pipelines = t->n_pipelines;
 
-	/* Free response */
-	thread_msg_free(rsp);
+	/* Check there is room for at least one more pipeline. */
+	if (n_pipelines >= THREAD_PIPELINES_MAX)
+		return -ENOSPC;
 
-	/* Request completion */
-	if (status)
-		return status;
+	/* Install the new pipeline. */
+	t->pipelines[n_pipelines] = p;
+	rte_wmb();
+	t->n_pipelines = n_pipelines + 1;
 
 	return 0;
 }
 
 /**
- * Data plane threads: message handling
+ * Disable a given pipeline from running on any DP thread.
+ *
+ * CP thread:
+ *  - Detects the thread that is running the given pipeline, if any;
+ *  - Writes the last pipeline handle (pipeline_last = t->pipelines[t->n_pipelines - 1]) on the
+ *    position of the pipeline to be disabled (t->pipelines[i] = pipeline_last) and decrements the
+ *    number of pipelines running on the current thread (t->n_pipelines--). This approach makes sure
+ *    that no holes with invalid locations are ever developed within the t->pipelines[] array.
+ *  - If the memory barrier below is present, then t->n_pipelines update is guaranteed to take place
+ *    after the t->pipelines[] update is completed. The possible DP thread behaviors are detailed
+ *    below, which are all valid:
+ *     - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
+ *       exactly one time during the current dispatch loop iteration. This takes place when the DP
+ *       thread sees the final value of t->n_pipelines;
+ *     - Not run the removed pipeline at all, run all the other pipelines, except pipeline_last,
+ *       exactly one time and the pipeline_last exactly two times during the current dispatch loop
+ *       iteration. This takes place when the DP thread sees the initial value of t->n_pipelines.
+ *  - If the memory barrier below is not present, then the t->n_pipelines update may be reordered by
+ *    the CPU, so that it takes place before the t->pipelines[] update. The possible DP thread
+ *    behaviors are detailed below, which are all valid:
+ *     - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
+ *       exactly one time during the current dispatch loop iteration. This takes place when the DP
+ *       thread sees the final values of the t->pipeline[] array;
+ *     - Run the removed pipeline one last time, run all the other pipelines exactly one time, with
+ *       the exception of the pipeline_last, which is not run during the current dispatch loop
+ *       iteration. This takes place when the DP thread sees the initial values of t->pipeline[].
+ *
+ * DP thread:
+ *  - Reads t->n_pipelines before starting every new iteration through t->pipelines[].
  */
-static inline struct thread_msg_req *
-thread_msg_recv(struct rte_ring *msgq_req)
-{
-	struct thread_msg_req *req;
-
-	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
-
-	if (status != 0)
-		return NULL;
-
-	return req;
-}
-
-static inline void
-thread_msg_send(struct rte_ring *msgq_rsp,
-	struct thread_msg_rsp *rsp)
-{
-	int status;
-
-	do {
-		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
-	} while (status == -ENOBUFS);
-}
-
-static struct thread_msg_rsp *
-thread_msg_handle_pipeline_enable(struct thread_data *t,
-	struct thread_msg_req *req)
+void
+pipeline_disable(struct rte_swx_pipeline *p)
 {
-	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
-	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
-
-	/* Request */
-	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
-		rsp->status = -1;
-		return rsp;
-	}
-
-	t->p[t->n_pipelines] = req->pipeline_enable.p;
-
-	p->p = req->pipeline_enable.p;
-	p->timer_period = (rte_get_tsc_hz() *
-		req->pipeline_enable.timer_period_ms) / 1000;
-	p->time_next = rte_get_tsc_cycles() + p->timer_period;
+	struct thread *t;
+	uint64_t n_pipelines;
+	uint32_t thread_id, i;
 
-	t->n_pipelines++;
+	/* Check input params */
+	if (!p)
+		return;
 
-	/* Response */
-	rsp->status = 0;
-	return rsp;
-}
+	/* Find the thread that runs this pipeline. */
+	thread_id = pipeline_find(p);
+	if (thread_id == RTE_MAX_LCORE)
+		return;
 
-static struct thread_msg_rsp *
-thread_msg_handle_pipeline_disable(struct thread_data *t,
-	struct thread_msg_req *req)
-{
-	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
-	uint32_t n_pipelines = t->n_pipelines;
-	struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
-	uint32_t i;
+	t = &threads[thread_id];
+	n_pipelines = t->n_pipelines;
 
-	/* find pipeline */
 	for (i = 0; i < n_pipelines; i++) {
-		struct pipeline_data *p = &t->pipeline_data[i];
+		struct rte_swx_pipeline *pipeline = t->pipelines[i];
 
-		if (p->p != pipeline)
+		if (pipeline != p)
 			continue;
 
 		if (i < n_pipelines - 1) {
-			struct rte_swx_pipeline *pipeline_last =
-				t->p[n_pipelines - 1];
-			struct pipeline_data *p_last =
-				&t->pipeline_data[n_pipelines - 1];
+			struct rte_swx_pipeline *pipeline_last = t->pipelines[n_pipelines - 1];
 
-			t->p[i] = pipeline_last;
-			memcpy(p, p_last, sizeof(*p));
+			t->pipelines[i] = pipeline_last;
 		}
 
-		t->n_pipelines--;
+		rte_wmb();
+		t->n_pipelines = n_pipelines - 1;
 
-		rsp->status = 0;
-		return rsp;
+		return;
 	}
 
-	/* should not get here */
-	rsp->status = 0;
-	return rsp;
-}
-
-static void
-thread_msg_handle(struct thread_data *t)
-{
-	for ( ; ; ) {
-		struct thread_msg_req *req;
-		struct thread_msg_rsp *rsp;
-
-		req = thread_msg_recv(t->msgq_req);
-		if (req == NULL)
-			break;
-
-		switch (req->type) {
-		case THREAD_REQ_PIPELINE_ENABLE:
-			rsp = thread_msg_handle_pipeline_enable(t, req);
-			break;
-
-		case THREAD_REQ_PIPELINE_DISABLE:
-			rsp = thread_msg_handle_pipeline_disable(t, req);
-			break;
-
-		default:
-			rsp = (struct thread_msg_rsp *) req;
-			rsp->status = -1;
-		}
-
-		thread_msg_send(t->msgq_rsp, rsp);
-	}
+	return;
 }
 
 /**
- * Data plane threads: main
+ * Data plane (DP) threads.
+ *
+ * The t->n_pipelines variable is modified by the CP thread every time changes to the t->pipeline[]
+ * array are operated, so it is therefore very important that the latest value of t->n_pipelines is
+ * read by the DP thread at the beginning of every new dispatch loop iteration, otherwise a stale
+ * t->n_pipelines value may result in new pipelines not being detected, running pipelines that have
+ * been removed and are possibly no longer valid (e.g. when the pipeline_last is removed), running
+ * one pipeline (pipeline_last) twice as frequently than the rest of the pipelines (e.g. when a
+ * pipeline other than pipeline_last is removed), etc. This is the reason why t->n_pipelines is
+ * marked as volatile.
  */
 int
 thread_main(void *arg __rte_unused)
 {
-	struct thread_data *t;
-	uint32_t thread_id, i;
+	struct thread *t;
+	uint32_t thread_id;
 
 	thread_id = rte_lcore_id();
-	t = &thread_data[thread_id];
-
-	/* Dispatch loop */
-	for (i = 0; ; i++) {
-		uint32_t j;
-
-		/* Data Plane */
-		for (j = 0; j < t->n_pipelines; j++)
-			rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
+	t = &threads[thread_id];
 
-		/* Control Plane */
-		if ((i & 0xF) == 0) {
-			uint64_t time = rte_get_tsc_cycles();
-			uint64_t time_next_min = UINT64_MAX;
-
-			if (time < t->time_next_min)
-				continue;
-
-			/* Thread message queues */
-			{
-				uint64_t time_next = t->time_next;
-
-				if (time_next <= time) {
-					thread_msg_handle(t);
-					time_next = time + t->timer_period;
-					t->time_next = time_next;
-				}
-
-				if (time_next < time_next_min)
-					time_next_min = time_next;
-			}
+	/* Dispatch loop. */
+	for ( ; ; ) {
+		uint32_t i;
 
-			t->time_next_min = time_next_min;
-		}
+		/* Pipelines. */
+		for (i = 0; i < t->n_pipelines; i++)
+			rte_swx_pipeline_run(t->pipelines[i], PIPELINE_INSTR_QUANTA);
 	}
 
 	return 0;
diff --git a/examples/pipeline/thread.h b/examples/pipeline/thread.h
index 712cb25bbb..338d480abb 100644
--- a/examples/pipeline/thread.h
+++ b/examples/pipeline/thread.h
@@ -9,18 +9,21 @@ 
 
 #include <rte_swx_pipeline.h>
 
+/**
+ * Control plane (CP) thread.
+ */
 int
-thread_pipeline_enable(uint32_t thread_id,
-		       struct rte_swx_pipeline *p,
-		       uint32_t timer_period_ms);
+thread_init(void);
 
 int
-thread_pipeline_disable(uint32_t thread_id,
-			struct rte_swx_pipeline *p);
+pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id);
 
-int
-thread_init(void);
+void
+pipeline_disable(struct rte_swx_pipeline *p);
 
+/**
+ * Data plane (DP) threads.
+ */
 int
 thread_main(void *arg);