@@ -252,45 +252,51 @@ mana_ring_doorbell(void *db_page, enum gdma_queue_types queue_type,
/*
* Poll completion queue for completions.
*/
-int
-gdma_poll_completion_queue(struct mana_gdma_queue *cq, struct gdma_comp *comp)
+uint32_t
+gdma_poll_completion_queue(struct mana_gdma_queue *cq,
+ struct gdma_comp *gdma_comp, uint32_t max_comp)
{
struct gdma_hardware_completion_entry *cqe;
- uint32_t head = cq->head % cq->count;
uint32_t new_owner_bits, old_owner_bits;
uint32_t cqe_owner_bits;
+ uint32_t num_comp = 0;
struct gdma_hardware_completion_entry *buffer = cq->buffer;
- cqe = &buffer[head];
- new_owner_bits = (cq->head / cq->count) & COMPLETION_QUEUE_OWNER_MASK;
- old_owner_bits = (cq->head / cq->count - 1) &
- COMPLETION_QUEUE_OWNER_MASK;
- cqe_owner_bits = cqe->owner_bits;
+ while (num_comp < max_comp) {
+ cqe = &buffer[cq->head % cq->count];
+ new_owner_bits = (cq->head / cq->count) &
+ COMPLETION_QUEUE_OWNER_MASK;
+ old_owner_bits = (cq->head / cq->count - 1) &
+ COMPLETION_QUEUE_OWNER_MASK;
+ cqe_owner_bits = cqe->owner_bits;
+
+ DRV_LOG(DEBUG, "comp cqe bits 0x%x owner bits 0x%x",
+ cqe_owner_bits, old_owner_bits);
+
+ /* No new entry */
+ if (cqe_owner_bits == old_owner_bits)
+ break;
+
+ if (cqe_owner_bits != new_owner_bits) {
+ DRV_LOG(ERR, "CQ overflowed, ID %u cqe 0x%x new 0x%x",
+ cq->id, cqe_owner_bits, new_owner_bits);
+ break;
+ }
- DRV_LOG(DEBUG, "comp cqe bits 0x%x owner bits 0x%x",
- cqe_owner_bits, old_owner_bits);
+ gdma_comp[num_comp].cqe_data = cqe->dma_client_data;
+ num_comp++;
- if (cqe_owner_bits == old_owner_bits)
- return 0; /* No new entry */
+ cq->head++;
- if (cqe_owner_bits != new_owner_bits) {
- DRV_LOG(ERR, "CQ overflowed, ID %u cqe 0x%x new 0x%x",
- cq->id, cqe_owner_bits, new_owner_bits);
- return -1;
+ DRV_LOG(DEBUG, "comp new 0x%x old 0x%x cqe 0x%x wq %u sq %u head %u",
+ new_owner_bits, old_owner_bits, cqe_owner_bits,
+ cqe->wq_num, cqe->is_sq, cq->head);
}
- /* Ensure checking owner bits happens before reading from CQE */
+ /* Make sure the CQE owner bits are checked before we access the data
+ * in CQE
+ */
rte_rmb();
- comp->work_queue_number = cqe->wq_num;
- comp->send_work_queue = cqe->is_sq;
-
- memcpy(comp->completion_data, cqe->dma_client_data, GDMA_COMP_DATA_SIZE);
-
- cq->head++;
-
- DRV_LOG(DEBUG, "comp new 0x%x old 0x%x cqe 0x%x wq %u sq %u head %u",
- new_owner_bits, old_owner_bits, cqe_owner_bits,
- comp->work_queue_number, comp->send_work_queue, cq->head);
- return 1;
+ return num_comp;
}
@@ -487,6 +487,15 @@ mana_dev_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
goto fail;
}
+ txq->gdma_comp_buf = rte_malloc_socket("mana_txq_comp",
+ sizeof(*txq->gdma_comp_buf) * nb_desc,
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (!txq->gdma_comp_buf) {
+ DRV_LOG(ERR, "failed to allocate txq comp");
+ ret = -ENOMEM;
+ goto fail;
+ }
+
ret = mana_mr_btree_init(&txq->mr_btree,
MANA_MR_BTREE_PER_QUEUE_N, socket_id);
if (ret) {
@@ -506,6 +515,7 @@ mana_dev_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
return 0;
fail:
+ rte_free(txq->gdma_comp_buf);
rte_free(txq->desc_ring);
rte_free(txq);
return ret;
@@ -518,6 +528,7 @@ mana_dev_tx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
mana_mr_btree_free(&txq->mr_btree);
+ rte_free(txq->gdma_comp_buf);
rte_free(txq->desc_ring);
rte_free(txq);
}
@@ -557,6 +568,15 @@ mana_dev_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
rxq->desc_ring_head = 0;
rxq->desc_ring_tail = 0;
+ rxq->gdma_comp_buf = rte_malloc_socket("mana_rxq_comp",
+ sizeof(*rxq->gdma_comp_buf) * nb_desc,
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (!rxq->gdma_comp_buf) {
+ DRV_LOG(ERR, "failed to allocate rxq comp");
+ ret = -ENOMEM;
+ goto fail;
+ }
+
ret = mana_mr_btree_init(&rxq->mr_btree,
MANA_MR_BTREE_PER_QUEUE_N, socket_id);
if (ret) {
@@ -572,6 +592,7 @@ mana_dev_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
return 0;
fail:
+ rte_free(rxq->gdma_comp_buf);
rte_free(rxq->desc_ring);
rte_free(rxq);
return ret;
@@ -584,6 +605,7 @@ mana_dev_rx_queue_release(struct rte_eth_dev *dev, uint16_t qid)
mana_mr_btree_free(&rxq->mr_btree);
+ rte_free(rxq->gdma_comp_buf);
rte_free(rxq->desc_ring);
rte_free(rxq);
}
@@ -142,19 +142,6 @@ struct gdma_header {
#define COMPLETION_QUEUE_OWNER_MASK \
((1 << (COMPLETION_QUEUE_ENTRY_OWNER_BITS_SIZE)) - 1)
-struct gdma_comp {
- struct gdma_header gdma_header;
-
- /* Filled by GDMA core */
- uint32_t completion_data[GDMA_COMP_DATA_SIZE_IN_UINT32];
-
- /* Filled by GDMA core */
- uint32_t work_queue_number;
-
- /* Filled by GDMA core */
- bool send_work_queue;
-};
-
struct gdma_hardware_completion_entry {
char dma_client_data[GDMA_COMP_DATA_SIZE];
union {
@@ -391,6 +378,11 @@ struct mana_gdma_queue {
#define MANA_MR_BTREE_PER_QUEUE_N 64
+struct gdma_comp {
+ /* Filled by GDMA core */
+ char *cqe_data;
+};
+
struct mana_txq {
struct mana_priv *priv;
uint32_t num_desc;
@@ -399,6 +391,7 @@ struct mana_txq {
struct mana_gdma_queue gdma_sq;
struct mana_gdma_queue gdma_cq;
+ struct gdma_comp *gdma_comp_buf;
uint32_t tx_vp_offset;
@@ -433,6 +426,7 @@ struct mana_rxq {
struct mana_gdma_queue gdma_rq;
struct mana_gdma_queue gdma_cq;
+ struct gdma_comp *gdma_comp_buf;
struct mana_stats stats;
struct mana_mr_btree mr_btree;
@@ -473,8 +467,9 @@ uint16_t mana_rx_burst_removed(void *dpdk_rxq, struct rte_mbuf **pkts,
uint16_t mana_tx_burst_removed(void *dpdk_rxq, struct rte_mbuf **pkts,
uint16_t pkts_n);
-int gdma_poll_completion_queue(struct mana_gdma_queue *cq,
- struct gdma_comp *comp);
+uint32_t gdma_poll_completion_queue(struct mana_gdma_queue *cq,
+ struct gdma_comp *gdma_comp,
+ uint32_t max_comp);
int mana_start_rx_queues(struct rte_eth_dev *dev);
int mana_start_tx_queues(struct rte_eth_dev *dev);
@@ -383,24 +383,17 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, uint16_t pkts_n)
uint8_t wqe_posted = 0;
struct mana_rxq *rxq = dpdk_rxq;
struct mana_priv *priv = rxq->priv;
- struct gdma_comp comp;
struct rte_mbuf *mbuf;
int ret;
+ uint32_t num_pkts;
- while (pkt_received < pkts_n &&
- gdma_poll_completion_queue(&rxq->gdma_cq, &comp) == 1) {
- struct mana_rxq_desc *desc;
- struct mana_rx_comp_oob *oob =
- (struct mana_rx_comp_oob *)&comp.completion_data[0];
-
- if (comp.work_queue_number != rxq->gdma_rq.id) {
- DRV_LOG(ERR, "rxq comp id mismatch wqid=0x%x rcid=0x%x",
- comp.work_queue_number, rxq->gdma_rq.id);
- rxq->stats.errors++;
- break;
- }
+ num_pkts = gdma_poll_completion_queue(&rxq->gdma_cq, rxq->gdma_comp_buf, pkts_n);
+ for (uint32_t i = 0; i < num_pkts; i++) {
+ struct mana_rx_comp_oob *oob = (struct mana_rx_comp_oob *)
+ rxq->gdma_comp_buf[i].cqe_data;
+ struct mana_rxq_desc *desc =
+ &rxq->desc_ring[rxq->desc_ring_tail];
- desc = &rxq->desc_ring[rxq->desc_ring_tail];
rxq->gdma_rq.tail += desc->wqe_size_in_bu;
mbuf = desc->pkt;
@@ -170,17 +170,20 @@ mana_tx_burst(void *dpdk_txq, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
{
struct mana_txq *txq = dpdk_txq;
struct mana_priv *priv = txq->priv;
- struct gdma_comp comp;
int ret;
void *db_page;
uint16_t pkt_sent = 0;
+ uint32_t num_comp;
/* Process send completions from GDMA */
- while (gdma_poll_completion_queue(&txq->gdma_cq, &comp) == 1) {
+ num_comp = gdma_poll_completion_queue(&txq->gdma_cq,
+ txq->gdma_comp_buf, txq->num_desc);
+
+ for (uint32_t i = 0; i < num_comp; i++) {
struct mana_txq_desc *desc =
&txq->desc_ring[txq->desc_ring_tail];
- struct mana_tx_comp_oob *oob =
- (struct mana_tx_comp_oob *)&comp.completion_data[0];
+ struct mana_tx_comp_oob *oob = (struct mana_tx_comp_oob *)
+ txq->gdma_comp_buf[i].cqe_data;
if (oob->cqe_hdr.cqe_type != CQE_TX_OKAY) {
DRV_LOG(ERR,