[v5,12/29] graph: implement fastpath API routines

Message ID 20200411141428.1987768-13-jerinj@marvell.com (mailing list archive)
State Accepted, archived
Delegated to: Thomas Monjalon
Headers
Series graph: introduce graph subsystem |

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Jerin Jacob Kollanukkaran April 11, 2020, 2:14 p.m. UTC
  From: Jerin Jacob <jerinj@marvell.com>

Adding implementation for rte_graph_walk() API. This will perform a walk
on the circular buffer and call the process function of each node
and collect the stats if stats collection is enabled.

Signed-off-by: Jerin Jacob <jerinj@marvell.com>
Signed-off-by: Kiran Kumar K <kirankumark@marvell.com>
Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
Signed-off-by: Nithin Dabilpuram <ndabilpuram@marvell.com>
---
 doc/api/doxy-api-index.md              |   1 +
 lib/librte_graph/graph.c               |  16 +
 lib/librte_graph/rte_graph_version.map |  10 +
 lib/librte_graph/rte_graph_worker.h    | 404 +++++++++++++++++++++++++
 4 files changed, 431 insertions(+)
  

Patch

diff --git a/doc/api/doxy-api-index.md b/doc/api/doxy-api-index.md
index 5cc50f750..fd2ff64d7 100644
--- a/doc/api/doxy-api-index.md
+++ b/doc/api/doxy-api-index.md
@@ -160,6 +160,7 @@  The public API headers are grouped by topics:
     [port_in_action]   (@ref rte_port_in_action.h)
     [table_action]     (@ref rte_table_action.h)
   * [graph]            (@ref rte_graph.h):
+    [graph_worker]     (@ref rte_graph_worker.h)
 
 - **basic**:
   [approx fraction]    (@ref rte_approx.h),
diff --git a/lib/librte_graph/graph.c b/lib/librte_graph/graph.c
index e811a7b38..0ea83df3d 100644
--- a/lib/librte_graph/graph.c
+++ b/lib/librte_graph/graph.c
@@ -473,6 +473,22 @@  __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node)
 	node->realloc_count++;
 }
 
