[v10,10/16] graph: introduce stream moving cross cores

Message ID 20230608095759.1800617-11-zhirun.yan@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Thomas Monjalon
Headers
Series graph enhancement for multi-core dispatch |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Yan, Zhirun June 8, 2023, 9:57 a.m. UTC
  This patch introduces key functions to allow a worker thread to
enable enqueue and move streams of objects to the next nodes over
different cores for mcore dispatch model.

Signed-off-by: Haiyue Wang <haiyue.wang@intel.com>
Signed-off-by: Cunming Liang <cunming.liang@intel.com>
Signed-off-by: Zhirun Yan <zhirun.yan@intel.com>
---
 lib/graph/graph.c                          |   6 +-
 lib/graph/graph_private.h                  |  31 ++++
 lib/graph/meson.build                      |   2 +-
 lib/graph/rte_graph.h                      |  15 +-
 lib/graph/rte_graph_model_mcore_dispatch.c | 158 +++++++++++++++++++++
 lib/graph/rte_graph_model_mcore_dispatch.h |  45 ++++++
 lib/graph/version.map                      |   2 +
 7 files changed, 254 insertions(+), 5 deletions(-)
  

Comments

Pavan Nikhilesh Bhagavatula June 8, 2023, 1:39 p.m. UTC | #1
> This patch introduces key functions to allow a worker thread to
> enable enqueue and move streams of objects to the next nodes over
> different cores for mcore dispatch model.
> 
> Signed-off-by: Haiyue Wang <haiyue.wang@intel.com>
> Signed-off-by: Cunming Liang <cunming.liang@intel.com>
> Signed-off-by: Zhirun Yan <zhirun.yan@intel.com>

Acked-by: Pavan Nikhilesh <pbhagavatula@marvell.com>

