diff mbox series

[v1,1/2] eventdev/crypto_adapter: move crypto ops to circular buffer

Message ID 20211221113125.719189-1-ganapati.kundapura@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers show
Series [v1,1/2] eventdev/crypto_adapter: move crypto ops to circular buffer | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Kundapura, Ganapati Dec. 21, 2021, 11:31 a.m. UTC
Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
diff mbox series

Patch

diff --git a/lib/eventdev/rte_event_crypto_adapter.c b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..4469a89 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@ 
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+	/* index of head element in circular buffer */
+	uint16_t head;
+	/* index of tail element in circular buffer */
+	uint16_t tail;
+	/* number elements in buffer */
+	uint16_t count;
+	/* size of circular buffer */
+	uint16_t size;
+	/* Pointer to hold rte_crypto_ops for batching */
+	struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
 	/* Event device identifier */
 	uint8_t eventdev_id;
@@ -47,6 +63,8 @@  struct event_crypto_adapter {
 	struct crypto_device_info *cdevs;
 	/* Loop counter to flush crypto ops */
 	uint16_t transmit_loop_count;
+	/* Circular buffer for batching crypto ops to eventdev */
+	struct crypto_ops_circular_buffer ebuf;
 	/* Per instance stats structure */
 	struct rte_event_crypto_adapter_stats crypto_stats;
 	/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@  struct crypto_device_info {
 struct crypto_queue_pair_info {
 	/* Set to indicate queue pair is enabled */
 	bool qp_enabled;
-	/* Pointer to hold rte_crypto_ops for batching */
-	struct rte_crypto_op **op_buffer;
+	/* Circular buffer for batching crypto ops to cdev */
+	struct crypto_ops_circular_buffer cbuf;
 	/* No of crypto ops accumulated */
 	uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@  eca_init(void)
 	return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+	return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+	rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+			 struct crypto_ops_circular_buffer *bufp,
+			 uint16_t sz)
+{
+	bufp->op_buffer = rte_zmalloc(name,
+				      sizeof(struct rte_crypto_op *) * sz,
+				      0);
+	if (bufp->op_buffer == NULL)
+		return -ENOMEM;
+
+	bufp->size = sz;
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+			struct rte_crypto_op *op)
+{
+	uint16_t *tailp = &bufp->tail;
+
+	bufp->op_buffer[*tailp] = op;
+	*tailp = (*tailp + 1) % bufp->size;
+	bufp->count++;
+
+	return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+				  uint8_t cdev_id, uint16_t qp_id,
+				  uint16_t *nb_ops_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else {
+		*nb_ops_flushed = 0;
+		return 0;  /* buffer empty */
+	}
+
+	*nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+						      &ops[*headp], n);
+	bufp->count -= *nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+	return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@  rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 		return -ENOMEM;
 	}
 
+	if (eca_circular_buffer_init("eca_edev_circular_buffer",
+				     &adapter->ebuf,
+				     CRYPTO_ADAPTER_BUFFER_SZ)) {
+		RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+		rte_free(adapter);
+		return -ENOMEM;
+	}
+
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	if (ret < 0) {
 		RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
 				 dev_id, dev_info.driver_name);
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return ret;
 	}
@@ -259,6 +357,7 @@  rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
 					socket_id);
 	if (adapter->cdevs == NULL) {
 		RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+		eca_circular_buffer_free(&adapter->ebuf);
 		rte_free(adapter);
 		return -ENOMEM;
 	}
@@ -337,11 +436,10 @@  eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 	struct crypto_queue_pair_info *qp_info = NULL;
 	struct rte_crypto_op *crypto_op;
 	unsigned int i, n;
-	uint16_t qp_id, len, ret;
+	uint16_t qp_id, len, nb_enqueued = 0;
 	uint8_t cdev_id;
 
 	len = 0;
-	ret = 0;
 	n = 0;
 	stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@  eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
 				crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@  eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 				continue;
 			}
 			len = qp_info->len;
-			qp_info->op_buffer[len] = crypto_op;
+			eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
 			len++;
 		} else {
 			rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,16 @@  eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 			continue;
 		}
 
-		if (len == BATCH_SIZE) {
-			struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+		if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+			eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+							  cdev_id,
 							  qp_id,
-							  op_buffer,
-							  BATCH_SIZE);
+							  &nb_enqueued);
+			stats->crypto_enq_count += nb_enqueued;
 
-			stats->crypto_enq_count += ret;
-
-			while (ret < len) {
+			while (nb_enqueued < len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = qp_info->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
@@ -413,7 +509,8 @@  eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
 
 		if (qp_info)
 			qp_info->len = len;
-		n += ret;
+
+		n += nb_enqueued;
 	}
 
 	return n;
@@ -425,14 +522,12 @@  eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
-	struct rte_crypto_op **op_buffer;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp;
-	uint16_t ret;
+	uint16_t nb_enqueued = 0, nb = 0;
 	uint16_t num_cdev = rte_cryptodev_count();
 
-	ret = 0;
 	for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
 		curr_dev = &adapter->cdevs[cdev_id];
 		dev = curr_dev->dev;
@@ -444,25 +539,26 @@  eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
 			if (!curr_queue->qp_enabled)
 				continue;
 
-			op_buffer = curr_queue->op_buffer;
-			ret = rte_cryptodev_enqueue_burst(cdev_id,
+			eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+							  cdev_id,
 							  qp,
-							  op_buffer,
-							  curr_queue->len);
-			stats->crypto_enq_count += ret;
+							  &nb_enqueued);
+
+			stats->crypto_enq_count += nb_enqueued;
 
