[v4,12/29] graph: implement fastpath API routines

Message ID 20200405085613.1336841-13-jerinj@marvell.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series graph: introduce graph subsystem |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Jerin Jacob Kollanukkaran April 5, 2020, 8:55 a.m. UTC
  From: Jerin Jacob <jerinj@marvell.com>

Adding implementation for rte_graph_walk() API. This will perform a walk
on the circular buffer and call the process function of each node
and collect the stats if stats collection is enabled.

Signed-off-by: Jerin Jacob <jerinj@marvell.com>
Signed-off-by: Kiran Kumar K <kirankumark@marvell.com>
Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
Signed-off-by: Nithin Dabilpuram <ndabilpuram@marvell.com>
---
 doc/api/doxy-api-index.md              |   1 +
 lib/librte_graph/graph.c               |  16 +
 lib/librte_graph/rte_graph_version.map |  10 +
 lib/librte_graph/rte_graph_worker.h    | 434 +++++++++++++++++++++++++
 4 files changed, 461 insertions(+)
  

Comments

Andrzej Ostruszka April 9, 2020, 11:07 p.m. UTC | #1
On 4/5/20 10:55 AM, jerinj@marvell.com wrote:
> From: Jerin Jacob <jerinj@marvell.com>
> 
> Adding implementation for rte_graph_walk() API. This will perform a walk
> on the circular buffer and call the process function of each node
> and collect the stats if stats collection is enabled.
> 
> Signed-off-by: Jerin Jacob <jerinj@marvell.com>
> Signed-off-by: Kiran Kumar K <kirankumark@marvell.com>
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> Signed-off-by: Nithin Dabilpuram <ndabilpuram@marvell.com>
> ---
[...]
> +__rte_experimental
> +static inline void
> +rte_graph_walk(struct rte_graph *graph)
> +{
> +	const rte_graph_off_t *cir_start = graph->cir_start;
> +	const rte_node_t mask = graph->cir_mask;
> +	uint32_t head = graph->head;
> +	struct rte_node *node;
> +	uint64_t start;
> +	uint16_t rc;
> +	void **objs;
> +
> +	/*
> +	 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
> +	 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
> +	 * in a circular buffer fashion.
> +	 *
> +	 *	+-----+ <= cir_start - head [number of source nodes]
> +	 *	|     |
> +	 *	| ... | <= source nodes
> +	 *	|     |
> +	 *	+-----+ <= cir_start [head = 0] [tail = 0]
> +	 *	|     |
> +	 *	| ... | <= pending streams
> +	 *	|     |
> +	 *	+-----+ <= cir_start + mask
> +	 */
> +	while (likely(head != graph->tail)) {
> +		node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
> +		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
> +		objs = node->objs;
> +		rte_prefetch0(objs);
> +
> +		if (rte_graph_has_stats_feature()) {
> +			start = rte_rdtsc();
> +			rc = node->process(graph, node, objs, node->idx);
> +			node->total_cycles += rte_rdtsc() - start;
> +			node->total_calls++;
> +			node->total_objs += rc;
> +		} else {
> +			node->process(graph, node, objs, node->idx);
> +		}
> +		node->idx = 0;

So I guess this is a responsibility of a node process function to handle
all objects.  What should it do if it is not possible.  E.g. after
tx_burst we usually drop packets, how do you drop objects in graph?  Do
you simply free them (does the node knows how object was allocated?) or
you need to pass it to "null" node.  Process function returns number of
objects processed (e.g. later RX/TX nodes), why it is not used here?

> +		head = likely((int32_t)head > 0) ? head & mask : head;
> +	}
> +	graph->tail = 0;
> +}
[...]
> +__rte_experimental
> +static inline void
> +rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
> +		 rte_edge_t next, void **objs, uint16_t nb_objs)
> +{
> +	node = __rte_node_next_node_get(node, next);
> +	const uint16_t idx = node->idx;
> +
> +	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
> +
> +	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
> +	node->idx = idx + nb_objs;
> +}

I see how it works for usual scenario.  But is there some kind of
fork/join operation?  Just trying to imagine scenario where I have a
bunch of co-processors doing different operations for given
object/packet.  Then to to minimize latency I'd like to use them in
parallel and when all done then enqueue it to next node.  Is there a
support for that in terms of graph lib or should that be handled in the
process function of a single node?