+void __rte_noinline
+__rte_node_stream_alloc_size(struct rte_graph *graph, struct rte_node *node,
+			     uint16_t req_size)
+{
+	uint16_t size = node->size;
+
+	RTE_VERIFY(size != UINT16_MAX);
+	/* Allocate double amount of size to avoid immediate realloc */
+	size = RTE_MIN(UINT16_MAX, RTE_MAX(RTE_GRAPH_BURST_SIZE, req_size * 2));
+	node->objs = rte_realloc_socket(node->objs, size * sizeof(void *),
+					RTE_CACHE_LINE_SIZE, graph->socket);
+	RTE_VERIFY(node->objs);
+	node->size = size;
+	node->realloc_count++;
+}
+
 static int
 graph_to_dot(FILE *f, struct graph *graph)
 {
diff --git a/lib/librte_graph/rte_graph_version.map b/lib/librte_graph/rte_graph_version.map
index adf55d406..13b838752 100644
--- a/lib/librte_graph/rte_graph_version.map
+++ b/lib/librte_graph/rte_graph_version.map
@@ -3,6 +3,7 @@  EXPERIMENTAL {
 
 	__rte_node_register;
 	__rte_node_stream_alloc;
+	__rte_node_stream_alloc_size;
 
 	rte_graph_create;
 	rte_graph_destroy;
@@ -16,6 +17,7 @@  EXPERIMENTAL {
 	rte_graph_node_get;
 	rte_graph_node_get_by_name;
 	rte_graph_obj_dump;
+	rte_graph_walk;
 
 	rte_graph_cluster_stats_create;
 	rte_graph_cluster_stats_destroy;
@@ -28,10 +30,18 @@  EXPERIMENTAL {
 	rte_node_edge_get;
 	rte_node_edge_shrink;
 	rte_node_edge_update;
+	rte_node_enqueue;
+	rte_node_enqueue_x1;
+	rte_node_enqueue_x2;
+	rte_node_enqueue_x4;
+	rte_node_enqueue_next;
 	rte_node_from_name;
 	rte_node_id_to_name;
 	rte_node_list_dump;
 	rte_node_max_count;
+	rte_node_next_stream_get;
+	rte_node_next_stream_put;
+	rte_node_next_stream_move;
 
 	local: *;
 };
diff --git a/lib/librte_graph/rte_graph_worker.h b/lib/librte_graph/rte_graph_worker.h
index 42be8fd13..4c3ddcbde 100644
--- a/lib/librte_graph/rte_graph_worker.h
+++ b/lib/librte_graph/rte_graph_worker.h
@@ -99,6 +99,410 @@  struct rte_node {
 __rte_experimental
 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
 
+/**
+ * @internal
+ *
+ * Allocate a stream with requested number of objects.
+ *
+ * If stream already exists then re-allocate it to a larger size.
+ *
+ * @param graph
+ *   Pointer to the graph object.
+ * @param node
+ *   Pointer to the node object.
+ * @param req_size
+ *   Number of objects to be allocated.
+ */
+__rte_experimental
+void __rte_node_stream_alloc_size(struct rte_graph *graph,
+				  struct rte_node *node, uint16_t req_size);
+
+/**
+ * Perform graph walk on the circular buffer and invoke the process function
+ * of the nodes and collect the stats.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup function.
+ *
+ * @see rte_graph_lookup()
+ */
+__rte_experimental
+static inline void
+rte_graph_walk(struct rte_graph *graph)
+{
+	const rte_graph_off_t *cir_start = graph->cir_start;
+	const rte_node_t mask = graph->cir_mask;
+	uint32_t head = graph->head;
+	struct rte_node *node;
+	uint64_t start;
+	uint16_t rc;
+	void **objs;
+
+	/*
+	 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
+	 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
+	 * in a circular buffer fashion.
+	 *
+	 *	+-----+ <= cir_start - head [number of source nodes]
+	 *	|     |
+	 *	| ... | <= source nodes
+	 *	|     |
+	 *	+-----+ <= cir_start [head = 0] [tail = 0]
+	 *	|     |
+	 *	| ... | <= pending streams
+	 *	|     |
+	 *	+-----+ <= cir_start + mask
+	 */
+	while (likely(head != graph->tail)) {
+		node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
+		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+		objs = node->objs;
+		rte_prefetch0(objs);
+
+		if (rte_graph_has_stats_feature()) {
+			start = rte_rdtsc();
+			rc = node->process(graph, node, objs, node->idx);
+			node->total_cycles += rte_rdtsc() - start;
+			node->total_calls++;
+			node->total_objs += rc;
+		} else {
+			node->process(graph, node, objs, node->idx);
+		}
+		node->idx = 0;
+		head = likely((int32_t)head > 0) ? head & mask : head;
+	}
+	graph->tail = 0;
+}
+
+/* Fast path helper functions */
+
+/**
+ * @internal
+ *
+ * Enqueue a given node to the tail of the graph reel.
+ *
+ * @param graph
+ *   Pointer Graph object.
+ * @param node
+ *   Pointer to node object to be enqueued.
+ */
+static __rte_always_inline void
+__rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
+{
+	uint32_t tail;
+
+	tail = graph->tail;
+	graph->cir_start[tail++] = node->off;
+	graph->tail = tail & graph->cir_mask;
+}
+
+/**
+ * @internal
+ *
+ * Enqueue sequence prologue function.
+ *
+ * Updates the node to tail of graph reel and resizes the number of objects
+ * available in the stream as needed.
+ *
+ * @param graph
+ *   Pointer to the graph object.
+ * @param node
+ *   Pointer to the node object.
+ * @param idx
+ *   Index at which the object enqueue starts from.
+ * @param space
+ *   Space required for the object enqueue.
+ */
+static __rte_always_inline void
+__rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
+			    const uint16_t idx, const uint16_t space)
+{
+
+	/* Add to the pending stream list if the node is new */
+	if (idx == 0)
+		__rte_node_enqueue_tail_update(graph, node);
+
+	if (unlikely(node->size < (idx + space)))
+		__rte_node_stream_alloc(graph, node);
+}
+
+/**
+ * @internal
+ *
+ * Get the node pointer from current node edge id.
+ *
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Edge id of the required node.
+ *
+ * @return
+ *   Pointer to the node denoted by the edge id.
+ */
+static __rte_always_inline struct rte_node *
+__rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
+{
+	RTE_ASSERT(next < node->nb_edges);
+	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+	node = node->nodes[next];
+	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+
+	return node;
+}
+
+/**
+ * Enqueue the objs to next node for further processing and set
+ * the next node to pending state in the circular buffer.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param objs
+ *   Objs to enqueue.
+ * @param nb_objs
+ *   Number of objs to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
+		 rte_edge_t next, void **objs, uint16_t nb_objs)
+{
+	node = __rte_node_next_node_get(node, next);
+	const uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
+
+	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
+	node->idx = idx + nb_objs;
+}
+
+/**
+ * Enqueue only one obj to next node for further processing and
+ * set the next node to pending state in the circular buffer.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param obj
+ *   Obj to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
+		    rte_edge_t next, void *obj)
+{
+	node = __rte_node_next_node_get(node, next);
+	uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, 1);
+
+	node->objs[idx++] = obj;
+	node->idx = idx;
+}
+
+/**
+ * Enqueue only two objs to next node for further processing and
+ * set the next node to pending state in the circular buffer.
+ * Same as rte_node_enqueue_x1 but enqueue two objs.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param obj0
+ *   Obj to enqueue.
+ * @param obj1
+ *   Obj to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
+		    rte_edge_t next, void *obj0, void *obj1)
+{
+	node = __rte_node_next_node_get(node, next);
+	uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, 2);
+
+	node->objs[idx++] = obj0;
+	node->objs[idx++] = obj1;
+	node->idx = idx;
+}
+
+/**
+ * Enqueue only four objs to next node for further processing and
+ * set the next node to pending state in the circular buffer.
+ * Same as rte_node_enqueue_x1 but enqueue four objs.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to enqueue objs.
+ * @param obj0
+ *   1st obj to enqueue.
+ * @param obj1
+ *   2nd obj to enqueue.
+ * @param obj2
+ *   3rd obj to enqueue.
+ * @param obj3
+ *   4th obj to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
+		    rte_edge_t next, void *obj0, void *obj1, void *obj2,
+		    void *obj3)
+{
+	node = __rte_node_next_node_get(node, next);
+	uint16_t idx = node->idx;
+
+	__rte_node_enqueue_prologue(graph, node, idx, 4);
+
+	node->objs[idx++] = obj0;
+	node->objs[idx++] = obj1;
+	node->objs[idx++] = obj2;
+	node->objs[idx++] = obj3;
+	node->idx = idx;
+}
+
+/**
+ * Enqueue objs to multiple next nodes for further processing and
+ * set the next nodes to pending state in the circular buffer.
+ * objs[i] will be enqueued to nexts[i].
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param nexts
+ *   List of relative next node indices to enqueue objs.
+ * @param objs
+ *   List of objs to enqueue.
+ * @param nb_objs
+ *   Number of objs to enqueue.
+ */
+__rte_experimental
+static inline void
+rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
+		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
+{
+	uint16_t i;
+
+	for (i = 0; i < nb_objs; i++)
+		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
+}
+
+/**
+ * Get the stream of next node to enqueue the objs.
+ * Once done with the updating the objs, needs to call
+ * rte_node_next_stream_put to put the next node to pending state.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index to get stream.
+ * @param nb_objs
+ *   Requested free size of the next stream.
+ *
+ * @return
+ *   Valid next stream on success.
+ *
+ * @see rte_node_next_stream_put().
+ */
+__rte_experimental
+static inline void **
+rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
+			 rte_edge_t next, uint16_t nb_objs)
+{
+	node = __rte_node_next_node_get(node, next);
+	const uint16_t idx = node->idx;
+	uint16_t free_space = node->size - idx;
+
+	if (unlikely(free_space < nb_objs))
+		__rte_node_stream_alloc_size(graph, node, nb_objs);
+
+	return &node->objs[idx];
+}
+
+/**
+ * Put the next stream to pending state in the circular buffer
+ * for further processing. Should be invoked after rte_node_next_stream_get().
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index..
+ * @param idx
+ *   Number of objs updated in the stream after getting the stream using
+ *   rte_node_next_stream_get.
+ *
+ * @see rte_node_next_stream_get().
+ */
+__rte_experimental
+static inline void
+rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
+			 rte_edge_t next, uint16_t idx)
+{
+	if (unlikely(!idx))
+		return;
+
+	node = __rte_node_next_node_get(node, next);
+	if (node->idx == 0)
+		__rte_node_enqueue_tail_update(graph, node);
+
+	node->idx += idx;
+}
+
+/**
+ * Home run scenario, Enqueue all the objs of current node to next
+ * node in optimized way by swapping the streams of both nodes.
+ * Performs good when next node is already not in pending state.
+ * If next node is already in pending state then normal enqueue
+ * will be used.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param src
+ *   Current node pointer.
+ * @param next
+ *   Relative next node index.
+ */
+__rte_experimental
+static inline void
+rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
+			  rte_edge_t next)
+{
+	struct rte_node *dst = __rte_node_next_node_get(src, next);
+
+	/* Let swap the pointers if dst don't have valid objs */
+	if (likely(dst->idx == 0)) {
+		void **dobjs = dst->objs;
+		uint16_t dsz = dst->size;
+		dst->objs = src->objs;
+		dst->size = src->size;
+		src->objs = dobjs;
+		src->size = dsz;
+		dst->idx = src->idx;
+		__rte_node_enqueue_tail_update(graph, dst);
+	} else { /* Move the objects from src node to dst node */
+		rte_node_enqueue(graph, src, next, src->objs, src->idx);
+	}
+}
+
 #ifdef __cplusplus
 }
 #endif