[v2,2/3] net/mlx5: fix sync meter processing in HWS setup

Message ID 20240307101910.1135720-3-getelson@nvidia.com (mailing list archive)
State Accepted, archived
Delegated to: Raslan Darawsheh
Headers
Series net/mlx5: fix sync meter processing in HWS setup |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Gregory Etelson March 7, 2024, 10:19 a.m. UTC
  Synchronous calls for meter ASO try to pull pending completions
from CQ, submit WR and return to caller. That avoids delays between
WR post and  HW response.
If the template API was activated, PMD will use control queue for
sync operations.

PMD has different formats for the `user_data` context in sync and
async meter ASO calls.
PMD port destruction procedure submits async operations to the port
control queue and polls the queue CQs to clean HW responses.

Port destruction can pull a meter ASO completion from control CQ.
Such completion has sync format, but was processed by async handler.

The patch implements sync meter ASO interface with async calls
in the template API environment.

Fixes: 48fbb0e93d06 ("net/mlx5: support flow meter mark indirect action with HWS")

Cc: stable@dpdk.org

Signed-off-by: Gregory Etelson <getelson@nvidia.com>
Acked-by: Dariusz Sosnowski <dsosnowski@nvidia.com>
---
 drivers/net/mlx5/mlx5.h            |  35 +++++-
 drivers/net/mlx5/mlx5_flow_aso.c   | 178 ++++++++++++++++++-----------
 drivers/net/mlx5/mlx5_flow_hw.c    |  99 ++++++++--------
 drivers/net/mlx5/mlx5_flow_meter.c |  27 +++--
 4 files changed, 216 insertions(+), 123 deletions(-)
  

Comments

Raslan Darawsheh March 12, 2024, 8:03 a.m. UTC | #1
Hi,