> ---
>  lib/graph/graph.c                          |   6 +-
>  lib/graph/graph_private.h                  |  31 ++++
>  lib/graph/meson.build                      |   2 +-
>  lib/graph/rte_graph.h                      |  15 +-
>  lib/graph/rte_graph_model_mcore_dispatch.c | 158
> +++++++++++++++++++++
>  lib/graph/rte_graph_model_mcore_dispatch.h |  45 ++++++
>  lib/graph/version.map                      |   2 +
>  7 files changed, 254 insertions(+), 5 deletions(-)
> 
> diff --git a/lib/graph/graph.c b/lib/graph/graph.c
> index 968cbbf86c..41251e3435 100644
> --- a/lib/graph/graph.c
> +++ b/lib/graph/graph.c
> @@ -473,7 +473,7 @@ rte_graph_destroy(rte_graph_t id)
>  }
> 
>  static rte_graph_t
> -graph_clone(struct graph *parent_graph, const char *name)
> +graph_clone(struct graph *parent_graph, const char *name, struct
> rte_graph_param *prm)
>  {
>  	struct graph_node *graph_node;
>  	struct graph *graph;
> @@ -547,14 +547,14 @@ graph_clone(struct graph *parent_graph, const
> char *name)
>  }
> 
>  rte_graph_t
> -rte_graph_clone(rte_graph_t id, const char *name)
> +rte_graph_clone(rte_graph_t id, const char *name, struct
> rte_graph_param *prm)
>  {
>  	struct graph *graph;
> 
>  	GRAPH_ID_CHECK(id);
>  	STAILQ_FOREACH(graph, &graph_list, next)
>  		if (graph->id == id)
> -			return graph_clone(graph, name);
> +			return graph_clone(graph, name, prm);
> 
>  fail:
>  	return RTE_GRAPH_ID_INVALID;
> diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h
> index d84174b667..d0ef13b205 100644
> --- a/lib/graph/graph_private.h
> +++ b/lib/graph/graph_private.h
> @@ -414,4 +414,35 @@ void graph_dump(FILE *f, struct graph *g);
>   */
>  void node_dump(FILE *f, struct node *n);
> 
> +/**
> + * @internal
> + *
> + * Create the graph schedule work queue for mcore dispatch model.
> + * All cloned graphs attached to the parent graph MUST be destroyed
> together
> + * for fast schedule design limitation.
> + *
> + * @param _graph
> + *   The graph object
> + * @param _parent_graph
> + *   The parent graph object which holds the run-queue head.
> + * @param prm
> + *   Graph parameter, includes model-specific parameters in this graph.
> + *
> + * @return
> + *   - 0: Success.
> + *   - <0: Graph schedule work queue related error.
> + */
> +int graph_sched_wq_create(struct graph *_graph, struct graph
> *_parent_graph,
> +			   struct rte_graph_param *prm);
> +
> +/**
> + * @internal
> + *
> + * Destroy the graph schedule work queue for mcore dispatch model.
> + *
> + * @param _graph
> + *   The graph object
> + */
> +void graph_sched_wq_destroy(struct graph *_graph);
> +
>  #endif /* _RTE_GRAPH_PRIVATE_H_ */
> diff --git a/lib/graph/meson.build b/lib/graph/meson.build
> index 0685cf9e72..9d51eabe33 100644
> --- a/lib/graph/meson.build
> +++ b/lib/graph/meson.build
> @@ -20,4 +20,4 @@ sources = files(
>  )
>  headers = files('rte_graph.h', 'rte_graph_worker.h')
> 
> -deps += ['eal', 'pcapng']
> +deps += ['eal', 'pcapng', 'mempool', 'ring']
> diff --git a/lib/graph/rte_graph.h b/lib/graph/rte_graph.h
> index 998cade200..2ffee520b1 100644
> --- a/lib/graph/rte_graph.h
> +++ b/lib/graph/rte_graph.h
> @@ -169,6 +169,17 @@ struct rte_graph_param {
>  	bool pcap_enable; /**< Pcap enable. */
>  	uint64_t num_pkt_to_capture; /**< Number of packets to capture.
> */
>  	char *pcap_filename; /**< Filename in which packets to be
> captured.*/
> +
> +	RTE_STD_C11
> +	union {
> +		struct {
> +			uint64_t rsvd; /**< Reserved for rtc model. */
> +		} rtc;
> +		struct {
> +			uint32_t wq_size_max; /**< Maximum size of
> workqueue for dispatch model. */
> +			uint32_t mp_capacity; /**< Capacity of memory pool
> for dispatch model. */
> +		} dispatch;
> +	};
>  };
> 
>  /**
> @@ -260,12 +271,14 @@ int rte_graph_destroy(rte_graph_t id);
>   *   Name of the new graph. The library prepends the parent graph name to
> the
>   * user-specified name. The final graph name will be,
>   * "parent graph name" + "-" + name.
> + * @param prm
> + *   Graph parameter, includes model-specific parameters in this graph.
>   *
>   * @return
>   *   Valid graph id on success, RTE_GRAPH_ID_INVALID otherwise.
>   */
>  __rte_experimental
> -rte_graph_t rte_graph_clone(rte_graph_t id, const char *name);
> +rte_graph_t rte_graph_clone(rte_graph_t id, const char *name, struct
> rte_graph_param *prm);
> 
>  /**
>   * Get graph id from graph name.
> diff --git a/lib/graph/rte_graph_model_mcore_dispatch.c
> b/lib/graph/rte_graph_model_mcore_dispatch.c
> index 9df2479a10..8f4bc860ab 100644
> --- a/lib/graph/rte_graph_model_mcore_dispatch.c
> +++ b/lib/graph/rte_graph_model_mcore_dispatch.c
> @@ -5,6 +5,164 @@
>  #include "graph_private.h"
>  #include "rte_graph_model_mcore_dispatch.h"
> 
> +int
> +graph_sched_wq_create(struct graph *_graph, struct graph
> *_parent_graph,
> +		       struct rte_graph_param *prm)
> +{
> +	struct rte_graph *parent_graph = _parent_graph->graph;
> +	struct rte_graph *graph = _graph->graph;
> +	unsigned int wq_size;
> +	unsigned int flags = RING_F_SC_DEQ;
> +
> +	wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
> +	wq_size = rte_align32pow2(wq_size + 1);
> +
> +	if (prm->dispatch.wq_size_max > 0)
> +		wq_size = wq_size <= (prm->dispatch.wq_size_max) ?
> wq_size :
> +			prm->dispatch.wq_size_max;
> +
> +	if (!rte_is_power_of_2(wq_size))
> +		flags |= RING_F_EXACT_SZ;
> +
> +	graph->dispatch.wq = rte_ring_create(graph->name, wq_size,
> graph->socket,
> +					     flags);
> +	if (graph->dispatch.wq == NULL)
> +		SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
> +
> +	if (prm->dispatch.mp_capacity > 0)
> +		wq_size = (wq_size <= prm->dispatch.mp_capacity) ?
> wq_size :
> +			prm->dispatch.mp_capacity;
> +
> +	graph->dispatch.mp = rte_mempool_create(graph->name, wq_size,
> +						sizeof(struct
> graph_mcore_dispatch_wq_node),
> +						0, 0, NULL, NULL, NULL, NULL,
> +						graph->socket,
> MEMPOOL_F_SP_PUT);
> +	if (graph->dispatch.mp == NULL)
> +		SET_ERR_JMP(EIO, fail_mp,
> +			    "Failed to allocate graph WQ schedule entry");
> +
> +	graph->dispatch.lcore_id = _graph->lcore_id;
> +
> +	if (parent_graph->dispatch.rq == NULL) {
> +		parent_graph->dispatch.rq = &parent_graph-
> >dispatch.rq_head;
> +		SLIST_INIT(parent_graph->dispatch.rq);
> +	}
> +
> +	graph->dispatch.rq = parent_graph->dispatch.rq;
> +	SLIST_INSERT_HEAD(graph->dispatch.rq, graph, next);
> +
> +	return 0;
> +
> +fail_mp:
> +	rte_ring_free(graph->dispatch.wq);
> +	graph->dispatch.wq = NULL;
> +fail:
> +	return -rte_errno;
> +}
> +
> +void
> +graph_sched_wq_destroy(struct graph *_graph)
> +{
> +	struct rte_graph *graph = _graph->graph;
> +
> +	if (graph == NULL)
> +		return;
> +
> +	rte_ring_free(graph->dispatch.wq);
> +	graph->dispatch.wq = NULL;
> +
> +	rte_mempool_free(graph->dispatch.mp);
> +	graph->dispatch.mp = NULL;
> +}
> +
> +static __rte_always_inline bool
> +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph
> *graph)
> +{
> +	struct graph_mcore_dispatch_wq_node *wq_node;
> +	uint16_t off = 0;
> +	uint16_t size;
> +
> +submit_again:
> +	if (rte_mempool_get(graph->dispatch.mp, (void **)&wq_node) < 0)
> +		goto fallback;
> +
> +	size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
> +	wq_node->node_off = node->off;
> +	wq_node->nb_objs = size;
> +	rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void
> *));
> +
> +	while (rte_ring_mp_enqueue_bulk_elem(graph->dispatch.wq, (void
> *)&wq_node,
> +					     sizeof(wq_node), 1, NULL) == 0)
> +		rte_pause();
> +
> +	off += size;
> +	node->idx -= size;
> +	if (node->idx > 0)
> +		goto submit_again;
> +
> +	return true;
> +
> +fallback:
> +	if (off != 0)
> +		memmove(&node->objs[0], &node->objs[off],
> +			node->idx * sizeof(void *));
> +
> +	return false;
> +}
> +
> +bool __rte_noinline
> +__rte_graph_mcore_dispatch_sched_node_enqueue(struct rte_node
> *node,
> +					      struct rte_graph_rq_head *rq)
> +{
> +	const unsigned int lcore_id = node->dispatch.lcore_id;
> +	struct rte_graph *graph;
> +
> +	SLIST_FOREACH(graph, rq, next)
> +		if (graph->dispatch.lcore_id == lcore_id)
> +			break;
> +
> +	return graph != NULL ? __graph_sched_node_enqueue(node,
> graph) : false;
> +}
> +
> +void
> +__rte_graph_mcore_dispatch_sched_wq_process(struct rte_graph
> *graph)
> +{
> +#define WQ_SZ 32
> +	struct graph_mcore_dispatch_wq_node *wq_node;
> +	struct rte_mempool *mp = graph->dispatch.mp;
> +	struct rte_ring *wq = graph->dispatch.wq;
> +	uint16_t idx, free_space;
> +	struct rte_node *node;
> +	unsigned int i, n;
> +	struct graph_mcore_dispatch_wq_node *wq_nodes[WQ_SZ];
> +
> +	n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes,
> sizeof(wq_nodes[0]),
> +					   RTE_DIM(wq_nodes), NULL);
> +	if (n == 0)
> +		return;
> +
> +	for (i = 0; i < n; i++) {
> +		wq_node = wq_nodes[i];
> +		node = RTE_PTR_ADD(graph, wq_node->node_off);
> +		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
> +		idx = node->idx;
> +		free_space = node->size - idx;
> +
> +		if (unlikely(free_space < wq_node->nb_objs))
> +			__rte_node_stream_alloc_size(graph, node, node-
> >size + wq_node->nb_objs);
> +
> +		memmove(&node->objs[idx], wq_node->objs, wq_node-
> >nb_objs * sizeof(void *));
> +		node->idx = idx + wq_node->nb_objs;
> +
> +		__rte_node_process(graph, node);
> +
> +		wq_node->nb_objs = 0;
> +		node->idx = 0;
> +	}
> +
> +	rte_mempool_put_bulk(mp, (void **)wq_nodes, n);
> +}
> +
>  int
>  rte_graph_model_mcore_dispatch_node_lcore_affinity_set(const char
> *name, unsigned int lcore_id)
>  {
> diff --git a/lib/graph/rte_graph_model_mcore_dispatch.h
> b/lib/graph/rte_graph_model_mcore_dispatch.h
> index 7da0483d13..6163f96c37 100644
> --- a/lib/graph/rte_graph_model_mcore_dispatch.h
> +++ b/lib/graph/rte_graph_model_mcore_dispatch.h
> @@ -20,8 +20,53 @@
>  extern "C" {
>  #endif
> 
> +#include <rte_errno.h>
> +#include <rte_mempool.h>
> +#include <rte_memzone.h>
> +#include <rte_ring.h>
> +
>  #include "rte_graph_worker_common.h"
> 
> +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER  8
> +#define GRAPH_SCHED_WQ_SIZE(nb_nodes)   \
> +	((typeof(nb_nodes))((nb_nodes) *
> GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
> +
> +/**
> + * @internal
> + *
> + * Schedule the node to the right graph's work queue for mcore dispatch
> model.
> + *
> + * @param node
> + *   Pointer to the scheduled node object.
> + * @param rq
> + *   Pointer to the scheduled run-queue for all graphs.
> + *
> + * @return
> + *   True on success, false otherwise.
> + *
> + * @note
> + * This implementation is used by mcore dispatch model only and user
> application
> + * should not call it directly.
> + */
> +__rte_experimental
> +bool __rte_noinline
> __rte_graph_mcore_dispatch_sched_node_enqueue(struct rte_node
> *node,
> +								  struct
> rte_graph_rq_head *rq);
> +
> +/**
> + * @internal
> + *
> + * Process all nodes (streams) in the graph's work queue for mcore dispatch
> model.
> + *
> + * @param graph
> + *   Pointer to the graph object.
> + *
> + * @note
> + * This implementation is used by mcore dispatch model only and user
> application
> + * should not call it directly.
> + */
> +__rte_experimental
> +void __rte_graph_mcore_dispatch_sched_wq_process(struct rte_graph
> *graph);
> +
>  /**
>   * Set lcore affinity with the node used for mcore dispatch model.
>   *
> diff --git a/lib/graph/version.map b/lib/graph/version.map
> index dbb3507687..f95a6b0fb5 100644
> --- a/lib/graph/version.map
> +++ b/lib/graph/version.map
> @@ -50,6 +50,8 @@ EXPERIMENTAL {
>  	rte_graph_worker_model_set;
>  	rte_graph_worker_model_get;
>  	rte_graph_worker_model_no_check_get;
> +	__rte_graph_mcore_dispatch_sched_wq_process;
> +	__rte_graph_mcore_dispatch_sched_node_enqueue;
> 
>  	rte_graph_model_mcore_dispatch_node_lcore_affinity_set;
> 
> --
> 2.37.2
  

Patch

diff --git a/lib/graph/graph.c b/lib/graph/graph.c
index 968cbbf86c..41251e3435 100644
--- a/lib/graph/graph.c
+++ b/lib/graph/graph.c
@@ -473,7 +473,7 @@  rte_graph_destroy(rte_graph_t id)
 }
 
 static rte_graph_t
-graph_clone(struct graph *parent_graph, const char *name)
+graph_clone(struct graph *parent_graph, const char *name, struct rte_graph_param *prm)
 {
 	struct graph_node *graph_node;
 	struct graph *graph;
@@ -547,14 +547,14 @@  graph_clone(struct graph *parent_graph, const char *name)
 }
 
 rte_graph_t
-rte_graph_clone(rte_graph_t id, const char *name)
+rte_graph_clone(rte_graph_t id, const char *name, struct rte_graph_param *prm)
 {
 	struct graph *graph;
 
 	GRAPH_ID_CHECK(id);
 	STAILQ_FOREACH(graph, &graph_list, next)
 		if (graph->id == id)
-			return graph_clone(graph, name);
+			return graph_clone(graph, name, prm);
 
 fail:
 	return RTE_GRAPH_ID_INVALID;
diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h
index d84174b667..d0ef13b205 100644
--- a/lib/graph/graph_private.h
+++ b/lib/graph/graph_private.h
@@ -414,4 +414,35 @@  void graph_dump(FILE *f, struct graph *g);
  */
 void node_dump(FILE *f, struct node *n);
 
+/**
+ * @internal
+ *
+ * Create the graph schedule work queue for mcore dispatch model.
+ * All cloned graphs attached to the parent graph MUST be destroyed together
+ * for fast schedule design limitation.
+ *
+ * @param _graph
+ *   The graph object
+ * @param _parent_graph
+ *   The parent graph object which holds the run-queue head.
+ * @param prm
+ *   Graph parameter, includes model-specific parameters in this graph.
+ *
+ * @return
+ *   - 0: Success.
+ *   - <0: Graph schedule work queue related error.
+ */
+int graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph,
+			   struct rte_graph_param *prm);
+
+/**
+ * @internal
+ *
+ * Destroy the graph schedule work queue for mcore dispatch model.
+ *
+ * @param _graph
+ *   The graph object
+ */
+void graph_sched_wq_destroy(struct graph *_graph);
+
 #endif /* _RTE_GRAPH_PRIVATE_H_ */
