diff mbox series

[v5,10/16] dma/idxd: add data-path job completion functions

Message ID 20210917152437.3270330-11-kevin.laatz@intel.com (mailing list archive)
State Superseded
Delegated to: Thomas Monjalon
Headers show
Series add dmadev driver for idxd devices | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Kevin Laatz Sept. 17, 2021, 3:24 p.m. UTC
Add the data path functions for gathering completed operations.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
Reviewed-by: Conor Walsh <conor.walsh@intel.com>

---
v2:
   - fixed typo in docs
   - add completion status for invalid opcode
---
 doc/guides/dmadevs/idxd.rst      |  32 +++++
 drivers/dma/idxd/idxd_common.c   | 235 +++++++++++++++++++++++++++++++
 drivers/dma/idxd/idxd_internal.h |   5 +
 3 files changed, 272 insertions(+)

Comments

Bruce Richardson Sept. 20, 2021, 10:36 a.m. UTC | #1
On Fri, Sep 17, 2021 at 03:24:31PM +0000, Kevin Laatz wrote:
> Add the data path functions for gathering completed operations.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
> Reviewed-by: Conor Walsh <conor.walsh@intel.com>
> 
> ---
> v2:
>    - fixed typo in docs
>    - add completion status for invalid opcode
> ---
>  doc/guides/dmadevs/idxd.rst      |  32 +++++
>  drivers/dma/idxd/idxd_common.c   | 235 +++++++++++++++++++++++++++++++
>  drivers/dma/idxd/idxd_internal.h |   5 +
>  3 files changed, 272 insertions(+)
> 
> diff --git a/doc/guides/dmadevs/idxd.rst b/doc/guides/dmadevs/idxd.rst
> index 7835461a22..f942a8aa44 100644
> --- a/doc/guides/dmadevs/idxd.rst
> +++ b/doc/guides/dmadevs/idxd.rst
> @@ -209,6 +209,38 @@ device and start the hardware processing of them:
>     }
>     rte_dma_submit(dev_id, vchan);
>  
> +To retrieve information about completed copies, ``rte_dma_completed()`` and
> +``rte_dma_completed_status()`` APIs should be used. ``rte_dma_completed()``
> +will return the number of completed operations, along with the index of the last
> +successful completed operation and whether or not an error was encountered. If an
> +error was encountered, ``rte_dma_completed_status()`` must be used to kick the
> +device off to continue processing operations and also to gather the status of each
> +individual operations which is filled in to the ``status`` array provided as
> +parameter by the application.
> +
> +The following status codes are supported by IDXD:
> +* ``RTE_DMA_STATUS_SUCCESSFUL``: The operation was successful.
> +* ``RTE_DMA_STATUS_INVALID_OPCODE``: The operation failed due to an invalid operation code.
> +* ``RTE_DMA_STATUS_INVALID_LENGTH``: The operation failed due to an invalid data length.
> +* ``RTE_DMA_STATUS_NOT_ATTEMPTED``: The operation was not attempted.
> +* ``RTE_DMA_STATUS_ERROR_UNKNOWN``: The operation failed due to an unspecified error.
> +
> +The following code shows how to retrieve the number of successfully completed
> +copies within a burst and then using ``rte_dma_completed_status()`` to check
> +which operation failed and kick off the device to continue processing operations:
> +
> +.. code-block:: C
> +
> +   enum rte_dma_status_code status[COMP_BURST_SZ];
> +   uint16_t count, idx, status_count;
> +   bool error = 0;
> +
> +   count = rte_dma_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error);
> +
> +   if (error){
> +      status_count = rte_dma_completed_status(dev_id, vchan, COMP_BURST_SZ, &idx, status);
> +   }
> +
As with some of the other documentation text, it should be checked for
overlap with the dmadev documentation, and merged with that if appropriate.

