diff mbox series

[4/7] eventdev: add Rx adapter event vector support

Message ID 20210220220957.4583-5-pbhagavatula@marvell.com (mailing list archive)
State New
Delegated to: Jerin Jacob
Headers show
Series Introduce event vectorization | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Pavan Nikhilesh Bhagavatula Feb. 20, 2021, 10:09 p.m. UTC
From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Add event vector support for event eth Rx adapter, the implementation
creates vector flows based on port and queue identifier of the received
mbufs.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 lib/librte_eventdev/eventdev_pmd.h            |  31 +-
 .../rte_event_eth_rx_adapter.c                | 305 +++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c            |   6 +-
 3 files changed, 324 insertions(+), 18 deletions(-)
diff mbox series

Patch

diff --git a/lib/librte_eventdev/eventdev_pmd.h b/lib/librte_eventdev/eventdev_pmd.h
index 7eb9a7739..60bfaebc0 100644
--- a/lib/librte_eventdev/eventdev_pmd.h
+++ b/lib/librte_eventdev/eventdev_pmd.h
@@ -69,9 +69,10 @@  extern "C" {
 	} \
 } while (0)
 
-#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
-		((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
-			(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
+#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP                                        \
+	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |                     \
+	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |                         \
+	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
 
 #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
 		RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
@@ -645,6 +646,27 @@  typedef int (*eventdev_eth_rx_adapter_stats_reset)
  */
 typedef int (*eventdev_selftest)(void);
 
+struct rte_event_eth_rx_adapter_vector_limits;
+/**
+ * Get event vector limits for a given event, etherner device pair.
+ *
+ * @param dev
+ *   Event device pointer
+ *
+ * @param eth_dev
+ *   Ethernet device pointer
+ *
+ * @param[out] limits
+ *   Pointer to the limits structure to be filled.
+ *
+ * @return
+ *   - 0: Success.
+ *   - <0: Error code returned by the driver function.
+ */
+typedef int (*eventdev_eth_rx_adapter_vector_limits_get_t)(
+	const struct rte_eventdev *dev, const struct rte_eth_dev *eth_dev,
+	struct rte_event_eth_rx_adapter_vector_limits *limits);
+
 typedef uint32_t rte_event_pmd_selftest_seqn_t;
 extern int rte_event_pmd_selftest_seqn_dynfield_offset;
 
@@ -1067,6 +1089,9 @@  struct rte_eventdev_ops {
 	/**< Get ethernet Rx stats */
 	eventdev_eth_rx_adapter_stats_reset eth_rx_adapter_stats_reset;
 	/**< Reset ethernet Rx stats */
+	eventdev_eth_rx_adapter_vector_limits_get_t
+		eth_rx_adapter_vector_limits_get;
+	/**< Get event vector limits for the Rx adapter */
 
 	eventdev_timer_adapter_caps_get_t timer_adapter_caps_get;
 	/**< Get timer adapter capabilities */
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index d8c635e99..a1990637f 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -26,6 +26,10 @@ 
 #define BATCH_SIZE		32
 #define BLOCK_CNT_THRESHOLD	10
 #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
+#define MAX_VECTOR_SIZE		1024
+#define MIN_VECTOR_SIZE		4
+#define MAX_VECTOR_NS		1E9
+#define MIN_VECTOR_NS		1E5
 
 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
@@ -59,6 +63,20 @@  struct eth_rx_poll_entry {
 	uint16_t eth_rx_qid;
 };
 
+struct eth_rx_vector_data {
+	TAILQ_ENTRY(eth_rx_vector_data) next;
+	uint16_t port;
+	uint16_t queue;
+	uint16_t max_vector_count;
+	uint64_t event;
+	uint64_t ts;
+	uint64_t vector_timeout_ticks;
+	struct rte_mempool *vector_pool;
+	struct rte_event_vector *vector_ev;
+} __rte_cache_aligned;
+
+TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
+
 /* Instance per adapter */
 struct rte_eth_event_enqueue_buffer {
 	/* Count of events in this buffer */
@@ -92,6 +110,14 @@  struct rte_event_eth_rx_adapter {
 	uint32_t wrr_pos;
 	/* Event burst buffer */
 	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+	/* Vector enable flag */
+	uint8_t ena_vector;
+	/* Timestamp of previous vector expiry list traversal */
+	uint64_t prev_expiry_ts;
+	/* Minimum ticks to wait before traversing expiry list */
+	uint64_t vector_tmo_ticks;
+	/* vector list */
+	struct eth_rx_vector_data_list vector_list;
 	/* Per adapter stats */
 	struct rte_event_eth_rx_adapter_stats stats;
 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
@@ -198,9 +224,11 @@  struct eth_device_info {
 struct eth_rx_queue_info {
 	int queue_enabled;	/* True if added */
 	int intr_enabled;
+	uint8_t ena_vector;
 	uint16_t wt;		/* Polling weight */
 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
 	uint64_t event;
+	struct eth_rx_vector_data vector_data;
 };
 
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
@@ -722,6 +750,9 @@  rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	    &rx_adapter->event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
 
+	if (!buf->count)
+		return 0;
+
 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
 					rx_adapter->event_port_id,
 					buf->events,
@@ -742,6 +773,72 @@  rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	return n;
 }
 
+static inline uint16_t
+rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_rx_queue_info *queue_info,
+			struct rte_eth_event_enqueue_buffer *buf,
+			struct rte_mbuf **mbufs, uint16_t num)
+{
+	struct rte_event *ev = &buf->events[buf->count];
+	struct eth_rx_vector_data *vec;
+	uint16_t filled, space, sz;
+
+	filled = 0;
+	vec = &queue_info->vector_data;
+	while (num) {
+		if (vec->vector_ev == NULL) {
+			if (rte_mempool_get(vec->vector_pool,
+					    (void **)&vec->vector_ev) < 0) {
+				rte_pktmbuf_free_bulk(mbufs, num);
+				return 0;
+			}
+			vec->vector_ev->nb_elem = 0;
+			vec->vector_ev->port = vec->port;
+			vec->vector_ev->queue = vec->queue;
+			vec->vector_ev->attr_valid = true;
+			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+		} else if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+			/* Event ready. */
+			ev->event = vec->event;
+			ev->vec = vec->vector_ev;
+			ev++;
+			filled++;
+			vec->vector_ev = NULL;
+			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+			if (rte_mempool_get(vec->vector_pool,
+					    (void **)&vec->vector_ev) < 0) {
+				rte_pktmbuf_free_bulk(mbufs, num);
+				return 0;
+			}
+			vec->vector_ev->nb_elem = 0;
+			vec->vector_ev->port = vec->port;
+			vec->vector_ev->queue = vec->queue;
+			vec->vector_ev->attr_valid = true;
+			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+		}
+
+		space = vec->max_vector_count - vec->vector_ev->nb_elem;
+		sz = num > space ? space : num;
+		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
+		       sizeof(void *) * sz);
+		vec->vector_ev->nb_elem += sz;
+		num -= sz;
+		mbufs += sz;
+		vec->ts = rte_rdtsc();
+	}
+
+	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+		ev->event = vec->event;
+		ev->vec = vec->vector_ev;
+		ev++;
+		filled++;
+		vec->vector_ev = NULL;
+		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+	}
+
+	return filled;
+}
+
 static inline void
 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
@@ -770,25 +867,30 @@  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
 	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
 
-	for (i = 0; i < num; i++) {
-		m = mbufs[i];
-
-		rss = do_rss ?
-			rxa_do_softrss(m, rx_adapter->rss_key_be) :
-			m->hash.rss;
-		ev->event = event;
-		ev->flow_id = (rss & ~flow_id_mask) |
-				(ev->flow_id & flow_id_mask);
-		ev->mbuf = m;
-		ev++;
+	if (!eth_rx_queue_info->ena_vector) {
+		for (i = 0; i < num; i++) {
+			m = mbufs[i];
+
+			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
+				     : m->hash.rss;
+			ev->event = event;
+			ev->flow_id = (rss & ~flow_id_mask) |
+				      (ev->flow_id & flow_id_mask);
+			ev->mbuf = m;
+			ev++;
+		}
+	} else {
+		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
+					      buf, mbufs, num);
 	}
 
-	if (dev_info->cb_fn) {
+	if (num && dev_info->cb_fn) {
 
 		dropped = 0;
 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-					ETH_EVENT_BUFFER_SIZE, buf->count, ev,
-					num, dev_info->cb_arg, &dropped);
+					ETH_EVENT_BUFFER_SIZE, buf->count,
+					&buf->events[buf->count], num,
+					dev_info->cb_arg, &dropped);
 		if (unlikely(nb_cb > num))
 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
 				nb_cb, num);
@@ -1124,6 +1226,30 @@  rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 	return nb_rx;
 }
 