[...]
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Enqueue objs to multiple next nodes for further processing and
> + * set the next nodes to pending state in the circular buffer.
> + * objs[i] will be enqueued to nexts[i].
> + *
> + * @param graph
> + *   Graph pointer returned from rte_graph_lookup().
> + * @param node
> + *   Current node pointer.
> + * @param nexts
> + *   List of relative next node indices to enqueue objs.
> + * @param objs
> + *   List of objs to enqueue.
> + * @param nb_objs
> + *   Number of objs to enqueue.
> + */
> +__rte_experimental
> +static inline void
> +rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
> +		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
> +{
> +	uint16_t i;
> +
> +	for (i = 0; i < nb_objs; i++)
> +		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);

I have noticed comments about x1/2/4 functions but since you defended
the performance there why not having some kind of use of them here
(Duff's device like unrolling)?  Just like you have in your performance
test.

[...]
> +__rte_experimental
> +static inline void **
> +rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
> +			 rte_edge_t next, uint16_t nb_objs)
> +{
> +	node = __rte_node_next_node_get(node, next);
> +	const uint16_t idx = node->idx;
> +	uint16_t free_space = node->size - idx;
> +
> +	if (unlikely(free_space < nb_objs))
> +		__rte_node_stream_alloc_size(graph, node, nb_objs);
> +
> +	return &node->objs[idx];
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Put the next stream to pending state in the circular buffer
> + * for further processing. Should be invoked followed by
> + * rte_node_next_stream_get().

Is the last sentence correct?

> + *
> + * @param graph
> + *   Graph pointer returned from rte_graph_lookup().
> + * @param node
> + *   Current node pointer.
> + * @param next
> + *   Relative next node index..
> + * @param idx
> + *   Number of objs updated in the stream after getting the stream using
> + *   rte_node_next_stream_get.
> + *
> + * @see rte_node_next_stream_get().
> + */
> +__rte_experimental
> +static inline void
> +rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
> +			 rte_edge_t next, uint16_t idx)

I understood the stream_move as an optimized enqueue, but could you
describe how these _get/put are meant to be used, and why not use
stream_move/enqueue?  I see in later (testing) patches that it is used
for source nodes, and I'm not sure I understand the difference between them.

With regards
Andrzej Ostruszka
  
