[dpdk-dev,01/13] examples/eventdev: add Rx adapter support
Checks
Commit Message
Use event Rx adapter for packets Rx instead of explicit producer logic.
Use service run iter function for granular control instead of using
dedicated service lcore.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
examples/eventdev_pipeline_sw_pmd/main.c | 149 +++++++++++++++++--------------
1 file changed, 82 insertions(+), 67 deletions(-)
Comments
Hi Pavan,
</snip>
> static inline void
> schedule_devices(unsigned int lcore_id) {
> if (fdata->rx_core[lcore_id] && (fdata->rx_single ||
> rte_atomic32_cmpset(&(fdata->rx_lock), 0, 1))) {
> - producer();
> + rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
> 1);
> rte_atomic32_clear((rte_atomic32_t *)&(fdata->rx_lock));
> }
The (rx_single || cmpset(rx_lock)) check should no longer be needed -- this logic is provided in rte_service_run_iter_on_app_lcore() and service_run(). The rx_lock can be dropped in general.
</snip>
> + if (port_needed)
> + prod_data.port_id = cons_data.port_id + 1;
> + prod_data.dev_id = evdev_id;
> + prod_data.qid = cdata.qid[0];
> +
Is prod_data still needed? Looks like we're only using it in main() to print the port ID (which may not be valid, depending on if port_needed is true).
Thanks,
Gage
On Mon, Dec 11, 2017 at 04:15:41PM +0000, Eads, Gage wrote:
> Hi Pavan,
>
> </snip>
>
> > static inline void
> > schedule_devices(unsigned int lcore_id) {
> > if (fdata->rx_core[lcore_id] && (fdata->rx_single ||
> > rte_atomic32_cmpset(&(fdata->rx_lock), 0, 1))) {
> > - producer();
> > + rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
> > 1);
> > rte_atomic32_clear((rte_atomic32_t *)&(fdata->rx_lock));
> > }
>
> The (rx_single || cmpset(rx_lock)) check should no longer be needed -- this logic is provided in rte_service_run_iter_on_app_lcore() and service_run(). The rx_lock can be dropped in general.
>
we could either remove the example level locks (or) keep the locks at
application level and disable them in service api through
rte_service_run_iter_on_app_lcore(<id>, 0).
If we choose to remove example level locks we could do something like
rte_service_run_iter_on_app_lcore(id, !rx_single)
> </snip>
>
> > + if (port_needed)
> > + prod_data.port_id = cons_data.port_id + 1;
> > + prod_data.dev_id = evdev_id;
> > + prod_data.qid = cdata.qid[0];
> > +
>
> Is prod_data still needed? Looks like we're only using it in main() to print the port ID (which may not be valid, depending on if port_needed is true).
Prod data is not needed I left it there to be consistent with the old example,
I will clean it up in the next version.
>
> Thanks,
> Gage
Thanks,
Pavan
> -----Original Message-----
> From: Pavan Nikhilesh Bhagavatula
> [mailto:pbhagavatula@caviumnetworks.com]
> Sent: Tuesday, December 12, 2017 2:18 AM
> To: Eads, Gage <gage.eads@intel.com>;
> jerin.jacobkollanukkaran@cavium.com; Van Haaren, Harry
> <harry.van.haaren@intel.com>; Rao, Nikhil <nikhil.rao@intel.com>;
> hemant.agrawal@nxp.com; Ma, Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [PATCH 01/13] examples/eventdev: add Rx adapter support
>
> On Mon, Dec 11, 2017 at 04:15:41PM +0000, Eads, Gage wrote:
> > Hi Pavan,
> >
> > </snip>
> >
> > > static inline void
> > > schedule_devices(unsigned int lcore_id) {
> > > if (fdata->rx_core[lcore_id] && (fdata->rx_single ||
> > > rte_atomic32_cmpset(&(fdata->rx_lock), 0, 1))) {
> > > - producer();
> > > + rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
> > > 1);
> > > rte_atomic32_clear((rte_atomic32_t *)&(fdata->rx_lock));
> > > }
> >
> > The (rx_single || cmpset(rx_lock)) check should no longer be needed -- this
> logic is provided in rte_service_run_iter_on_app_lcore() and service_run(). The
> rx_lock can be dropped in general.
> >
>
> we could either remove the example level locks (or) keep the locks at application
> level and disable them in service api through
> rte_service_run_iter_on_app_lcore(<id>, 0).
>
> If we choose to remove example level locks we could do something like
> rte_service_run_iter_on_app_lcore(id, !rx_single)
>
That sounds good. No need to duplicate code that the EAL provides, and it simplifies the example.
Thanks,
Gage
@@ -46,6 +46,7 @@
#include <rte_cycles.h>
#include <rte_ethdev.h>
#include <rte_eventdev.h>
+#include <rte_event_eth_rx_adapter.h>
#include <rte_service.h>
#define MAX_NUM_STAGES 8
@@ -78,6 +79,7 @@ struct fastpath_data {
uint32_t tx_lock;
uint32_t sched_lock;
uint32_t evdev_service_id;
+ uint32_t rxadptr_service_id;
bool rx_single;
bool tx_single;
bool sched_single;
@@ -105,6 +107,7 @@ struct config_data {
unsigned int worker_cq_depth;
int16_t next_qid[MAX_NUM_STAGES+2];
int16_t qid[MAX_NUM_STAGES];
+ uint8_t rx_adapter_id;
};
static struct config_data cdata = {
@@ -193,53 +196,12 @@ consumer(void)
return 0;
}
-static int
-producer(void)
-{
- static uint8_t eth_port;
- struct rte_mbuf *mbufs[BATCH_SIZE+2];
- struct rte_event ev[BATCH_SIZE+2];
- uint32_t i, num_ports = prod_data.num_nic_ports;
- int32_t qid = prod_data.qid;
- uint8_t dev_id = prod_data.dev_id;
- uint8_t port_id = prod_data.port_id;
- uint32_t prio_idx = 0;
-
- const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
- if (++eth_port == num_ports)
- eth_port = 0;
- if (nb_rx == 0) {
- rte_pause();
- return 0;
- }
-
- for (i = 0; i < nb_rx; i++) {
- ev[i].flow_id = mbufs[i]->hash.rss;
- ev[i].op = RTE_EVENT_OP_NEW;
- ev[i].sched_type = cdata.queue_type;
- ev[i].queue_id = qid;
- ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
- ev[i].sub_event_type = 0;
- ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
- ev[i].mbuf = mbufs[i];
- RTE_SET_USED(prio_idx);
- }
-
- const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);
- if (nb_tx != nb_rx) {
- for (i = nb_tx; i < nb_rx; i++)
- rte_pktmbuf_free(mbufs[i]);
- }
-
- return 0;
-}
-
static inline void
schedule_devices(unsigned int lcore_id)
{
if (fdata->rx_core[lcore_id] && (fdata->rx_single ||
rte_atomic32_cmpset(&(fdata->rx_lock), 0, 1))) {
- producer();
+ rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id, 1);
rte_atomic32_clear((rte_atomic32_t *)&(fdata->rx_lock));
}
@@ -553,6 +515,79 @@ parse_app_args(int argc, char **argv)
}
}
+static inline void
+init_rx_adapter(uint16_t nb_ports)
+{
+ int i;
+ int ret;
+ uint8_t evdev_id = 0;
+ uint8_t port_needed = 0;
+ struct rte_event_dev_info dev_info;
+
+ ret = rte_event_dev_info_get(evdev_id, &dev_info);
+
+ struct rte_event_port_conf rx_p_conf = {
+ .dequeue_depth = 8,
+ .enqueue_depth = 8,
+ .new_event_threshold = 1200,
+ };
+
+ if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
+ if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+
+ ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
+ &rx_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
+ cdata.rx_adapter_id);
+
+ struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
+ .ev.sched_type = cdata.queue_type,
+ .ev.queue_id = cdata.qid[0],
+ };
+
+ for (i = 0; i < nb_ports; i++) {
+ uint32_t cap;
+
+ ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "failed to get event rx adapter "
+ "capabilities");
+ /* Producer needs port. */
+ port_needed |= !(cap &
+ RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+
+ ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
+ -1, &queue_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Rx adapter");
+ }
+
+ if (port_needed)
+ prod_data.port_id = cons_data.port_id + 1;
+ prod_data.dev_id = evdev_id;
+ prod_data.qid = cdata.qid[0];
+
+ ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
+ &fdata->rxadptr_service_id);
+ if (ret != -ESRCH && ret != 0) {
+ rte_exit(EXIT_FAILURE,
+ "Error getting the service ID for Rx adapter\n");
+ }
+ rte_service_runstate_set(fdata->rxadptr_service_id, 1);
+ rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
+
+ ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
+ cdata.rx_adapter_id);
+
+}
+
/*
* Initializes a given port using global settings and with the RX buffers
* coming from the mbuf_pool passed as a parameter.
@@ -663,15 +698,14 @@ struct port_link {
};
static int
-setup_eventdev(struct prod_data *prod_data,
- struct cons_data *cons_data,
+setup_eventdev(struct cons_data *cons_data,
struct worker_data *worker_data)
{
const uint8_t dev_id = 0;
/* +1 stages is for a SINGLE_LINK TX stage */
const uint8_t nb_queues = cdata.num_stages + 1;
- /* + 2 is one port for producer and one for consumer */
- const uint8_t nb_ports = cdata.num_workers + 2;
+ /* + 1 for consumer */
+ const uint8_t nb_ports = cdata.num_workers + 1;
struct rte_event_dev_config config = {
.nb_event_queues = nb_queues,
.nb_event_ports = nb_ports,
@@ -818,26 +852,6 @@ setup_eventdev(struct prod_data *prod_data,
__LINE__, i);
return -1;
}
- /* port for producer, no links */
- struct rte_event_port_conf rx_p_conf = {
- .dequeue_depth = 8,
- .enqueue_depth = 8,
- .new_event_threshold = 1200,
- };
-
- if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
- rx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
- if (rx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
- rx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
- if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
- printf("Error setting up port %d\n", i);
- return -1;
- }
-
- *prod_data = (struct prod_data){.dev_id = dev_id,
- .port_id = i + 1,
- .qid = cdata.qid[0] };
*cons_data = (struct cons_data){.dev_id = dev_id,
.port_id = i };
@@ -940,12 +954,13 @@ main(int argc, char **argv)
if (worker_data == NULL)
rte_panic("rte_calloc failed\n");
- int dev_id = setup_eventdev(&prod_data, &cons_data, worker_data);
+ int dev_id = setup_eventdev(&cons_data, worker_data);
if (dev_id < 0)
rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
prod_data.num_nic_ports = num_ports;
init_ports(num_ports);
+ init_rx_adapter(num_ports);
int worker_idx = 0;
RTE_LCORE_FOREACH_SLAVE(lcore_id) {