/Bruce
fengchengwen Sept. 22, 2021, 3:47 a.m. UTC | #2
On 2021/9/17 23:24, Kevin Laatz wrote:
> Add the data path functions for gathering completed operations.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
> Reviewed-by: Conor Walsh <conor.walsh@intel.com>
> 
> ---
> v2:
>    - fixed typo in docs
>    - add completion status for invalid opcode
> ---
>  doc/guides/dmadevs/idxd.rst      |  32 +++++
>  drivers/dma/idxd/idxd_common.c   | 235 +++++++++++++++++++++++++++++++
>  drivers/dma/idxd/idxd_internal.h |   5 +
>  3 files changed, 272 insertions(+)
> 
> diff --git a/doc/guides/dmadevs/idxd.rst b/doc/guides/dmadevs/idxd.rst
> index 7835461a22..f942a8aa44 100644
> --- a/doc/guides/dmadevs/idxd.rst
> +++ b/doc/guides/dmadevs/idxd.rst
> @@ -209,6 +209,38 @@ device and start the hardware processing of them:
>     }
>     rte_dma_submit(dev_id, vchan);
>  
> +To retrieve information about completed copies, ``rte_dma_completed()`` and
> +``rte_dma_completed_status()`` APIs should be used. ``rte_dma_completed()``
> +will return the number of completed operations, along with the index of the last
> +successful completed operation and whether or not an error was encountered. If an
> +error was encountered, ``rte_dma_completed_status()`` must be used to kick the
> +device off to continue processing operations and also to gather the status of each
> +individual operations which is filled in to the ``status`` array provided as
> +parameter by the application.
> +
> +The following status codes are supported by IDXD:
> +* ``RTE_DMA_STATUS_SUCCESSFUL``: The operation was successful.
> +* ``RTE_DMA_STATUS_INVALID_OPCODE``: The operation failed due to an invalid operation code.
> +* ``RTE_DMA_STATUS_INVALID_LENGTH``: The operation failed due to an invalid data length.
> +* ``RTE_DMA_STATUS_NOT_ATTEMPTED``: The operation was not attempted.
> +* ``RTE_DMA_STATUS_ERROR_UNKNOWN``: The operation failed due to an unspecified error.
> +
> +The following code shows how to retrieve the number of successfully completed
> +copies within a burst and then using ``rte_dma_completed_status()`` to check
> +which operation failed and kick off the device to continue processing operations:
> +
> +.. code-block:: C
> +
> +   enum rte_dma_status_code status[COMP_BURST_SZ];
> +   uint16_t count, idx, status_count;
> +   bool error = 0;
> +
> +   count = rte_dma_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error);
> +
> +   if (error){
> +      status_count = rte_dma_completed_status(dev_id, vchan, COMP_BURST_SZ, &idx, status);
> +   }

Because it's a sequential scan to get the completion status, so the count may >0 even error was true. e.g.
   request 1 successful
   request 2 successful
   request 3 fail
so the rte_dma_completed(dev_id, vchan, 3, &idx, &error) will return 2 and error was mark with true.
If return 0 and error with true in above situation, this means that the driver must scans all completions first. which
will lead to low performance.

Therefore, the recommended handling method is as follows:
  count = rte_dma_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error);
  if (likely(count > 0)) {
    // process success completed
  }
  if (unlikely(error)) {
    //
  }

> +
>  Filling an Area of Memory
>  ~~~~~~~~~~~~~~~~~~~~~~~~~~
>  
> diff --git a/drivers/dma/idxd/idxd_common.c b/drivers/dma/idxd/idxd_common.c
> index b01edeab07..a061a956c2 100644
> --- a/drivers/dma/idxd/idxd_common.c
> +++ b/drivers/dma/idxd/idxd_common.c
> @@ -140,6 +140,241 @@ idxd_submit(struct rte_dma_dev *dev, uint16_t qid __rte_unused)
>  	return 0;
>  }
>  
> +static enum rte_dma_status_code
> +get_comp_status(struct idxd_completion *c)
> +{
> +	uint8_t st = c->status;
> +	switch (st) {
> +	/* successful descriptors are not written back normally */
> +	case IDXD_COMP_STATUS_INCOMPLETE:
> +	case IDXD_COMP_STATUS_SUCCESS:
> +		return RTE_DMA_STATUS_SUCCESSFUL;
> +	case IDXD_COMP_STATUS_INVALID_OPCODE:
> +		return RTE_DMA_STATUS_INVALID_OPCODE;
> +	case IDXD_COMP_STATUS_INVALID_SIZE:
> +		return RTE_DMA_STATUS_INVALID_LENGTH;
> +	case IDXD_COMP_STATUS_SKIPPED:
> +		return RTE_DMA_STATUS_NOT_ATTEMPTED;
> +	default:
> +		return RTE_DMA_STATUS_ERROR_UNKNOWN;
> +	}
> +}
> +
> +static __rte_always_inline int
> +batch_ok(struct idxd_dmadev *idxd, uint8_t max_ops, enum rte_dma_status_code *status)