Jerin Jacob April 10, 2020, 9:18 a.m. UTC | #2
On Fri, Apr 10, 2020 at 4:37 AM Andrzej Ostruszka <amo@semihalf.com> wrote:
>
> On 4/5/20 10:55 AM, jerinj@marvell.com wrote:
> > From: Jerin Jacob <jerinj@marvell.com>
> >
> > Adding implementation for rte_graph_walk() API. This will perform a walk
> > on the circular buffer and call the process function of each node
> > and collect the stats if stats collection is enabled.
> >
> > Signed-off-by: Jerin Jacob <jerinj@marvell.com>
> > Signed-off-by: Kiran Kumar K <kirankumark@marvell.com>
> > Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> > Signed-off-by: Nithin Dabilpuram <ndabilpuram@marvell.com>
> > ---
> [...]
> > +__rte_experimental
> > +static inline void
> > +rte_graph_walk(struct rte_graph *graph)
> > +{
> > +     const rte_graph_off_t *cir_start = graph->cir_start;
> > +     const rte_node_t mask = graph->cir_mask;
> > +     uint32_t head = graph->head;
> > +     struct rte_node *node;
> > +     uint64_t start;
> > +     uint16_t rc;
> > +     void **objs;
> > +
> > +     /*
> > +      * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
> > +      * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
> > +      * in a circular buffer fashion.
> > +      *
> > +      *      +-----+ <= cir_start - head [number of source nodes]
> > +      *      |     |
> > +      *      | ... | <= source nodes
> > +      *      |     |
> > +      *      +-----+ <= cir_start [head = 0] [tail = 0]
> > +      *      |     |
> > +      *      | ... | <= pending streams
> > +      *      |     |
> > +      *      +-----+ <= cir_start + mask
> > +      */
> > +     while (likely(head != graph->tail)) {
> > +             node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
> > +             RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
> > +             objs = node->objs;
> > +             rte_prefetch0(objs);
> > +
> > +             if (rte_graph_has_stats_feature()) {
> > +                     start = rte_rdtsc();
> > +                     rc = node->process(graph, node, objs, node->idx);
> > +                     node->total_cycles += rte_rdtsc() - start;
> > +                     node->total_calls++;
> > +                     node->total_objs += rc;
> > +             } else {
> > +                     node->process(graph, node, objs, node->idx);
> > +             }
> > +             node->idx = 0;
>
> So I guess this is a responsibility of a node process function to handle
> all objects.  What should it do if it is not possible.  E.g. after
> tx_burst we usually drop packets, how do you drop objects in graph?  Do
> you simply free them (does the node knows how object was allocated?) or
> you need to pass it to "null" node.  Process function returns number of
> objects processed (e.g. later RX/TX nodes), why it is not used here?

Graph library deals with (void *)objects, not the mbuf. So it can be used with
any datatypes.
For packet drop requirements, a "packet drop" node added in the libnode to
free the packets to mempool.
The downstream node can send  to "null"  node  if it needs to drop on the floor.
So it is the downstream node decision. The graph library is not
dictating any domain-specific
details.

>
> > +             head = likely((int32_t)head > 0) ? head & mask : head;
> > +     }
> > +     graph->tail = 0;
> > +}
> [...]
> > +__rte_experimental
> > +static inline void
> > +rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
> > +              rte_edge_t next, void **objs, uint16_t nb_objs)
> > +{
> > +     node = __rte_node_next_node_get(node, next);
> > +     const uint16_t idx = node->idx;
> > +
> > +     __rte_node_enqueue_prologue(graph, node, idx, nb_objs);
> > +
> > +     rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
> > +     node->idx = idx + nb_objs;
> > +}
>
> I see how it works for usual scenario.  But is there some kind of
> fork/join operation?  Just trying to imagine scenario where I have a
> bunch of co-processors doing different operations for given
> object/packet.  Then to to minimize latency I'd like to use them in
> parallel and when all done then enqueue it to next node.  Is there a
> support for that in terms of graph lib or should that be handled in the
> process function of a single node?

Yes. We  can create multiple graphs(each attached to a core) which
can have multiple instances of SRC nodes.

>
> [...]
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Enqueue objs to multiple next nodes for further processing and
> > + * set the next nodes to pending state in the circular buffer.
> > + * objs[i] will be enqueued to nexts[i].
> > + *
> > + * @param graph
> > + *   Graph pointer returned from rte_graph_lookup().
> > + * @param node
> > + *   Current node pointer.
> > + * @param nexts
> > + *   List of relative next node indices to enqueue objs.
> > + * @param objs
> > + *   List of objs to enqueue.
> > + * @param nb_objs
> > + *   Number of objs to enqueue.
> > + */
> > +__rte_experimental
> > +static inline void
> > +rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
> > +                   rte_edge_t *nexts, void **objs, uint16_t nb_objs)
> > +{
> > +     uint16_t i;
> > +
> > +     for (i = 0; i < nb_objs; i++)
> > +             rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
>
> I have noticed comments about x1/2/4 functions but since you defended
> the performance there why not having some kind of use of them here
> (Duff's device like unrolling)?  Just like you have in your performance
> test.

This function will be replaced with more SIMD type processing in future
based on the performance need.

>
> [...]
> > +__rte_experimental
> > +static inline void **
> > +rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
> > +                      rte_edge_t next, uint16_t nb_objs)
> > +{
> > +     node = __rte_node_next_node_get(node, next);
> > +     const uint16_t idx = node->idx;
> > +     uint16_t free_space = node->size - idx;
> > +
> > +     if (unlikely(free_space < nb_objs))
> > +             __rte_node_stream_alloc_size(graph, node, nb_objs);
> > +
> > +     return &node->objs[idx];
> > +}
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Put the next stream to pending state in the circular buffer
> > + * for further processing. Should be invoked followed by
> > + * rte_node_next_stream_get().
>
> Is the last sentence correct?

I will reword to "Should be invoked after rte_node_next_stream_get()"
  

Patch

diff --git a/doc/api/doxy-api-index.md b/doc/api/doxy-api-index.md
index 5cc50f750..fd2ff64d7 100644
--- a/doc/api/doxy-api-index.md
+++ b/doc/api/doxy-api-index.md
@@ -160,6 +160,7 @@  The public API headers are grouped by topics:
     [port_in_action]   (@ref rte_port_in_action.h)
     [table_action]     (@ref rte_table_action.h)
   * [graph]            (@ref rte_graph.h):
+    [graph_worker]     (@ref rte_graph_worker.h)
 
 - **basic**:
   [approx fraction]    (@ref rte_approx.h),
diff --git a/lib/librte_graph/graph.c b/lib/librte_graph/graph.c
index e96363777..d5d816c71 100644
--- a/lib/librte_graph/graph.c
+++ b/lib/librte_graph/graph.c
@@ -474,6 +474,22 @@  __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node)
 	node->realloc_count++;
 }
 