diff --git a/lib/graph/meson.build b/lib/graph/meson.build
index 0685cf9e72..9d51eabe33 100644
--- a/lib/graph/meson.build
+++ b/lib/graph/meson.build
@@ -20,4 +20,4 @@  sources = files(
 )
 headers = files('rte_graph.h', 'rte_graph_worker.h')
 
-deps += ['eal', 'pcapng']
+deps += ['eal', 'pcapng', 'mempool', 'ring']
diff --git a/lib/graph/rte_graph.h b/lib/graph/rte_graph.h
index 998cade200..2ffee520b1 100644
--- a/lib/graph/rte_graph.h
+++ b/lib/graph/rte_graph.h
@@ -169,6 +169,17 @@  struct rte_graph_param {
 	bool pcap_enable; /**< Pcap enable. */
 	uint64_t num_pkt_to_capture; /**< Number of packets to capture. */
 	char *pcap_filename; /**< Filename in which packets to be captured.*/
+
+	RTE_STD_C11
+	union {
+		struct {
+			uint64_t rsvd; /**< Reserved for rtc model. */
+		} rtc;
+		struct {
+			uint32_t wq_size_max; /**< Maximum size of workqueue for dispatch model. */
+			uint32_t mp_capacity; /**< Capacity of memory pool for dispatch model. */
+		} dispatch;
+	};
 };
 
 /**
@@ -260,12 +271,14 @@  int rte_graph_destroy(rte_graph_t id);
  *   Name of the new graph. The library prepends the parent graph name to the
  * user-specified name. The final graph name will be,
  * "parent graph name" + "-" + name.
+ * @param prm
+ *   Graph parameter, includes model-specific parameters in this graph.
  *
  * @return
  *   Valid graph id on success, RTE_GRAPH_ID_INVALID otherwise.
  */
 __rte_experimental