+static void
+rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = arg;
+	struct rte_eth_event_enqueue_buffer *buf =
+		&rx_adapter->event_enqueue_buffer;
+	struct rte_event *ev;
+
+	if (buf->count)
+		rxa_flush_event_buffer(rx_adapter);
+
+	if (vec->vector_ev->nb_elem == 0)
+		return;
+	ev = &buf->events[buf->count];
+
+	/* Event ready. */
+	ev->event = vec->event;
+	ev->vec = vec->vector_ev;
+	buf->count++;
+
+	vec->vector_ev = NULL;
+	vec->ts = 0;
+}
+
 static int
 rxa_service_func(void *args)
 {
@@ -1137,6 +1263,24 @@  rxa_service_func(void *args)
 		return 0;
 	}
 
+	if (rx_adapter->ena_vector) {
+		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
+		    rx_adapter->vector_tmo_ticks) {
+			struct eth_rx_vector_data *vec;
+
+			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
+
+				if (elapsed_time >= vec->vector_timeout_ticks) {
+					rxa_vector_expire(vec, rx_adapter);
+					TAILQ_REMOVE(&rx_adapter->vector_list,
+						     vec, next);
+				}
+			}
+			rx_adapter->prev_expiry_ts = rte_rdtsc();
+		}
+	}
+
 	stats = &rx_adapter->stats;
 	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
 	stats->rx_packets += rxa_poll(rx_adapter);
