@@ -13,7 +13,8 @@ sources = files('rte_eventdev.c',
'rte_event_eth_rx_adapter.c',
'rte_event_timer_adapter.c',
'rte_event_crypto_adapter.c',
- 'rte_event_eth_tx_adapter.c')
+ 'rte_event_eth_tx_adapter.c',
+ 'rte_event_dispatcher.c')
headers = files('rte_eventdev.h',
'rte_eventdev_trace.h',
'rte_eventdev_trace_fp.h',
@@ -22,6 +23,7 @@ headers = files('rte_eventdev.h',
'rte_event_timer_adapter.h',
'rte_event_timer_adapter_pmd.h',
'rte_event_crypto_adapter.h',
- 'rte_event_eth_tx_adapter.h')
+ 'rte_event_eth_tx_adapter.h',
+ 'rte_event_dispatcher.h')
deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
deps += ['telemetry']
new file mode 100644
@@ -0,0 +1,436 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Ericsson AB
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include <rte_lcore.h>
+#include <rte_service_component.h>
+
+#include "eventdev_pmd.h"
+
+#include <rte_event_dispatcher.h>
+
+#define RED_MAX_PORTS_PER_LCORE (4)
+#define RED_MAX_CBS (16)
+
+struct rte_event_dispatcher_lcore_port {
+ uint8_t port_id;
+ uint16_t batch_size;
+ uint64_t timeout;
+};
+
+struct rte_event_dispatcher_lcore {
+ uint8_t num_ports;
+ struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE];
+};
+
+struct rte_event_dispatcher_cb {
+ rte_event_dispatcher_match_cb_t match_fun;
+ void *match_data;
+ rte_event_dispatcher_deliver_cb_t deliver_fun;
+ void *deliver_data;
+};
+
+struct rte_event_dispatcher {
+ uint8_t id;
+ uint8_t event_dev_id;
+ int socket_id;
+ uint32_t service_id;
+ struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE];
+ int16_t num_cbs;
+ struct rte_event_dispatcher_cb cbs[RED_MAX_CBS];
+};
+
+static struct rte_event_dispatcher *dispatchers[UINT8_MAX];
+
+static bool
+red_has_dispatcher(uint8_t id)
+{
+ return dispatchers[id] != NULL;
+}
+
+static struct rte_event_dispatcher *
+red_get_dispatcher(uint8_t id)
+{
+ return dispatchers[id];
+}
+
+static void
+red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher)
+{
+ dispatchers[id] = dispatcher;
+}
+
+#define RED_VALID_ID_OR_RET_EINVAL(id) \
+ do { \
+ if (unlikely(!red_has_dispatcher(id))) { \
+ RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \
+ return -EINVAL; \
+ } \
+ } while (0)
+
+static int
+red_lookup_cb_idx(struct rte_event_dispatcher *dispatcher,
+ const struct rte_event *e)
+{
+ int16_t i;
+
+ for (i = 0; i < dispatcher->num_cbs; i++) {
+ struct rte_event_dispatcher_cb *cb =
+ &dispatcher->cbs[i];
+
+ if (cb->match_fun(e, cb->match_data))
+ return i;
+ }
+
+ return -1;
+}
+
+static void
+red_dispatch_events(struct rte_event_dispatcher *dispatcher,
+ struct rte_event *events, uint16_t num_events)
+{
+ int i;
+ struct rte_event bursts[RED_MAX_CBS][num_events];
+ uint16_t burst_lens[RED_MAX_CBS] = { 0 };
+
+ for (i = 0; i < num_events; i++) {
+ struct rte_event *e = &events[i];
+ int cb_idx;
+
+ cb_idx = red_lookup_cb_idx(dispatcher, e);
+
+ if (unlikely(cb_idx < 0)) {
+ RTE_EDEV_LOG_ERR("No matching callback found for "
+ "event\n");
+ continue;
+ }
+
+ bursts[cb_idx][burst_lens[cb_idx]] = *e;
+ burst_lens[cb_idx]++;
+ }
+
+ for (i = 0; i < dispatcher->num_cbs; i++) {
+ struct rte_event_dispatcher_cb *cb = &dispatcher->cbs[i];
+ uint16_t len = burst_lens[i];
+
+ if (len == 0)
+ continue;
+
+ cb->deliver_fun(bursts[i], len, cb->deliver_data);
+ }
+}
+
+static void
+red_port_dequeue(struct rte_event_dispatcher *dispatcher,
+ struct rte_event_dispatcher_lcore_port *port)
+{
+ uint16_t batch_size = port->batch_size;
+ struct rte_event events[batch_size];
+ uint16_t n;
+
+ n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
+ events, batch_size, port->timeout);
+
+ red_dispatch_events(dispatcher, events, n);
+}
+
+static int32_t
+red_lcore_process(void *userdata)
+{
+ uint16_t i;
+ struct rte_event_dispatcher *dispatcher = userdata;
+ unsigned int lcore_id = rte_lcore_id();
+ struct rte_event_dispatcher_lcore *lcore =
+ &dispatcher->lcores[lcore_id];
+
+ for (i = 0; i < lcore->num_ports; i++) {
+ struct rte_event_dispatcher_lcore_port *port =
+ &lcore->ports[i];
+
+ red_port_dequeue(dispatcher, port);
+ }
+
+ return 0;
+}
+
+static int
+red_service_runstate_set(uint32_t service_id, int state)
+{
+ int rc;
+
+ rc = rte_service_component_runstate_set(service_id, state);
+
+ if (rc)
+ RTE_EDEV_LOG_ERR("Error %d occurred while setting service "
+ "component run state to %d\n", rc, state);
+
+ return rc;
+}
+
+static int
+red_service_register(struct rte_event_dispatcher *dispatcher)
+{
+ struct rte_service_spec service = {
+ .callback = red_lcore_process,
+ .callback_userdata = dispatcher,
+ .capabilities = RTE_SERVICE_CAP_MT_SAFE,
+ .socket_id = dispatcher->socket_id
+ };
+ int rc;
+
+ snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d",
+ dispatcher->id);
+
+ rc = rte_service_component_register(&service, &dispatcher->service_id);
+
+ if (rc)
+ RTE_EDEV_LOG_ERR("Registration of event dispatcher service "
+ "%s failed with error code %d\n",
+ service.name, rc);
+
+ rc = red_service_runstate_set(dispatcher->service_id, 1);
+
+ if (rc)
+ rte_service_component_unregister(dispatcher->service_id);
+
+ return rc;
+}
+
+static int
+red_service_unregister(struct rte_event_dispatcher *dispatcher)
+{
+ int rc;
+
+ rc = red_service_runstate_set(dispatcher->service_id, 0);
+
+ if (rc)
+ return rc;
+
+ rc = rte_service_component_unregister(dispatcher->service_id);
+
+ if (rc)
+ RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service "
+ "failed with error code %d\n", rc);
+
+ return rc;
+}
+
+int
+rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id)
+{
+ int socket_id;
+ struct rte_event_dispatcher *dispatcher;
+ int rc;
+
+ if (red_has_dispatcher(id)) {
+ RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
+ id);
+ return -EEXIST;
+ }
+
+ socket_id = rte_event_dev_socket_id(event_dev_id);
+
+ dispatcher =
+ rte_malloc_socket("event dispatcher",
+ sizeof(struct rte_event_dispatcher),
+ RTE_CACHE_LINE_SIZE, socket_id);
+
+ if (dispatcher == NULL) {
+ RTE_EDEV_LOG_ERR("Unable to allocate memory for event "
+ "dispatcher\n");
+ return -ENOMEM;
+ }
+
+ *dispatcher = (struct rte_event_dispatcher) {
+ .id = id,
+ .event_dev_id = event_dev_id,
+ .socket_id = socket_id
+ };
+
+ rc = red_service_register(dispatcher);
+
+ if (rc < 0) {
+ rte_free(dispatcher);
+ return rc;
+ }
+
+ red_set_dispatcher(id, dispatcher);
+
+ return 0;
+}
+
+int
+rte_event_dispatcher_free(uint8_t id)
+{
+ struct rte_event_dispatcher *dispatcher;
+ int rc;
+
+ RED_VALID_ID_OR_RET_EINVAL(id);
+ dispatcher = red_get_dispatcher(id);
+
+ rc = red_service_unregister(dispatcher);
+
+ if (rc)
+ return rc;
+
+ red_set_dispatcher(id, NULL);
+
+ rte_free(dispatcher);
+
+ return 0;
+}
+
+int
+rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id)
+{
+ struct rte_event_dispatcher *dispatcher;
+
+ RED_VALID_ID_OR_RET_EINVAL(id);
+ dispatcher = red_get_dispatcher(id);
+
+ *service_id = dispatcher->service_id;
+
+ return 0;
+}
+
+static int16_t
+lcore_port_index(struct rte_event_dispatcher_lcore *lcore,
+ uint8_t event_port_id)
+{
+ uint16_t i;
+
+ for (i = 0; i < lcore->num_ports; i++) {
+ struct rte_event_dispatcher_lcore_port *port =
+ &lcore->ports[i];
+ if (port->port_id == event_port_id)
+ return i;
+ }
+
+ return -1;
+}
+
+int
+rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
+ uint16_t batch_size, uint64_t timeout,
+ unsigned int lcore_id)
+{
+ struct rte_event_dispatcher *dispatcher;
+ struct rte_event_dispatcher_lcore *lcore;
+ struct rte_event_dispatcher_lcore_port *port;
+
+ RED_VALID_ID_OR_RET_EINVAL(id);
+ dispatcher = red_get_dispatcher(id);
+
+ lcore = &dispatcher->lcores[lcore_id];
+
+ if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE)
+ return -ENOMEM;
+
+ if (lcore_port_index(lcore, event_port_id) >= 0)
+ return -EEXIST;
+
+ port = &lcore->ports[lcore->num_ports];
+
+ *port = (struct rte_event_dispatcher_lcore_port) {
+ .port_id = event_port_id,
+ .batch_size = batch_size,
+ .timeout = timeout
+ };
+
+ lcore->num_ports++;
+
+ return 0;
+}
+
+int
+rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
+ unsigned int lcore_id)
+{
+ struct rte_event_dispatcher *dispatcher;
+ struct rte_event_dispatcher_lcore *lcore;
+ int16_t port_idx;
+ struct rte_event_dispatcher_lcore_port *port;
+ struct rte_event_dispatcher_lcore_port *last;
+
+ RED_VALID_ID_OR_RET_EINVAL(id);
+ dispatcher = red_get_dispatcher(id);
+
+ lcore = &dispatcher->lcores[lcore_id];
+
+ port_idx = lcore_port_index(lcore, event_port_id);
+
+ if (port_idx < 0)
+ return -ENOENT;
+
+ port = &lcore->ports[port_idx];
+ last = &lcore->ports[lcore->num_ports - 1];
+
+ if (port != last)
+ *port = *last;
+
+ lcore->num_ports--;
+
+ return 0;
+}
+
+int16_t
+rte_event_dispatcher_register(uint8_t id,
+ rte_event_dispatcher_match_cb_t match_fun,
+ void *match_data,
+ rte_event_dispatcher_deliver_cb_t deliver_fun,
+ void *deliver_data)
+{
+ struct rte_event_dispatcher *dispatcher;
+ struct rte_event_dispatcher_cb *cb;
+ int16_t cb_idx;
+
+ RED_VALID_ID_OR_RET_EINVAL(id);
+ dispatcher = red_get_dispatcher(id);
+
+ if (dispatcher->num_cbs == RED_MAX_CBS)
+ return -ENOMEM;
+
+ cb_idx = dispatcher->num_cbs;
+
+ cb = &dispatcher->cbs[cb_idx];
+
+ *cb = (struct rte_event_dispatcher_cb) {
+ .match_fun = match_fun,
+ .match_data = match_data,
+ .deliver_fun = deliver_fun,
+ .deliver_data = deliver_data
+ };
+
+ dispatcher->num_cbs++;
+
+ return cb_idx;
+}
+
+int
+rte_event_dispatcher_unregister(uint8_t id, int16_t unreg_idx)
+{
+ struct rte_event_dispatcher *dispatcher;
+ uint16_t last_idx;
+
+ RED_VALID_ID_OR_RET_EINVAL(id);
+ dispatcher = red_get_dispatcher(id);
+
+ if (unreg_idx < 0 || unreg_idx >= dispatcher->num_cbs)
+ return -EINVAL;
+
+ last_idx = dispatcher->num_cbs - 1;
+
+ if (unreg_idx != last_idx) {
+ struct rte_event_dispatcher_cb *unreg_cb =
+ &dispatcher->cbs[unreg_idx];
+ int16_t n = last_idx - unreg_idx;
+ memmove(unreg_cb, unreg_cb + 1, sizeof(*unreg_cb) * n);
+ }
+
+ dispatcher->num_cbs--;
+
+ return 0;
+}
new file mode 100644
@@ -0,0 +1,247 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Ericsson AB
+ */
+
+#ifndef __RTE_EVENT_DISPATCHER_H__
+#define __RTE_EVENT_DISPATCHER_H__
+
+/**
+ * @file
+ *
+ * RTE Event Dispatcher
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_eventdev.h>
+
+/**
+ * Function prototype for match callbacks.
+ *
+ * With the match callbacks, the application chooses which events it
+ * wants delivered, among the events dequeued by the dispatcher, from
+ * the event device.
+ *
+ * @param event
+ * Pointer to event
+ *
+ * @param cb_data
+ * The pointer supplied by the application in
+ * rte_event_dispatcher_register().
+ *
+ * @return
+ * Returns true in case this events should be delivered, and false
+ * otherwise.
+ */
+typedef bool (*rte_event_dispatcher_match_cb_t)(const struct rte_event *event,
+ void *cb_data);
+/**
+ * Function prototype for deliver callbacks.
+ *
+ * @param events
+ * Pointer to an array of events.
+ *
+ * @param num
+ * The number of events in the @p events array.
+ *
+ * @param cb_data
+ * The pointer supplied by the application in
+ * rte_event_dispatcher_register().
+ */
+
+typedef void (*rte_event_dispatcher_deliver_cb_t)(struct rte_event *events,
+ uint16_t num, void *cb_data);
+
+/**
+ * Create an event dispatcher with the specified id.
+ *
+ * @param id
+ * An application-specified, unique (across all event dispatcher
+ * instances) identifier.
+ *
+ * @param event_dev_id
+ * The identifier of the event device from which this event dispatcher
+ * will dequeue events.
+ *
+ * @return
+ * - 0: Success
+ * - <0: Error code on failure
+ */
+__rte_experimental
+int
+rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id);
+
+/**
+ * Frees an event dispatcher with the specified id.
+ *
+ * @param id
+ * The event dispatcher identifier.
+ *
+ * @return
+ * - 0: Success
+ * - <0: Error code on failure
+ */
+__rte_experimental
+int
+rte_event_dispatcher_free(uint8_t id);
+
+/**
+ * Retrieve the service identifier of the event dispatcher.
+ *
+ * @param id
+ * The event dispatcher identifier.
+ *
+ * @param [out] service_id
+ * A pointer to a caller-supplied buffer where the event dispatcher's
+ * service id will be stored.
+ *
+ * @return
+ * - 0: Success
+ * - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
+
+/**
+ * Binds an event device port to a specific lcore on the specified
+ * event dispatcher.
+ *
+ * This function configures an event dispatcher to dequeue events from
+ * an event device port (as specified by @p event_port_id), in case
+ * its service function is run on particular lcore (as specified by @p
+ * lcore_id).
+ *
+ * Multiple event device ports may be bound to the same lcore. A
+ * particular port may only be bound to one lcore.
+ *
+ * If the event dispatcher service is mapped (with
+ * rte_service_map_lcore_set()) to a lcore for which no ports are
+ * bound, the service function will be a no-operation.
+ *
+ * @param id
+ * The event dispatcher identifier.
+ *
+ * @param event_port_id
+ * The event device port identifier.
+ *
+ * @param batch_size
+ * The batch size to use in rte_event_dequeue_burst(), for the
+ * configured event device port and lcore.
+ *
+ * @param timeout
+ * The timeout parameter to use in rte_event_dequeue_burst(), for the
+ * configured event device port and lcore.
+ *
+ * @param lcore_id
+ * The lcore by which this event port will be used.
+ *
+ * @return
+ * - 0: Success
+ * - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
+ uint16_t batch_size, uint64_t timeout,
+ unsigned int lcore_id);
+
+/**
+ * Unbind an event device port from a specific lcore.
+ *
+ * @param id
+ * The event dispatcher identifier.
+ *
+ * @param event_port_id
+ * The event device port identifier.
+ *
+ * @param lcore_id
+ * The lcore which was using this event port.
+ *
+ * @return
+ * - 0: Success
+ * - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
+ unsigned int lcore_id);
+
+/**
+ * Registers callback functions.
+ *
+ * The match callback function is used to select if a particular event
+ * should be delivered, using the corresponding deliver callback function.
+ *
+ * The reason for dividing the delivery into two distinct steps is to
+ * allow the dispatcher to deliver all events as a batch. This in turn
+ * will cause processing of a particular kind of events to happen in a
+ * back-to-back manner, which in the typical improve cache locality.
+ *
+ * Ordering is not guaranteed between different deliver callbacks. For
+ * example, suppose there are two callbacks registered, matching
+ * different subsets of events an atomic queue. A batch of events
+ * [ev0, ev1, ev2] are dequeued on a particular port, all pertaining
+ * to the same flow. The match callback for registration A returns
+ * true for ev0 and ev2, and the matching function for registration B
+ * for ev1. In that scenario, the event dispatcher may choose to
+ * deliver first [ev0, ev2] using A's deliver function, and then [ev1]
+ * to B - or vice versa.
+ *
+ * @param id
+ * The event dispatcher identifier.
+ *
+ * @param match_fun
+ * The match callback function.
+ *
+ * @param match_cb_data
+ * A pointer to some application-specific opaque data (or NULL),
+ * which is supplied back to the application when match_fun is
+ * called.
+ *
+ * @param deliver_fun
+ * The deliver callback function.
+ *
+ * @param deliver_cb_data
+ * A pointer to some application-specific opaque data (or NULL),
+ * which is supplied back to the application when deliver_fun is
+ * called.
+ *
+ * @return
+ * - >= 0: The identifier for this registration.
+ * - <0: Error code on failure.
+ */
+__rte_experimental
+int16_t
+rte_event_dispatcher_register(uint8_t id,
+ rte_event_dispatcher_match_cb_t match_fun,
+ void *match_cb_data,
+ rte_event_dispatcher_deliver_cb_t deliver_fun,
+ void *deliver_cb_data);
+
+/**
+ * Unregister callback functions.
+ *
+ * @param id
+ * The event dispatcher identifier.
+ *
+ * @param reg_id
+ * The callback registration id returned by the original
+ * rte_event_dispatcher_register() call.
+ *
+ * @return
+ * - 0: Success
+ * - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_unregister(uint8_t id, int16_t reg_id);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __RTE_EVENT_DISPATCHER__ */
@@ -133,11 +133,18 @@ EXPERIMENTAL {
__rte_eventdev_trace_crypto_adapter_queue_pair_del;
__rte_eventdev_trace_crypto_adapter_start;
__rte_eventdev_trace_crypto_adapter_stop;
-
# changed in 20.11
__rte_eventdev_trace_port_setup;
# added in 20.11
rte_event_pmd_pci_probe_named;
+
+ rte_event_dispatcher_create;
+ rte_event_dispatcher_free;
+ rte_event_dispatcher_service_id_get;
+ rte_event_dispatcher_bind_port_to_lcore;
+ rte_event_dispatcher_unbind_port_from_lcore;
+ rte_event_dispatcher_register;
+ rte_event_dispatcher_unregister;
};
INTERNAL {