[v1,11/17] vdpa/mlx5: add task ring for MT management

Message ID 20220606112109.208873-22-lizh@nvidia.com (mailing list archive)
State Superseded, archived
Headers
Series None |

Commit Message

Li Zhang June 6, 2022, 11:20 a.m. UTC
  The configuration threads tasks need a container to
support multiple tasks assigned to a thread in parallel.
Use rte_ring container per thread to manage
the thread tasks without locks.
The caller thread from the user context opens a task to
a thread and enqueue it to the thread ring.
The thread polls its ring and dequeue tasks.
That’s why the ring should be in multi-producer
and single consumer mode.
Anatomic counter manages the tasks completion notification.
The threads report errors to the caller by
a dedicated error counter per task.

Signed-off-by: Li Zhang <lizh@nvidia.com>
---
 drivers/vdpa/mlx5/mlx5_vdpa.h         |  17 ++++
 drivers/vdpa/mlx5/mlx5_vdpa_cthread.c | 115 +++++++++++++++++++++++++-
 2 files changed, 130 insertions(+), 2 deletions(-)
  

Patch

diff --git a/drivers/vdpa/mlx5/mlx5_vdpa.h b/drivers/vdpa/mlx5/mlx5_vdpa.h
index 4e7c2557b7..2bbb868ec6 100644
--- a/drivers/vdpa/mlx5/mlx5_vdpa.h
+++ b/drivers/vdpa/mlx5/mlx5_vdpa.h
@@ -74,10 +74,22 @@  enum {
 };
 
 #define MLX5_VDPA_MAX_C_THRD 256
+#define MLX5_VDPA_MAX_TASKS_PER_THRD 4096
+#define MLX5_VDPA_TASKS_PER_DEV 64
+
+/* Generic task information and size must be multiple of 4B. */
+struct mlx5_vdpa_task {
+	struct mlx5_vdpa_priv *priv;
+	uint32_t *remaining_cnt;
+	uint32_t *err_cnt;
+	uint32_t idx;
+} __rte_packed __rte_aligned(4);
 
 /* Generic mlx5_vdpa_c_thread information. */
 struct mlx5_vdpa_c_thread {
 	pthread_t tid;
+	struct rte_ring *rng;
+	pthread_cond_t c_cond;
 };
 
 struct mlx5_vdpa_conf_thread_mng {
@@ -532,4 +544,9 @@  mlx5_vdpa_mult_threads_create(int cpu_core);
  */
 void
 mlx5_vdpa_mult_threads_destroy(bool need_unlock);
+
+bool
+mlx5_vdpa_task_add(struct mlx5_vdpa_priv *priv,
+		uint32_t thrd_idx,
+		uint32_t num);
 #endif /* RTE_PMD_MLX5_VDPA_H_ */
diff --git a/drivers/vdpa/mlx5/mlx5_vdpa_cthread.c b/drivers/vdpa/mlx5/mlx5_vdpa_cthread.c
index ba7d8b63b3..1fdc92d3ad 100644
--- a/drivers/vdpa/mlx5/mlx5_vdpa_cthread.c
+++ b/drivers/vdpa/mlx5/mlx5_vdpa_cthread.c
@@ -11,17 +11,103 @@ 
 #include <rte_alarm.h>
 #include <rte_tailq.h>
 #include <rte_ring_elem.h>
+#include <rte_ring_peek.h>
 
 #include <mlx5_common.h>
 
 #include "mlx5_vdpa_utils.h"
 #include "mlx5_vdpa.h"
 
+static inline uint32_t
+mlx5_vdpa_c_thrd_ring_dequeue_bulk(struct rte_ring *r,
+	void **obj, uint32_t n, uint32_t *avail)
+{
+	uint32_t m;
+
+	m = rte_ring_dequeue_bulk_elem_start(r, obj,
+		sizeof(struct mlx5_vdpa_task), n, avail);
+	n = (m == n) ? n : 0;
+	rte_ring_dequeue_elem_finish(r, n);
+	return n;
+}
+
+static inline uint32_t
+mlx5_vdpa_c_thrd_ring_enqueue_bulk(struct rte_ring *r,
+	void * const *obj, uint32_t n, uint32_t *free)
+{
+	uint32_t m;
+
+	m = rte_ring_enqueue_bulk_elem_start(r, n, free);
+	n = (m == n) ? n : 0;
+	rte_ring_enqueue_elem_finish(r, obj,
+		sizeof(struct mlx5_vdpa_task), n);
+	return n;
+}
+
+bool
+mlx5_vdpa_task_add(struct mlx5_vdpa_priv *priv,
+		uint32_t thrd_idx,
+		uint32_t num)
+{
+	struct rte_ring *rng = conf_thread_mng.cthrd[thrd_idx].rng;
+	struct mlx5_vdpa_task task[MLX5_VDPA_TASKS_PER_DEV];
+	uint32_t i;
+
+	MLX5_ASSERT(num <= MLX5_VDPA_TASKS_PER_DEV);
+	for (i = 0 ; i < num; i++) {
+		task[i].priv = priv;
+		/* To be added later. */
+	}
+	if (!mlx5_vdpa_c_thrd_ring_enqueue_bulk(rng, (void **)&task, num, NULL))
+		return -1;
+	for (i = 0 ; i < num; i++)
+		if (task[i].remaining_cnt)
+			__atomic_fetch_add(task[i].remaining_cnt, 1,
+				__ATOMIC_RELAXED);
+	/* wake up conf thread. */
+	pthread_mutex_lock(&conf_thread_mng.cthrd_lock);
+	pthread_cond_signal(&conf_thread_mng.cthrd[thrd_idx].c_cond);
+	pthread_mutex_unlock(&conf_thread_mng.cthrd_lock);
+	return 0;
+}
+
 static void *
 mlx5_vdpa_c_thread_handle(void *arg)
 {
-	/* To be added later. */
-	return arg;
+	struct mlx5_vdpa_conf_thread_mng *multhrd = arg;
+	pthread_t thread_id = pthread_self();
+	struct mlx5_vdpa_priv *priv;
+	struct mlx5_vdpa_task task;
+	struct rte_ring *rng;
+	uint32_t thrd_idx;
+	uint32_t task_num;
+
+	for (thrd_idx = 0; thrd_idx < multhrd->max_thrds;
+		thrd_idx++)
+		if (multhrd->cthrd[thrd_idx].tid == thread_id)
+			break;
+	if (thrd_idx >= multhrd->max_thrds)
+		return NULL;
+	rng = multhrd->cthrd[thrd_idx].rng;
+	while (1) {
+		task_num = mlx5_vdpa_c_thrd_ring_dequeue_bulk(rng,
+			(void **)&task, 1, NULL);
+		if (!task_num) {
+			/* No task and condition wait. */
+			pthread_mutex_lock(&multhrd->cthrd_lock);
+			pthread_cond_wait(
+				&multhrd->cthrd[thrd_idx].c_cond,
+				&multhrd->cthrd_lock);
+			pthread_mutex_unlock(&multhrd->cthrd_lock);
+		}
+		priv = task.priv;
+		if (priv == NULL)
+			continue;
+		__atomic_fetch_sub(task.remaining_cnt,
+			1, __ATOMIC_RELAXED);
+		/* To be added later. */
+	}
+	return NULL;
 }
 
 static void
@@ -34,6 +120,10 @@  mlx5_vdpa_c_thread_destroy(uint32_t thrd_idx, bool need_unlock)
 		if (need_unlock)
 			pthread_mutex_init(&conf_thread_mng.cthrd_lock, NULL);
 	}