@@ -1640,6 +1784,28 @@  rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 }
 
+static void
+rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
+		    uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
+		    uint16_t port_id)
+{
+#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
+	struct eth_rx_vector_data *vector_data;
+	uint32_t flow_id;
+
+	vector_data = &queue_info->vector_data;
+	vector_data->max_vector_count = vector_count;
+	vector_data->port = port_id;
+	vector_data->queue = qid;
+	vector_data->vector_pool = mp;
+	vector_data->vector_timeout_ticks =
+		NSEC2TICK(vector_ns, rte_get_timer_hz());
+	vector_data->ts = 0;
+	flow_id = queue_info->event & 0xFFFFF;
+	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;
+	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
+}
+
 static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
@@ -1716,6 +1882,25 @@  rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	} else
 		qi_ev->flow_id = 0;
 
+	if (conf->rx_queue_flags &
+	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
+		queue_info->ena_vector = 1;
+		qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+		rxa_set_vector_data(queue_info, conf->vector_sz,
+				    conf->vector_timeout_ns, conf->vector_mp,
+				    rx_queue_id, dev_info->dev->data->port_id);
+		rx_adapter->ena_vector = 1;
+		rx_adapter->vector_tmo_ticks =
+			rx_adapter->vector_tmo_ticks
+				? RTE_MIN(queue_info->vector_data
+						  .vector_timeout_ticks,
+					  rx_adapter->vector_tmo_ticks)
+				: queue_info->vector_data.vector_timeout_ticks;
+		rx_adapter->vector_tmo_ticks <<= 1;
+		TAILQ_INIT(&rx_adapter->vector_list);
+		rx_adapter->prev_expiry_ts = 0;
+	}
+
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
 		rx_adapter->num_rx_polled += !pollq;