-			while (ret < curr_queue->len) {
+			while (nb_enqueued < curr_queue->len) {
 				struct rte_crypto_op *op;
-				op = op_buffer[ret++];
+				op = curr_queue->cbuf.op_buffer[nb_enqueued++];
 				stats->crypto_enq_fail++;
 				rte_pktmbuf_free(op->sym->m_src);
 				rte_crypto_op_free(op);
 			}
 			curr_queue->len = 0;
+			nb += nb_enqueued;
 		}
 	}
 
-	return ret;
+	return nb;
 }
 
 static int
@@ -499,9 +595,9 @@  eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
 	return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-		      struct rte_crypto_op **ops, uint16_t num)
+		  struct rte_crypto_op **ops, uint16_t num)
 {
 	struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
 	union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +614,8 @@  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	num = RTE_MIN(num, BATCH_SIZE);
 	for (i = 0; i < num; i++) {
 		struct rte_event *ev = &events[nb_ev++];
+
+		m_data = NULL;
 		if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
 			m_data = rte_cryptodev_sym_session_get_user_data(
 					ops[i]->sym->session);
@@ -548,6 +646,7 @@  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 						  event_port_id,
 						  &events[nb_enqueued],
 						  nb_ev - nb_enqueued);
+
 	} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
 		 nb_enqueued < nb_ev);
 
@@ -561,8 +660,52 @@  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
 	stats->event_enq_fail_count += nb_ev - nb_enqueued;
 	stats->event_enq_count += nb_enqueued;
 	stats->event_enq_retry_count += retry - 1;
+
+	return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+				   struct crypto_ops_circular_buffer *bufp)
+{
+	uint16_t n = 0, nb_ops_flushed;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_crypto_op **ops = bufp->op_buffer;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = bufp->size - *headp;
+	else
+		return 0;  /* buffer empty */
+
+	nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+	bufp->count -= nb_ops_flushed;
+	if (!bufp->count) {
+		*headp = 0;
+		*tailp = 0;
+	} else
+		*headp = (*headp + nb_ops_flushed) % bufp->size;
+
+	/* Stop further enqueue to eventdev, if nb_ops_flushed < n */
+	if (nb_ops_flushed < n)
+		return 0;
+
+	return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+	if (adapter->ebuf.count == 0)
+		return;
+
+	while (eca_circular_buffer_flush_to_evdev(adapter,
+						  &adapter->ebuf))
+		;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 			   unsigned int max_deq)
@@ -571,7 +714,7 @@  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	struct crypto_device_info *curr_dev;
 	struct crypto_queue_pair_info *curr_queue;
 	struct rte_crypto_op *ops[BATCH_SIZE];
-	uint16_t n, nb_deq;
+	uint16_t n, nb_deq, nb_enqueued, i;
 	struct rte_cryptodev *dev;
 	uint8_t cdev_id;
 	uint16_t qp, dev_qps;
@@ -579,16 +722,20 @@  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 	uint16_t num_cdev = rte_cryptodev_count();
 
 	nb_deq = 0;
+	eca_ops_buffer_flush(adapter);
+
 	do {
-		uint16_t queues = 0;
 		done = true;
 
 		for (cdev_id = adapter->next_cdev_id;
 			cdev_id < num_cdev; cdev_id++) {
+			uint16_t queues = 0;
+
 			curr_dev = &adapter->cdevs[cdev_id];
 			dev = curr_dev->dev;
 			if (dev == NULL)
 				continue;
+
 			dev_qps = dev->data->nb_queue_pairs;
 
 			for (qp = curr_dev->next_queue_pair_id;
@@ -605,8 +752,24 @@  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
 					continue;
 
 				done = false;
+				nb_enqueued = 0;
+
 				stats->crypto_deq_count += n;
-				eca_ops_enqueue_burst(adapter, ops, n);
+
+				if (unlikely(!adapter->ebuf.count))
+					nb_enqueued = eca_ops_enqueue_burst(
+							adapter, ops, n);
+
+				if (nb_enqueued == n)
+					goto check;
+
+				/* Failed to enqueue events case */
+				for (i = nb_enqueued; i < n; i++)
+					eca_circular_buffer_add(
+						&adapter->ebuf,
+						ops[nb_enqueued]);
+
+check:
 				nb_deq += n;
 
 				if (nb_deq > max_deq) {
@@ -751,11 +914,10 @@  eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
 			return -ENOMEM;
 
 		qpairs = dev_info->qpairs;
-		qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-					BATCH_SIZE *
-					sizeof(struct rte_crypto_op *),
-					0, adapter->socket_id);
-		if (!qpairs->op_buffer) {
+
+		if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+					     &qpairs->cbuf,
+					     CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
 			rte_free(qpairs);
 			return -ENOMEM;
 		}