+	if (conf_thread_mng.cthrd[thrd_idx].rng) {
+		rte_ring_free(conf_thread_mng.cthrd[thrd_idx].rng);
+		conf_thread_mng.cthrd[thrd_idx].rng = NULL;
+	}
 }
 
 static int
@@ -45,6 +135,7 @@  mlx5_vdpa_c_thread_create(int cpu_core)
 	rte_cpuset_t cpuset;
 	pthread_attr_t attr;
 	uint32_t thrd_idx;
+	uint32_t ring_num;
 	char name[32];
 	int ret;
 
@@ -60,8 +151,26 @@  mlx5_vdpa_c_thread_create(int cpu_core)
 		DRV_LOG(ERR, "Failed to set thread priority.");
 		goto c_thread_err;
 	}
+	ring_num = MLX5_VDPA_MAX_TASKS_PER_THRD / conf_thread_mng.max_thrds;
+	if (!ring_num) {
+		DRV_LOG(ERR, "Invalid ring number for thread.");
+		goto c_thread_err;
+	}
 	for (thrd_idx = 0; thrd_idx < conf_thread_mng.max_thrds;
 		thrd_idx++) {
+		snprintf(name, sizeof(name), "vDPA-mthread-ring-%d",
+			thrd_idx);
+		conf_thread_mng.cthrd[thrd_idx].rng = rte_ring_create_elem(name,
+			sizeof(struct mlx5_vdpa_task), ring_num,
+			rte_socket_id(),
+			RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ |
+			RING_F_EXACT_SZ);
+		if (!conf_thread_mng.cthrd[thrd_idx].rng) {
+			DRV_LOG(ERR,
+			"Failed to create vdpa multi-threads %d ring.",
+			thrd_idx);
+			goto c_thread_err;
+		}
 		ret = pthread_create(&conf_thread_mng.cthrd[thrd_idx].tid,
 				&attr, mlx5_vdpa_c_thread_handle,
 				(void *)&conf_thread_mng);
@@ -91,6 +200,8 @@  mlx5_vdpa_c_thread_create(int cpu_core)
 					name);
 		else
 			DRV_LOG(DEBUG, "Thread name: %s.", name);
+		pthread_cond_init(&conf_thread_mng.cthrd[thrd_idx].c_cond,
+			NULL);
 	}
 	pthread_mutex_unlock(&conf_thread_mng.cthrd_lock);
 	return 0;