@@ -2054,6 +2239,7 @@  rte_event_eth_rx_adapter_queue_add(uint8_t id,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
 	struct eth_device_info *dev_info;
+	struct rte_event_eth_rx_adapter_vector_limits limits;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -2081,6 +2267,48 @@  rte_event_eth_rx_adapter_queue_add(uint8_t id,
 		return -EINVAL;
 	}
 
+	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
+	    (queue_conf->rx_queue_flags &
+	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
+		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
+				 eth_dev_id, id);
+		return -EINVAL;
+	}
+
+	if (queue_conf->rx_queue_flags &
+	    RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
+		ret = rte_event_eth_rx_adapter_vector_limits_get(
+			rx_adapter->eventdev_id, eth_dev_id, &limits);
+		if (ret < 0) {
+			RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
+					 " eth port: %" PRIu16
+					 " adapter id: %" PRIu8,
+					 eth_dev_id, id);
+			return -EINVAL;
+		}
+		if (queue_conf->vector_sz < limits.min_sz ||
+		    queue_conf->vector_sz > limits.max_sz ||
+		    queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
+		    queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
+		    queue_conf->vector_mp == NULL) {
+			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
+					 " eth port: %" PRIu16
+					 " adapter id: %" PRIu8,
+					 eth_dev_id, id);
+			return -EINVAL;
+		}
+		if (queue_conf->vector_mp->elt_size <
+		    (sizeof(struct rte_event_vector) +
+		     (sizeof(uintptr_t) * queue_conf->vector_sz))) {
+			RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
+					 " eth port: %" PRIu16
+					 " adapter id: %" PRIu8,
+					 eth_dev_id, id);
+			return -EINVAL;
+		}
+	}
+
 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
 		(rx_queue_id != -1)) {
 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
@@ -2143,6 +2371,17 @@  rte_event_eth_rx_adapter_queue_add(uint8_t id,
 	return 0;
 }
 
+static int
+rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+	limits->max_sz = MAX_VECTOR_SIZE;
+	limits->min_sz = MIN_VECTOR_SIZE;
+	limits->max_timeout_ns = MAX_VECTOR_NS;
+	limits->min_timeout_ns = MIN_VECTOR_NS;
+
+	return 0;
+}
+
 int
 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 				int32_t rx_queue_id)
@@ -2263,6 +2502,44 @@  rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 	return ret;
 }
 
+int
+rte_event_eth_rx_adapter_vector_limits_get(
+	uint8_t dev_id, uint16_t eth_port_id,
+	struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+	struct rte_eventdev *dev;
+	uint32_t cap;
+	int ret;
+
+	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
+
+	if (limits == NULL)
+		return -EINVAL;
+
+	dev = &rte_eventdevs[dev_id];
+
+	ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
+				 "eth port %" PRIu16,
+				 dev_id, eth_port_id);
+		return ret;
+	}
+
+	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
+		RTE_FUNC_PTR_OR_ERR_RET(
+			*dev->dev_ops->eth_rx_adapter_vector_limits_get,
+			-ENOTSUP);
+		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
+			dev, &rte_eth_devices[eth_port_id], limits);
+	} else {
+		ret = rxa_sw_vector_limits(limits);
+	}
+
+	return ret;
+}
+
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index b57363f80..2e6e367e0 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -122,7 +122,11 @@  rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->eth_rx_adapter_caps_get ?
 				(*dev->dev_ops->eth_rx_adapter_caps_get)(dev,