[v1,08/13] graph: introduce stream moving cross cores

Message ID 20221117050926.136974-9-zhirun.yan@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series graph enhancement for multi-core dispatch |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Yan, Zhirun Nov. 17, 2022, 5:09 a.m. UTC
  This patch introduces key functions to allow a worker thread to
enable enqueue and move streams of objects to the next nodes over
different cores.

1. add graph_sched_wq_node to hold graph scheduling workqueue node
stream
2. add workqueue help functions to create/destroy/enqueue/dequeue

Signed-off-by: Haiyue Wang <haiyue.wang@intel.com>
Signed-off-by: Cunming Liang <cunming.liang@intel.com>
Signed-off-by: Zhirun Yan <zhirun.yan@intel.com>
---
 lib/graph/graph.c                   |   1 +
 lib/graph/graph_populate.c          |   1 +
 lib/graph/graph_private.h           |  39 ++++++++
 lib/graph/meson.build               |   2 +-
 lib/graph/rte_graph_model_generic.c | 145 ++++++++++++++++++++++++++++
 lib/graph/rte_graph_model_generic.h |  35 +++++++
 lib/graph/rte_graph_worker_common.h |  18 ++++
 7 files changed, 240 insertions(+), 1 deletion(-)
  

Comments

Jerin Jacob Feb. 20, 2023, 2:17 p.m. UTC | #1
On Thu, Nov 17, 2022 at 10:40 AM Zhirun Yan <zhirun.yan@intel.com> wrote:
>
> This patch introduces key functions to allow a worker thread to
> enable enqueue and move streams of objects to the next nodes over
> different cores.
>
> 1. add graph_sched_wq_node to hold graph scheduling workqueue node
> stream
> 2. add workqueue help functions to create/destroy/enqueue/dequeue

Two things, make as two patches


