@@ -472,6 +472,39 @@ A flush callback can be passed to the function to handle any outstanding events.
Invocation of this API does not affect the existing port configuration.
+Independent Enqueue Capability
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Some eventdev hardware devices such as DLB2 expects all forwarded events to be
+enqueued in the same order as they are dequeued. For dropped events, their
+releases should come at the same location as the original event was expected.
+Hardware has this restriction as it uses the order to retrieve information about
+the original event that was sent to the CPU. This contains information like atomic flow
+ID to release the flow lock and ordered events sequence number to restore the
+original order.
+
+Some applications, like those based on the DPDK dispatcher library, want
+enqueue order independence. To support this, DLB2 PMD supports the
+``RTE_EVENT_DEV_CAP_INDEPENDENT_ENQ`` capability.
+
+This capability applies to Eventdevs supporting burst mode. On ports where
+the application is going to change enqueue order,
+``RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ`` support should be enabled.
+
+Example code to inform PMD that the application plans to use independent enqueue
+order on a port:
+
+ .. code-block:: c
+
+ if (capability & RTE_EVENT_DEV_CAP_INDEPENDENT_ENQ)
+ port_config = port_config | RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ;
+
+This code example enables enqueue event reordering inside DLB2 PMD before the events
+are sent to the DLB2 hardware. If the application is not going to change the enqueue
+order, this flag should not be enabled to get better performance. DLB2 PMD saves
+ordering information inside the impl_opaque field of the event, and this field should
+be preserved for all FORWARD or RELEASE events.
+
Stopping the EventDev
~~~~~~~~~~~~~~~~~~~~~
@@ -72,6 +72,11 @@ New Features
* Added support for independent enqueue feature. Updated Event Device and
PMD feature list.
+ * Updated DLB2 driver for independent enqueue feature. Applications should
+ use ``RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ`` to enable the feature if the
+ capability ``RTE_EVENT_DEV_CAP_INDEPENDENT_ENQ`` exists.
+
+
Removed Items
-------------
@@ -82,6 +82,7 @@ static struct rte_event_dev_info evdev_dlb2_default_info = {
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
RTE_EVENT_DEV_CAP_RUNTIME_PORT_LINK |
RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT |
+ RTE_EVENT_DEV_CAP_INDEPENDENT_ENQ |
RTE_EVENT_DEV_CAP_MAINTENANCE_FREE),
.max_profiles_per_port = 1,
};
@@ -98,6 +99,11 @@ dlb2_free_qe_mem(struct dlb2_port *qm_port)
rte_free(qm_port->qe4);
qm_port->qe4 = NULL;
+ if (qm_port->order) {
+ rte_free(qm_port->order);
+ qm_port->order = NULL;
+ }
+
rte_free(qm_port->int_arm_qe);
qm_port->int_arm_qe = NULL;
@@ -304,7 +310,7 @@ set_max_cq_depth(const char *key __rte_unused,
if (*max_cq_depth < DLB2_MIN_CQ_DEPTH_OVERRIDE ||
*max_cq_depth > DLB2_MAX_CQ_DEPTH_OVERRIDE ||
!rte_is_power_of_2(*max_cq_depth)) {
- DLB2_LOG_ERR("dlb2: max_cq_depth %d and %d and a power of 2",
+ DLB2_LOG_ERR("dlb2: Allowed max_cq_depth range %d - %d and should be power of 2",
DLB2_MIN_CQ_DEPTH_OVERRIDE,
DLB2_MAX_CQ_DEPTH_OVERRIDE);
return -EINVAL;
@@ -1445,6 +1451,17 @@ dlb2_init_qe_mem(struct dlb2_port *qm_port, char *mz_name)
goto error_exit;
}
+ if (qm_port->reorder_en) {
+ sz = sizeof(struct dlb2_reorder);
+ qm_port->order = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
+
+ if (qm_port->order == NULL) {
+ DLB2_LOG_ERR("dlb2: no reorder memory");
+ ret = -ENOMEM;
+ goto error_exit;
+ }
+ }
+
ret = dlb2_init_int_arm_qe(qm_port, mz_name);
if (ret < 0) {
DLB2_LOG_ERR("dlb2: dlb2_init_int_arm_qe ret=%d", ret);
@@ -1541,13 +1558,6 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
return -EINVAL;
}
- if (dlb2->version == DLB2_HW_V2 && ev_port->cq_weight != 0 &&
- ev_port->cq_weight > dequeue_depth) {
- DLB2_LOG_ERR("dlb2: invalid cq dequeue depth %d, must be >= cq weight %d",
- dequeue_depth, ev_port->cq_weight);
- return -EINVAL;
- }
-
rte_spinlock_lock(&handle->resource_lock);
/* We round up to the next power of 2 if necessary */
@@ -1620,9 +1630,6 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
dlb2_error_strings[cfg.response. status]);
goto error_exit;
}
- qm_port->cq_weight = dequeue_depth;
- } else {
- qm_port->cq_weight = 0;
}
/* CQs with depth < 8 use an 8-entry queue, but withhold credits so
@@ -1947,6 +1954,13 @@ dlb2_eventdev_port_setup(struct rte_eventdev *dev,
evdev_dlb2_default_info.max_event_port_enqueue_depth)
return -EINVAL;
+ if ((port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ) &&
+ port_conf->dequeue_depth > DLB2_MAX_CQ_DEPTH_REORDER) {
+ DLB2_LOG_ERR("evport %d: Max dequeue depth supported with reorder is %d",
+ ev_port_id, DLB2_MAX_CQ_DEPTH_REORDER);
+ return -EINVAL;
+ }
+
ev_port = &dlb2->ev_ports[ev_port_id];
/* configured? */
if (ev_port->setup_done) {
@@ -1988,7 +2002,11 @@ dlb2_eventdev_port_setup(struct rte_eventdev *dev,
hw_credit_quanta);
return -EINVAL;
}
- ev_port->enq_retries = port_conf->enqueue_depth / sw_credit_quanta;
+ ev_port->enq_retries = port_conf->enqueue_depth;
+
+ ev_port->qm_port.reorder_id = 0;
+ ev_port->qm_port.reorder_en = port_conf->event_port_cfg &
+ RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ;
/* Save off port config for reconfig */
ev_port->conf = *port_conf;
@@ -2792,10 +2810,34 @@ dlb2_check_enqueue_hw_credits(struct dlb2_port *qm_port)
}
static __rte_always_inline void
-dlb2_pp_write(struct dlb2_enqueue_qe *qe4,
- struct process_local_port_data *port_data)
+dlb2_pp_write(struct process_local_port_data *port_data, struct dlb2_enqueue_qe *qe4)
+{
+ dlb2_movdir64b(port_data->pp_addr, qe4);
+}
+
+static __rte_always_inline void
+dlb2_pp_write_reorder(struct process_local_port_data *port_data,
+ struct dlb2_enqueue_qe *qe4)
+{
+ for (uint8_t i = 0; i < 4; i++) {
+ if (qe4[i].cmd_byte != DLB2_NOOP_CMD_BYTE) {
+ dlb2_movdir64b(port_data->pp_addr, qe4);
+ return;
+ }
+ }
+}
+
+static __rte_always_inline int
+dlb2_pp_check4_write(struct process_local_port_data *port_data,
+ struct dlb2_enqueue_qe *qe4)
{
+ for (uint8_t i = 0; i < DLB2_NUM_QES_PER_CACHE_LINE; i++)
+ if (((uint64_t *)&qe4[i])[1] == 0)
+ return 0;
+
dlb2_movdir64b(port_data->pp_addr, qe4);
+ memset(qe4, 0, DLB2_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb2_enqueue_qe));
+ return DLB2_NUM_QES_PER_CACHE_LINE;
}
static inline int
@@ -2815,7 +2857,7 @@ dlb2_consume_qe_immediate(struct dlb2_port *qm_port, int num)
*/
port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
- dlb2_movntdq_single(port_data->pp_addr, qe);
+ dlb2_movdir64b_single(port_data->pp_addr, qe);
DLB2_LOG_LINE_DBG("dlb2: consume immediate - %d QEs", num);
@@ -2835,7 +2877,7 @@ dlb2_hw_do_enqueue(struct dlb2_port *qm_port,
if (do_sfence)
rte_wmb();
- dlb2_pp_write(qm_port->qe4, port_data);
+ dlb2_pp_write(port_data, qm_port->qe4);
}
static inline void
@@ -2986,6 +3028,166 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port *ev_port,
return 0;
}
+static inline __m128i
+dlb2_event_to_qe(const struct rte_event *ev, uint8_t cmd, uint8_t sched_type, uint8_t qid)
+{
+ __m128i dlb2_to_qe_shuffle = _mm_set_epi8(
+ 0xFF, 0xFF, /* zero out cmd word */
+ 1, 0, /* low 16-bits of flow id */
+ 0xFF, 0xFF, /* zero QID, sched_type etc fields to be filled later */
+ 3, 2, /* top of flow id, event type and subtype */
+ 15, 14, 13, 12, 11, 10, 9, 8 /* data from end of event goes at start */
+ );
+
+ /* event may not be 16 byte aligned. Use 16 byte unaligned load */
+ __m128i tmp = _mm_lddqu_si128((const __m128i *)ev);
+ __m128i qe = _mm_shuffle_epi8(tmp, dlb2_to_qe_shuffle);
+ struct dlb2_enqueue_qe *dq = (struct dlb2_enqueue_qe *)&qe;
+ /* set the cmd field */
+ qe = _mm_insert_epi8(qe, cmd, 15);
+ /* insert missing 16-bits with qid, sched_type and priority */
+ uint16_t qid_stype_prio =
+ qid | (uint16_t)sched_type << 8 | ((uint16_t)ev->priority & 0xE0) << 5;
+ qe = _mm_insert_epi16(qe, qid_stype_prio, 5);
+ dq->weight = RTE_PMD_DLB2_GET_QE_WEIGHT(ev);
+ return qe;
+}
+
+static inline uint16_t
+__dlb2_event_enqueue_burst_reorder(void *event_port,
+ const struct rte_event events[],
+ uint16_t num,
+ bool use_delayed)
+{
+ struct dlb2_eventdev_port *ev_port = event_port;
+ struct dlb2_port *qm_port = &ev_port->qm_port;
+ struct dlb2_reorder *order = qm_port->order;
+ struct process_local_port_data *port_data;
+ bool is_directed = qm_port->is_directed;
+ uint8_t n = order->next_to_enqueue;
+ uint8_t p_cnt = 0;
+ int retries = ev_port->enq_retries;
+ __m128i new_qes[4], *from = NULL;
+ int num_new = 0;
+ int num_tx;
+ int i;
+
+ RTE_ASSERT(ev_port->enq_configured);
+ RTE_ASSERT(events != NULL);
+
+ port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
+
+ num_tx = RTE_MIN(num, ev_port->conf.enqueue_depth);
+#if DLB2_BYPASS_FENCE_ON_PP == 1
+ if (!qm_port->is_producer) /* Call memory fense once at the start */
+ rte_wmb(); /* calls _mm_sfence() */
+#else
+ rte_wmb(); /* calls _mm_sfence() */
+#endif
+ for (i = 0; i < num_tx; i++) {
+ uint8_t sched_type = 0;
+ uint8_t reorder_idx = events[i].impl_opaque;
+ int16_t thresh = qm_port->token_pop_thresh;
+ uint8_t qid = 0;
+ int ret;
+
+ while ((ret = dlb2_event_enqueue_prep(ev_port, qm_port, &events[i],
+ &sched_type, &qid)) != 0 &&
+ rte_errno == -ENOSPC && --retries > 0)
+ rte_pause();
+
+ if (ret != 0) /* Either there is error or retires exceeded */
+ break;
+
+ switch (events[i].op) {
+ case RTE_EVENT_OP_NEW:
+ new_qes[num_new++] = dlb2_event_to_qe(
+ &events[i], DLB2_NEW_CMD_BYTE, sched_type, qid);
+ if (num_new == RTE_DIM(new_qes)) {
+ dlb2_pp_write(port_data, (struct dlb2_enqueue_qe *)&new_qes);
+ num_new = 0;
+ }
+ break;
+ case RTE_EVENT_OP_FORWARD: {
+ order->enq_reorder[reorder_idx].m128 = dlb2_event_to_qe(
+ &events[i], is_directed ? DLB2_NEW_CMD_BYTE : DLB2_FWD_CMD_BYTE,
+ sched_type, qid);
+ n += dlb2_pp_check4_write(port_data, &order->enq_reorder[n].qe);
+ break;
+ }
+ case RTE_EVENT_OP_RELEASE: {
+ order->enq_reorder[reorder_idx].m128 = dlb2_event_to_qe(
+ &events[i], is_directed ? DLB2_NOOP_CMD_BYTE : DLB2_COMP_CMD_BYTE,
+ sched_type, 0xFF);
+ break;
+ }
+ }
+
+ if (use_delayed && qm_port->token_pop_mode == DELAYED_POP &&
+ (events[i].op == RTE_EVENT_OP_FORWARD ||
+ events[i].op == RTE_EVENT_OP_RELEASE) &&
+ qm_port->issued_releases >= thresh - 1) {
+
+ dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
+
+ /* Reset the releases for the next QE batch */
+ qm_port->issued_releases -= thresh;
+
+ /* When using delayed token pop mode, the
+ * initial token threshold is the full CQ
+ * depth. After the first token pop, we need to
+ * reset it to the dequeue_depth.
+ */
+ qm_port->token_pop_thresh =
+ qm_port->dequeue_depth;
+ }
+ }
+ while (order->enq_reorder[n].u64[1] != 0) {
+ __m128i tmp[4] = {0}, *send = NULL;
+ bool enq;
+
+ if (!p_cnt)
+ from = &order->enq_reorder[n].m128;
+
+ p_cnt++;
+ n++;
+
+ enq = !n || p_cnt == 4 || !order->enq_reorder[n].u64[1];
+ if (!enq)
+ continue;
+
+ if (p_cnt < 4) {
+ memcpy(tmp, from, p_cnt * sizeof(struct dlb2_enqueue_qe));
+ send = tmp;
+ } else {
+ send = from;
+ }
+
+ if (is_directed)
+ dlb2_pp_write_reorder(port_data, (struct dlb2_enqueue_qe *)send);
+ else
+ dlb2_pp_write(port_data, (struct dlb2_enqueue_qe *)send);
+ memset(from, 0, p_cnt * sizeof(struct dlb2_enqueue_qe));
+ p_cnt = 0;
+ }
+ order->next_to_enqueue = n;
+
+ if (num_new > 0) {
+ switch (num_new) {
+ case 1:
+ new_qes[1] = _mm_setzero_si128(); /* fall-through */
+ case 2:
+ new_qes[2] = _mm_setzero_si128(); /* fall-through */
+ case 3:
+ new_qes[3] = _mm_setzero_si128();
+ }
+ dlb2_pp_write(port_data, (struct dlb2_enqueue_qe *)&new_qes);
+ num_new = 0;
+ }
+
+ return i;
+}
+
static inline uint16_t
__dlb2_event_enqueue_burst(void *event_port,
const struct rte_event events[],
@@ -3002,6 +3204,9 @@ __dlb2_event_enqueue_burst(void *event_port,
RTE_ASSERT(ev_port->enq_configured);
RTE_ASSERT(events != NULL);
+ if (qm_port->reorder_en)
+ return __dlb2_event_enqueue_burst_reorder(event_port, events, num, use_delayed);
+
i = 0;
port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
@@ -3379,7 +3584,8 @@ dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
events[num].event_type = qe->u.event_type.major;
events[num].sub_event_type = qe->u.event_type.sub;
events[num].sched_type = sched_type_map[qe->sched_type];
- events[num].impl_opaque = qe->qid_depth;
+ events[num].impl_opaque = qm_port->reorder_id++;
+ RTE_PMD_DLB2_SET_QID_DEPTH(&events[num], qe->qid_depth);
/* qid not preserved for directed queues */
if (qm_port->is_directed)
@@ -3414,7 +3620,6 @@ dlb2_process_dequeue_four_qes(struct dlb2_eventdev_port *ev_port,
};
const int num_events = DLB2_NUM_QES_PER_CACHE_LINE;
uint8_t *qid_mappings = qm_port->qid_mappings;
- __m128i sse_evt[2];
/* In the unlikely case that any of the QE error bits are set, process
* them one at a time.
@@ -3423,153 +3628,33 @@ dlb2_process_dequeue_four_qes(struct dlb2_eventdev_port *ev_port,
qes[2].error || qes[3].error))
return dlb2_process_dequeue_qes(ev_port, qm_port, events,
qes, num_events);
+ const __m128i qe_to_ev_shuffle =
+ _mm_set_epi8(7, 6, 5, 4, 3, 2, 1, 0, /* last 8-bytes = data from first 8 */
+ 0xFF, 0xFF, 0xFF, 0xFF, /* fill in later as 32-bit value*/
+ 9, 8, /* event type and sub-event, + 4 zero bits */
+ 13, 12 /* flow id, 16 bits */);
+ for (int i = 0; i < 4; i++) {
+ const __m128i hw_qe = _mm_load_si128((void *)&qes[i]);
+ const __m128i event = _mm_shuffle_epi8(hw_qe, qe_to_ev_shuffle);
+ /* prepare missing 32-bits for op, sched_type, QID, Priority and
+ * sequence number in impl_opaque
+ */
+ const uint16_t qid_sched_prio = _mm_extract_epi16(hw_qe, 5);
+ /* Extract qid_depth and format it as per event header */
+ const uint8_t qid_depth = (_mm_extract_epi8(hw_qe, 15) & 0x6) << 1;
+ const uint32_t qid = (qm_port->is_directed) ? ev_port->link[0].queue_id :
+ qid_mappings[(uint8_t)qid_sched_prio];
+ const uint32_t sched_type = sched_type_map[(qid_sched_prio >> 8) & 0x3];
+ const uint32_t priority = (qid_sched_prio >> 5) & 0xE0;
- events[0].u64 = qes[0].data;
- events[1].u64 = qes[1].data;
- events[2].u64 = qes[2].data;
- events[3].u64 = qes[3].data;
-
- /* Construct the metadata portion of two struct rte_events
- * in one 128b SSE register. Event metadata is constructed in the SSE
- * registers like so:
- * sse_evt[0][63:0]: event[0]'s metadata
- * sse_evt[0][127:64]: event[1]'s metadata
- * sse_evt[1][63:0]: event[2]'s metadata
- * sse_evt[1][127:64]: event[3]'s metadata
- */
- sse_evt[0] = _mm_setzero_si128();
- sse_evt[1] = _mm_setzero_si128();
-
- /* Convert the hardware queue ID to an event queue ID and store it in
- * the metadata:
- * sse_evt[0][47:40] = qid_mappings[qes[0].qid]
- * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
- * sse_evt[1][47:40] = qid_mappings[qes[2].qid]
- * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
- */
-#define DLB_EVENT_QUEUE_ID_BYTE 5
- sse_evt[0] = _mm_insert_epi8(sse_evt[0],
- qid_mappings[qes[0].qid],
- DLB_EVENT_QUEUE_ID_BYTE);
- sse_evt[0] = _mm_insert_epi8(sse_evt[0],
- qid_mappings[qes[1].qid],
- DLB_EVENT_QUEUE_ID_BYTE + 8);
- sse_evt[1] = _mm_insert_epi8(sse_evt[1],
- qid_mappings[qes[2].qid],
- DLB_EVENT_QUEUE_ID_BYTE);
- sse_evt[1] = _mm_insert_epi8(sse_evt[1],
- qid_mappings[qes[3].qid],
- DLB_EVENT_QUEUE_ID_BYTE + 8);
-
- /* Convert the hardware priority to an event priority and store it in
- * the metadata, while also returning the queue depth status
- * value captured by the hardware, storing it in impl_opaque, which can
- * be read by the application but not modified
- * sse_evt[0][55:48] = DLB2_TO_EV_PRIO(qes[0].priority)
- * sse_evt[0][63:56] = qes[0].qid_depth
- * sse_evt[0][119:112] = DLB2_TO_EV_PRIO(qes[1].priority)
- * sse_evt[0][127:120] = qes[1].qid_depth
- * sse_evt[1][55:48] = DLB2_TO_EV_PRIO(qes[2].priority)
- * sse_evt[1][63:56] = qes[2].qid_depth
- * sse_evt[1][119:112] = DLB2_TO_EV_PRIO(qes[3].priority)
- * sse_evt[1][127:120] = qes[3].qid_depth
- */
-#define DLB_EVENT_PRIO_IMPL_OPAQUE_WORD 3
-#define DLB_BYTE_SHIFT 8
- sse_evt[0] =
- _mm_insert_epi16(sse_evt[0],
- DLB2_TO_EV_PRIO((uint8_t)qes[0].priority) |
- (qes[0].qid_depth << DLB_BYTE_SHIFT),
- DLB_EVENT_PRIO_IMPL_OPAQUE_WORD);
- sse_evt[0] =
- _mm_insert_epi16(sse_evt[0],
- DLB2_TO_EV_PRIO((uint8_t)qes[1].priority) |
- (qes[1].qid_depth << DLB_BYTE_SHIFT),
- DLB_EVENT_PRIO_IMPL_OPAQUE_WORD + 4);
- sse_evt[1] =
- _mm_insert_epi16(sse_evt[1],
- DLB2_TO_EV_PRIO((uint8_t)qes[2].priority) |
- (qes[2].qid_depth << DLB_BYTE_SHIFT),
- DLB_EVENT_PRIO_IMPL_OPAQUE_WORD);
- sse_evt[1] =
- _mm_insert_epi16(sse_evt[1],
- DLB2_TO_EV_PRIO((uint8_t)qes[3].priority) |
- (qes[3].qid_depth << DLB_BYTE_SHIFT),
- DLB_EVENT_PRIO_IMPL_OPAQUE_WORD + 4);
-
- /* Write the event type, sub event type, and flow_id to the event
- * metadata.
- * sse_evt[0][31:0] = qes[0].flow_id |
- * qes[0].u.event_type.major << 28 |
- * qes[0].u.event_type.sub << 20;
- * sse_evt[0][95:64] = qes[1].flow_id |
- * qes[1].u.event_type.major << 28 |
- * qes[1].u.event_type.sub << 20;
- * sse_evt[1][31:0] = qes[2].flow_id |
- * qes[2].u.event_type.major << 28 |
- * qes[2].u.event_type.sub << 20;
- * sse_evt[1][95:64] = qes[3].flow_id |
- * qes[3].u.event_type.major << 28 |
- * qes[3].u.event_type.sub << 20;
- */
-#define DLB_EVENT_EV_TYPE_DW 0
-#define DLB_EVENT_EV_TYPE_SHIFT 28
-#define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
- sse_evt[0] = _mm_insert_epi32(sse_evt[0],
- qes[0].flow_id |
- qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
- qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
- DLB_EVENT_EV_TYPE_DW);
- sse_evt[0] = _mm_insert_epi32(sse_evt[0],
- qes[1].flow_id |
- qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
- qes[1].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
- DLB_EVENT_EV_TYPE_DW + 2);
- sse_evt[1] = _mm_insert_epi32(sse_evt[1],
- qes[2].flow_id |
- qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
- qes[2].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
- DLB_EVENT_EV_TYPE_DW);
- sse_evt[1] = _mm_insert_epi32(sse_evt[1],
- qes[3].flow_id |
- qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
- qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
- DLB_EVENT_EV_TYPE_DW + 2);
-
- /* Write the sched type to the event metadata. 'op' and 'rsvd' are not
- * set:
- * sse_evt[0][39:32] = sched_type_map[qes[0].sched_type] << 6
- * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
- * sse_evt[1][39:32] = sched_type_map[qes[2].sched_type] << 6
- * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
- */
-#define DLB_EVENT_SCHED_TYPE_BYTE 4
-#define DLB_EVENT_SCHED_TYPE_SHIFT 6
- sse_evt[0] = _mm_insert_epi8(sse_evt[0],
- sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
- DLB_EVENT_SCHED_TYPE_BYTE);
- sse_evt[0] = _mm_insert_epi8(sse_evt[0],
- sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
- DLB_EVENT_SCHED_TYPE_BYTE + 8);
- sse_evt[1] = _mm_insert_epi8(sse_evt[1],
- sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
- DLB_EVENT_SCHED_TYPE_BYTE);
- sse_evt[1] = _mm_insert_epi8(sse_evt[1],
- sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
- DLB_EVENT_SCHED_TYPE_BYTE + 8);
-
- /* Store the metadata to the event (use the double-precision
- * _mm_storeh_pd because there is no integer function for storing the
- * upper 64b):
- * events[0].event = sse_evt[0][63:0]
- * events[1].event = sse_evt[0][127:64]
- * events[2].event = sse_evt[1][63:0]
- * events[3].event = sse_evt[1][127:64]
- */
- _mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
- _mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
- _mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
- _mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
+ const uint32_t dword1 = qid_depth |
+ sched_type << 6 | qid << 8 | priority << 16 | (qm_port->reorder_id + i) << 24;
+
+ /* events[] may not be 16 byte aligned. So use separate load and store */
+ const __m128i tmpEv = _mm_insert_epi32(event, dword1, 1);
+ _mm_storeu_si128((__m128i *) &events[i], tmpEv);
+ }
+ qm_port->reorder_id += 4;
DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
@@ -3722,6 +3807,15 @@ _process_deq_qes_vec_impl(struct dlb2_port *qm_port,
0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x03,
};
+
+ static const uint8_t qid_depth_mask[16] = {
+ 0x00, 0x00, 0x00, 0x06,
+ 0x00, 0x00, 0x00, 0x06,
+ 0x00, 0x00, 0x00, 0x06,
+ 0x00, 0x00, 0x00, 0x06,
+ };
+ const __m128i v_qid_depth_mask = _mm_loadu_si128(
+ (const __m128i *)qid_depth_mask);
const __m128i v_sched_map = _mm_loadu_si128(
(const __m128i *)sched_type_map);
__m128i v_sched_mask = _mm_loadu_si128(
@@ -3732,6 +3826,9 @@ _process_deq_qes_vec_impl(struct dlb2_port *qm_port,
__m128i v_preshift = _mm_and_si128(v_sched_remapped,
v_sched_mask);
v_sched_done = _mm_srli_epi32(v_preshift, 10);
+ __m128i v_qid_depth = _mm_and_si128(v_qe_status, v_qid_depth_mask);
+ v_qid_depth = _mm_srli_epi32(v_qid_depth, 15);
+ v_sched_done = _mm_or_si128(v_sched_done, v_qid_depth);
}
/* Priority handling
@@ -3784,9 +3881,10 @@ _process_deq_qes_vec_impl(struct dlb2_port *qm_port,
(const __m128i *)sub_event_mask);
__m128i v_flow_mask = _mm_loadu_si128(
(const __m128i *)flow_mask);
- __m128i v_sub = _mm_srli_epi32(v_qe_meta, 8);
+ __m128i v_sub = _mm_srli_epi32(v_qe_meta, 4);
v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
- __m128i v_type = _mm_and_si128(v_qe_meta, v_event_mask);
+ __m128i v_type = _mm_srli_epi32(v_qe_meta, 12);
+ v_type = _mm_and_si128(v_type, v_event_mask);
v_type = _mm_slli_epi32(v_type, 8);
v_types_done = _mm_or_si128(v_type, v_sub);
v_types_done = _mm_slli_epi32(v_types_done, 20);
@@ -3814,12 +3912,14 @@ _process_deq_qes_vec_impl(struct dlb2_port *qm_port,
case 4:
v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
+ v_ev_3 = _mm_insert_epi8(v_ev_3, qm_port->reorder_id + 3, 7);
_mm_storeu_si128((__m128i *)&events[3], v_ev_3);
DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched3],
1);
/* fallthrough */
case 3:
v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
+ v_ev_2 = _mm_insert_epi8(v_ev_2, qm_port->reorder_id + 2, 7);
_mm_storeu_si128((__m128i *)&events[2], v_ev_2);
DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched2],
1);
@@ -3827,16 +3927,19 @@ _process_deq_qes_vec_impl(struct dlb2_port *qm_port,
case 2:
v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
+ v_ev_1 = _mm_insert_epi8(v_ev_1, qm_port->reorder_id + 1, 7);
_mm_storeu_si128((__m128i *)&events[1], v_ev_1);
DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched1],
1);
/* fallthrough */
case 1:
v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
+ v_ev_0 = _mm_insert_epi8(v_ev_0, qm_port->reorder_id, 7);
_mm_storeu_si128((__m128i *)&events[0], v_ev_0);
DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched0],
1);
}
+ qm_port->reorder_id += valid_events;
}
static __rte_always_inline int
@@ -4171,6 +4274,7 @@ dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
struct dlb2_eventdev_port *ev_port = event_port;
struct dlb2_port *qm_port = &ev_port->qm_port;
struct dlb2_eventdev *dlb2 = ev_port->dlb2;
+ struct dlb2_reorder *order = qm_port->order;
uint16_t cnt;
RTE_ASSERT(ev_port->setup_done);
@@ -4178,8 +4282,21 @@ dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
uint16_t out_rels = ev_port->outstanding_releases;
-
- dlb2_event_release(dlb2, ev_port->id, out_rels);
+ if (qm_port->reorder_en) {
+ /* for directed, no-op command-byte = 0, but set dsi field */
+ /* for load-balanced, set COMP */
+ uint64_t release_u64 =
+ qm_port->is_directed ? 0xFF : (uint64_t)DLB2_COMP_CMD_BYTE << 56;
+
+ for (uint8_t i = order->next_to_enqueue; i != qm_port->reorder_id; i++)
+ if (order->enq_reorder[i].u64[1] == 0)
+ order->enq_reorder[i].u64[1] = release_u64;
+
+ __dlb2_event_enqueue_burst_reorder(event_port, NULL, 0,
+ qm_port->token_pop_mode == DELAYED_POP);
+ } else {
+ dlb2_event_release(dlb2, ev_port->id, out_rels);
+ }
DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
}
@@ -4208,6 +4325,7 @@ dlb2_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
struct dlb2_eventdev_port *ev_port = event_port;
struct dlb2_port *qm_port = &ev_port->qm_port;
struct dlb2_eventdev *dlb2 = ev_port->dlb2;
+ struct dlb2_reorder *order = qm_port->order;
uint16_t cnt;
RTE_ASSERT(ev_port->setup_done);
@@ -4215,9 +4333,35 @@ dlb2_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
uint16_t out_rels = ev_port->outstanding_releases;
+ if (qm_port->reorder_en) {
+ struct rte_event release_burst[8];
+ int num_releases = 0;
+
+ /* go through reorder buffer looking for missing releases. */
+ for (uint8_t i = order->next_to_enqueue; i != qm_port->reorder_id; i++) {
+ if (order->enq_reorder[i].u64[1] == 0) {
+ release_burst[num_releases++] = (struct rte_event){
+ .op = RTE_EVENT_OP_RELEASE,
+ .impl_opaque = i,
+ };
+
+ if (num_releases == RTE_DIM(release_burst)) {
+ __dlb2_event_enqueue_burst_reorder(event_port,
+ release_burst, RTE_DIM(release_burst),
+ qm_port->token_pop_mode == DELAYED_POP);
+ num_releases = 0;
+ }
+ }
+ }
- dlb2_event_release(dlb2, ev_port->id, out_rels);
+ if (num_releases)
+ __dlb2_event_enqueue_burst_reorder(event_port, release_burst
+ , num_releases, qm_port->token_pop_mode == DELAYED_POP);
+ } else {
+ dlb2_event_release(dlb2, ev_port->id, out_rels);
+ }
+ RTE_ASSERT(ev_port->outstanding_releases == 0);
DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
}
@@ -4242,6 +4386,8 @@ static void
dlb2_flush_port(struct rte_eventdev *dev, int port_id)
{
struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
+ struct dlb2_eventdev_port *ev_port = &dlb2->ev_ports[port_id];
+ struct dlb2_reorder *order = ev_port->qm_port.order;
eventdev_stop_flush_t flush;
struct rte_event ev;
uint8_t dev_id;
@@ -4267,8 +4413,10 @@ dlb2_flush_port(struct rte_eventdev *dev, int port_id)
/* Enqueue any additional outstanding releases */
ev.op = RTE_EVENT_OP_RELEASE;
- for (i = dlb2->ev_ports[port_id].outstanding_releases; i > 0; i--)
+ for (i = dlb2->ev_ports[port_id].outstanding_releases; i > 0; i--) {
+ ev.impl_opaque = order ? order->next_to_enqueue : 0;
rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
+ }
}
static uint32_t
@@ -4939,6 +5087,8 @@ dlb2_parse_params(const char *params,
rte_kvargs_free(kvlist);
return ret;
}
+ if (version == DLB2_HW_V2 && dlb2_args->enable_cq_weight)
+ DLB2_LOG_INFO("Ignoring 'enable_cq_weight=y'. Only supported for 2.5 HW onwards");
rte_kvargs_free(kvlist);
}
@@ -151,20 +151,20 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
*/
#define DLB2_QE_EV_TYPE_WORD 0
sse_qe[0] = _mm_insert_epi16(sse_qe[0],
- ev[0].sub_event_type << 8 |
- ev[0].event_type,
+ ev[0].sub_event_type << 4 |
+ ev[0].event_type << 12,
DLB2_QE_EV_TYPE_WORD);
sse_qe[0] = _mm_insert_epi16(sse_qe[0],
- ev[1].sub_event_type << 8 |
- ev[1].event_type,
+ ev[1].sub_event_type << 4 |
+ ev[1].event_type << 12,
DLB2_QE_EV_TYPE_WORD + 4);
sse_qe[1] = _mm_insert_epi16(sse_qe[1],
- ev[2].sub_event_type << 8 |
- ev[2].event_type,
+ ev[2].sub_event_type << 4 |
+ ev[2].event_type << 12,
DLB2_QE_EV_TYPE_WORD);
sse_qe[1] = _mm_insert_epi16(sse_qe[1],
- ev[3].sub_event_type << 8 |
- ev[3].event_type,
+ ev[3].sub_event_type << 4 |
+ ev[3].event_type << 12,
DLB2_QE_EV_TYPE_WORD + 4);
if (qm_port->use_avx512) {
@@ -238,11 +238,11 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
}
/* will only be set for DLB 2.5 + */
- if (qm_port->cq_weight) {
- qe[0].weight = ev[0].impl_opaque & 3;
- qe[1].weight = ev[1].impl_opaque & 3;
- qe[2].weight = ev[2].impl_opaque & 3;
- qe[3].weight = ev[3].impl_opaque & 3;
+ if (qm_port->dlb2->enable_cq_weight) {
+ qe[0].weight = RTE_PMD_DLB2_GET_QE_WEIGHT(&ev[0]);
+ qe[1].weight = RTE_PMD_DLB2_GET_QE_WEIGHT(&ev[1]);
+ qe[2].weight = RTE_PMD_DLB2_GET_QE_WEIGHT(&ev[2]);
+ qe[3].weight = RTE_PMD_DLB2_GET_QE_WEIGHT(&ev[3]);
}
break;
@@ -267,6 +267,7 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
}
qe[i].u.event_type.major = ev[i].event_type;
qe[i].u.event_type.sub = ev[i].sub_event_type;
+ qe[i].weight = RTE_PMD_DLB2_GET_QE_WEIGHT(&ev[i]);
}
break;
case 0:
@@ -32,4 +32,12 @@ dlb2_movdir64b(void *dest, void *src)
: "a" (dest), "d" (src));
}
+static inline void
+dlb2_movdir64b_single(void *pp_addr, void *qe4)
+{
+ asm volatile(".byte 0x66, 0x0f, 0x38, 0xf8, 0x02"
+ :
+ : "a" (pp_addr), "d" (qe4));
+}
+
#endif /* _DLB2_INLINE_FNS_H_ */
@@ -29,7 +29,8 @@
#define DLB2_SW_CREDIT_C_QUANTA_DEFAULT 256 /* Consumer */
#define DLB2_DEPTH_THRESH_DEFAULT 256
#define DLB2_MIN_CQ_DEPTH_OVERRIDE 32
-#define DLB2_MAX_CQ_DEPTH_OVERRIDE 128
+#define DLB2_MAX_CQ_DEPTH_OVERRIDE 1024
+#define DLB2_MAX_CQ_DEPTH_REORDER 128
#define DLB2_MIN_ENQ_DEPTH_OVERRIDE 32
#define DLB2_MAX_ENQ_DEPTH_OVERRIDE 1024
@@ -387,8 +388,23 @@ struct dlb2_port {
bool use_scalar; /* force usage of scalar code */
uint16_t hw_credit_quanta;
bool use_avx512;
- uint32_t cq_weight;
bool is_producer; /* True if port is of type producer */
+ uint8_t reorder_id; /* id used for reordering events coming back into the scheduler */
+ bool reorder_en;
+ struct dlb2_reorder *order; /* For ordering enqueues */
+};
+
+struct dlb2_reorder {
+ /* a reorder buffer for events coming back in different order from dequeue
+ * We use UINT8_MAX + 1 elements, but add on three no-ops to make movdirs easier at the end
+ */
+ union {
+ __m128i m128;
+ struct dlb2_enqueue_qe qe;
+ uint64_t u64[2];
+ } enq_reorder[UINT8_MAX + 4];
+ /* id of the next entry in the reorder enqueue ring to send in */
+ uint8_t next_to_enqueue;
};
/* Per-process per-port mmio and memory pointers */
@@ -642,10 +658,6 @@ struct dlb2_qid_depth_thresholds {
int val[DLB2_MAX_NUM_QUEUES_ALL];
};
-struct dlb2_cq_weight {
- int limit[DLB2_MAX_NUM_PORTS_ALL];
-};
-
struct dlb2_port_cos {
int cos_id[DLB2_MAX_NUM_PORTS_ALL];
};
@@ -667,7 +679,6 @@ struct dlb2_devargs {
bool vector_opts_enabled;
int max_cq_depth;
int max_enq_depth;
- struct dlb2_cq_weight cq_weight;
struct dlb2_port_cos port_cos;
struct dlb2_cos_bw cos_bw;
const char *producer_coremask;
@@ -19,6 +19,30 @@ extern "C" {
#include <rte_compat.h>
+/**
+ * Macro function to get QID depth of rte_event metadata.
+ * Currently lower 2 bits of 'rsvd' field are used to store QID depth.
+ */
+#define RTE_PMD_DLB2_GET_QID_DEPTH(x) ((x)->rsvd & 0x3)
+
+/**
+ * Macro function to set QID depth of rte_event metadata.
+ * Currently lower 2 bits of 'rsvd' field are used to store QID depth.
+ */
+#define RTE_PMD_DLB2_SET_QID_DEPTH(x, v) ((x)->rsvd = ((x)->rsvd & ~0x3) | (v & 0x3))
+
+/**
+ * Macro function to get QE weight from rte_event metadata.
+ * Currently upper 2 bits of 'rsvd' field are used to store QE weight.
+ */
+#define RTE_PMD_DLB2_GET_QE_WEIGHT(x) (((x)->rsvd >> 2) & 0x3)
+
+/**
+ * Macro function to set QE weight from rte_event metadata.
+ * Currently upper 2 bits of 'rsvd' field are used to store QE weight.
+ */
+#define RTE_PMD_DLB2_SET_QE_WEIGHT(x, v) ((x)->rsvd = ((x)->rsvd & 0x3) | ((v & 0x3) << 2))
+
/**
* @warning
* @b EXPERIMENTAL: this API may change, or be removed, without prior notice