From patchwork Tue Jul 18 10:35:38 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Rybalchenko, Kirill" X-Patchwork-Id: 27018 X-Patchwork-Delegate: pablo.de.lara.guarch@intel.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id E6AE23253; Tue, 18 Jul 2017 12:36:04 +0200 (CEST) Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 7DEE52FDD for ; Tue, 18 Jul 2017 12:36:01 +0200 (CEST) Received: from fmsmga006.fm.intel.com ([10.253.24.20]) by orsmga104.jf.intel.com with ESMTP; 18 Jul 2017 03:36:00 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.40,377,1496127600"; d="scan'208";a="128681270" Received: from silpixa00389036.ir.intel.com (HELO silpixa00389036.ger.corp.intel.com) ([10.237.223.231]) by fmsmga006.fm.intel.com with ESMTP; 18 Jul 2017 03:35:59 -0700 From: Kirill Rybalchenko To: roy.fan.zhang@intel.com, declan.doherty@intel.com Cc: dev@dpdk.org, Kirill Rybalchenko Date: Tue, 18 Jul 2017 11:35:38 +0100 Message-Id: <1500374138-6262-1-git-send-email-kirill.rybalchenko@intel.com> X-Mailer: git-send-email 2.5.5 Subject: [dpdk-dev] [PATCH v1] crypto/scheduler: fix multicore scheduler reordering X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Operations can be dequeued from the reordering ring only after they were dequeued from the crypto pmd with rte_cryptodev_dequeue_burst() function. It is not correct to dequeue them when status just changed from RTE_CRYPTO_OP_STATUS_NOT_PROCESSED to any other value, as the operations still can be processed by crypto pmd internally. Now multicore scheduler workers mark status of all dequeued from crypto pmd operations with CRYPTO_OP_STATUS_BIT_COMPLETE bit set. Scheduler will dequeue crypto operations from reordering ring only when this status bit is set. Prior to put this operation to output buffer, scheduler clears this bit, so the application gets unmodified status from crypto pmd. Fixes: 4c07e0552f0a ("crypto/scheduler: add multicore scheduling mode") Signed-off-by: Kirill Rybalchenko Acked-by: Declan Doherty --- drivers/crypto/scheduler/scheduler_multicore.c | 94 ++++++++++++++++++-------- 1 file changed, 67 insertions(+), 27 deletions(-) diff --git a/drivers/crypto/scheduler/scheduler_multicore.c b/drivers/crypto/scheduler/scheduler_multicore.c index bed9a8f..0cd5bce 100644 --- a/drivers/crypto/scheduler/scheduler_multicore.c +++ b/drivers/crypto/scheduler/scheduler_multicore.c @@ -42,6 +42,8 @@ #define MC_SCHED_BUFFER_SIZE 32 +#define CRYPTO_OP_STATUS_BIT_COMPLETE 0x80 + /** multi-core scheduler context */ struct mc_scheduler_ctx { uint32_t num_workers; /**< Number of workers polling */ @@ -136,10 +138,31 @@ static uint16_t schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) { - struct rte_ring *order_ring = - ((struct scheduler_qp_ctx *)qp)->order_ring; + struct rte_ring *order_ring = ((struct scheduler_qp_ctx *)qp)->order_ring; + struct rte_crypto_op *op; + uint32_t nb_objs = rte_ring_count(order_ring); + uint32_t nb_ops_to_deq = 0; + uint32_t nb_ops_deqd = 0; + + if (nb_objs > nb_ops) + nb_objs = nb_ops; + + while (nb_ops_to_deq < nb_objs) { + SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq, op); + + if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE)) + break; + + op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE; + nb_ops_to_deq++; + } + + if (nb_ops_to_deq) { + nb_ops_deqd = rte_ring_sc_dequeue_bulk(order_ring, + (void **)ops, nb_ops_to_deq, NULL); + } - return scheduler_order_drain(order_ring, ops, nb_ops); + return nb_ops_deqd; } static int @@ -169,9 +192,12 @@ mc_scheduler_worker(struct rte_cryptodev *dev) struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE]; struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE]; uint16_t processed_ops; - uint16_t left_op = 0; - uint16_t left_op_idx = 0; + uint16_t pending_enq_ops = 0; + uint16_t pending_enq_ops_idx = 0; + uint16_t pending_deq_ops = 0; + uint16_t pending_deq_ops_idx = 0; uint16_t inflight_ops = 0; + const uint8_t reordering_enabled = sched_ctx->reordering_enabled; for (i = 0; i < (int)sched_ctx->nb_wc; i++) { if (sched_ctx->wc_pool[i] == core_id) { @@ -189,37 +215,51 @@ mc_scheduler_worker(struct rte_cryptodev *dev) deq_ring = mc_ctx->sched_deq_ring[worker_idx]; while (!mc_ctx->stop_signal) { - if (left_op) { + if (pending_enq_ops) { processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id, - slave->qp_id, - &enq_ops[left_op_idx], left_op); - - left_op -= processed_ops; - left_op_idx += processed_ops; + slave->qp_id, &enq_ops[pending_enq_ops_idx], + pending_enq_ops); + pending_enq_ops -= processed_ops; + pending_enq_ops_idx += processed_ops; + inflight_ops += processed_ops; } else { - uint16_t nb_deq_ops = rte_ring_dequeue_burst(enq_ring, - (void *)enq_ops, MC_SCHED_BUFFER_SIZE, NULL); - if (nb_deq_ops) { - processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id, - slave->qp_id, enq_ops, nb_deq_ops); - - if (unlikely(processed_ops < nb_deq_ops)) { - left_op = nb_deq_ops - processed_ops; - left_op_idx = processed_ops; - } - - inflight_ops += processed_ops; + processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, + MC_SCHED_BUFFER_SIZE, NULL); + if (processed_ops) { + pending_enq_ops_idx = rte_cryptodev_enqueue_burst( + slave->dev_id, slave->qp_id, + enq_ops, processed_ops); + pending_enq_ops = processed_ops - pending_enq_ops_idx; + inflight_ops += pending_enq_ops_idx; } } - if (inflight_ops > 0) { + if (pending_deq_ops) { + processed_ops = rte_ring_enqueue_burst( + deq_ring, (void *)&deq_ops[pending_deq_ops_idx], + pending_deq_ops, NULL); + pending_deq_ops -= processed_ops; + pending_deq_ops_idx += processed_ops; + } else if (inflight_ops) { processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id, slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE); if (processed_ops) { - uint16_t nb_enq_ops = rte_ring_enqueue_burst(deq_ring, - (void *)deq_ops, processed_ops, NULL); - inflight_ops -= nb_enq_ops; + inflight_ops -= processed_ops; + if (reordering_enabled) { + uint16_t j; + + for (j = 0; j < processed_ops; j++) { + deq_ops[j]->status |= + CRYPTO_OP_STATUS_BIT_COMPLETE; + } + } else { + pending_deq_ops_idx = rte_ring_enqueue_burst( + deq_ring, (void *)deq_ops, processed_ops, + NULL); + pending_deq_ops = processed_ops - + pending_deq_ops_idx; + } } }