@@ -23,15 +23,20 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
struct rte_event_ring *in_ring;
struct rte_ring *ctl_in_ring;
char ring_name[RTE_RING_NAMESIZE];
+ bool implicit_release;
port = &dsw->ports[port_id];
+ implicit_release =
+ !(conf->event_port_cfg & RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
+
*port = (struct dsw_port) {
.id = port_id,
.dsw = dsw,
.dequeue_depth = conf->dequeue_depth,
.enqueue_depth = conf->enqueue_depth,
- .new_event_threshold = conf->new_event_threshold
+ .new_event_threshold = conf->new_event_threshold,
+ .implicit_release = implicit_release
};
snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
@@ -221,6 +226,7 @@ dsw_info_get(struct rte_eventdev *dev __rte_unused,
.max_profiles_per_port = 1,
.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED|
+ RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE|
RTE_EVENT_DEV_CAP_NONSEQ_MODE|
RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT|
RTE_EVENT_DEV_CAP_CARRY_FLOW_ID
@@ -128,6 +128,7 @@ struct dsw_queue_flow {
enum dsw_migration_state {
DSW_MIGRATION_STATE_IDLE,
DSW_MIGRATION_STATE_PAUSING,
+ DSW_MIGRATION_STATE_FINISH_PENDING,
DSW_MIGRATION_STATE_UNPAUSING
};
@@ -148,6 +149,8 @@ struct dsw_port {
int32_t new_event_threshold;
+ bool implicit_release;
+
uint16_t pending_releases;
uint16_t next_parallel_flow_id;
@@ -1141,6 +1141,15 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
}
+static void
+dsw_port_try_finish_pending(struct dsw_evdev *dsw, struct dsw_port *source_port)
+{
+ if (unlikely(source_port->migration_state ==
+ DSW_MIGRATION_STATE_FINISH_PENDING &&
+ source_port->pending_releases == 0))
+ dsw_port_move_emigrating_flows(dsw, source_port);
+}
+
static void
dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
{
@@ -1149,14 +1158,15 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
if (port->cfm_cnt == (dsw->num_ports-1)) {
switch (port->migration_state) {
case DSW_MIGRATION_STATE_PAUSING:
- dsw_port_move_emigrating_flows(dsw, port);
+ port->migration_state =
+ DSW_MIGRATION_STATE_FINISH_PENDING;
break;
case DSW_MIGRATION_STATE_UNPAUSING:
dsw_port_end_emigration(dsw, port,
RTE_SCHED_TYPE_ATOMIC);
break;
default:
- RTE_ASSERT(0);
+ RTE_VERIFY(0);
break;
}
}
@@ -1195,19 +1205,18 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
static void
dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
{
- /* For simplicity (in the migration logic), avoid all
- * background processing in case event processing is in
- * progress.
- */
- if (port->pending_releases > 0)
- return;
-
/* Polling the control ring is relatively inexpensive, and
* polling it often helps bringing down migration latency, so
* do this for every iteration.
*/
dsw_port_ctl_process(dsw, port);
+ /* Always check if a migration is waiting for pending releases
+ * to arrive, to keep the time at which dequeuing new events
+ * from the port is disabled.
+ */
+ dsw_port_try_finish_pending(dsw, port);
+
/* To avoid considering migration and flushing output buffers
* on every dequeue/enqueue call, the scheduler only performs
* such 'background' tasks every nth
@@ -1252,8 +1261,8 @@ static __rte_always_inline uint16_t
dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
const struct rte_event events[],
uint16_t events_len, bool op_types_known,
- uint16_t num_new, uint16_t num_release,
- uint16_t num_non_release)
+ uint16_t num_new, uint16_t num_forward,
+ uint16_t num_release)
{
struct dsw_evdev *dsw = source_port->dsw;
bool enough_credits;
@@ -1287,14 +1296,14 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
if (!op_types_known)
for (i = 0; i < events_len; i++) {
switch (events[i].op) {
- case RTE_EVENT_OP_RELEASE:
- num_release++;
- break;
case RTE_EVENT_OP_NEW:
num_new++;
- /* Falls through. */
- default:
- num_non_release++;
+ break;
+ case RTE_EVENT_OP_FORWARD:
+ num_forward++;
+ break;
+ case RTE_EVENT_OP_RELEASE:
+ num_release++;
break;
}
}
@@ -1309,15 +1318,15 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
source_port->new_event_threshold))
return 0;
- enough_credits = dsw_port_acquire_credits(dsw, source_port,
- num_non_release);
+ enough_credits = dsw_port_acquire_credits(dsw, source_port, num_new);
if (unlikely(!enough_credits))
return 0;
- source_port->pending_releases -= num_release;
+ dsw_port_return_credits(dsw, source_port, num_release);
+
+ source_port->pending_releases -= (num_forward + num_release);
- dsw_port_enqueue_stats(source_port, num_new,
- num_non_release-num_new, num_release);
+ dsw_port_enqueue_stats(source_port, num_new, num_forward, num_release);
for (i = 0; i < events_len; i++) {
const struct rte_event *event = &events[i];
@@ -1329,9 +1338,9 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
}
DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
- "accepted.\n", num_non_release);
+ "accepted.\n", num_new + num_forward);
- return (num_non_release + num_release);
+ return (num_new + num_forward + num_release);
}
uint16_t
@@ -1358,7 +1367,7 @@ dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
return dsw_event_enqueue_burst_generic(source_port, events,
events_len, true, events_len,
- 0, events_len);
+ 0, 0);
}
uint16_t
@@ -1371,8 +1380,8 @@ dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
events_len = source_port->enqueue_depth;
return dsw_event_enqueue_burst_generic(source_port, events,
- events_len, true, 0, 0,
- events_len);
+ events_len, true, 0,
+ events_len, 0);
}
uint16_t
@@ -1484,21 +1493,34 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
struct dsw_evdev *dsw = source_port->dsw;
uint16_t dequeued;
- source_port->pending_releases = 0;
+ if (source_port->implicit_release) {
+ dsw_port_return_credits(dsw, port,
+ source_port->pending_releases);
+
+ source_port->pending_releases = 0;
+ }
dsw_port_bg_process(dsw, source_port);
if (unlikely(num > source_port->dequeue_depth))
num = source_port->dequeue_depth;
- dequeued = dsw_port_dequeue_burst(source_port, events, num);
+ if (unlikely(source_port->migration_state ==
+ DSW_MIGRATION_STATE_FINISH_PENDING))
+ /* Do not take on new work - only finish outstanding
+ * (unreleased) events, to allow the migration
+ * procedure to complete.
+ */
+ dequeued = 0;
+ else
+ dequeued = dsw_port_dequeue_burst(source_port, events, num);
if (unlikely(source_port->migration_state ==
DSW_MIGRATION_STATE_PAUSING))
dsw_port_stash_migrating_events(source_port, events,
&dequeued);
- source_port->pending_releases = dequeued;
+ source_port->pending_releases += dequeued;
dsw_port_load_record(source_port, dequeued);
@@ -1508,8 +1530,6 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
dequeued);
- dsw_port_return_credits(dsw, source_port, dequeued);
-
/* One potential optimization one might think of is to
* add a migration state (prior to 'pausing'), and
* only record seen events when the port is in this