uint8_t max_ops -> uint16_t max_ops ?

> +{
> +	uint16_t ret;
> +	uint8_t bstatus;
> +
> +	if (max_ops == 0)
> +		return 0;
> +
> +	/* first check if there are any unreturned handles from last time */
> +	if (idxd->ids_avail != idxd->ids_returned) {
> +		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
> +		idxd->ids_returned += ret;
> +		if (status)
> +			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
> +		return ret;
> +	}
> +
> +	if (idxd->batch_idx_read == idxd->batch_idx_write)
> +		return 0;
> +
> +	bstatus = idxd->batch_comp_ring[idxd->batch_idx_read].status;
> +	/* now check if next batch is complete and successful */
> +	if (bstatus == IDXD_COMP_STATUS_SUCCESS) {
> +		/* since the batch idx ring stores the start of each batch, pre-increment to lookup
> +		 * start of next batch.
> +		 */
> +		if (++idxd->batch_idx_read > idxd->max_batches)
> +			idxd->batch_idx_read = 0;
> +		idxd->ids_avail = idxd->batch_idx_ring[idxd->batch_idx_read];
> +
> +		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
> +		idxd->ids_returned += ret;
> +		if (status)
> +			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
> +		return ret;
> +	}
> +	/* check if batch is incomplete */
> +	else if (bstatus == IDXD_COMP_STATUS_INCOMPLETE)
> +		return 0;
> +
> +	return -1; /* error case */
> +}
> +
> +static inline uint16_t
> +batch_completed(struct idxd_dmadev *idxd, uint8_t max_ops, bool *has_error)

uint8_t max_ops -> uint16_t max_ops ?