+void __rte_noinline
+__rte_node_stream_alloc_size(struct rte_graph *graph, struct rte_node *node,
+			     uint16_t req_size)
+{
+	uint16_t size = node->size;
+
+	RTE_VERIFY(size != UINT16_MAX);
+	/* Allocate double amount of size to avoid immediate realloc */
+	size = RTE_MIN(UINT16_MAX, RTE_MAX(RTE_GRAPH_BURST_SIZE, req_size * 2));
+	node->objs = rte_realloc_socket(node->objs, size * sizeof(void *),
+					RTE_CACHE_LINE_SIZE, graph->socket);
+	RTE_VERIFY(node->objs);
+	node->size = size;
+	node->realloc_count++;
+}
+
 static int
 graph_to_dot(FILE *f, struct graph *graph)
 {
diff --git a/lib/librte_graph/rte_graph_version.map b/lib/librte_graph/rte_graph_version.map
index adf55d406..13b838752 100644
--- a/lib/librte_graph/rte_graph_version.map
+++ b/lib/librte_graph/rte_graph_version.map
@@ -3,6 +3,7 @@  EXPERIMENTAL {
 
 	__rte_node_register;
 	__rte_node_stream_alloc;
+	__rte_node_stream_alloc_size;
 
 	rte_graph_create;
 	rte_graph_destroy;
@@ -16,6 +17,7 @@  EXPERIMENTAL {
 	rte_graph_node_get;
 	rte_graph_node_get_by_name;
 	rte_graph_obj_dump;
+	rte_graph_walk;
 
 	rte_graph_cluster_stats_create;
 	rte_graph_cluster_stats_destroy;
@@ -28,10 +30,18 @@  EXPERIMENTAL {
 	rte_node_edge_get;
 	rte_node_edge_shrink;
 	rte_node_edge_update;
+	rte_node_enqueue;
+	rte_node_enqueue_x1;
+	rte_node_enqueue_x2;
+	rte_node_enqueue_x4;
+	rte_node_enqueue_next;
 	rte_node_from_name;
 	rte_node_id_to_name;
 	rte_node_list_dump;
 	rte_node_max_count;
+	rte_node_next_stream_get;
+	rte_node_next_stream_put;
+	rte_node_next_stream_move;
 
 	local: *;
 };
diff --git a/lib/librte_graph/rte_graph_worker.h b/lib/librte_graph/rte_graph_worker.h
index a8133739d..a1bfc498b 100644
--- a/lib/librte_graph/rte_graph_worker.h
+++ b/lib/librte_graph/rte_graph_worker.h
@@ -101,6 +101,440 @@  struct rte_node {
 __rte_experimental
 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
 
+/**
+ * @internal
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Allocate a stream with requested number of objects.
+ *
+ * If stream already exists then re-allocate it to a larger size.
+ *
+ * @param graph
+ *   Pointer to the graph object.
+ * @param node
+ *   Pointer to the node object.
+ * @param req_size
+ *   Number of objects to be allocated.
+ */
+__rte_experimental
+void __rte_node_stream_alloc_size(struct rte_graph *graph,
+				  struct rte_node *node, uint16_t req_size);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Perform graph walk on the circular buffer and invoke the process function
+ * of the nodes and collect the stats.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup function.
+ *
+ * @see rte_graph_lookup()
+ */
+__rte_experimental
+static inline void
+rte_graph_walk(struct rte_graph *graph)
+{
+	const rte_graph_off_t *cir_start = graph->cir_start;
+	const rte_node_t mask = graph->cir_mask;
+	uint32_t head = graph->head;
+	struct rte_node *node;
+	uint64_t start;
+	uint16_t rc;
+	void **objs;
+
+	/*
+	 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
+	 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
+	 * in a circular buffer fashion.
+	 *
+	 *	+-----+ <= cir_start - head [number of source nodes]
+	 *	|     |
+	 *	| ... | <= source nodes
+	 *	|     |
+	 *	+-----+ <= cir_start [head = 0] [tail = 0]
+	 *	|     |
+	 *	| ... | <= pending streams
+	 *	|     |
+	 *	+-----+ <= cir_start + mask
+	 */
+	while (likely(head != graph->tail)) {
+		node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
+		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+		objs = node->objs;
+		rte_prefetch0(objs);
+
+		if (rte_graph_has_stats_feature()) {
+			start = rte_rdtsc();
+			rc = node->process(graph, node, objs, node->idx);
+			node->total_cycles += rte_rdtsc() - start;
+			node->total_calls++;
+			node->total_objs += rc;
+		} else {
+			node->process(graph, node, objs, node->idx);
+		}
+		node->idx = 0;
+		head = likely((int32_t)head > 0) ? head & mask : head;
+	}
+	graph->tail = 0;
+}
+
+/* Fast path helper functions */
+
+/**
+ * @internal
+ *
+ * Enqueue a given node to the tail of the graph reel.
+ *
+ * @param graph
+ *   Pointer Graph object.
+ * @param node
+ *   Pointer to node object to be enqueued.
+ */
+static __rte_always_inline void
+__rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
+{
+	uint32_t tail;
+
+	tail = graph->tail;
+	graph->cir_start[tail++] = node->off;
+	graph->tail = tail & graph->cir_mask;
+}
+
+/**
+ * @internal
+ *
+ * Enqueue sequence prologue function.
+ *
+ * Updates the node to tail of graph reel and resizes the number of objects
+ * available in the stream as needed.
+ *
+ * @param graph
+ *   Pointer to the graph object.
+ * @param node
+ *   Pointer to the node object.
+ * @param idx
+ *   Index at which the object enqueue starts from.
+ * @param space
+ *   Space required for the object enqueue.
+ */
+static __rte_always_inline void
+__rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
+			    const uint16_t idx, const uint16_t space)
+{
+
+	/* Add to the pending stream list if the node is new */
+	if (idx == 0)
+		__rte_node_enqueue_tail_update(graph, node);
+
+	if (unlikely(node->size < (idx + space)))
+		__rte_node_stream_alloc(graph, node);
+}
+
+/**
+ * @internal
+ *
+ * Get the node pointer from current node edge id.
+ *
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Edge id of the required node.
+ *
+ * @return
+ *   Pointer to the node denoted by the edge id.
+ */
+static __rte_always_inline struct rte_node *
+__rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
+{
+	RTE_ASSERT(next < node->nb_edges);
+	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+	node = node->nodes[next];
+	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+
+	return node;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue the objs to next node for further processing and set
+ * the next node to pending state in the circular buffer.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param objs
+ *   Objs to enqueue.
+ * @param nb_objs
+ *   Number of objs to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
+		 rte_edge_t next, void **objs, uint16_t nb_objs)
+{
+	node = __rte_node_next_node_get(node, next);
+	const uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
+
+	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
+	node->idx = idx + nb_objs;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue only one obj to next node for further processing and
+ * set the next node to pending state in the circular buffer.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param obj
+ *   Obj to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
+		    rte_edge_t next, void *obj)
+{
+	node = __rte_node_next_node_get(node, next);
+	uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, 1);
+
+	node->objs[idx++] = obj;
+	node->idx = idx;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue only two objs to next node for further processing and
+ * set the next node to pending state in the circular buffer.
+ * Same as rte_node_enqueue_x1 but enqueue two objs.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param obj0
+ *   Obj to enqueue.
+ * @param obj1
+ *   Obj to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
+		    rte_edge_t next, void *obj0, void *obj1)
+{
+	node = __rte_node_next_node_get(node, next);
+	uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, 2);
+
+	node->objs[idx++] = obj0;
+	node->objs[idx++] = obj1;
+	node->idx = idx;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue only four objs to next node for further processing and
+ * set the next node to pending state in the circular buffer.
+ * Same as rte_node_enqueue_x1 but enqueue four objs.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param obj0
+ *   1st obj to enqueue.
+ * @param obj1
+ *   2nd obj to enqueue.
+ * @param obj2
+ *   3rd obj to enqueue.
+ * @param obj3
+ *   4th obj to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
+		    rte_edge_t next, void *obj0, void *obj1, void *obj2,
+		    void *obj3)
+{
+	node = __rte_node_next_node_get(node, next);
+	uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, 4);
+
+	node->objs[idx++] = obj0;
+	node->objs[idx++] = obj1;
+	node->objs[idx++] = obj2;
+	node->objs[idx++] = obj3;
+	node->idx = idx;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue objs to multiple next nodes for further processing and
+ * set the next nodes to pending state in the circular buffer.
+ * objs[i] will be enqueued to nexts[i].
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param nexts
+ *   List of relative next node indices to enqueue objs.
+ * @param objs
+ *   List of objs to enqueue.
+ * @param nb_objs
+ *   Number of objs to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
+		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
+{
+	uint16_t i;
+
+	for (i = 0; i < nb_objs; i++)
+		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Get the stream of next node to enqueue the objs.
+ * Once done with the updating the objs, needs to call
+ * rte_node_next_stream_put to put the next node to pending state.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to get stream.
+ * @param nb_objs
+ *   Requested free size of the next stream.
+ *
+ * @return
+ *   Valid next stream on success.
+ *
+ * @see rte_node_next_stream_put().
+ */
+__rte_experimental
+static inline void **
+rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
+			 rte_edge_t next, uint16_t nb_objs)
+{
+	node = __rte_node_next_node_get(node, next);
+	const uint16_t idx = node->idx;
+	uint16_t free_space = node->size - idx;
+
+	if (unlikely(free_space < nb_objs))
+		__rte_node_stream_alloc_size(graph, node, nb_objs);
+
+	return &node->objs[idx];
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Put the next stream to pending state in the circular buffer
+ * for further processing. Should be invoked followed by
+ * rte_node_next_stream_get().
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index..
+ * @param idx
+ *   Number of objs updated in the stream after getting the stream using
+ *   rte_node_next_stream_get.
+ *
+ * @see rte_node_next_stream_get().
+ */
+__rte_experimental
+static inline void
+rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
+			 rte_edge_t next, uint16_t idx)
+{
+	if (unlikely(!idx))
+		return;
+
+	node = __rte_node_next_node_get(node, next);
+	if (node->idx == 0)
+		__rte_node_enqueue_tail_update(graph, node);
+
+	node->idx += idx;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Home run scenario, Enqueue all the objs of current node to next
+ * node in optimized way by swapping the streams of both nodes.
+ * Performs good when next node is already not in pending state.
+ * If next node is already in pending state then normal enqueue
+ * will be used.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param src
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index.
+ */
+__rte_experimental
+static inline void
+rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
+			  rte_edge_t next)
+{
+	struct rte_node *dst = __rte_node_next_node_get(src, next);
+
+	/* Let swap the pointers if dst don't have valid objs */
+	if (likely(dst->idx == 0)) {
+		void **dobjs = dst->objs;
+		uint16_t dsz = dst->size;
+		dst->objs = src->objs;
+		dst->size = src->size;
+		src->objs = dobjs;
+		src->size = dsz;
+		dst->idx = src->idx;
+		__rte_node_enqueue_tail_update(graph, dst);
+	} else { /* Move the objects from src node to dst node */
+		rte_node_enqueue(graph, src, next, src->objs, src->idx);
+	}
+}
+
 #ifdef __cplusplus
 }
 #endif