-rte_graph_t rte_graph_clone(rte_graph_t id, const char *name);
+rte_graph_t rte_graph_clone(rte_graph_t id, const char *name, struct rte_graph_param *prm);
 
 /**
  * Get graph id from graph name.
diff --git a/lib/graph/rte_graph_model_mcore_dispatch.c b/lib/graph/rte_graph_model_mcore_dispatch.c
index 9df2479a10..8f4bc860ab 100644
--- a/lib/graph/rte_graph_model_mcore_dispatch.c
+++ b/lib/graph/rte_graph_model_mcore_dispatch.c
@@ -5,6 +5,164 @@ 
 #include "graph_private.h"
 #include "rte_graph_model_mcore_dispatch.h"
 
+int
+graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph,
+		       struct rte_graph_param *prm)
+{
+	struct rte_graph *parent_graph = _parent_graph->graph;
+	struct rte_graph *graph = _graph->graph;
+	unsigned int wq_size;
+	unsigned int flags = RING_F_SC_DEQ;
+
+	wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
+	wq_size = rte_align32pow2(wq_size + 1);
+
+	if (prm->dispatch.wq_size_max > 0)
+		wq_size = wq_size <= (prm->dispatch.wq_size_max) ? wq_size :
+			prm->dispatch.wq_size_max;
+
+	if (!rte_is_power_of_2(wq_size))
+		flags |= RING_F_EXACT_SZ;
+
+	graph->dispatch.wq = rte_ring_create(graph->name, wq_size, graph->socket,
+					     flags);
+	if (graph->dispatch.wq == NULL)
+		SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
+
+	if (prm->dispatch.mp_capacity > 0)
+		wq_size = (wq_size <= prm->dispatch.mp_capacity) ? wq_size :
+			prm->dispatch.mp_capacity;
+
+	graph->dispatch.mp = rte_mempool_create(graph->name, wq_size,
+						sizeof(struct graph_mcore_dispatch_wq_node),
+						0, 0, NULL, NULL, NULL, NULL,
+						graph->socket, MEMPOOL_F_SP_PUT);
+	if (graph->dispatch.mp == NULL)
+		SET_ERR_JMP(EIO, fail_mp,
+			    "Failed to allocate graph WQ schedule entry");
+
+	graph->dispatch.lcore_id = _graph->lcore_id;
+
+	if (parent_graph->dispatch.rq == NULL) {
+		parent_graph->dispatch.rq = &parent_graph->dispatch.rq_head;
+		SLIST_INIT(parent_graph->dispatch.rq);
+	}
+
+	graph->dispatch.rq = parent_graph->dispatch.rq;
+	SLIST_INSERT_HEAD(graph->dispatch.rq, graph, next);
+
+	return 0;
+
+fail_mp:
+	rte_ring_free(graph->dispatch.wq);
+	graph->dispatch.wq = NULL;
+fail:
+	return -rte_errno;
+}
+
+void
+graph_sched_wq_destroy(struct graph *_graph)
+{
+	struct rte_graph *graph = _graph->graph;
+
+	if (graph == NULL)
+		return;
+
+	rte_ring_free(graph->dispatch.wq);
+	graph->dispatch.wq = NULL;
+
+	rte_mempool_free(graph->dispatch.mp);
+	graph->dispatch.mp = NULL;
+}
+
+static __rte_always_inline bool
+__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph *graph)
+{
+	struct graph_mcore_dispatch_wq_node *wq_node;
+	uint16_t off = 0;
+	uint16_t size;
+
+submit_again:
+	if (rte_mempool_get(graph->dispatch.mp, (void **)&wq_node) < 0)
+		goto fallback;
+
+	size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
+	wq_node->node_off = node->off;
+	wq_node->nb_objs = size;
+	rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void *));
+
+	while (rte_ring_mp_enqueue_bulk_elem(graph->dispatch.wq, (void *)&wq_node,
+					     sizeof(wq_node), 1, NULL) == 0)
+		rte_pause();
+
+	off += size;
+	node->idx -= size;
+	if (node->idx > 0)
+		goto submit_again;
+
+	return true;
+
+fallback:
+	if (off != 0)
+		memmove(&node->objs[0], &node->objs[off],
+			node->idx * sizeof(void *));
+
+	return false;
+}
+
+bool __rte_noinline
+__rte_graph_mcore_dispatch_sched_node_enqueue(struct rte_node *node,
+					      struct rte_graph_rq_head *rq)
+{
+	const unsigned int lcore_id = node->dispatch.lcore_id;
+	struct rte_graph *graph;
+
+	SLIST_FOREACH(graph, rq, next)
+		if (graph->dispatch.lcore_id == lcore_id)
+			break;
+
+	return graph != NULL ? __graph_sched_node_enqueue(node, graph) : false;
+}
+
+void
+__rte_graph_mcore_dispatch_sched_wq_process(struct rte_graph *graph)
+{
+#define WQ_SZ 32
+	struct graph_mcore_dispatch_wq_node *wq_node;
+	struct rte_mempool *mp = graph->dispatch.mp;
+	struct rte_ring *wq = graph->dispatch.wq;
+	uint16_t idx, free_space;
+	struct rte_node *node;
+	unsigned int i, n;
+	struct graph_mcore_dispatch_wq_node *wq_nodes[WQ_SZ];
+
+	n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes, sizeof(wq_nodes[0]),
+					   RTE_DIM(wq_nodes), NULL);
+	if (n == 0)
+		return;
+
+	for (i = 0; i < n; i++) {
+		wq_node = wq_nodes[i];
+		node = RTE_PTR_ADD(graph, wq_node->node_off);
+		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+		idx = node->idx;
+		free_space = node->size - idx;
+
+		if (unlikely(free_space < wq_node->nb_objs))
+			__rte_node_stream_alloc_size(graph, node, node->size + wq_node->nb_objs);
+
+		memmove(&node->objs[idx], wq_node->objs, wq_node->nb_objs * sizeof(void *));
+		node->idx = idx + wq_node->nb_objs;
+
+		__rte_node_process(graph, node);
+
+		wq_node->nb_objs = 0;
+		node->idx = 0;
+	}
+
+	rte_mempool_put_bulk(mp, (void **)wq_nodes, n);
+}
+
 int
 rte_graph_model_mcore_dispatch_node_lcore_affinity_set(const char *name, unsigned int lcore_id)
 {
diff --git a/lib/graph/rte_graph_model_mcore_dispatch.h b/lib/graph/rte_graph_model_mcore_dispatch.h
index 7da0483d13..6163f96c37 100644
--- a/lib/graph/rte_graph_model_mcore_dispatch.h
+++ b/lib/graph/rte_graph_model_mcore_dispatch.h
@@ -20,8 +20,53 @@ 
 extern "C" {
 #endif
 
+#include <rte_errno.h>
+#include <rte_mempool.h>
+#include <rte_memzone.h>
+#include <rte_ring.h>
+
 #include "rte_graph_worker_common.h"
 
+#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER  8
+#define GRAPH_SCHED_WQ_SIZE(nb_nodes)   \
+	((typeof(nb_nodes))((nb_nodes) * GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
+
+/**
+ * @internal
+ *
+ * Schedule the node to the right graph's work queue for mcore dispatch model.
+ *
+ * @param node
+ *   Pointer to the scheduled node object.
+ * @param rq
+ *   Pointer to the scheduled run-queue for all graphs.
+ *
+ * @return
+ *   True on success, false otherwise.
+ *
+ * @note
+ * This implementation is used by mcore dispatch model only and user application
+ * should not call it directly.
+ */
+__rte_experimental
+bool __rte_noinline __rte_graph_mcore_dispatch_sched_node_enqueue(struct rte_node *node,
+								  struct rte_graph_rq_head *rq);
+
+/**
+ * @internal
+ *
+ * Process all nodes (streams) in the graph's work queue for mcore dispatch model.
+ *
+ * @param graph
+ *   Pointer to the graph object.
+ *
+ * @note
+ * This implementation is used by mcore dispatch model only and user application
+ * should not call it directly.
+ */
+__rte_experimental
+void __rte_graph_mcore_dispatch_sched_wq_process(struct rte_graph *graph);
+
 /**
  * Set lcore affinity with the node used for mcore dispatch model.
  *
diff --git a/lib/graph/version.map b/lib/graph/version.map
index dbb3507687..f95a6b0fb5 100644
--- a/lib/graph/version.map
+++ b/lib/graph/version.map
@@ -50,6 +50,8 @@  EXPERIMENTAL {
 	rte_graph_worker_model_set;
 	rte_graph_worker_model_get;
 	rte_graph_worker_model_no_check_get;
+	__rte_graph_mcore_dispatch_sched_wq_process;
+	__rte_graph_mcore_dispatch_sched_node_enqueue;
 
 	rte_graph_model_mcore_dispatch_node_lcore_affinity_set;