> -----Original Message-----
> From: Gregory Etelson <getelson@nvidia.com>
> Sent: Thursday, March 7, 2024 12:19 PM
> To: dev@dpdk.org
> Cc: Gregory Etelson <getelson@nvidia.com>; Maayan Kashani
> <mkashani@nvidia.com>; Raslan Darawsheh <rasland@nvidia.com>;
> stable@dpdk.org; Dariusz Sosnowski <dsosnowski@nvidia.com>; Slava
> Ovsiienko <viacheslavo@nvidia.com>; Ori Kam <orika@nvidia.com>;
> Suanming Mou <suanmingm@nvidia.com>; Matan Azrad
> <matan@nvidia.com>; Alexander Kozyrev <akozyrev@nvidia.com>
> Subject: [PATCH v2 2/3] net/mlx5: fix sync meter processing in HWS setup
> 
> Synchronous calls for meter ASO try to pull pending completions from CQ,
> submit WR and return to caller. That avoids delays between WR post and  HW
> response.
> If the template API was activated, PMD will use control queue for sync
> operations.
> 
> PMD has different formats for the `user_data` context in sync and async meter
> ASO calls.
> PMD port destruction procedure submits async operations to the port control
> queue and polls the queue CQs to clean HW responses.
> 
> Port destruction can pull a meter ASO completion from control CQ.
> Such completion has sync format, but was processed by async handler.
> 
> The patch implements sync meter ASO interface with async calls in the
> template API environment.
> 
> Fixes: 48fbb0e93d06 ("net/mlx5: support flow meter mark indirect action
> with HWS")
> 
> Cc: stable@dpdk.org
> 
> Signed-off-by: Gregory Etelson <getelson@nvidia.com>
> Acked-by: Dariusz Sosnowski <dsosnowski@nvidia.com>
> ---
>  drivers/net/mlx5/mlx5.h            |  35 +++++-
>  drivers/net/mlx5/mlx5_flow_aso.c   | 178 ++++++++++++++++++-----------
>  drivers/net/mlx5/mlx5_flow_hw.c    |  99 ++++++++--------
>  drivers/net/mlx5/mlx5_flow_meter.c |  27 +++--
>  4 files changed, 216 insertions(+), 123 deletions(-)
Series applied to next-net-mlx,

Kindest regards
Raslan Darawsheh
  

Patch

diff --git a/drivers/net/mlx5/mlx5.h b/drivers/net/mlx5/mlx5.h
index 2fb3bb65cc..6ff8f322e0 100644
--- a/drivers/net/mlx5/mlx5.h
+++ b/drivers/net/mlx5/mlx5.h
@@ -2033,6 +2033,30 @@  enum dr_dump_rec_type {
 	DR_DUMP_REC_TYPE_PMD_COUNTER = 4430,
 };
 
+#if defined(HAVE_MLX5_HWS_SUPPORT)
+static __rte_always_inline struct mlx5_hw_q_job *
+flow_hw_job_get(struct mlx5_priv *priv, uint32_t queue)
+{
+	MLX5_ASSERT(priv->hw_q[queue].job_idx <= priv->hw_q[queue].size);
+	return priv->hw_q[queue].job_idx ?
+	       priv->hw_q[queue].job[--priv->hw_q[queue].job_idx] : NULL;
+}
+
+static __rte_always_inline void
+flow_hw_job_put(struct mlx5_priv *priv, struct mlx5_hw_q_job *job, uint32_t queue)
+{
+	MLX5_ASSERT(priv->hw_q[queue].job_idx < priv->hw_q[queue].size);
+	priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
+}
+
+struct mlx5_hw_q_job *
+mlx5_flow_action_job_init(struct mlx5_priv *priv, uint32_t queue,
+			  const struct rte_flow_action_handle *handle,
+			  void *user_data, void *query_data,
+			  enum mlx5_hw_job_type type,
+			  struct rte_flow_error *error);
+#endif
+
 /**
  * Indicates whether HW objects operations can be created by DevX.
  *
@@ -2443,11 +2467,12 @@  int mlx5_aso_flow_hit_queue_poll_start(struct mlx5_dev_ctx_shared *sh);
 int mlx5_aso_flow_hit_queue_poll_stop(struct mlx5_dev_ctx_shared *sh);
 void mlx5_aso_queue_uninit(struct mlx5_dev_ctx_shared *sh,
 			   enum mlx5_access_aso_opc_mod aso_opc_mod);
-int mlx5_aso_meter_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-		struct mlx5_aso_mtr *mtr, struct mlx5_mtr_bulk *bulk,
-		void *user_data, bool push);
-int mlx5_aso_mtr_wait(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-		struct mlx5_aso_mtr *mtr);
+int mlx5_aso_meter_update_by_wqe(struct mlx5_priv *priv, uint32_t queue,
+				 struct mlx5_aso_mtr *mtr,
+				 struct mlx5_mtr_bulk *bulk,
+				 struct mlx5_hw_q_job *job, bool push);
+int mlx5_aso_mtr_wait(struct mlx5_priv *priv,
+		      struct mlx5_aso_mtr *mtr, bool is_tmpl_api);
 int mlx5_aso_ct_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
 			      struct mlx5_aso_ct_action *ct,
 			      const struct rte_flow_action_conntrack *profile,
diff --git a/drivers/net/mlx5/mlx5_flow_aso.c b/drivers/net/mlx5/mlx5_flow_aso.c
index f311443472..ab9eb21e01 100644
--- a/drivers/net/mlx5/mlx5_flow_aso.c
+++ b/drivers/net/mlx5/mlx5_flow_aso.c
@@ -792,7 +792,7 @@  mlx5_aso_mtr_sq_enqueue_single(struct mlx5_dev_ctx_shared *sh,
 			       struct mlx5_aso_mtr *aso_mtr,
 			       struct mlx5_mtr_bulk *bulk,
 			       bool need_lock,
-			       void *user_data,
+			       struct mlx5_hw_q_job *job,
 			       bool push)
 {
 	volatile struct mlx5_aso_wqe *wqe = NULL;
@@ -819,7 +819,7 @@  mlx5_aso_mtr_sq_enqueue_single(struct mlx5_dev_ctx_shared *sh,
 	rte_prefetch0(&sq->sq_obj.aso_wqes[(sq->head + 1) & mask]);
 	/* Fill next WQE. */
 	fm = &aso_mtr->fm;
-	sq->elts[sq->head & mask].mtr = user_data ? user_data : aso_mtr;
+	sq->elts[sq->head & mask].user_data = job ? job : (void *)aso_mtr;
 	if (aso_mtr->type == ASO_METER_INDIRECT) {
 		if (likely(sh->config.dv_flow_en == 2))
 			pool = aso_mtr->pool;
@@ -897,24 +897,6 @@  mlx5_aso_mtr_sq_enqueue_single(struct mlx5_dev_ctx_shared *sh,
 	return 1;
 }
 
-static void
-mlx5_aso_mtrs_status_update(struct mlx5_aso_sq *sq, uint16_t aso_mtrs_nums)
-{
-	uint16_t size = 1 << sq->log_desc_n;
-	uint16_t mask = size - 1;
-	uint16_t i;
-	struct mlx5_aso_mtr *aso_mtr = NULL;
-	uint8_t exp_state = ASO_METER_WAIT;
-
-	for (i = 0; i < aso_mtrs_nums; ++i) {
-		aso_mtr = sq->elts[(sq->tail + i) & mask].mtr;
-		MLX5_ASSERT(aso_mtr);
-		(void)__atomic_compare_exchange_n(&aso_mtr->state,
-				&exp_state, ASO_METER_READY,
-				false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
-	}
-}
-
 static void
 mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 {
@@ -925,7 +907,7 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 	uint32_t idx;
 	uint32_t next_idx = cq->cq_ci & mask;
 	uint16_t max;
-	uint16_t n = 0;
+	uint16_t i, n = 0;
 	int ret;
 
 	if (need_lock)
@@ -957,7 +939,19 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 		cq->cq_ci++;
 	} while (1);
 	if (likely(n)) {
-		mlx5_aso_mtrs_status_update(sq, n);
+		uint8_t exp_state = ASO_METER_WAIT;
+		struct mlx5_aso_mtr *aso_mtr;
+		__rte_unused bool verdict;
+
+		for (i = 0; i < n; ++i) {
+			aso_mtr = sq->elts[(sq->tail + i) & mask].mtr;
+			MLX5_ASSERT(aso_mtr);
+			verdict = __atomic_compare_exchange_n(&aso_mtr->state,
+						    &exp_state, ASO_METER_READY,
+						    false, __ATOMIC_RELAXED,
+						    __ATOMIC_RELAXED);
+			MLX5_ASSERT(verdict);
+		}
 		sq->tail += n;
 		rte_io_wmb();
 		cq->cq_obj.db_rec[0] = rte_cpu_to_be_32(cq->cq_ci);
@@ -966,6 +960,82 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 		rte_spinlock_unlock(&sq->sqsl);
 }
 
+static __rte_always_inline struct mlx5_aso_sq *
+mlx5_aso_mtr_select_sq(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
+		       struct mlx5_aso_mtr *mtr, bool *need_lock)
+{
+	struct mlx5_aso_sq *sq;
+
+	if (likely(sh->config.dv_flow_en == 2) &&
+	    mtr->type == ASO_METER_INDIRECT) {
+		if (queue == MLX5_HW_INV_QUEUE) {
+			sq = &mtr->pool->sq[mtr->pool->nb_sq - 1];
+			*need_lock = true;
+		} else {
+			sq = &mtr->pool->sq[queue];
+			*need_lock = false;
+		}
+	} else {
+		sq = &sh->mtrmng->pools_mng.sq;
+		*need_lock = true;
+	}
+	return sq;
+}
+
+#if defined(HAVE_MLX5_HWS_SUPPORT)
+static void
+mlx5_aso_poll_cq_mtr_hws(struct mlx5_priv *priv, struct mlx5_aso_sq *sq)
+{
+#define MLX5_HWS_MTR_CMPL_NUM 4
+
+	int i, ret;
+	struct mlx5_aso_mtr *mtr;
+	uint8_t exp_state = ASO_METER_WAIT;
+	struct rte_flow_op_result res[MLX5_HWS_MTR_CMPL_NUM];
+	__rte_unused bool verdict;
+
+	rte_spinlock_lock(&sq->sqsl);
+repeat:
+	ret = mlx5_aso_pull_completion(sq, res, MLX5_HWS_MTR_CMPL_NUM);
+	if (ret) {
+		for (i = 0; i < ret; i++) {
+			struct mlx5_hw_q_job *job = res[i].user_data;
+
+			MLX5_ASSERT(job);
+			mtr = mlx5_ipool_get(priv->hws_mpool->idx_pool,
+					     MLX5_INDIRECT_ACTION_IDX_GET(job->action));
+			MLX5_ASSERT(mtr);
+			verdict = __atomic_compare_exchange_n(&mtr->state,
+						    &exp_state, ASO_METER_READY,
+						    false, __ATOMIC_RELAXED,
+						    __ATOMIC_RELAXED);
+			MLX5_ASSERT(verdict);
+			flow_hw_job_put(priv, job, CTRL_QUEUE_ID(priv));
+		}
+		if (ret == MLX5_HWS_MTR_CMPL_NUM)
+			goto repeat;
+	}
+	rte_spinlock_unlock(&sq->sqsl);
+
+#undef MLX5_HWS_MTR_CMPL_NUM
+}
+#else
+static void
+mlx5_aso_poll_cq_mtr_hws(__rte_unused struct mlx5_priv *priv, __rte_unused struct mlx5_aso_sq *sq)
+{
+	MLX5_ASSERT(false);
+}
+#endif
+
+static void
+mlx5_aso_poll_cq_mtr_sws(__rte_unused struct mlx5_priv *priv,
+			 struct mlx5_aso_sq *sq)
+{
+	mlx5_aso_mtr_completion_handle(sq, true);
+}
+
+typedef void (*poll_cq_t)(struct mlx5_priv *, struct mlx5_aso_sq *);
+
 /**
  * Update meter parameter by send WQE.
  *
@@ -980,39 +1050,29 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
  *   0 on success, a negative errno value otherwise and rte_errno is set.
  */
 int
-mlx5_aso_meter_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-			struct mlx5_aso_mtr *mtr,
-			struct mlx5_mtr_bulk *bulk,
-			void *user_data,
-			bool push)
+mlx5_aso_meter_update_by_wqe(struct mlx5_priv *priv, uint32_t queue,
+			     struct mlx5_aso_mtr *mtr,
+			     struct mlx5_mtr_bulk *bulk,
+			     struct mlx5_hw_q_job *job, bool push)
 {
-	struct mlx5_aso_sq *sq;
-	uint32_t poll_wqe_times = MLX5_MTR_POLL_WQE_CQE_TIMES;
 	bool need_lock;
+	struct mlx5_dev_ctx_shared *sh = priv->sh;
+	struct mlx5_aso_sq *sq =
+		mlx5_aso_mtr_select_sq(sh, queue, mtr, &need_lock);
+	uint32_t poll_wqe_times = MLX5_MTR_POLL_WQE_CQE_TIMES;
+	poll_cq_t poll_mtr_cq =
+		job ? mlx5_aso_poll_cq_mtr_hws : mlx5_aso_poll_cq_mtr_sws;
 	int ret;
 
-	if (likely(sh->config.dv_flow_en == 2) &&
-	    mtr->type == ASO_METER_INDIRECT) {
-		if (queue == MLX5_HW_INV_QUEUE) {
-			sq = &mtr->pool->sq[mtr->pool->nb_sq - 1];
-			need_lock = true;
-		} else {
-			sq = &mtr->pool->sq[queue];
-			need_lock = false;
-		}
-	} else {
-		sq = &sh->mtrmng->pools_mng.sq;
-		need_lock = true;
-	}
 	if (queue != MLX5_HW_INV_QUEUE) {
 		ret = mlx5_aso_mtr_sq_enqueue_single(sh, sq, mtr, bulk,
-						     need_lock, user_data, push);
+						     need_lock, job, push);
 		return ret > 0 ? 0 : -1;
 	}
 	do {
-		mlx5_aso_mtr_completion_handle(sq, need_lock);
+		poll_mtr_cq(priv, sq);
 		if (mlx5_aso_mtr_sq_enqueue_single(sh, sq, mtr, bulk,
-						   need_lock, NULL, true))
+						   need_lock, job, true))
 			return 0;
 		/* Waiting for wqe resource. */
 		rte_delay_us_sleep(MLX5_ASO_WQE_CQE_RESPONSE_DELAY);
@@ -1036,32 +1096,22 @@  mlx5_aso_meter_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
  *   0 on success, a negative errno value otherwise and rte_errno is set.
  */
 int
-mlx5_aso_mtr_wait(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-			struct mlx5_aso_mtr *mtr)
+mlx5_aso_mtr_wait(struct mlx5_priv *priv,
+		  struct mlx5_aso_mtr *mtr, bool is_tmpl_api)
 {
+	bool need_lock;
 	struct mlx5_aso_sq *sq;
+	struct mlx5_dev_ctx_shared *sh = priv->sh;
 	uint32_t poll_cqe_times = MLX5_MTR_POLL_WQE_CQE_TIMES;
-	uint8_t state;
-	bool need_lock;
+	uint8_t state = __atomic_load_n(&mtr->state, __ATOMIC_RELAXED);
+	poll_cq_t poll_mtr_cq =
+		is_tmpl_api ? mlx5_aso_poll_cq_mtr_hws : mlx5_aso_poll_cq_mtr_sws;
 
-	if (likely(sh->config.dv_flow_en == 2) &&
-	    mtr->type == ASO_METER_INDIRECT) {
-		if (queue == MLX5_HW_INV_QUEUE) {
-			sq = &mtr->pool->sq[mtr->pool->nb_sq - 1];
-			need_lock = true;
-		} else {
-			sq = &mtr->pool->sq[queue];
-			need_lock = false;
-		}
-	} else {
-		sq = &sh->mtrmng->pools_mng.sq;
-		need_lock = true;
-	}
-	state = __atomic_load_n(&mtr->state, __ATOMIC_RELAXED);
 	if (state == ASO_METER_READY || state == ASO_METER_WAIT_ASYNC)
 		return 0;
+	sq = mlx5_aso_mtr_select_sq(sh, MLX5_HW_INV_QUEUE, mtr, &need_lock);
 	do {
-		mlx5_aso_mtr_completion_handle(sq, need_lock);
+		poll_mtr_cq(priv, sq);
 		if (__atomic_load_n(&mtr->state, __ATOMIC_RELAXED) ==
 					    ASO_METER_READY)
 			return 0;
diff --git a/drivers/net/mlx5/mlx5_flow_hw.c b/drivers/net/mlx5/mlx5_flow_hw.c
index c1b09c9c03..8f004b5435 100644
--- a/drivers/net/mlx5/mlx5_flow_hw.c
+++ b/drivers/net/mlx5/mlx5_flow_hw.c
@@ -183,6 +183,12 @@  mlx5_flow_hw_aux_get_mtr_id(struct rte_flow_hw *flow, struct rte_flow_hw_aux *au
 		return aux->orig.mtr_id;
 }
 
+static __rte_always_inline struct mlx5_hw_q_job *
+flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
+			const struct rte_flow_action_handle *handle,
+			void *user_data, void *query_data,
+			enum mlx5_hw_job_type type,
+			struct rte_flow_error *error);
 static int
 mlx5_tbl_multi_pattern_process(struct rte_eth_dev *dev,
 			       struct rte_flow_template_table *tbl,
@@ -384,21 +390,6 @@  flow_hw_q_dec_flow_ops(struct mlx5_priv *priv, uint32_t queue)
 	q->ongoing_flow_ops--;
 }
 
-static __rte_always_inline struct mlx5_hw_q_job *
-flow_hw_job_get(struct mlx5_priv *priv, uint32_t queue)
-{
-	MLX5_ASSERT(priv->hw_q[queue].job_idx <= priv->hw_q[queue].size);
-	return priv->hw_q[queue].job_idx ?
-	       priv->hw_q[queue].job[--priv->hw_q[queue].job_idx] : NULL;
-}
-
-static __rte_always_inline void
-flow_hw_job_put(struct mlx5_priv *priv, struct mlx5_hw_q_job *job, uint32_t queue)
-{
-	MLX5_ASSERT(priv->hw_q[queue].job_idx < priv->hw_q[queue].size);
-	priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
-}
-
 static inline enum mlx5dr_matcher_insert_mode
 flow_hw_matcher_insert_mode_get(enum rte_flow_table_insertion_type insert_type)
 {
@@ -1560,7 +1551,7 @@  flow_hw_meter_compile(struct rte_eth_dev *dev,
 	acts->rule_acts[jump_pos].action = (!!group) ?
 				    acts->jump->hws_action :
 				    acts->jump->root_action;
-	if (mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr))
+	if (mlx5_aso_mtr_wait(priv, aso_mtr, true))
 		return -ENOMEM;
 	return 0;
 }
@@ -1637,7 +1628,7 @@  static rte_be32_t vlan_hdr_to_be32(const struct rte_flow_action *actions)
 static __rte_always_inline struct mlx5_aso_mtr *
 flow_hw_meter_mark_alloc(struct rte_eth_dev *dev, uint32_t queue,
 			 const struct rte_flow_action *action,
-			 void *user_data, bool push,
+			 struct mlx5_hw_q_job *job, bool push,
 			 struct rte_flow_error *error)
 {
 	struct mlx5_priv *priv = dev->data->dev_private;
@@ -1646,6 +1637,8 @@  flow_hw_meter_mark_alloc(struct rte_eth_dev *dev, uint32_t queue,
 	struct mlx5_aso_mtr *aso_mtr;
 	struct mlx5_flow_meter_info *fm;
 	uint32_t mtr_id;
+	uintptr_t handle = (uintptr_t)MLX5_INDIRECT_ACTION_TYPE_METER_MARK <<
+					MLX5_INDIRECT_ACTION_TYPE_OFFSET;
 
 	if (priv->shared_host) {
 		rte_flow_error_set(error, ENOTSUP, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
@@ -1669,15 +1662,16 @@  flow_hw_meter_mark_alloc(struct rte_eth_dev *dev, uint32_t queue,
 			  ASO_METER_WAIT : ASO_METER_WAIT_ASYNC;
 	aso_mtr->offset = mtr_id - 1;
 	aso_mtr->init_color = fm->color_aware ? RTE_COLORS : RTE_COLOR_GREEN;
+	job->action = (void *)(handle | mtr_id);
 	/* Update ASO flow meter by wqe. */
-	if (mlx5_aso_meter_update_by_wqe(priv->sh, queue, aso_mtr,
-					 &priv->mtr_bulk, user_data, push)) {
+	if (mlx5_aso_meter_update_by_wqe(priv, queue, aso_mtr,
+					 &priv->mtr_bulk, job, push)) {
 		mlx5_ipool_free(pool->idx_pool, mtr_id);
 		return NULL;
 	}
 	/* Wait for ASO object completion. */
 	if (queue == MLX5_HW_INV_QUEUE &&
-	    mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr)) {
+	    mlx5_aso_mtr_wait(priv, aso_mtr, true)) {
 		mlx5_ipool_free(pool->idx_pool, mtr_id);
 		return NULL;
 	}
@@ -1696,10 +1690,18 @@  flow_hw_meter_mark_compile(struct rte_eth_dev *dev,
 	struct mlx5_priv *priv = dev->data->dev_private;
 	struct mlx5_aso_mtr_pool *pool = priv->hws_mpool;
 	struct mlx5_aso_mtr *aso_mtr;
+	struct mlx5_hw_q_job *job =
+		flow_hw_action_job_init(priv, queue, NULL, NULL, NULL,
+					MLX5_HW_Q_JOB_TYPE_CREATE, NULL);
 
-	aso_mtr = flow_hw_meter_mark_alloc(dev, queue, action, NULL, true, error);
-	if (!aso_mtr)
+	if (!job)
+		return -1;
+	aso_mtr = flow_hw_meter_mark_alloc(dev, queue, action, job,
+					   true, error);
+	if (!aso_mtr) {
+		flow_hw_job_put(priv, job, queue);
 		return -1;
+	}
 
 	/* Compile METER_MARK action */
 	acts[aso_mtr_pos].action = pool->action;
@@ -3275,7 +3277,7 @@  flow_hw_actions_construct(struct rte_eth_dev *dev,
 							 jump->root_action;
 			flow->jump = jump;
 			flow->flags |= MLX5_FLOW_HW_FLOW_FLAG_FATE_JUMP;
-			if (mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr))
+			if (mlx5_aso_mtr_wait(priv, aso_mtr, true))
 				return -1;
 			break;
 		case RTE_FLOW_ACTION_TYPE_AGE:
@@ -4009,13 +4011,6 @@  flow_hw_pull_legacy_indirect_comp(struct rte_eth_dev *dev, struct mlx5_hw_q_job
 						job->query.hw);
 			aso_ct->state = ASO_CONNTRACK_READY;
 		}
-	} else {
-		/*
-		 * rte_flow_op_result::user data can point to
-		 * struct mlx5_aso_mtr object as well
-		 */
-		if (queue != CTRL_QUEUE_ID(priv))
-			MLX5_ASSERT(false);
 	}
 }
 
@@ -11007,7 +11002,8 @@  flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
 {
 	struct mlx5_hw_q_job *job;
 
-	MLX5_ASSERT(queue != MLX5_HW_INV_QUEUE);
+	if (queue == MLX5_HW_INV_QUEUE)
+		queue = CTRL_QUEUE_ID(priv);
 	job = flow_hw_job_get(priv, queue);
 	if (!job) {
 		rte_flow_error_set(error, ENOMEM,
@@ -11022,6 +11018,17 @@  flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
 	return job;
 }
 
+struct mlx5_hw_q_job *
+mlx5_flow_action_job_init(struct mlx5_priv *priv, uint32_t queue,
+			  const struct rte_flow_action_handle *handle,
+			  void *user_data, void *query_data,
+			  enum mlx5_hw_job_type type,
+			  struct rte_flow_error *error)
+{
+	return flow_hw_action_job_init(priv, queue, handle, user_data, query_data,
+				       type, error);
+}
+
 static __rte_always_inline void
 flow_hw_action_finalize(struct rte_eth_dev *dev, uint32_t queue,
 			struct mlx5_hw_q_job *job,
@@ -11081,12 +11088,12 @@  flow_hw_action_handle_create(struct rte_eth_dev *dev, uint32_t queue,
 	const struct rte_flow_action_age *age;
 	struct mlx5_aso_mtr *aso_mtr;
 	cnt_id_t cnt_id;
-	uint32_t mtr_id;
 	uint32_t age_idx;
 	bool push = flow_hw_action_push(attr);
 	bool aso = false;
+	bool force_job = action->type == RTE_FLOW_ACTION_TYPE_METER_MARK;
 
-	if (attr) {
+	if (attr || force_job) {
 		job = flow_hw_action_job_init(priv, queue, NULL, user_data,
 					      NULL, MLX5_HW_Q_JOB_TYPE_CREATE,
 					      error);
@@ -11141,9 +11148,7 @@  flow_hw_action_handle_create(struct rte_eth_dev *dev, uint32_t queue,
 		aso_mtr = flow_hw_meter_mark_alloc(dev, queue, action, job, push, error);
 		if (!aso_mtr)
 			break;
-		mtr_id = (MLX5_INDIRECT_ACTION_TYPE_METER_MARK <<
-			MLX5_INDIRECT_ACTION_TYPE_OFFSET) | (aso_mtr->fm.meter_id);
-		handle = (struct rte_flow_action_handle *)(uintptr_t)mtr_id;
+		handle = (void *)(uintptr_t)job->action;
 		break;
 	case RTE_FLOW_ACTION_TYPE_RSS:
 		handle = flow_dv_action_create(dev, conf, action, error);
@@ -11158,7 +11163,7 @@  flow_hw_action_handle_create(struct rte_eth_dev *dev, uint32_t queue,
 				   NULL, "action type not supported");
 		break;
 	}
-	if (job) {
+	if (job && !force_job) {
 		job->action = handle;
 		job->indirect_type = MLX5_HW_INDIRECT_TYPE_LEGACY;
 		flow_hw_action_finalize(dev, queue, job, push, aso,
@@ -11191,15 +11196,17 @@  mlx5_flow_update_meter_mark(struct rte_eth_dev *dev, uint32_t queue,
 		fm->color_aware = meter_mark->color_mode;
 	if (upd_meter_mark->state_valid)
 		fm->is_enable = meter_mark->state;
+	aso_mtr->state = (queue == MLX5_HW_INV_QUEUE) ?
+			 ASO_METER_WAIT : ASO_METER_WAIT_ASYNC;
 	/* Update ASO flow meter by wqe. */
-	if (mlx5_aso_meter_update_by_wqe(priv->sh, queue,
+	if (mlx5_aso_meter_update_by_wqe(priv, queue,
 					 aso_mtr, &priv->mtr_bulk, job, push))
 		return rte_flow_error_set(error, EINVAL,
 					  RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
 					  NULL, "Unable to update ASO meter WQE");
 	/* Wait for ASO object completion. */
 	if (queue == MLX5_HW_INV_QUEUE &&
-	    mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr))
+	    mlx5_aso_mtr_wait(priv, aso_mtr, true))
 		return rte_flow_error_set(error, EINVAL,
 					  RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
 					  NULL, "Unable to wait for ASO meter CQE");
@@ -11245,8 +11252,9 @@  flow_hw_action_handle_update(struct rte_eth_dev *dev, uint32_t queue,
 	int ret = 0;
 	bool push = flow_hw_action_push(attr);
 	bool aso = false;
+	bool force_job = type == MLX5_INDIRECT_ACTION_TYPE_METER_MARK;
 
-	if (attr) {
+	if (attr || force_job) {
 		job = flow_hw_action_job_init(priv, queue, handle, user_data,
 					      NULL, MLX5_HW_Q_JOB_TYPE_UPDATE,
 					      error);
@@ -11283,7 +11291,7 @@  flow_hw_action_handle_update(struct rte_eth_dev *dev, uint32_t queue,
 					  "action type not supported");
 		break;
 	}
-	if (job)
+	if (job && !force_job)
 		flow_hw_action_finalize(dev, queue, job, push, aso, ret == 0);
 	return ret;
 }
@@ -11326,8 +11334,9 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 	bool push = flow_hw_action_push(attr);
 	bool aso = false;
 	int ret = 0;
+	bool force_job = type == MLX5_INDIRECT_ACTION_TYPE_METER_MARK;
 
-	if (attr) {
+	if (attr || force_job) {
 		job = flow_hw_action_job_init(priv, queue, handle, user_data,
 					      NULL, MLX5_HW_Q_JOB_TYPE_DESTROY,
 					      error);
@@ -11363,7 +11372,7 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 		fm = &aso_mtr->fm;
 		fm->is_enable = 0;
 		/* Update ASO flow meter by wqe. */
-		if (mlx5_aso_meter_update_by_wqe(priv->sh, queue, aso_mtr,
+		if (mlx5_aso_meter_update_by_wqe(priv, queue, aso_mtr,
 						 &priv->mtr_bulk, job, push)) {
 			ret = -EINVAL;
 			rte_flow_error_set(error, EINVAL,
@@ -11373,7 +11382,7 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 		}
 		/* Wait for ASO object completion. */
 		if (queue == MLX5_HW_INV_QUEUE &&
-		    mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr)) {
+		    mlx5_aso_mtr_wait(priv, aso_mtr, true)) {
 			ret = -EINVAL;
 			rte_flow_error_set(error, EINVAL,
 				RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
@@ -11397,7 +11406,7 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 					  "action type not supported");
 		break;
 	}
-	if (job)
+	if (job && !force_job)
 		flow_hw_action_finalize(dev, queue, job, push, aso, ret == 0);
 	return ret;
 }
diff --git a/drivers/net/mlx5/mlx5_flow_meter.c b/drivers/net/mlx5/mlx5_flow_meter.c
index 57de95b4b9..4045c4c249 100644
--- a/drivers/net/mlx5/mlx5_flow_meter.c
+++ b/drivers/net/mlx5/mlx5_flow_meter.c
@@ -1897,12 +1897,12 @@  mlx5_flow_meter_action_modify(struct mlx5_priv *priv,
 	if (sh->meter_aso_en) {
 		fm->is_enable = !!is_enable;
 		aso_mtr = container_of(fm, struct mlx5_aso_mtr, fm);
-		ret = mlx5_aso_meter_update_by_wqe(sh, MLX5_HW_INV_QUEUE,
+		ret = mlx5_aso_meter_update_by_wqe(priv, MLX5_HW_INV_QUEUE,
 						   aso_mtr, &priv->mtr_bulk,
 						   NULL, true);
 		if (ret)
 			return ret;
-		ret = mlx5_aso_mtr_wait(sh, MLX5_HW_INV_QUEUE, aso_mtr);
+		ret = mlx5_aso_mtr_wait(priv, aso_mtr, false);
 		if (ret)
 			return ret;
 	} else {
@@ -2148,7 +2148,7 @@  mlx5_flow_meter_create(struct rte_eth_dev *dev, uint32_t meter_id,
 	/* If ASO meter supported, update ASO flow meter by wqe. */
 	if (priv->sh->meter_aso_en) {
 		aso_mtr = container_of(fm, struct mlx5_aso_mtr, fm);
-		ret = mlx5_aso_meter_update_by_wqe(priv->sh, MLX5_HW_INV_QUEUE,
+		ret = mlx5_aso_meter_update_by_wqe(priv, MLX5_HW_INV_QUEUE,
 						   aso_mtr, &priv->mtr_bulk, NULL, true);
 		if (ret)
 			goto error;
@@ -2210,6 +2210,7 @@  mlx5_flow_meter_hws_create(struct rte_eth_dev *dev, uint32_t meter_id,
 	struct mlx5_flow_meter_info *fm;
 	struct mlx5_flow_meter_policy *policy = NULL;
 	struct mlx5_aso_mtr *aso_mtr;
+	struct mlx5_hw_q_job *job;
 	int ret;
 
 	if (!priv->mtr_profile_arr ||
@@ -2255,12 +2256,20 @@  mlx5_flow_meter_hws_create(struct rte_eth_dev *dev, uint32_t meter_id,
 	fm->shared = !!shared;
 	fm->initialized = 1;
 	/* Update ASO flow meter by wqe. */
-	ret = mlx5_aso_meter_update_by_wqe(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr,
-					   &priv->mtr_bulk, NULL, true);
-	if (ret)
+	job = mlx5_flow_action_job_init(priv, MLX5_HW_INV_QUEUE, NULL, NULL,
+					NULL, MLX5_HW_Q_JOB_TYPE_CREATE, NULL);
+	if (!job)
+		return -rte_mtr_error_set(error, ENOMEM,
+					  RTE_MTR_ERROR_TYPE_MTR_ID,
+					  NULL, "No job context.");
+	ret = mlx5_aso_meter_update_by_wqe(priv, MLX5_HW_INV_QUEUE, aso_mtr,
+					   &priv->mtr_bulk, job, true);
+	if (ret) {
+		flow_hw_job_put(priv, job, MLX5_HW_INV_QUEUE);
 		return -rte_mtr_error_set(error, ENOTSUP,
-			RTE_MTR_ERROR_TYPE_UNSPECIFIED,
-			NULL, "Failed to create devx meter.");
+					  RTE_MTR_ERROR_TYPE_UNSPECIFIED,
+					  NULL, "Failed to create devx meter.");
+	}
 	fm->active_state = params->meter_enable;
 	__atomic_fetch_add(&fm->profile->ref_cnt, 1, __ATOMIC_RELAXED);
 	__atomic_fetch_add(&policy->ref_cnt, 1, __ATOMIC_RELAXED);
@@ -2911,7 +2920,7 @@  mlx5_flow_meter_attach(struct mlx5_priv *priv,
 		struct mlx5_aso_mtr *aso_mtr;
 
 		aso_mtr = container_of(fm, struct mlx5_aso_mtr, fm);
-		if (mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr)) {
+		if (mlx5_aso_mtr_wait(priv, aso_mtr, false)) {
 			return rte_flow_error_set(error, ENOENT,
 					RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
 					NULL,