> @@ -39,6 +46,15 @@ struct rte_graph {
>         uint32_t cir_mask;           /**< Circular buffer wrap around mask. */
>         rte_node_t nb_nodes;         /**< Number of nodes in the graph. */
>         rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
> +       /* Graph schedule */
> +       struct rte_graph_rq_head *rq __rte_cache_aligned; /* The run-queue */
> +       struct rte_graph_rq_head rq_head; /* The head for run-queue list */
> +
> +       SLIST_ENTRY(rte_graph) rq_next;   /* The next for run-queue list */
> +       unsigned int lcore_id;  /**< The graph running Lcore. */
> +       struct rte_ring *wq;    /**< The work-queue for pending streams. */
> +       struct rte_mempool *mp; /**< The mempool for scheduling streams. */
> +       /* Graph schedule area */
>         rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
>         rte_graph_t id; /**< Graph identifier. */
>         int socket;     /**< Socket ID where memory is allocated. */
> @@ -63,6 +79,8 @@ struct rte_node {
>         char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
>         char name[RTE_NODE_NAMESIZE];   /**< Name of the node. */
>
> +       /* Fast schedule area */
> +       unsigned int lcore_id __rte_cache_aligned;  /**< Node running Lcore. */

Do we need __rte_cache_aligned here? I am wondering can we add union
for different model specific area ONLY for fast path so that we can
save memory and fast path data will be more warm.

>         /* Fast path area  */
>  #define RTE_NODE_CTX_SZ 16
>         uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
> --
> 2.25.1
>
  
Yan, Zhirun Feb. 24, 2023, 6:48 a.m. UTC | #2
> -----Original Message-----
> From: Jerin Jacob <jerinjacobk@gmail.com>
> Sent: Monday, February 20, 2023 10:17 PM
> To: Yan, Zhirun <zhirun.yan@intel.com>
> Cc: dev@dpdk.org; jerinj@marvell.com; kirankumark@marvell.com;
> ndabilpuram@marvell.com; Liang, Cunming <cunming.liang@intel.com>; Wang,
> Haiyue <haiyue.wang@intel.com>
> Subject: Re: [PATCH v1 08/13] graph: introduce stream moving cross cores
> 
> On Thu, Nov 17, 2022 at 10:40 AM Zhirun Yan <zhirun.yan@intel.com> wrote:
> >
> > This patch introduces key functions to allow a worker thread to enable
> > enqueue and move streams of objects to the next nodes over different
> > cores.
> >
> > 1. add graph_sched_wq_node to hold graph scheduling workqueue node
> > stream 2. add workqueue help functions to
> > create/destroy/enqueue/dequeue
> 
> Two things, make as two patches
> 
I will do in next version. 

> 
> > @@ -39,6 +46,15 @@ struct rte_graph {
> >         uint32_t cir_mask;           /**< Circular buffer wrap around mask. */
> >         rte_node_t nb_nodes;         /**< Number of nodes in the graph. */
> >         rte_graph_off_t *cir_start;  /**< Pointer to circular buffer.
> > */
> > +       /* Graph schedule */
> > +       struct rte_graph_rq_head *rq __rte_cache_aligned; /* The run-queue */
> > +       struct rte_graph_rq_head rq_head; /* The head for run-queue
> > + list */
> > +
> > +       SLIST_ENTRY(rte_graph) rq_next;   /* The next for run-queue list */
> > +       unsigned int lcore_id;  /**< The graph running Lcore. */
> > +       struct rte_ring *wq;    /**< The work-queue for pending streams. */
> > +       struct rte_mempool *mp; /**< The mempool for scheduling streams. */
> > +       /* Graph schedule area */
> >         rte_graph_off_t nodes_start; /**< Offset at which node memory starts.
> */
> >         rte_graph_t id; /**< Graph identifier. */
> >         int socket;     /**< Socket ID where memory is allocated. */
> > @@ -63,6 +79,8 @@ struct rte_node {
> >         char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
> >         char name[RTE_NODE_NAMESIZE];   /**< Name of the node. */
> >
> > +       /* Fast schedule area */
> > +       unsigned int lcore_id __rte_cache_aligned;  /**< Node running
> > + Lcore. */
> 
> Do we need __rte_cache_aligned here? I am wondering can we add union for
> different model specific area ONLY for fast path so that we can save memory
> and fast path data will be more warm.

Maybe it is not necessary. I agree with you and I can use union to cover the specific field.

> 
> >         /* Fast path area  */
> >  #define RTE_NODE_CTX_SZ 16
> >         uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node
> > Context. */
> > --
> > 2.25.1
> >
  

Patch

diff --git a/lib/graph/graph.c b/lib/graph/graph.c
index 17a9c87032..8ea0daaa35 100644
--- a/lib/graph/graph.c
+++ b/lib/graph/graph.c
@@ -275,6 +275,7 @@  rte_graph_bind_core(rte_graph_t id, int lcore)
 			break;
 
 	graph->lcore_id = lcore;
+	graph->graph->lcore_id = graph->lcore_id;
 	graph->socket = rte_lcore_to_socket_id(lcore);
 
 	/* check the availability of source node */
diff --git a/lib/graph/graph_populate.c b/lib/graph/graph_populate.c
index 102fd6c29b..26f9670406 100644
--- a/lib/graph/graph_populate.c
+++ b/lib/graph/graph_populate.c
@@ -84,6 +84,7 @@  graph_nodes_populate(struct graph *_graph)
 		}
 		node->id = graph_node->node->id;
 		node->parent_id = pid;
+		node->lcore_id = graph_node->node->lcore_id;
 		nb_edges = graph_node->node->nb_edges;
 		node->nb_edges = nb_edges;
 		off += sizeof(struct rte_node);
diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h
index c1f2aadd42..f58d0d1d63 100644
--- a/lib/graph/graph_private.h
+++ b/lib/graph/graph_private.h
@@ -59,6 +59,18 @@  struct node {
 	char next_nodes[][RTE_NODE_NAMESIZE]; /**< Names of next nodes. */
 };
 
+/**
+ * @internal
+ *
+ * Structure that holds the graph scheduling workqueue node stream.
+ * Used for generic worker model.
+ */
+struct graph_sched_wq_node {
+	rte_graph_off_t node_off;
+	uint16_t nb_objs;
+	void *objs[RTE_GRAPH_BURST_SIZE];
+} __rte_cache_aligned;
+
 /**
  * @internal
  *
@@ -349,4 +361,31 @@  void graph_dump(FILE *f, struct graph *g);
  */
 void node_dump(FILE *f, struct node *n);
 
+/**
+ * @internal
+ *
+ * Create the graph schedule work queue. And all cloned graphs attached to the
+ * parent graph MUST be destroyed together for fast schedule design limitation.
+ *
+ * @param _graph
+ *   The graph object
+ * @param _parent_graph
+ *   The parent graph object which holds the run-queue head.
+ *
+ * @return
+ *   - 0: Success.
+ *   - <0: Graph schedule work queue related error.
+ */
+int graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph);
+
+/**
+ * @internal
+ *
+ * Destroy the graph schedule work queue.
+ *
+ * @param _graph
+ *   The graph object
+ */
+void graph_sched_wq_destroy(struct graph *_graph);
+
 #endif /* _RTE_GRAPH_PRIVATE_H_ */
diff --git a/lib/graph/meson.build b/lib/graph/meson.build
index 8c8b11ed27..f93ab6fdcb 100644
--- a/lib/graph/meson.build
+++ b/lib/graph/meson.build
@@ -18,4 +18,4 @@  sources = files(
 )
 headers = files('rte_graph.h', 'rte_graph_worker.h')
 
-deps += ['eal']
+deps += ['eal', 'mempool', 'ring']
diff --git a/lib/graph/rte_graph_model_generic.c b/lib/graph/rte_graph_model_generic.c
index 54ff659c7b..c862237432 100644
--- a/lib/graph/rte_graph_model_generic.c
+++ b/lib/graph/rte_graph_model_generic.c
@@ -5,6 +5,151 @@ 
 #include "graph_private.h"
 #include "rte_graph_model_generic.h"
 
+int
+graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph)
+{
+	struct rte_graph *parent_graph = _parent_graph->graph;
+	struct rte_graph *graph = _graph->graph;
+	unsigned int wq_size;
+
+	wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
+	wq_size = rte_align32pow2(wq_size + 1);
+
+	graph->wq = rte_ring_create(graph->name, wq_size, graph->socket,
+				    RING_F_SC_DEQ);
+	if (graph->wq == NULL)
+		SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
+
+	graph->mp = rte_mempool_create(graph->name, wq_size,
+				       sizeof(struct graph_sched_wq_node),
+				       0, 0, NULL, NULL, NULL, NULL,
+				       graph->socket, MEMPOOL_F_SP_PUT);
+	if (graph->mp == NULL)
+		SET_ERR_JMP(EIO, fail_mp,
+			    "Failed to allocate graph WQ schedule entry");
+
+	graph->lcore_id = _graph->lcore_id;
+
+	if (parent_graph->rq == NULL) {
+		parent_graph->rq = &parent_graph->rq_head;
+		SLIST_INIT(parent_graph->rq);
+	}
+
+	graph->rq = parent_graph->rq;
+	SLIST_INSERT_HEAD(graph->rq, graph, rq_next);
+
+	return 0;
+
+fail_mp:
+	rte_ring_free(graph->wq);
+	graph->wq = NULL;
+fail:
+	return -rte_errno;
+}
+
+void
+graph_sched_wq_destroy(struct graph *_graph)
+{
+	struct rte_graph *graph = _graph->graph;
+
+	if (graph == NULL)
+		return;
+
+	rte_ring_free(graph->wq);
+	graph->wq = NULL;
+
+	rte_mempool_free(graph->mp);
+	graph->mp = NULL;
+}
+
+static __rte_always_inline bool
+__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph *graph)
+{
+	struct graph_sched_wq_node *wq_node;
+	uint16_t off = 0;
+	uint16_t size;
+
+submit_again:
+	if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0)
+		goto fallback;
+
+	size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
+	wq_node->node_off = node->off;
+	wq_node->nb_objs = size;
+	rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void *));
+
+	while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void *)&wq_node,
+					  sizeof(wq_node), 1, NULL) == 0)
+		rte_pause();
+
+	off += size;
+	node->idx -= size;
+	if (node->idx > 0)
+		goto submit_again;
+
+	return true;
+
+fallback:
+	if (off != 0)
+		memmove(&node->objs[0], &node->objs[off],
+			node->idx * sizeof(void *));
+
+	return false;
+}
+
+bool __rte_noinline
+__rte_graph_sched_node_enqueue(struct rte_node *node,
+			       struct rte_graph_rq_head *rq)
+{
+	const unsigned int lcore_id = node->lcore_id;
+	struct rte_graph *graph;
+
+	SLIST_FOREACH(graph, rq, rq_next)
+		if (graph->lcore_id == lcore_id)
+			break;
+
+	return graph != NULL ? __graph_sched_node_enqueue(node, graph) : false;
+}
+
+void __rte_noinline
+__rte_graph_sched_wq_process(struct rte_graph *graph)
+{
+	struct graph_sched_wq_node *wq_node;
+	struct rte_mempool *mp = graph->mp;
+	struct rte_ring *wq = graph->wq;
+	uint16_t idx, free_space;
+	struct rte_node *node;
+	unsigned int i, n;
+	struct graph_sched_wq_node *wq_nodes[32];
+
+	n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes, sizeof(wq_nodes[0]),
+					   RTE_DIM(wq_nodes), NULL);
+	if (n == 0)
+		return;
+
+	for (i = 0; i < n; i++) {
+		wq_node = wq_nodes[i];
+		node = RTE_PTR_ADD(graph, wq_node->node_off);
+		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+		idx = node->idx;
+		free_space = node->size - idx;
+
+		if (unlikely(free_space < wq_node->nb_objs))
+			__rte_node_stream_alloc_size(graph, node, node->size + wq_node->nb_objs);
+
+		memmove(&node->objs[idx], wq_node->objs, wq_node->nb_objs * sizeof(void *));
+		memset(wq_node->objs, 0, wq_node->nb_objs * sizeof(void *));
+		node->idx = idx + wq_node->nb_objs;
+
+		__rte_node_process(graph, node);
+
+		wq_node->nb_objs = 0;
+		node->idx = 0;
+	}
+
+	rte_mempool_put_bulk(mp, (void **)wq_nodes, n);
+}
+
 int
 rte_node_model_generic_set_lcore_affinity(const char *name, unsigned int lcore_id)
 {
diff --git a/lib/graph/rte_graph_model_generic.h b/lib/graph/rte_graph_model_generic.h
index 20ca48a9e3..5715fc8ffb 100644
--- a/lib/graph/rte_graph_model_generic.h
+++ b/lib/graph/rte_graph_model_generic.h
@@ -15,12 +15,47 @@ 
  * This API allows a worker thread to walk over a graph and nodes to create,
  * process, enqueue and move streams of objects to the next nodes.
  */
+#include <rte_errno.h>
+#include <rte_mempool.h>
+#include <rte_memzone.h>
+#include <rte_ring.h>
+
 #include "rte_graph_worker_common.h"
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
+#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER  8
+#define GRAPH_SCHED_WQ_SIZE(nb_nodes)   \
+	((typeof(nb_nodes))((nb_nodes) * GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
+
+/**
+ * @internal
+ *
+ * Schedule the node to the right graph's work queue.
+ *
+ * @param node
+ *   Pointer to the scheduled node object.
+ * @param rq
+ *   Pointer to the scheduled run-queue for all graphs.
+ *
+ * @return
+ *   True on success, false otherwise.
+ */
+bool __rte_graph_sched_node_enqueue(struct rte_node *node,
+				    struct rte_graph_rq_head *rq);
+
+/**
+ * @internal
+ *
+ * Process all nodes (streams) in the graph's work queue.
+ *
+ * @param graph
+ *   Pointer to the graph object.
+ */
+void __rte_noinline __rte_graph_sched_wq_process(struct rte_graph *graph);
+
 /**
  * Set lcore affinity to the node.
  *
diff --git a/lib/graph/rte_graph_worker_common.h b/lib/graph/rte_graph_worker_common.h
index 507a344afd..cf38a03f44 100644
--- a/lib/graph/rte_graph_worker_common.h
+++ b/lib/graph/rte_graph_worker_common.h
@@ -28,6 +28,13 @@ 
 extern "C" {
 #endif
 
+/**
+ * @internal
+ *
+ * Singly-linked list head for graph schedule run-queue.
+ */
+SLIST_HEAD(rte_graph_rq_head, rte_graph);
+
 /**
  * @internal
  *
@@ -39,6 +46,15 @@  struct rte_graph {
 	uint32_t cir_mask;	     /**< Circular buffer wrap around mask. */
 	rte_node_t nb_nodes;	     /**< Number of nodes in the graph. */
 	rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
+	/* Graph schedule */
+	struct rte_graph_rq_head *rq __rte_cache_aligned; /* The run-queue */
+	struct rte_graph_rq_head rq_head; /* The head for run-queue list */
+
+	SLIST_ENTRY(rte_graph) rq_next;   /* The next for run-queue list */
+	unsigned int lcore_id;  /**< The graph running Lcore. */
+	struct rte_ring *wq;    /**< The work-queue for pending streams. */
+	struct rte_mempool *mp; /**< The mempool for scheduling streams. */
+	/* Graph schedule area */
 	rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
 	rte_graph_t id;	/**< Graph identifier. */
 	int socket;	/**< Socket ID where memory is allocated. */
@@ -63,6 +79,8 @@  struct rte_node {
 	char parent[RTE_NODE_NAMESIZE];	/**< Parent node name. */
 	char name[RTE_NODE_NAMESIZE];	/**< Name of the node. */
 
+	/* Fast schedule area */
+	unsigned int lcore_id __rte_cache_aligned;  /**< Node running Lcore. */
 	/* Fast path area  */
 #define RTE_NODE_CTX_SZ 16
 	uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */