[dpdk-dev,v6,13/23] eventtimer: add adapter service definition
Checks
Commit Message
Define the callback function for the service that corresponds to an
adapter instance, as well as the callback for expired timers that the
service manages.
Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
---
lib/librte_eventdev/rte_event_timer_adapter.c | 198 +++++++++++++++++++++++++-
lib/librte_eventdev/rte_event_timer_adapter.h | 2 +-
2 files changed, 198 insertions(+), 2 deletions(-)
Comments
On Wed, Jan 10, 2018 at 06:21:04PM -0600, Erik Gabriel Carrillo wrote:
> Define the callback function for the service that corresponds to an
> adapter instance, as well as the callback for expired timers that the
> service manages.
>
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
> ---
> lib/librte_eventdev/rte_event_timer_adapter.c | 198 +++++++++++++++++++++++++-
> lib/librte_eventdev/rte_event_timer_adapter.h | 2 +-
> 2 files changed, 198 insertions(+), 2 deletions(-)
>
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 38e52cb..0266ad5 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -40,8 +40,10 @@
> #include <rte_malloc.h>
> #include <rte_ring.h>
> #include <rte_mempool.h>
> +#include <rte_common.h>
> #include <rte_timer.h>
> #include <rte_service_component.h>
> +#include <rte_cycles.h>
>
> #include "rte_eventdev.h"
> #include "rte_eventdev_pmd.h"
> @@ -460,10 +462,198 @@ struct msg {
> struct rte_event_timer *evtim;
> };
<snip>
> + if (n != 1 && rte_errno == -ENOSPC) {
> + /* If we couldn't enqueue because the event port was
> + * backpressured, put the timer back in the skiplist with an
> + * immediate expiry value so we can process it again on the
> + * next iteration.
> + */
> + rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
> + sw_event_timer_cb, evtim);
> + } else {
> + sw_data->nb_armed_evtims--;
> + rte_wmb();
Any reason for using barrier here?. IMO smp_wmb() would be more than sufficient
or use atomics.
> + evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
> + rte_mempool_put(sw_data->tim_pool, (void **)&tim);
> + }
> +}
> +
> +static __rte_always_inline uint64_t
> +get_timeout_cycles(struct rte_event_timer *evtim,
> + struct rte_event_timer_adapter *adapter)
> +{
> + uint64_t timeout_ns;
> +
> + timeout_ns = evtim->timeout_ticks * adapter->data->conf.timer_tick_ns;
> +#define NSECPERSEC 1E9
> + return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
> +
> +}
> +
> +/* Check that event timer timeout value is in range */
> +static __rte_always_inline int
> +check_timeout(struct rte_event_timer *evtim,
> + const struct rte_event_timer_adapter *adapter)
> +{
> + uint64_t tmo_nsec = evtim->timeout_ticks *
> + adapter->data->conf.timer_tick_ns;
> +
> + return (tmo_nsec > adapter->data->conf.max_tmo_ns) ? -1
> + : (tmo_nsec < adapter->data->conf.timer_tick_ns) ? -2
> + : 0;
Consider simplifying this for readability.
> +}
> +
> +/* Check that event timer event queue sched type matches destination event queue
> + * sched type
> + */
> +static __rte_always_inline int
> +check_destination_event_queue(struct rte_event_timer *evtim,
> + const struct rte_event_timer_adapter *adapter)
<snip>
> +
> +#define NB_OBJS 32
> static int
> sw_event_timer_adapter_service_func(void *arg)
> {
> - RTE_SET_USED(arg);
> + int i, num_msgs, ret;
> + uint64_t cycles;
> + uint16_t nb_events;
> + struct rte_event_timer_adapter *adapter;
> + struct rte_event_timer_adapter_sw_data *sw_data;
> + struct rte_event_timer *evtim = NULL;
> + struct rte_timer *tim = NULL;
> + struct msg *msg, *msgs[NB_OBJS];
> +
> + adapter = arg;
> + sw_data = adapter->data->adapter_priv;
> +
> + while (!rte_ring_empty(sw_data->msg_ring)) {
> + num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
> + (void **)msgs, NB_OBJS, NULL);
> +
> + for (i = 0; i < num_msgs; i++) {
> + msg = msgs[i];
> + evtim = msg->evtim;
> +
> + tim = (struct rte_timer *)evtim->impl_opaque[0];
> + RTE_ASSERT(tim != NULL);
> +
> + switch (msg->type) {
> + case MSG_TYPE_ARM:
> + if (validate_event_timer(evtim, adapter) < 0) {
> + rte_mempool_put(sw_data->tim_pool,
> + (void **)&tim);
> + continue;
> + }
> +
> + /* Checks passed; set an rte_timer */
> + cycles = get_timeout_cycles(msg->evtim,
> + adapter);
> + rte_timer_reset_sync(tim, cycles, SINGLE,
> + rte_lcore_id(),
> + sw_event_timer_cb,
> + msg->evtim);
> +
> + sw_data->nb_armed_evtims++;
> + rte_wmb();
Same as above comment.
> + evtim->state = RTE_EVENT_TIMER_ARMED;
> + break;
> + case MSG_TYPE_CANCEL:
> + /* The event timer was either not armed or it
> + * fired after this cancel request was queued
> + * and before the request was processed.
> + */
> + if (evtim->state != RTE_EVENT_TIMER_ARMED)
> + continue;
> +
> + rte_timer_stop_sync(tim);
> + rte_mempool_put(sw_data->tim_pool,
> + (void **)&tim);
> + sw_data->nb_armed_evtims--;
> + rte_wmb();
Same as above comment.
> + msg->evtim->state = RTE_EVENT_TIMER_CANCELED;
> + break;
> + }
> + }
> +
> + rte_mempool_put_bulk(sw_data->msg_pool, (void **)msgs,
> + num_msgs);
> + }
> +
> + rte_timer_manage();
Consider calling rte_timer_manage() before ARM new set of timers also, poll it
based on the timeout interval configured.
> +
> + /* Could use for stats */
> + RTE_SET_USED(nb_events);
> + RTE_SET_USED(ret);
> +
> return 0;
> }
>
<snip>
@@ -40,8 +40,10 @@
#include <rte_malloc.h>
#include <rte_ring.h>
#include <rte_mempool.h>
+#include <rte_common.h>
#include <rte_timer.h>
#include <rte_service_component.h>
+#include <rte_cycles.h>
#include "rte_eventdev.h"
#include "rte_eventdev_pmd.h"
@@ -460,10 +462,198 @@ struct msg {
struct rte_event_timer *evtim;
};
+static void
+sw_event_timer_cb(struct rte_timer *tim, void *arg)
+{
+ uint16_t n;
+ struct rte_event_timer *evtim;
+ struct rte_event_timer_adapter *adapter;
+ struct rte_event_timer_adapter_sw_data *sw_data;
+
+ evtim = arg;
+ adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
+ sw_data = adapter->data->adapter_priv;
+
+ n = rte_event_enqueue_burst(adapter->data->event_dev_id,
+ adapter->data->event_port_id,
+ &evtim->ev,
+ 1);
+ if (n != 1 && rte_errno == -ENOSPC) {
+ /* If we couldn't enqueue because the event port was
+ * backpressured, put the timer back in the skiplist with an
+ * immediate expiry value so we can process it again on the
+ * next iteration.
+ */
+ rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
+ sw_event_timer_cb, evtim);
+ } else {
+ sw_data->nb_armed_evtims--;
+ rte_wmb();
+ evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+ rte_mempool_put(sw_data->tim_pool, (void **)&tim);
+ }
+}
+
+static __rte_always_inline uint64_t
+get_timeout_cycles(struct rte_event_timer *evtim,
+ struct rte_event_timer_adapter *adapter)
+{
+ uint64_t timeout_ns;
+
+ timeout_ns = evtim->timeout_ticks * adapter->data->conf.timer_tick_ns;
+#define NSECPERSEC 1E9
+ return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
+
+}
+
+/* Check that event timer timeout value is in range */
+static __rte_always_inline int
+check_timeout(struct rte_event_timer *evtim,
+ const struct rte_event_timer_adapter *adapter)
+{
+ uint64_t tmo_nsec = evtim->timeout_ticks *
+ adapter->data->conf.timer_tick_ns;
+
+ return (tmo_nsec > adapter->data->conf.max_tmo_ns) ? -1
+ : (tmo_nsec < adapter->data->conf.timer_tick_ns) ? -2
+ : 0;
+}
+
+/* Check that event timer event queue sched type matches destination event queue
+ * sched type
+ */
+static __rte_always_inline int
+check_destination_event_queue(struct rte_event_timer *evtim,
+ const struct rte_event_timer_adapter *adapter)
+{
+ int ret;
+ uint32_t sched_type;
+
+ ret = rte_event_queue_attr_get(adapter->data->event_dev_id,
+ evtim->ev.queue_id,
+ RTE_EVENT_QUEUE_ATTR_SCHEDULE_TYPE,
+ &sched_type);
+
+ if (ret < 0 || evtim->ev.sched_type != sched_type)
+ return -1;
+
+ return 0;
+}
+
+/* We can't correctly block on the state of a timer that is currently armed,
+ * so disallow it.
+ */
+static __rte_always_inline int
+check_state_for_arm(struct rte_event_timer *evtim)
+{
+ return evtim->state != RTE_EVENT_TIMER_ARMED ? 0 : -1;
+}
+
+static inline int
+validate_event_timer(struct rte_event_timer *evtim,
+ struct rte_event_timer_adapter *adapter)
+{
+ int ret;
+
+ if (check_state_for_arm(evtim) < 0) {
+ evtim->state = RTE_EVENT_TIMER_ERROR;
+ return -1;
+ }
+
+ ret = check_timeout(evtim, adapter);
+ switch (ret) {
+ case -1:
+ evtim->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+ return -1;
+ case -2:
+ evtim->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+ return -1;
+ }
+
+ if (check_destination_event_queue(evtim, adapter) < 0) {
+ evtim->state = RTE_EVENT_TIMER_ERROR;
+ return -1;
+ }
+
+ return 0;
+}
+
+
+#define NB_OBJS 32
static int
sw_event_timer_adapter_service_func(void *arg)
{
- RTE_SET_USED(arg);
+ int i, num_msgs, ret;
+ uint64_t cycles;
+ uint16_t nb_events;
+ struct rte_event_timer_adapter *adapter;
+ struct rte_event_timer_adapter_sw_data *sw_data;
+ struct rte_event_timer *evtim = NULL;
+ struct rte_timer *tim = NULL;
+ struct msg *msg, *msgs[NB_OBJS];
+
+ adapter = arg;
+ sw_data = adapter->data->adapter_priv;
+
+ while (!rte_ring_empty(sw_data->msg_ring)) {
+ num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
+ (void **)msgs, NB_OBJS, NULL);
+
+ for (i = 0; i < num_msgs; i++) {
+ msg = msgs[i];
+ evtim = msg->evtim;
+
+ tim = (struct rte_timer *)evtim->impl_opaque[0];
+ RTE_ASSERT(tim != NULL);
+
+ switch (msg->type) {
+ case MSG_TYPE_ARM:
+ if (validate_event_timer(evtim, adapter) < 0) {
+ rte_mempool_put(sw_data->tim_pool,
+ (void **)&tim);
+ continue;
+ }
+
+ /* Checks passed; set an rte_timer */
+ cycles = get_timeout_cycles(msg->evtim,
+ adapter);
+ rte_timer_reset_sync(tim, cycles, SINGLE,
+ rte_lcore_id(),
+ sw_event_timer_cb,
+ msg->evtim);
+
+ sw_data->nb_armed_evtims++;
+ rte_wmb();
+ evtim->state = RTE_EVENT_TIMER_ARMED;
+ break;
+ case MSG_TYPE_CANCEL:
+ /* The event timer was either not armed or it
+ * fired after this cancel request was queued
+ * and before the request was processed.
+ */
+ if (evtim->state != RTE_EVENT_TIMER_ARMED)
+ continue;
+
+ rte_timer_stop_sync(tim);
+ rte_mempool_put(sw_data->tim_pool,
+ (void **)&tim);
+ sw_data->nb_armed_evtims--;
+ rte_wmb();
+ msg->evtim->state = RTE_EVENT_TIMER_CANCELED;
+ break;
+ }
+ }
+
+ rte_mempool_put_bulk(sw_data->msg_pool, (void **)msgs,
+ num_msgs);
+ }
+
+ rte_timer_manage();
+
+ /* Could use for stats */
+ RTE_SET_USED(nb_events);
+ RTE_SET_USED(ret);
+
return 0;
}
@@ -474,6 +664,7 @@ sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
struct rte_event_timer_adapter_sw_data *sw_data;
uint64_t nb_timers;
struct rte_service_spec service;
+ static bool timer_subsystem_inited; // static initialized to false
/* Allocate storage for SW implementation data */
char priv_data_name[RTE_RING_NAMESIZE];
@@ -541,6 +732,11 @@ sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
adapter->data->service_id = sw_data->service_id;
adapter->data->service_inited = 1;
+ if (!timer_subsystem_inited) {
+ rte_timer_subsystem_init();
+ timer_subsystem_inited = true;
+ }
+
return 0;
}
@@ -461,7 +461,7 @@ struct rte_event_timer {
* - op: RTE_EVENT_OP_NEW
* - event_type: RTE_EVENT_TYPE_TIMER
*/
- enum rte_event_timer_state state;
+ volatile enum rte_event_timer_state state;
/**< State of the event timer. */
uint64_t timeout_ticks;
/**< Expiry timer ticks expressed in number of *timer_ticks_ns* from