> +{
> +	uint16_t i;
> +	uint16_t b_start, b_end, next_batch;
> +
> +	int ret = batch_ok(idxd, max_ops, NULL);
> +	if (ret >= 0)
> +		return ret;
> +
> +	/* ERROR case, not successful, not incomplete */
> +	/* Get the batch size, and special case size 1.
> +	 * once we identify the actual failure job, return other jobs, then update
> +	 * the batch ring indexes to make it look like the first job of the batch has failed.
> +	 * Subsequent calls here will always return zero packets, and the error must be cleared by
> +	 * calling the completed_status() function.
> +	 */
> +	next_batch = (idxd->batch_idx_read + 1);
> +	if (next_batch > idxd->max_batches)
> +		next_batch = 0;
> +	b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
> +	b_end = idxd->batch_idx_ring[next_batch];
> +
> +	if (b_end - b_start == 1) { /* not a batch */
> +		*has_error = true;
> +		return 0;
> +	}
> +
> +	for (i = b_start; i < b_end; i++) {
> +		struct idxd_completion *c = (void *)&idxd->desc_ring[i & idxd->desc_ring_mask];
> +		if (c->status > IDXD_COMP_STATUS_SUCCESS) /* ignore incomplete(0) and success(1) */
> +			break;
> +	}
> +	ret = RTE_MIN((uint16_t)(i - idxd->ids_returned), max_ops);
> +	if (ret < max_ops)
> +		*has_error = true; /* we got up to the point of error */
> +	idxd->ids_avail = idxd->ids_returned += ret;
> +
> +	/* to ensure we can call twice and just return 0, set start of batch to where we finished */
> +	idxd->batch_comp_ring[idxd->batch_idx_read].completed_size -= ret;
> +	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
> +	if (idxd->batch_idx_ring[next_batch] - idxd->batch_idx_ring[idxd->batch_idx_read] == 1) {
> +		/* copy over the descriptor status to the batch ring as if no batch */
> +		uint16_t d_idx = idxd->batch_idx_ring[idxd->batch_idx_read] & idxd->desc_ring_mask;
> +		struct idxd_completion *desc_comp = (void *)&idxd->desc_ring[d_idx];
> +		idxd->batch_comp_ring[idxd->batch_idx_read].status = desc_comp->status;
> +	}
> +
> +	return ret;
> +}
> +
> +static uint16_t
> +batch_completed_status(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
> +{
> +	uint16_t next_batch;
> +
> +	int ret = batch_ok(idxd, max_ops, status);
> +	if (ret >= 0)
> +		return ret;
> +
> +	/* ERROR case, not successful, not incomplete */
> +	/* Get the batch size, and special case size 1.
> +	 */
> +	next_batch = (idxd->batch_idx_read + 1);
> +	if (next_batch > idxd->max_batches)
> +		next_batch = 0;
> +	const uint16_t b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
> +	const uint16_t b_end = idxd->batch_idx_ring[next_batch];
> +	const uint16_t b_len = b_end - b_start;
> +	if (b_len == 1) {/* not a batch */
> +		*status = get_comp_status(&idxd->batch_comp_ring[idxd->batch_idx_read]);
> +		idxd->ids_avail++;
> +		idxd->ids_returned++;
> +		idxd->batch_idx_read = next_batch;
> +		return 1;
> +	}
> +
> +	/* not a single-element batch, need to process more.
> +	 * Scenarios:
> +	 * 1. max_ops >= batch_size - can fit everything, simple case
> +	 *   - loop through completed ops and then add on any not-attempted ones
> +	 * 2. max_ops < batch_size - can't fit everything, more complex case
> +	 *   - loop through completed/incomplete and stop when hit max_ops
> +	 *   - adjust the batch descriptor to update where we stopped, with appropriate bcount
> +	 *   - if bcount is to be exactly 1, update the batch descriptor as it will be treated as
> +	 *     non-batch next time.
> +	 */
> +	const uint16_t bcount = idxd->batch_comp_ring[idxd->batch_idx_read].completed_size;
> +	for (ret = 0; ret < b_len && ret < max_ops; ret++) {
> +		struct idxd_completion *c = (void *)
> +				&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
> +		status[ret] = (ret < bcount) ? get_comp_status(c) : RTE_DMA_STATUS_NOT_ATTEMPTED;
> +	}
> +	idxd->ids_avail = idxd->ids_returned += ret;
> +
> +	/* everything fit */
> +	if (ret == b_len) {
> +		idxd->batch_idx_read = next_batch;
> +		return ret;
> +	}
> +
> +	/* set up for next time, update existing batch descriptor & start idx at batch_idx_read */
> +	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
> +	if (ret > bcount) {
> +		/* we have only incomplete ones - set batch completed size to 0 */
> +		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
> +		comp->completed_size = 0;
> +		/* if there is only one descriptor left, job skipped so set flag appropriately */
> +		if (b_len - ret == 1)
> +			comp->status = IDXD_COMP_STATUS_SKIPPED;
> +	} else {
> +		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
> +		comp->completed_size -= ret;
> +		/* if there is only one descriptor left, copy status info straight to desc */
> +		if (comp->completed_size == 1) {
> +			struct idxd_completion *c = (void *)
> +					&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
> +			comp->status = c->status;
> +			/* individual descs can be ok without writeback, but not batches */
> +			if (comp->status == IDXD_COMP_STATUS_INCOMPLETE)
> +				comp->status = IDXD_COMP_STATUS_SUCCESS;
> +		} else if (bcount == b_len) {
> +			/* check if we still have an error, and clear flag if not */
> +			uint16_t i;
> +			for (i = b_start + ret; i < b_end; i++) {
> +				struct idxd_completion *c = (void *)
> +						&idxd->desc_ring[i & idxd->desc_ring_mask];
> +				if (c->status > IDXD_COMP_STATUS_SUCCESS)
> +					break;
> +			}
> +			if (i == b_end) /* no errors */
> +				comp->status = IDXD_COMP_STATUS_SUCCESS;
> +		}
> +	}
> +
> +	return ret;
> +}
> +
> +uint16_t
> +idxd_completed(struct rte_dma_dev *dev, uint16_t qid __rte_unused, uint16_t max_ops,
> +		uint16_t *last_idx, bool *has_error)
> +{
> +	struct idxd_dmadev *idxd = dev->dev_private;
> +	uint16_t batch, ret = 0;
> +
> +	do {
> +		batch = batch_completed(idxd, max_ops - ret, has_error);
> +		ret += batch;
> +	} while (batch > 0 && *has_error == false);
> +
> +	*last_idx = idxd->ids_returned - 1;
> +	return ret;
> +}
> +
> +uint16_t
> +idxd_completed_status(struct rte_dma_dev *dev, uint16_t qid __rte_unused, uint16_t max_ops,
> +		uint16_t *last_idx, enum rte_dma_status_code *status)
> +{
> +	struct idxd_dmadev *idxd = dev->dev_private;
> +
> +	uint16_t batch, ret = 0;
> +
> +	do {
> +		batch = batch_completed_status(idxd, max_ops - ret, &status[ret]);
> +		ret += batch;
> +	} while (batch > 0);
> +
> +	*last_idx = idxd->ids_returned - 1;
> +	return ret;
> +}
> +
>  int
>  idxd_dump(const struct rte_dma_dev *dev, FILE *f)
>  {
> diff --git a/drivers/dma/idxd/idxd_internal.h b/drivers/dma/idxd/idxd_internal.h
> index b66c2d0182..15115a0966 100644
> --- a/drivers/dma/idxd/idxd_internal.h
> +++ b/drivers/dma/idxd/idxd_internal.h
> @@ -93,5 +93,10 @@ int idxd_enqueue_copy(struct rte_dma_dev *dev, uint16_t qid, rte_iova_t src,
>  int idxd_enqueue_fill(struct rte_dma_dev *dev, uint16_t qid, uint64_t pattern,
>  		rte_iova_t dst, unsigned int length, uint64_t flags);
>  int idxd_submit(struct rte_dma_dev *dev, uint16_t qid);
> +uint16_t idxd_completed(struct rte_dma_dev *dev, uint16_t qid, uint16_t max_ops,
> +		uint16_t *last_idx, bool *has_error);
> +uint16_t idxd_completed_status(struct rte_dma_dev *dev, uint16_t qid __rte_unused,
> +		uint16_t max_ops, uint16_t *last_idx,
> +		enum rte_dma_status_code *status);
>  
>  #endif /* _IDXD_INTERNAL_H_ */
>
diff mbox series

Patch

diff --git a/doc/guides/dmadevs/idxd.rst b/doc/guides/dmadevs/idxd.rst
index 7835461a22..f942a8aa44 100644
--- a/doc/guides/dmadevs/idxd.rst
+++ b/doc/guides/dmadevs/idxd.rst
@@ -209,6 +209,38 @@  device and start the hardware processing of them:
    }
    rte_dma_submit(dev_id, vchan);
 
+To retrieve information about completed copies, ``rte_dma_completed()`` and
+``rte_dma_completed_status()`` APIs should be used. ``rte_dma_completed()``
+will return the number of completed operations, along with the index of the last
+successful completed operation and whether or not an error was encountered. If an
+error was encountered, ``rte_dma_completed_status()`` must be used to kick the
+device off to continue processing operations and also to gather the status of each
+individual operations which is filled in to the ``status`` array provided as
+parameter by the application.
+
+The following status codes are supported by IDXD:
+* ``RTE_DMA_STATUS_SUCCESSFUL``: The operation was successful.
+* ``RTE_DMA_STATUS_INVALID_OPCODE``: The operation failed due to an invalid operation code.
+* ``RTE_DMA_STATUS_INVALID_LENGTH``: The operation failed due to an invalid data length.
+* ``RTE_DMA_STATUS_NOT_ATTEMPTED``: The operation was not attempted.
+* ``RTE_DMA_STATUS_ERROR_UNKNOWN``: The operation failed due to an unspecified error.
+
+The following code shows how to retrieve the number of successfully completed
+copies within a burst and then using ``rte_dma_completed_status()`` to check
+which operation failed and kick off the device to continue processing operations:
+
+.. code-block:: C
+
+   enum rte_dma_status_code status[COMP_BURST_SZ];
+   uint16_t count, idx, status_count;
+   bool error = 0;
+
+   count = rte_dma_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error);
+
+   if (error){
+      status_count = rte_dma_completed_status(dev_id, vchan, COMP_BURST_SZ, &idx, status);
+   }
+
 Filling an Area of Memory
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/drivers/dma/idxd/idxd_common.c b/drivers/dma/idxd/idxd_common.c
index b01edeab07..a061a956c2 100644
--- a/drivers/dma/idxd/idxd_common.c
+++ b/drivers/dma/idxd/idxd_common.c
@@ -140,6 +140,241 @@  idxd_submit(struct rte_dma_dev *dev, uint16_t qid __rte_unused)
 	return 0;
 }
 
