@@ -34,7 +34,7 @@ static int evtim_buffer_logtype;
static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];
-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;
+static const struct rte_event_timer_adapter_ops swtim_ops;
#define EVTIM_LOG(level, logtype, ...) \
rte_log(RTE_LOG_ ## level, logtype, \
@@ -211,7 +211,7 @@ rte_event_timer_adapter_create_ext(
* implementation.
*/
if (adapter->ops == NULL)
- adapter->ops = &sw_event_adapter_timer_ops;
+ adapter->ops = &swtim_ops;
/* Allow driver to do some setup */
FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);
@@ -340,7 +340,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)
* implementation.
*/
if (adapter->ops == NULL)
- adapter->ops = &sw_event_adapter_timer_ops;
+ adapter->ops = &swtim_ops;
/* Set fast-path function pointers */
adapter->arm_burst = adapter->ops->arm_burst;
@@ -426,10 +426,11 @@ rte_event_timer_adapter_stats_reset(struct rte_event_timer_adapter *adapter)
#define EVENT_BUFFER_SZ 4096
#define EVENT_BUFFER_BATCHSZ 32
#define EVENT_BUFFER_MASK (EVENT_BUFFER_SZ - 1)
+#define EXP_TIM_BUFFER_SZ 128
struct event_buffer {
- uint16_t head;
- uint16_t tail;
+ size_t head;
+ size_t tail;
struct rte_event events[EVENT_BUFFER_SZ];
} __rte_cache_aligned;
@@ -455,7 +456,7 @@ event_buffer_init(struct event_buffer *bufp)
static int
event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
{
- uint16_t head_idx;
+ size_t head_idx;
struct rte_event *buf_eventp;
if (event_buffer_full(bufp))
@@ -477,13 +478,16 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
uint16_t *nb_events_flushed,
uint16_t *nb_events_inv)
{
- uint16_t head_idx, tail_idx, n = 0;
struct rte_event *events = bufp->events;
+ size_t head_idx, tail_idx;
+ uint16_t n = 0;
/* Instead of modulus, bitwise AND with mask to get index. */
head_idx = bufp->head & EVENT_BUFFER_MASK;
tail_idx = bufp->tail & EVENT_BUFFER_MASK;
+ RTE_ASSERT(head_idx < EVENT_BUFFER_SZ && tail_idx < EVENT_BUFFER_SZ);
+
/* Determine the largest contigous run we can attempt to enqueue to the
* event device.
*/
@@ -491,150 +495,166 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
n = head_idx - tail_idx;
else if (head_idx < tail_idx)
n = EVENT_BUFFER_SZ - tail_idx;
+ else if (event_buffer_full(bufp))
+ n = EVENT_BUFFER_SZ - tail_idx;
else {
*nb_events_flushed = 0;
return;
}
+ n = RTE_MIN(EVENT_BUFFER_BATCHSZ, n);
*nb_events_inv = 0;
+
*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
&events[tail_idx], n);
- if (*nb_events_flushed != n && rte_errno == -EINVAL) {
- EVTIM_LOG_ERR("failed to enqueue invalid event - dropping it");
- (*nb_events_inv)++;
+ if (*nb_events_flushed != n) {
+ if (rte_errno == -EINVAL) {
+ EVTIM_LOG_ERR("failed to enqueue invalid event - "
+ "dropping it");
+ (*nb_events_inv)++;
+ } else if (rte_errno == -ENOSPC)
+ rte_pause();
}
+ if (*nb_events_flushed > 0)
+ EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event "
+ "device", *nb_events_flushed);
+
bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv;
}
/*
* Software event timer adapter implementation
*/
-
-struct rte_event_timer_adapter_sw_data {
- /* List of messages for outstanding timers */
- TAILQ_HEAD(, msg) msgs_tailq_head;
- /* Lock to guard tailq and armed count */
- rte_spinlock_t msgs_tailq_sl;
+struct swtim {
/* Identifier of service executing timer management logic. */
uint32_t service_id;
/* The cycle count at which the adapter should next tick */
uint64_t next_tick_cycles;
- /* Incremented as the service moves through phases of an iteration */
- volatile int service_phase;
/* The tick resolution used by adapter instance. May have been
* adjusted from what user requested
*/
uint64_t timer_tick_ns;
/* Maximum timeout in nanoseconds allowed by adapter instance. */
uint64_t max_tmo_ns;
- /* Ring containing messages to arm or cancel event timers */
- struct rte_ring *msg_ring;
- /* Mempool containing msg objects */
- struct rte_mempool *msg_pool;
/* Buffered timer expiry events to be enqueued to an event device. */
struct event_buffer buffer;
/* Statistics */
struct rte_event_timer_adapter_stats stats;
- /* The number of threads currently adding to the message ring */
- rte_atomic16_t message_producer_count;
+ /* Mempool of timer objects */
+ struct rte_mempool *tim_pool;
+ /* Back pointer for convenience */
+ struct rte_event_timer_adapter *adapter;
+ /* Identifier of timer data instance */
+ uint32_t timer_data_id;
+ /* Track which cores have actually armed a timer */
+ struct {
+ rte_atomic16_t v;
+ } __rte_cache_aligned in_use[RTE_MAX_LCORE];
+ /* Track which cores' timer lists should be polled */
+ unsigned int poll_lcores[RTE_MAX_LCORE];
+ /* The number of lists that should be polled */
+ int n_poll_lcores;
+ /* Lock to atomically access the above two variables */
+ rte_spinlock_t poll_lcores_sl;
+
+ struct rte_timer *expired_timers[EXP_TIM_BUFFER_SZ];
+ size_t expired_timers_idx;
};
-enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
-
-struct msg {
- enum msg_type type;
- struct rte_event_timer *evtim;
- struct rte_timer tim;
- TAILQ_ENTRY(msg) msgs;
-};
+static inline struct swtim *
+swtim_pmd_priv(const struct rte_event_timer_adapter *adapter)
+{
+ return adapter->data->adapter_priv;
+}
static void
-sw_event_timer_cb(struct rte_timer *tim, void *arg)
+swtim_callback(struct rte_timer *tim)
{
- int ret;
+ struct rte_event_timer *evtim = tim->arg;
+ struct rte_event_timer_adapter *adapter;
+ unsigned int lcore = rte_lcore_id();
+ struct swtim *sw;
uint16_t nb_evs_flushed = 0;
uint16_t nb_evs_invalid = 0;
uint64_t opaque;
- struct rte_event_timer *evtim;
- struct rte_event_timer_adapter *adapter;
- struct rte_event_timer_adapter_sw_data *sw_data;
+ int ret;
- evtim = arg;
opaque = evtim->impl_opaque[1];
adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque;
- sw_data = adapter->data->adapter_priv;
+ sw = swtim_pmd_priv(adapter);
- ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+ ret = event_buffer_add(&sw->buffer, &evtim->ev);
if (ret < 0) {
/* If event buffer is full, put timer back in list with
* immediate expiry value, so that we process it again on the
* next iteration.
*/
- rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(),
- sw_event_timer_cb, evtim);
+ ret = rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE,
+ lcore, NULL, evtim);
+ if (ret < 0) {
+ EVTIM_LOG_DBG("event buffer full, failed to reset "
+ "timer with immediate expiry value");
+ } else {
+ sw->stats.evtim_retry_count++;
+ EVTIM_LOG_DBG("event buffer full, resetting rte_timer "
+ "with immediate expiry value");
+ }
- sw_data->stats.evtim_retry_count++;
- EVTIM_LOG_DBG("event buffer full, resetting rte_timer with "
- "immediate expiry value");
+ if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore].v)))
+ sw->poll_lcores[sw->n_poll_lcores++] = lcore;
} else {
- struct msg *m = container_of(tim, struct msg, tim);
- TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);
EVTIM_BUF_LOG_DBG("buffered an event timer expiry event");
- evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
- /* Free the msg object containing the rte_timer now that
- * we've buffered its event successfully.
+ /* Empty the buffer here, if necessary, to free older expired
+ * timers only
*/
- rte_mempool_put(sw_data->msg_pool, m);
+ if (unlikely(sw->expired_timers_idx == EXP_TIM_BUFFER_SZ)) {
+ rte_mempool_put_bulk(sw->tim_pool,
+ (void **)sw->expired_timers,
+ sw->expired_timers_idx);
+ sw->expired_timers_idx = 0;
+ }
- /* Bump the count when we successfully add an expiry event to
- * the buffer.
- */
- sw_data->stats.evtim_exp_count++;
+ sw->expired_timers[sw->expired_timers_idx++] = tim;
+ sw->stats.evtim_exp_count++;
+
+ evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
}
- if (event_buffer_batch_ready(&sw_data->buffer)) {
- event_buffer_flush(&sw_data->buffer,
+ if (event_buffer_batch_ready(&sw->buffer)) {
+ event_buffer_flush(&sw->buffer,
adapter->data->event_dev_id,
adapter->data->event_port_id,
&nb_evs_flushed,
&nb_evs_invalid);
- sw_data->stats.ev_enq_count += nb_evs_flushed;
- sw_data->stats.ev_inv_count += nb_evs_invalid;
+ sw->stats.ev_enq_count += nb_evs_flushed;
+ sw->stats.ev_inv_count += nb_evs_invalid;
}
}
static __rte_always_inline uint64_t
get_timeout_cycles(struct rte_event_timer *evtim,
- struct rte_event_timer_adapter *adapter)
+ const struct rte_event_timer_adapter *adapter)
{
- uint64_t timeout_ns;
- struct rte_event_timer_adapter_sw_data *sw_data;
-
- sw_data = adapter->data->adapter_priv;
- timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns;
+ struct swtim *sw = swtim_pmd_priv(adapter);
+ uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns;
return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
-
}
/* This function returns true if one or more (adapter) ticks have occurred since
* the last time it was called.
*/
static inline bool
-adapter_did_tick(struct rte_event_timer_adapter *adapter)
+swtim_did_tick(struct swtim *sw)
{
uint64_t cycles_per_adapter_tick, start_cycles;
uint64_t *next_tick_cyclesp;
- struct rte_event_timer_adapter_sw_data *sw_data;
- sw_data = adapter->data->adapter_priv;
- next_tick_cyclesp = &sw_data->next_tick_cycles;
-
- cycles_per_adapter_tick = sw_data->timer_tick_ns *
+ next_tick_cyclesp = &sw->next_tick_cycles;
+ cycles_per_adapter_tick = sw->timer_tick_ns *
(rte_get_timer_hz() / NSECPERSEC);
-
start_cycles = rte_get_timer_cycles();
/* Note: initially, *next_tick_cyclesp == 0, so the clause below will
@@ -646,7 +666,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter)
* boundary.
*/
start_cycles -= start_cycles % cycles_per_adapter_tick;
-
*next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;
return true;
@@ -661,15 +680,12 @@ check_timeout(struct rte_event_timer *evtim,
const struct rte_event_timer_adapter *adapter)
{
uint64_t tmo_nsec;
- struct rte_event_timer_adapter_sw_data *sw_data;
-
- sw_data = adapter->data->adapter_priv;
- tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns;
+ struct swtim *sw = swtim_pmd_priv(adapter);
- if (tmo_nsec > sw_data->max_tmo_ns)
+ tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns;
+ if (tmo_nsec > sw->max_tmo_ns)
return -1;
-
- if (tmo_nsec < sw_data->timer_tick_ns)
+ if (tmo_nsec < sw->timer_tick_ns)
return -2;
return 0;
@@ -697,110 +713,41 @@ check_destination_event_queue(struct rte_event_timer *evtim,
return 0;
}
-#define NB_OBJS 32
static int
-sw_event_timer_adapter_service_func(void *arg)
+swtim_service_func(void *arg)
{
- int i, num_msgs;
- uint64_t cycles, opaque;
+ struct rte_event_timer_adapter *adapter = arg;
+ struct swtim *sw = swtim_pmd_priv(adapter);
uint16_t nb_evs_flushed = 0;
uint16_t nb_evs_invalid = 0;
- struct rte_event_timer_adapter *adapter;
- struct rte_event_timer_adapter_sw_data *sw_data;
- struct rte_event_timer *evtim = NULL;
- struct rte_timer *tim = NULL;
- struct msg *msg, *msgs[NB_OBJS];
-
- adapter = arg;
- sw_data = adapter->data->adapter_priv;
-
- sw_data->service_phase = 1;
- rte_smp_wmb();
-
- while (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||
- !rte_ring_empty(sw_data->msg_ring)) {
-
- num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
- (void **)msgs, NB_OBJS, NULL);
-
- for (i = 0; i < num_msgs; i++) {
- int ret = 0;
-
- RTE_SET_USED(ret);
-
- msg = msgs[i];
- evtim = msg->evtim;
-
- switch (msg->type) {
- case MSG_TYPE_ARM:
- EVTIM_SVC_LOG_DBG("dequeued ARM message from "
- "ring");
- tim = &msg->tim;
- rte_timer_init(tim);
- cycles = get_timeout_cycles(evtim,
- adapter);
- ret = rte_timer_reset(tim, cycles, SINGLE,
- rte_lcore_id(),
- sw_event_timer_cb,
- evtim);
- RTE_ASSERT(ret == 0);
-
- evtim->impl_opaque[0] = (uintptr_t)tim;
- evtim->impl_opaque[1] = (uintptr_t)adapter;
-
- TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,
- msg,
- msgs);
- break;
- case MSG_TYPE_CANCEL:
- EVTIM_SVC_LOG_DBG("dequeued CANCEL message "
- "from ring");
- opaque = evtim->impl_opaque[0];
- tim = (struct rte_timer *)(uintptr_t)opaque;
- RTE_ASSERT(tim != NULL);
-
- ret = rte_timer_stop(tim);
- RTE_ASSERT(ret == 0);
-
- /* Free the msg object for the original arm
- * request.
- */
- struct msg *m;
- m = container_of(tim, struct msg, tim);
- TAILQ_REMOVE(&sw_data->msgs_tailq_head, m,
- msgs);
- rte_mempool_put(sw_data->msg_pool, m);
-
- /* Free the msg object for the current msg */
- rte_mempool_put(sw_data->msg_pool, msg);
-
- evtim->impl_opaque[0] = 0;
- evtim->impl_opaque[1] = 0;
-
- break;
- }
- }
- }
- sw_data->service_phase = 2;
- rte_smp_wmb();
+ if (swtim_did_tick(sw)) {
+ /* This lock is seldom acquired on the arm side */
+ rte_spinlock_lock(&sw->poll_lcores_sl);
+
+ rte_timer_alt_manage(sw->timer_data_id,
+ sw->poll_lcores,
+ sw->n_poll_lcores,
+ swtim_callback);
- if (adapter_did_tick(adapter)) {
- rte_timer_manage();
+ rte_spinlock_unlock(&sw->poll_lcores_sl);
- event_buffer_flush(&sw_data->buffer,
+ /* Return expired timer objects back to mempool */
+ rte_mempool_put_bulk(sw->tim_pool, (void **)sw->expired_timers,
+ sw->expired_timers_idx);
+ sw->expired_timers_idx = 0;
+
+ event_buffer_flush(&sw->buffer,
adapter->data->event_dev_id,
adapter->data->event_port_id,
- &nb_evs_flushed, &nb_evs_invalid);
+ &nb_evs_flushed,
+ &nb_evs_invalid);
- sw_data->stats.ev_enq_count += nb_evs_flushed;
- sw_data->stats.ev_inv_count += nb_evs_invalid;
- sw_data->stats.adapter_tick_count++;
+ sw->stats.ev_enq_count += nb_evs_flushed;
+ sw->stats.ev_inv_count += nb_evs_invalid;
+ sw->stats.adapter_tick_count++;
}
- sw_data->service_phase = 0;
- rte_smp_wmb();
-
return 0;
}
@@ -820,7 +767,7 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
int size;
int cache_size = 0;
- for (i = 0; ; i++) {
+ for (i = 0;; i++) {
size = 1 << i;
if (RTE_MAX_LCORE * size < (int)(nb_actual - nb_requested) &&
@@ -834,168 +781,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
return cache_size;
}
-#define SW_MIN_INTERVAL 1E5
-
static int
-sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
+swtim_init(struct rte_event_timer_adapter *adapter)
{
- int ret;
- struct rte_event_timer_adapter_sw_data *sw_data;
- uint64_t nb_timers;
+ int i, ret;
+ struct swtim *sw;
unsigned int flags;
struct rte_service_spec service;
- static bool timer_subsystem_inited; // static initialized to false
- /* Allocate storage for SW implementation data */
- char priv_data_name[RTE_RING_NAMESIZE];
- snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8,
- adapter->data->id);
- adapter->data->adapter_priv = rte_zmalloc_socket(
- priv_data_name,
- sizeof(struct rte_event_timer_adapter_sw_data),
- RTE_CACHE_LINE_SIZE,
- adapter->data->socket_id);
- if (adapter->data->adapter_priv == NULL) {
+ /* Allocate storage for private data area */
+#define SWTIM_NAMESIZE 32
+ char swtim_name[SWTIM_NAMESIZE];
+ snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8,
+ adapter->data->id);
+ sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE,
+ adapter->data->socket_id);
+ if (sw == NULL) {
EVTIM_LOG_ERR("failed to allocate space for private data");
rte_errno = ENOMEM;
return -1;
}
- if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) {
- EVTIM_LOG_ERR("failed to create adapter with requested tick "
- "interval");
- rte_errno = EINVAL;
- return -1;
- }
-
- sw_data = adapter->data->adapter_priv;
-
- sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns;
- sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns;
-
- TAILQ_INIT(&sw_data->msgs_tailq_head);
- rte_spinlock_init(&sw_data->msgs_tailq_sl);
- rte_atomic16_init(&sw_data->message_producer_count);
+ /* Connect storage to adapter instance */
+ adapter->data->adapter_priv = sw;
+ sw->adapter = adapter;
- /* Rings require power of 2, so round up to next such value */
- nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
+ sw->timer_tick_ns = adapter->data->conf.timer_tick_ns;
+ sw->max_tmo_ns = adapter->data->conf.max_tmo_ns;
- char msg_ring_name[RTE_RING_NAMESIZE];
- snprintf(msg_ring_name, RTE_RING_NAMESIZE,
- "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id);
- flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
- RING_F_SP_ENQ | RING_F_SC_DEQ :
- RING_F_SC_DEQ;
- sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,
- adapter->data->socket_id, flags);
- if (sw_data->msg_ring == NULL) {
- EVTIM_LOG_ERR("failed to create message ring");
- rte_errno = ENOMEM;
- goto free_priv_data;
- }
-
- char pool_name[RTE_RING_NAMESIZE];
- snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8,
+ /* Create a timer pool */
+ char pool_name[SWTIM_NAMESIZE];
+ snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8,
adapter->data->id);
-
- /* Both the arming/canceling thread and the service thread will do puts
- * to the mempool, but if the SP_PUT flag is enabled, we can specify
- * single-consumer get for the mempool.
- */
- flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
- MEMPOOL_F_SC_GET : 0;
-
- /* The usable size of a ring is count - 1, so subtract one here to
- * make the counts agree.
- */
+ /* Optimal mempool size is a power of 2 minus one */
+ uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
int pool_size = nb_timers - 1;
int cache_size = compute_msg_mempool_cache_size(
adapter->data->conf.nb_timers, nb_timers);
- sw_data->msg_pool = rte_mempool_create(pool_name, pool_size,
- sizeof(struct msg), cache_size,
- 0, NULL, NULL, NULL, NULL,
- adapter->data->socket_id, flags);
- if (sw_data->msg_pool == NULL) {
- EVTIM_LOG_ERR("failed to create message object mempool");
+ flags = 0; /* pool is multi-producer, multi-consumer */
+ sw->tim_pool = rte_mempool_create(pool_name, pool_size,
+ sizeof(struct rte_timer), cache_size, 0, NULL, NULL,
+ NULL, NULL, adapter->data->socket_id, flags);
+ if (sw->tim_pool == NULL) {
+ EVTIM_LOG_ERR("failed to create timer object mempool");
rte_errno = ENOMEM;
- goto free_msg_ring;
+ goto free_alloc;
}
- event_buffer_init(&sw_data->buffer);
+ /* Initialize the variables that track in-use timer lists */
+ rte_spinlock_init(&sw->poll_lcores_sl);
+ for (i = 0; i < RTE_MAX_LCORE; i++)
+ rte_atomic16_init(&sw->in_use[i].v);
+
+ /* Initialize the timer subsystem and allocate timer data instance */
+ ret = rte_timer_subsystem_init();
+ if (ret < 0) {
+ if (ret != -EALREADY) {
+ EVTIM_LOG_ERR("failed to initialize timer subsystem");
+ rte_errno = ret;
+ goto free_mempool;
+ }
+ }
+
+ ret = rte_timer_data_alloc(&sw->timer_data_id);
+ if (ret < 0) {
+ EVTIM_LOG_ERR("failed to allocate timer data instance");
+ rte_errno = ret;
+ goto free_mempool;
+ }
+
+ /* Initialize timer event buffer */
+ event_buffer_init(&sw->buffer);
+
+ sw->adapter = adapter;
/* Register a service component to run adapter logic */
memset(&service, 0, sizeof(service));
snprintf(service.name, RTE_SERVICE_NAME_MAX,
- "sw_evimer_adap_svc_%"PRIu8, adapter->data->id);
+ "swtim_svc_%"PRIu8, adapter->data->id);
service.socket_id = adapter->data->socket_id;
- service.callback = sw_event_timer_adapter_service_func;
+ service.callback = swtim_service_func;
service.callback_userdata = adapter;
service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);
- ret = rte_service_component_register(&service, &sw_data->service_id);
+ ret = rte_service_component_register(&service, &sw->service_id);
if (ret < 0) {
EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32
- ": err = %d", service.name, sw_data->service_id,
+ ": err = %d", service.name, sw->service_id,
ret);
rte_errno = ENOSPC;
- goto free_msg_pool;
+ goto free_mempool;
}
EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name,
- sw_data->service_id);
+ sw->service_id);
- adapter->data->service_id = sw_data->service_id;
+ adapter->data->service_id = sw->service_id;
adapter->data->service_inited = 1;
- if (!timer_subsystem_inited) {
- rte_timer_subsystem_init();
- timer_subsystem_inited = true;
- }
-
return 0;
-
-free_msg_pool:
- rte_mempool_free(sw_data->msg_pool);
-free_msg_ring:
- rte_ring_free(sw_data->msg_ring);
-free_priv_data:
- rte_free(sw_data);
+free_mempool:
+ rte_mempool_free(sw->tim_pool);
+free_alloc:
+ rte_free(sw);
return -1;
}
-static int
-sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)
+static void
+swtim_free_tim(struct rte_timer *tim, void *arg)
{
- int ret;
- struct msg *m1, *m2;
- struct rte_event_timer_adapter_sw_data *sw_data =
- adapter->data->adapter_priv;
-
- rte_spinlock_lock(&sw_data->msgs_tailq_sl);
+ struct swtim *sw = arg;
- /* Cancel outstanding rte_timers and free msg objects */
- m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);
- while (m1 != NULL) {
- EVTIM_LOG_DBG("freeing outstanding timer");
- m2 = TAILQ_NEXT(m1, msgs);
-
- rte_timer_stop_sync(&m1->tim);
- rte_mempool_put(sw_data->msg_pool, m1);
+ rte_mempool_put(sw->tim_pool, tim);
+}
- m1 = m2;
- }
+/* Traverse the list of outstanding timers and put them back in the mempool
+ * before freeing the adapter to avoid leaking the memory.
+ */
+static int
+swtim_uninit(struct rte_event_timer_adapter *adapter)
+{
+ int ret;
+ struct swtim *sw = swtim_pmd_priv(adapter);
- rte_spinlock_unlock(&sw_data->msgs_tailq_sl);
+ /* Free outstanding timers */
+ rte_timer_stop_all(sw->timer_data_id,
+ sw->poll_lcores,
+ sw->n_poll_lcores,
+ swtim_free_tim,
+ sw);
- ret = rte_service_component_unregister(sw_data->service_id);
+ ret = rte_service_component_unregister(sw->service_id);
if (ret < 0) {
EVTIM_LOG_ERR("failed to unregister service component");
return ret;
}
- rte_ring_free(sw_data->msg_ring);
- rte_mempool_free(sw_data->msg_pool);
- rte_free(adapter->data->adapter_priv);
+ rte_mempool_free(sw->tim_pool);
+ rte_free(sw);
+ adapter->data->adapter_priv = NULL;
return 0;
}
@@ -1016,88 +940,79 @@ get_mapped_count_for_service(uint32_t service_id)
}
static int
-sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)
+swtim_start(const struct rte_event_timer_adapter *adapter)
{
int mapped_count;
- struct rte_event_timer_adapter_sw_data *sw_data;
-
- sw_data = adapter->data->adapter_priv;
+ struct swtim *sw = swtim_pmd_priv(adapter);
/* Mapping the service to more than one service core can introduce
* delays while one thread is waiting to acquire a lock, so only allow
* one core to be mapped to the service.
+ *
+ * Note: the service could be modified such that it spreads cores to
+ * poll over multiple service instances.
*/
- mapped_count = get_mapped_count_for_service(sw_data->service_id);
+ mapped_count = get_mapped_count_for_service(sw->service_id);
- if (mapped_count == 1)
- return rte_service_component_runstate_set(sw_data->service_id,
- 1);
+ if (mapped_count != 1)
+ return mapped_count < 1 ? -ENOENT : -ENOTSUP;
- return mapped_count < 1 ? -ENOENT : -ENOTSUP;
+ return rte_service_component_runstate_set(sw->service_id, 1);
}
static int
-sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)
+swtim_stop(const struct rte_event_timer_adapter *adapter)
{
int ret;
- struct rte_event_timer_adapter_sw_data *sw_data =
- adapter->data->adapter_priv;
+ struct swtim *sw = swtim_pmd_priv(adapter);
- ret = rte_service_component_runstate_set(sw_data->service_id, 0);
+ ret = rte_service_component_runstate_set(sw->service_id, 0);
if (ret < 0)
return ret;
- /* Wait for the service to complete its final iteration before
- * stopping.
- */
- while (sw_data->service_phase != 0)
+ /* Wait for the service to complete its final iteration */
+ while (rte_service_may_be_active(sw->service_id))
rte_pause();
- rte_smp_rmb();
-
return 0;
}
static void
-sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,
+swtim_get_info(const struct rte_event_timer_adapter *adapter,
struct rte_event_timer_adapter_info *adapter_info)
{
- struct rte_event_timer_adapter_sw_data *sw_data;
- sw_data = adapter->data->adapter_priv;
-
- adapter_info->min_resolution_ns = sw_data->timer_tick_ns;
- adapter_info->max_tmo_ns = sw_data->max_tmo_ns;
+ struct swtim *sw = swtim_pmd_priv(adapter);
+ adapter_info->min_resolution_ns = sw->timer_tick_ns;
+ adapter_info->max_tmo_ns = sw->max_tmo_ns;
}
static int
-sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,
- struct rte_event_timer_adapter_stats *stats)
+swtim_stats_get(const struct rte_event_timer_adapter *adapter,
+ struct rte_event_timer_adapter_stats *stats)
{
- struct rte_event_timer_adapter_sw_data *sw_data;
- sw_data = adapter->data->adapter_priv;
- *stats = sw_data->stats;
+ struct swtim *sw = swtim_pmd_priv(adapter);
+ *stats = sw->stats; /* structure copy */
return 0;
}
static int
-sw_event_timer_adapter_stats_reset(
- const struct rte_event_timer_adapter *adapter)
+swtim_stats_reset(const struct rte_event_timer_adapter *adapter)
{
- struct rte_event_timer_adapter_sw_data *sw_data;
- sw_data = adapter->data->adapter_priv;
- memset(&sw_data->stats, 0, sizeof(sw_data->stats));
+ struct swtim *sw = swtim_pmd_priv(adapter);
+ memset(&sw->stats, 0, sizeof(sw->stats));
return 0;
}
-static __rte_always_inline uint16_t
-__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
- struct rte_event_timer **evtims,
- uint16_t nb_evtims)
+static uint16_t
+__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+ struct rte_event_timer **evtims,
+ uint16_t nb_evtims)
{
- uint16_t i;
- int ret;
- struct rte_event_timer_adapter_sw_data *sw_data;
- struct msg *msgs[nb_evtims];
+ int i, ret;
+ struct swtim *sw = swtim_pmd_priv(adapter);
+ uint32_t lcore_id = rte_lcore_id();
+ struct rte_timer *tim, *tims[nb_evtims];
+ uint64_t cycles;
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
/* Check that the service is running. */
@@ -1107,101 +1022,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
}
#endif
- sw_data = adapter->data->adapter_priv;
+ /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
+ * the highest lcore to insert such timers into
+ */
+ if (lcore_id == LCORE_ID_ANY)
+ lcore_id = RTE_MAX_LCORE - 1;
+
+ /* If this is the first time we're arming an event timer on this lcore,
+ * mark this lcore as "in use"; this will cause the service
+ * function to process the timer list that corresponds to this lcore.
+ */
+ if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {
+ rte_spinlock_lock(&sw->poll_lcores_sl);
+ EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
+ lcore_id);
+ sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
+ rte_spinlock_unlock(&sw->poll_lcores_sl);
+ }
- ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
+ ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
+ nb_evtims);
if (ret < 0) {
rte_errno = ENOSPC;
return 0;
}
- /* Let the service know we're producing messages for it to process */
- rte_atomic16_inc(&sw_data->message_producer_count);
-
- /* If the service is managing timers, wait for it to finish */
- while (sw_data->service_phase == 2)
- rte_pause();
-
- rte_smp_rmb();
-
for (i = 0; i < nb_evtims; i++) {
/* Don't modify the event timer state in these cases */
if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
rte_errno = EALREADY;
break;
} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
- evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+ evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
rte_errno = EINVAL;
break;
}
ret = check_timeout(evtims[i], adapter);
- if (ret == -1) {
+ if (unlikely(ret == -1)) {
evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
rte_errno = EINVAL;
break;
- }
- if (ret == -2) {
+ } else if (unlikely(ret == -2)) {
evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
rte_errno = EINVAL;
break;
}
- if (check_destination_event_queue(evtims[i], adapter) < 0) {
+ if (unlikely(check_destination_event_queue(evtims[i],
+ adapter) < 0)) {
evtims[i]->state = RTE_EVENT_TIMER_ERROR;
rte_errno = EINVAL;
break;
}
- /* Checks passed, set up a message to enqueue */
- msgs[i]->type = MSG_TYPE_ARM;
- msgs[i]->evtim = evtims[i];
+ tim = tims[i];
+ rte_timer_init(tim);
- /* Set the payload pointer if not set. */
- if (evtims[i]->ev.event_ptr == NULL)
- evtims[i]->ev.event_ptr = evtims[i];
+ evtims[i]->impl_opaque[0] = (uintptr_t)tim;
+ evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
- /* msg objects that get enqueued successfully will be freed
- * either by a future cancel operation or by the timer
- * expiration callback.
- */
- if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
- rte_errno = ENOSPC;
+ cycles = get_timeout_cycles(evtims[i], adapter);
+ ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
+ SINGLE, lcore_id, NULL, evtims[i]);
+ if (ret < 0) {
+ /* tim was in RUNNING or CONFIG state */
+ evtims[i]->state = RTE_EVENT_TIMER_ERROR;
break;
}
- EVTIM_LOG_DBG("enqueued ARM message to ring");
-
+ rte_smp_wmb();
+ EVTIM_LOG_DBG("armed an event timer");
evtims[i]->state = RTE_EVENT_TIMER_ARMED;
}
- /* Let the service know we're done producing messages */
- rte_atomic16_dec(&sw_data->message_producer_count);
-
if (i < nb_evtims)
- rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
- nb_evtims - i);
+ rte_mempool_put_bulk(sw->tim_pool,
+ (void **)&tims[i], nb_evtims - i);
return i;
}
static uint16_t
-sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
- struct rte_event_timer **evtims,
- uint16_t nb_evtims)
+swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+ struct rte_event_timer **evtims,
+ uint16_t nb_evtims)
{
- return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+ return __swtim_arm_burst(adapter, evtims, nb_evtims);
}
static uint16_t
-sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
- struct rte_event_timer **evtims,
- uint16_t nb_evtims)
+swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
+ struct rte_event_timer **evtims,
+ uint16_t nb_evtims)
{
- uint16_t i;
- int ret;
- struct rte_event_timer_adapter_sw_data *sw_data;
- struct msg *msgs[nb_evtims];
+ int i, ret;
+ struct rte_timer *timp;
+ uint64_t opaque;
+ struct swtim *sw = swtim_pmd_priv(adapter);
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
/* Check that the service is running. */
@@ -1211,23 +1129,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
}
#endif
- sw_data = adapter->data->adapter_priv;
-
- ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
- if (ret < 0) {
- rte_errno = ENOSPC;
- return 0;
- }
-
- /* Let the service know we're producing messages for it to process */
- rte_atomic16_inc(&sw_data->message_producer_count);
-
- /* If the service could be modifying event timer states, wait */
- while (sw_data->service_phase == 2)
- rte_pause();
-
- rte_smp_rmb();
-
for (i = 0; i < nb_evtims; i++) {
/* Don't modify the event timer state in these cases */
if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
@@ -1238,54 +1139,56 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
break;
}
- msgs[i]->type = MSG_TYPE_CANCEL;
- msgs[i]->evtim = evtims[i];
+ rte_smp_rmb();
+
+ opaque = evtims[i]->impl_opaque[0];
+ timp = (struct rte_timer *)(uintptr_t)opaque;
+ RTE_ASSERT(timp != NULL);
- if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
- rte_errno = ENOSPC;
+ ret = rte_timer_alt_stop(sw->timer_data_id, timp);
+ if (ret < 0) {
+ /* Timer is running or being configured */
+ rte_errno = EAGAIN;
break;
}
- EVTIM_LOG_DBG("enqueued CANCEL message to ring");
+ rte_mempool_put(sw->tim_pool, (void **)timp);
evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
- }
+ evtims[i]->impl_opaque[0] = 0;
+ evtims[i]->impl_opaque[1] = 0;
- /* Let the service know we're done producing messages */
- rte_atomic16_dec(&sw_data->message_producer_count);
-
- if (i < nb_evtims)
- rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
- nb_evtims - i);
+ rte_smp_wmb();
+ }
return i;
}
static uint16_t
-sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
- struct rte_event_timer **evtims,
- uint64_t timeout_ticks,
- uint16_t nb_evtims)
+swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
+ struct rte_event_timer **evtims,
+ uint64_t timeout_ticks,
+ uint16_t nb_evtims)
{
int i;
for (i = 0; i < nb_evtims; i++)
evtims[i]->timeout_ticks = timeout_ticks;
- return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+ return __swtim_arm_burst(adapter, evtims, nb_evtims);
}
-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {
- .init = sw_event_timer_adapter_init,
- .uninit = sw_event_timer_adapter_uninit,
- .start = sw_event_timer_adapter_start,
- .stop = sw_event_timer_adapter_stop,
- .get_info = sw_event_timer_adapter_get_info,
- .stats_get = sw_event_timer_adapter_stats_get,
- .stats_reset = sw_event_timer_adapter_stats_reset,
- .arm_burst = sw_event_timer_arm_burst,
- .arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,
- .cancel_burst = sw_event_timer_cancel_burst,
+static const struct rte_event_timer_adapter_ops swtim_ops = {
+ .init = swtim_init,
+ .uninit = swtim_uninit,
+ .start = swtim_start,
+ .stop = swtim_stop,
+ .get_info = swtim_get_info,
+ .stats_get = swtim_stats_get,
+ .stats_reset = swtim_stats_reset,
+ .arm_burst = swtim_arm_burst,
+ .arm_tmo_tick_burst = swtim_arm_tmo_tick_burst,
+ .cancel_burst = swtim_cancel_burst,
};
RTE_INIT(event_timer_adapter_init_log)