+static enum rte_dma_status_code
+get_comp_status(struct idxd_completion *c)
+{
+	uint8_t st = c->status;
+	switch (st) {
+	/* successful descriptors are not written back normally */
+	case IDXD_COMP_STATUS_INCOMPLETE:
+	case IDXD_COMP_STATUS_SUCCESS:
+		return RTE_DMA_STATUS_SUCCESSFUL;
+	case IDXD_COMP_STATUS_INVALID_OPCODE:
+		return RTE_DMA_STATUS_INVALID_OPCODE;
+	case IDXD_COMP_STATUS_INVALID_SIZE:
+		return RTE_DMA_STATUS_INVALID_LENGTH;
+	case IDXD_COMP_STATUS_SKIPPED:
+		return RTE_DMA_STATUS_NOT_ATTEMPTED;
+	default:
+		return RTE_DMA_STATUS_ERROR_UNKNOWN;
+	}
+}
+
+static __rte_always_inline int
+batch_ok(struct idxd_dmadev *idxd, uint8_t max_ops, enum rte_dma_status_code *status)
+{
+	uint16_t ret;
+	uint8_t bstatus;
+
+	if (max_ops == 0)
+		return 0;
+
+	/* first check if there are any unreturned handles from last time */
+	if (idxd->ids_avail != idxd->ids_returned) {
+		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
+		idxd->ids_returned += ret;
+		if (status)
+			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
+		return ret;
+	}
+
+	if (idxd->batch_idx_read == idxd->batch_idx_write)
+		return 0;
+
+	bstatus = idxd->batch_comp_ring[idxd->batch_idx_read].status;
+	/* now check if next batch is complete and successful */
+	if (bstatus == IDXD_COMP_STATUS_SUCCESS) {
+		/* since the batch idx ring stores the start of each batch, pre-increment to lookup
+		 * start of next batch.
+		 */
+		if (++idxd->batch_idx_read > idxd->max_batches)
+			idxd->batch_idx_read = 0;
+		idxd->ids_avail = idxd->batch_idx_ring[idxd->batch_idx_read];
+
+		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
+		idxd->ids_returned += ret;
+		if (status)
+			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
+		return ret;
+	}
+	/* check if batch is incomplete */
+	else if (bstatus == IDXD_COMP_STATUS_INCOMPLETE)
+		return 0;
+
+	return -1; /* error case */
+}
+
+static inline uint16_t
+batch_completed(struct idxd_dmadev *idxd, uint8_t max_ops, bool *has_error)
+{
+	uint16_t i;
+	uint16_t b_start, b_end, next_batch;
+
+	int ret = batch_ok(idxd, max_ops, NULL);
+	if (ret >= 0)
+		return ret;
+
+	/* ERROR case, not successful, not incomplete */
+	/* Get the batch size, and special case size 1.
+	 * once we identify the actual failure job, return other jobs, then update
+	 * the batch ring indexes to make it look like the first job of the batch has failed.
+	 * Subsequent calls here will always return zero packets, and the error must be cleared by
+	 * calling the completed_status() function.
+	 */
+	next_batch = (idxd->batch_idx_read + 1);
+	if (next_batch > idxd->max_batches)
+		next_batch = 0;
+	b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
+	b_end = idxd->batch_idx_ring[next_batch];
+
+	if (b_end - b_start == 1) { /* not a batch */
+		*has_error = true;
+		return 0;
+	}
+
+	for (i = b_start; i < b_end; i++) {
+		struct idxd_completion *c = (void *)&idxd->desc_ring[i & idxd->desc_ring_mask];
+		if (c->status > IDXD_COMP_STATUS_SUCCESS) /* ignore incomplete(0) and success(1) */
+			break;
+	}
+	ret = RTE_MIN((uint16_t)(i - idxd->ids_returned), max_ops);
+	if (ret < max_ops)
+		*has_error = true; /* we got up to the point of error */
+	idxd->ids_avail = idxd->ids_returned += ret;
+
+	/* to ensure we can call twice and just return 0, set start of batch to where we finished */
+	idxd->batch_comp_ring[idxd->batch_idx_read].completed_size -= ret;
+	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
+	if (idxd->batch_idx_ring[next_batch] - idxd->batch_idx_ring[idxd->batch_idx_read] == 1) {
+		/* copy over the descriptor status to the batch ring as if no batch */
+		uint16_t d_idx = idxd->batch_idx_ring[idxd->batch_idx_read] & idxd->desc_ring_mask;
+		struct idxd_completion *desc_comp = (void *)&idxd->desc_ring[d_idx];
+		idxd->batch_comp_ring[idxd->batch_idx_read].status = desc_comp->status;
+	}
+
+	return ret;
+}
+
+static uint16_t
+batch_completed_status(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
+{
+	uint16_t next_batch;
+
+	int ret = batch_ok(idxd, max_ops, status);
+	if (ret >= 0)
+		return ret;
+
+	/* ERROR case, not successful, not incomplete */
+	/* Get the batch size, and special case size 1.
+	 */
+	next_batch = (idxd->batch_idx_read + 1);
+	if (next_batch > idxd->max_batches)
+		next_batch = 0;
+	const uint16_t b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
+	const uint16_t b_end = idxd->batch_idx_ring[next_batch];
+	const uint16_t b_len = b_end - b_start;
+	if (b_len == 1) {/* not a batch */
+		*status = get_comp_status(&idxd->batch_comp_ring[idxd->batch_idx_read]);
+		idxd->ids_avail++;
+		idxd->ids_returned++;
+		idxd->batch_idx_read = next_batch;
+		return 1;
+	}
+
+	/* not a single-element batch, need to process more.
+	 * Scenarios:
+	 * 1. max_ops >= batch_size - can fit everything, simple case
+	 *   - loop through completed ops and then add on any not-attempted ones
+	 * 2. max_ops < batch_size - can't fit everything, more complex case
+	 *   - loop through completed/incomplete and stop when hit max_ops
+	 *   - adjust the batch descriptor to update where we stopped, with appropriate bcount
+	 *   - if bcount is to be exactly 1, update the batch descriptor as it will be treated as
+	 *     non-batch next time.
+	 */
+	const uint16_t bcount = idxd->batch_comp_ring[idxd->batch_idx_read].completed_size;
+	for (ret = 0; ret < b_len && ret < max_ops; ret++) {
+		struct idxd_completion *c = (void *)
+				&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
+		status[ret] = (ret < bcount) ? get_comp_status(c) : RTE_DMA_STATUS_NOT_ATTEMPTED;
+	}
+	idxd->ids_avail = idxd->ids_returned += ret;
+
+	/* everything fit */
+	if (ret == b_len) {
+		idxd->batch_idx_read = next_batch;
+		return ret;
+	}
+
+	/* set up for next time, update existing batch descriptor & start idx at batch_idx_read */
+	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
+	if (ret > bcount) {
+		/* we have only incomplete ones - set batch completed size to 0 */
+		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
+		comp->completed_size = 0;
+		/* if there is only one descriptor left, job skipped so set flag appropriately */
+		if (b_len - ret == 1)
+			comp->status = IDXD_COMP_STATUS_SKIPPED;
+	} else {
+		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
+		comp->completed_size -= ret;
+		/* if there is only one descriptor left, copy status info straight to desc */
+		if (comp->completed_size == 1) {
+			struct idxd_completion *c = (void *)
+					&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
+			comp->status = c->status;
+			/* individual descs can be ok without writeback, but not batches */
+			if (comp->status == IDXD_COMP_STATUS_INCOMPLETE)
+				comp->status = IDXD_COMP_STATUS_SUCCESS;
+		} else if (bcount == b_len) {
+			/* check if we still have an error, and clear flag if not */
+			uint16_t i;
+			for (i = b_start + ret; i < b_end; i++) {
+				struct idxd_completion *c = (void *)
+						&idxd->desc_ring[i & idxd->desc_ring_mask];
+				if (c->status > IDXD_COMP_STATUS_SUCCESS)
+					break;
+			}
+			if (i == b_end) /* no errors */
+				comp->status = IDXD_COMP_STATUS_SUCCESS;
+		}
+	}
+
+	return ret;
+}
+
+uint16_t
+idxd_completed(struct rte_dma_dev *dev, uint16_t qid __rte_unused, uint16_t max_ops,
+		uint16_t *last_idx, bool *has_error)
+{
+	struct idxd_dmadev *idxd = dev->dev_private;
+	uint16_t batch, ret = 0;
+
+	do {
+		batch = batch_completed(idxd, max_ops - ret, has_error);
+		ret += batch;
+	} while (batch > 0 && *has_error == false);
+
+	*last_idx = idxd->ids_returned - 1;
+	return ret;
+}
+
+uint16_t
+idxd_completed_status(struct rte_dma_dev *dev, uint16_t qid __rte_unused, uint16_t max_ops,
+		uint16_t *last_idx, enum rte_dma_status_code *status)
+{
+	struct idxd_dmadev *idxd = dev->dev_private;
+
+	uint16_t batch, ret = 0;
+
+	do {
+		batch = batch_completed_status(idxd, max_ops - ret, &status[ret]);
+		ret += batch;
+	} while (batch > 0);
+
+	*last_idx = idxd->ids_returned - 1;
+	return ret;
+}
+
 int
 idxd_dump(const struct rte_dma_dev *dev, FILE *f)
 {
diff --git a/drivers/dma/idxd/idxd_internal.h b/drivers/dma/idxd/idxd_internal.h
index b66c2d0182..15115a0966 100644
--- a/drivers/dma/idxd/idxd_internal.h
+++ b/drivers/dma/idxd/idxd_internal.h
@@ -93,5 +93,10 @@  int idxd_enqueue_copy(struct rte_dma_dev *dev, uint16_t qid, rte_iova_t src,
 int idxd_enqueue_fill(struct rte_dma_dev *dev, uint16_t qid, uint64_t pattern,
 		rte_iova_t dst, unsigned int length, uint64_t flags);
 int idxd_submit(struct rte_dma_dev *dev, uint16_t qid);
+uint16_t idxd_completed(struct rte_dma_dev *dev, uint16_t qid, uint16_t max_ops,
+		uint16_t *last_idx, bool *has_error);
+uint16_t idxd_completed_status(struct rte_dma_dev *dev, uint16_t qid __rte_unused,
+		uint16_t max_ops, uint16_t *last_idx,
+		enum rte_dma_status_code *status);
 
 #endif /* _IDXD_INTERNAL_H_ */