[v6,08/10] examples/l2fwd-event: add eventdev main loop
Checks
Commit Message
From: Pavan Nikhilesh <pbhagavatula@marvell.com>
Add event dev main loop based on enabled l2fwd options and eventdev
capabilities.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
examples/l2fwd-event/l2fwd_common.c | 6 +
examples/l2fwd-event/l2fwd_common.h | 1 +
examples/l2fwd-event/l2fwd_event.c | 276 ++++++++++++++++++++++++++++
examples/l2fwd-event/l2fwd_event.h | 2 +
examples/l2fwd-event/main.c | 6 +-
5 files changed, 290 insertions(+), 1 deletion(-)
Comments
HI Pavan,
snipped
> Add event dev main loop based on enabled l2fwd options and eventdev
> capabilities.
>
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> ---
> + if (rsrc->event_mode) {
> + port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
> + port_conf.rx_adv_conf.rss_conf.rss_key = NULL;
> + port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_IP;
> + }
Question, is RSS hash configured for generating flow id for Eventdev? As my understanding. RSS for single RX port-queue pair does not require the same.
snipped
> + if (is_master && timer_period > 0) {
> + cur_tsc = rte_rdtsc();
> + diff_tsc = cur_tsc - prev_tsc;
> +
> + /* advance the timer */
> + timer_tsc += diff_tsc;
> +
> + /* if timer has reached its timeout */
> + if (unlikely(timer_tsc >= timer_period)) {
> + print_stats(rsrc);
> + /* reset the timer */
> + timer_tsc = 0;
> + }
> + prev_tsc = cur_tsc;
> + }
Is it possible to move the print_stats to service core, as 'CALL_MASTER' is enabled in remote_launch making this a potential worker?
> +
> + /* Read packet from eventdev */
> + if (!rte_event_dequeue_burst(event_d_id, port_id, &ev, 1, 0))
> + continue;
Is not this unlikely `nb_burst == 0`
> +
> + l2fwd_event_fwd(rsrc, &ev, tx_q_id, timer_period, flags);
> +
> + if (flags & L2FWD_EVENT_TX_ENQ) {
> + while (rte_event_enqueue_burst(event_d_id, port_id,
> + &ev, 1) &&
> + !rsrc->force_quit)
> + ;
Can we place a `continue` as we are not expecting ` L2FWD_EVENT_TX_DIRECT`?
> + }
> +
> + if (flags & L2FWD_EVENT_TX_DIRECT) {
> + while
> (!rte_event_eth_tx_adapter_enqueue(event_d_id,
> + port_id,
> + &ev, 1, 0) &&
> + !rsrc->force_quit)
> + ;
> + }
> + }
snipped
> +
> + while (!rsrc->force_quit) {
> + /* if timer is enabled */
> + if (is_master && timer_period > 0) {
> + cur_tsc = rte_rdtsc();
> + diff_tsc = cur_tsc - prev_tsc;
> +
> + /* advance the timer */
> + timer_tsc += diff_tsc;
> +
> + /* if timer has reached its timeout */
> + if (unlikely(timer_tsc >= timer_period)) {
> + print_stats(rsrc);
> + /* reset the timer */
> + timer_tsc = 0;
> + }
> + prev_tsc = cur_tsc;
> + }
Can we move `print_stats` logic to service core?
> +
> + /* Read packet from eventdev */
> + nb_rx = rte_event_dequeue_burst(event_d_id, port_id, ev,
> + deq_len, 0);
> + if (nb_rx == 0)
Can we use `unlikely`?
> + continue;
> +
> + for (i = 0; i < nb_rx; i++) {
> + l2fwd_event_fwd(rsrc, &ev[i], tx_q_id, timer_period,
> + flags);
> + }
> +
> + if (flags & L2FWD_EVENT_TX_ENQ) {
> + nb_tx = rte_event_enqueue_burst(event_d_id, port_id,
> + ev, nb_rx);
> + while (nb_tx < nb_rx && !rsrc->force_quit)
> + nb_tx +=
> rte_event_enqueue_burst(event_d_id,
> + port_id, ev + nb_tx,
> + nb_rx - nb_tx);
Can we use `continue` as we do not transmit from the same worker int his case?
> + }
> +
snipped
Hi Vipin,
>HI Pavan,
>
>snipped
>> Add event dev main loop based on enabled l2fwd options and
>eventdev
>> capabilities.
>>
>> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> ---
>> + if (rsrc->event_mode) {
>> + port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
>> + port_conf.rx_adv_conf.rss_conf.rss_key = NULL;
>> + port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_IP;
>> + }
>Question, is RSS hash configured for generating flow id for Eventdev?
>As my understanding. RSS for single RX port-queue pair does not
>require the same.
>snipped
In case of SW event device i.e. vdev=event_sw0 the software Rx adapter requires mbuf::rss:hash
to distribute packets else it has to calculate rss.
@see lib/librte_eventdev/rte_event_eth_rx_adapter.c +817
'
rss = do_rss ?
rxa_do_softrss(m, rx_adapter->rss_key_be) :
m->hash.rss;
'
>> + if (is_master && timer_period > 0) {
>> + cur_tsc = rte_rdtsc();
>> + diff_tsc = cur_tsc - prev_tsc;
>> +
>> + /* advance the timer */
>> + timer_tsc += diff_tsc;
>> +
>> + /* if timer has reached its timeout */
>> + if (unlikely(timer_tsc >= timer_period)) {
>> + print_stats(rsrc);
>> + /* reset the timer */
>> + timer_tsc = 0;
>> + }
>> + prev_tsc = cur_tsc;
>> + }
>Is it possible to move the print_stats to service core, as 'CALL_MASTER'
>is enabled in remote_launch making this a potential worker?
>
Since not every eventdevice requires Service core user might not pass service core mask.
Instead we could do SKIP_MASTER and prints stats here.
>> +
>> + /* Read packet from eventdev */
>> + if (!rte_event_dequeue_burst(event_d_id, port_id,
>&ev, 1, 0))
>> + continue;
>Is not this unlikely `nb_burst == 0`
>
Not necessarily refer
https://mails.dpdk.org/archives/dev/2018-July/108610.html
>> +
>> + l2fwd_event_fwd(rsrc, &ev, tx_q_id, timer_period,
>flags);
>> +
>> + if (flags & L2FWD_EVENT_TX_ENQ) {
>> + while (rte_event_enqueue_burst(event_d_id,
>port_id,
>> + &ev, 1) &&
>> + !rsrc->force_quit)
>> + ;
>Can we place a `continue` as we are not expecting `
>L2FWD_EVENT_TX_DIRECT`?
>> + }
>
>> +
>> + if (flags & L2FWD_EVENT_TX_DIRECT) {
>> + while
>> (!rte_event_eth_tx_adapter_enqueue(event_d_id,
>> + port_id,
>> + &ev, 1,
>0) &&
>> + !rsrc->force_quit)
>> + ;
>> + }
>> + }
>snipped
>> +
>> + while (!rsrc->force_quit) {
>> + /* if timer is enabled */
>> + if (is_master && timer_period > 0) {
>> + cur_tsc = rte_rdtsc();
>> + diff_tsc = cur_tsc - prev_tsc;
>> +
>> + /* advance the timer */
>> + timer_tsc += diff_tsc;
>> +
>> + /* if timer has reached its timeout */
>> + if (unlikely(timer_tsc >= timer_period)) {
>> + print_stats(rsrc);
>> + /* reset the timer */
>> + timer_tsc = 0;
>> + }
>> + prev_tsc = cur_tsc;
>> + }
>Can we move `print_stats` logic to service core?
>
>> +
>> + /* Read packet from eventdev */
>> + nb_rx = rte_event_dequeue_burst(event_d_id,
>port_id, ev,
>> + deq_len, 0);
>> + if (nb_rx == 0)
>Can we use `unlikely`?
Not necessarily refer
https://mails.dpdk.org/archives/dev/2018-July/108610.html
>> + continue;
>> +
>> + for (i = 0; i < nb_rx; i++) {
>> + l2fwd_event_fwd(rsrc, &ev[i], tx_q_id,
>timer_period,
>> + flags);
>> + }
>> +
>> + if (flags & L2FWD_EVENT_TX_ENQ) {
>> + nb_tx =
>rte_event_enqueue_burst(event_d_id, port_id,
>> + ev, nb_rx);
>> + while (nb_tx < nb_rx && !rsrc->force_quit)
>> + nb_tx +=
>> rte_event_enqueue_burst(event_d_id,
>> + port_id, ev + nb_tx,
>> + nb_rx - nb_tx);
>Can we use `continue` as we do not transmit from the same worker int
>his case?
I'm not sure I follow what you meant here. We are trying to transmit the work
present on the worker till we succeed, if we do a continue then we would loose
the untransmitted packets.
@see examples/eventdev_pipeline/pipeline_worker_generic.c +109
>> + }
>> +
>snipped
Hi Pavan,
snipped
> >> Add event dev main loop based on enabled l2fwd options and
> >eventdev
> >> capabilities.
> >>
> >> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> >> ---
> >> + if (rsrc->event_mode) {
> >> + port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
> >> + port_conf.rx_adv_conf.rss_conf.rss_key = NULL;
> >> + port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_IP;
> >> + }
> >Question, is RSS hash configured for generating flow id for Eventdev?
> >As my understanding. RSS for single RX port-queue pair does not require
> >the same.
Thanks for the confirmation.
> >snipped
>
> In case of SW event device i.e. vdev=event_sw0 the software Rx adapter requires
> mbuf::rss:hash to distribute packets else it has to calculate rss.
>
> @see lib/librte_eventdev/rte_event_eth_rx_adapter.c +817 '
> rss = do_rss ?
> rxa_do_softrss(m, rx_adapter->rss_key_be) :
> m->hash.rss;
> '
>
> >> + if (is_master && timer_period > 0) {
> >> + cur_tsc = rte_rdtsc();
> >> + diff_tsc = cur_tsc - prev_tsc;
> >> +
> >> + /* advance the timer */
> >> + timer_tsc += diff_tsc;
> >> +
> >> + /* if timer has reached its timeout */
> >> + if (unlikely(timer_tsc >= timer_period)) {
> >> + print_stats(rsrc);
> >> + /* reset the timer */
> >> + timer_tsc = 0;
> >> + }
> >> + prev_tsc = cur_tsc;
> >> + }
> >Is it possible to move the print_stats to service core, as 'CALL_MASTER'
> >is enabled in remote_launch making this a potential worker?
> >
>
> Since not every eventdevice requires Service core user might not pass service
> core mask.
> Instead we could do SKIP_MASTER and prints stats here.
Thanks
>
> >> +
> >> + /* Read packet from eventdev */
> >> + if (!rte_event_dequeue_burst(event_d_id, port_id,
> >&ev, 1, 0))
> >> + continue;
> >Is not this unlikely `nb_burst == 0`
> >
>
> Not necessarily refer
> https://mails.dpdk.org/archives/dev/2018-July/108610.html
Thanks for the article and links it is helpful. Following the article suggestion, should not the code be enhanced for execute code rather than continue?
snipped
> >> + /* Read packet from eventdev */
> >> + nb_rx = rte_event_dequeue_burst(event_d_id,
> >port_id, ev,
> >> + deq_len, 0);
> >> + if (nb_rx == 0)
> >Can we use `unlikely`?
>
> Not necessarily refer
> https://mails.dpdk.org/archives/dev/2018-July/108610.html
Same as above.
>
> >> + continue;
> >> +
> >> + for (i = 0; i < nb_rx; i++) {
> >> + l2fwd_event_fwd(rsrc, &ev[i], tx_q_id,
> >timer_period,
> >> + flags);
> >> + }
> >> +
> >> + if (flags & L2FWD_EVENT_TX_ENQ) {
> >> + nb_tx =
> >rte_event_enqueue_burst(event_d_id, port_id,
> >> + ev, nb_rx);
> >> + while (nb_tx < nb_rx && !rsrc->force_quit)
> >> + nb_tx +=
> >> rte_event_enqueue_burst(event_d_id,
> >> + port_id, ev + nb_tx,
> >> + nb_rx - nb_tx);
> >Can we use `continue` as we do not transmit from the same worker int
> >his case?
>
> I'm not sure I follow what you meant here. We are trying to transmit the work
> present on the worker till we succeed, if we do a continue then we would loose
> the untransmitted packets.
Maybe I mistook the L2FWD against ETHDEV_EVENT flags. As per my current understanding L2FWD_EVENT_TX_ENQ uses extra queue stage for single-event-enqueue for TX adapter, while L2FWD_EVENT_TX_DIRECT allows the worker to do transmit via port. May be this is wrong expectation.
>
> @see examples/eventdev_pipeline/pipeline_worker_generic.c +109
I will check the same and get back as required.
>
> >> + }
> >> +
> >snipped
<snip>
>> >> + /* Read packet from eventdev */
>> >> + if (!rte_event_dequeue_burst(event_d_id, port_id,
>> >&ev, 1, 0))
>> >> + continue;
>> >Is not this unlikely `nb_burst == 0`
>> >
>>
>> Not necessarily refer
>> https://urldefense.proofpoint.com/v2/url?u=https-
>3A__mails.dpdk.org_archives_dev_2018-
>2DJuly_108610.html&d=DwIFAg&c=nKjWec2b6R0mOyPaz7xtfQ&r=E3Sg
>YMjtKCMVsB-fmvgGV3o-
>g_fjLhk5Pupi9ijohpc&m=HyS43be9AB6KIroLm8d2EK4M6_lE_fZua3CTxY
>5vbiU&s=FcZlWlkbhYkrQ3HApkxAX6A17gyHQZ0cYoDeG3KkwTw&e=
>
>Thanks for the article and links it is helpful. Following the article
>suggestion, should not the code be enhanced for execute code rather
>than continue?
>
Do you mean there would be a difference between the below cases?
void case1() {
while (1) {
if (!(random() & 0x1))
continue;
}
}
void case2() {
while (1) {
uint8_t is_one = random() & 0x1;
if (is_one == 0)
continue;
}
}
AFAIK both the above cases compile to the same asm.
>snipped
>> >> + /* Read packet from eventdev */
>> >> + nb_rx = rte_event_dequeue_burst(event_d_id,
>> >port_id, ev,
>> >> + deq_len, 0);
>> >> + if (nb_rx == 0)
>> >Can we use `unlikely`?
>>
>> Not necessarily refer
>> https://urldefense.proofpoint.com/v2/url?u=https-
>3A__mails.dpdk.org_archives_dev_2018-
>2DJuly_108610.html&d=DwIFAg&c=nKjWec2b6R0mOyPaz7xtfQ&r=E3Sg
>YMjtKCMVsB-fmvgGV3o-
>g_fjLhk5Pupi9ijohpc&m=HyS43be9AB6KIroLm8d2EK4M6_lE_fZua3CTxY
>5vbiU&s=FcZlWlkbhYkrQ3HApkxAX6A17gyHQZ0cYoDeG3KkwTw&e=
>Same as above.
>
>>
>> >> + continue;
>> >> +
>> >> + for (i = 0; i < nb_rx; i++) {
>> >> + l2fwd_event_fwd(rsrc, &ev[i], tx_q_id,
>> >timer_period,
>> >> + flags);
>> >> + }
>> >> +
>> >> + if (flags & L2FWD_EVENT_TX_ENQ) {
>> >> + nb_tx =
>> >rte_event_enqueue_burst(event_d_id, port_id,
>> >> + ev, nb_rx);
>> >> + while (nb_tx < nb_rx && !rsrc->force_quit)
>> >> + nb_tx +=
>> >> rte_event_enqueue_burst(event_d_id,
>> >> + port_id, ev + nb_tx,
>> >> + nb_rx - nb_tx);
>> >Can we use `continue` as we do not transmit from the same worker
>int
>> >his case?
>>
>> I'm not sure I follow what you meant here. We are trying to transmit
>the work
>> present on the worker till we succeed, if we do a continue then we
>would loose
>> the untransmitted packets.
>Maybe I mistook the L2FWD against ETHDEV_EVENT flags. As per my
>current understanding L2FWD_EVENT_TX_ENQ uses extra queue stage
>for single-event-enqueue for TX adapter, while
>L2FWD_EVENT_TX_DIRECT allows the worker to do transmit via port.
>May be this is wrong expectation.
>
The above is understanding is correct. I just don't see how a continue out of
the while loop would help.
In the above case we are going to retry enqueue(TX_ENQ) or transmit(TX_DIRECT)
until we succeed.
>>
>> @see examples/eventdev_pipeline/pipeline_worker_generic.c +109
>I will check the same and get back as required.
>
>>
>> >> + }
>> >> +
>> >snipped
@@ -65,6 +65,12 @@ l2fwd_event_init_ports(struct l2fwd_resources *rsrc)
uint16_t port_id;
int ret;
+ if (rsrc->event_mode) {
+ port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
+ port_conf.rx_adv_conf.rss_conf.rss_key = NULL;
+ port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_IP;
+ }
+
/* Initialise each port */
RTE_ETH_FOREACH_DEV(port_id) {
struct rte_eth_conf local_port_conf = port_conf;
@@ -114,6 +114,7 @@ l2fwd_get_rsrc(void)
memset(rsrc, 0, sizeof(struct l2fwd_resources));
rsrc->mac_updating = true;
+ rsrc->event_mode = true;
rsrc->rx_queue_per_lcore = 1;
rsrc->sched_type = RTE_SCHED_TYPE_ATOMIC;
rsrc->timer_period = 10 * rte_get_timer_hz();
@@ -17,6 +17,12 @@
#include "l2fwd_event.h"
+#define L2FWD_EVENT_SINGLE 0x1
+#define L2FWD_EVENT_BURST 0x2
+#define L2FWD_EVENT_TX_DIRECT 0x4
+#define L2FWD_EVENT_TX_ENQ 0x8
+#define L2FWD_EVENT_UPDT_MAC 0x10
+
static inline int
l2fwd_event_service_enable(uint32_t service_id)
{
@@ -122,11 +128,271 @@ l2fwd_event_capability_setup(struct l2fwd_event_resources *evt_rsrc)
l2fwd_event_set_internal_port_ops(&evt_rsrc->ops);
}
+static __rte_noinline int
+l2fwd_get_free_event_port(struct l2fwd_event_resources *evt_rsrc)
+{
+ static int index;
+ int port_id;
+
+ rte_spinlock_lock(&evt_rsrc->evp.lock);
+ if (index >= evt_rsrc->evp.nb_ports) {
+ printf("No free event port is available\n");
+ return -1;
+ }
+
+ port_id = evt_rsrc->evp.event_p_id[index];
+ index++;
+ rte_spinlock_unlock(&evt_rsrc->evp.lock);
+
+ return port_id;
+}
+
+static __rte_always_inline void
+l2fwd_event_fwd(struct l2fwd_resources *rsrc, struct rte_event *ev,
+ const uint8_t tx_q_id, const uint64_t timer_period,
+ const uint32_t flags)
+{
+ struct rte_mbuf *mbuf = ev->mbuf;
+ uint16_t dst_port;
+
+ rte_prefetch0(rte_pktmbuf_mtod(mbuf, void *));
+ dst_port = rsrc->dst_ports[mbuf->port];
+
+ if (timer_period > 0)
+ __atomic_fetch_add(&rsrc->port_stats[mbuf->port].rx,
+ 1, __ATOMIC_RELAXED);
+ mbuf->port = dst_port;
+
+ if (flags & L2FWD_EVENT_UPDT_MAC)
+ l2fwd_mac_updating(mbuf, dst_port, &rsrc->eth_addr[dst_port]);
+
+ if (flags & L2FWD_EVENT_TX_ENQ) {
+ ev->queue_id = tx_q_id;
+ ev->op = RTE_EVENT_OP_FORWARD;
+ }
+
+ if (flags & L2FWD_EVENT_TX_DIRECT)
+ rte_event_eth_tx_adapter_txq_set(mbuf, 0);
+
+ if (timer_period > 0)
+ __atomic_fetch_add(&rsrc->port_stats[mbuf->port].tx,
+ 1, __ATOMIC_RELAXED);
+}
+
+static __rte_always_inline void
+l2fwd_event_loop_single(struct l2fwd_resources *rsrc,
+ const uint32_t flags)
+{
+ const uint8_t is_master = rte_get_master_lcore() == rte_lcore_id();
+ struct l2fwd_event_resources *evt_rsrc = rsrc->evt_rsrc;
+ const int port_id = l2fwd_get_free_event_port(evt_rsrc);
+ uint64_t prev_tsc = 0, diff_tsc, cur_tsc, timer_tsc = 0;
+ const uint64_t timer_period = rsrc->timer_period;
+ const uint8_t tx_q_id = evt_rsrc->evq.event_q_id[
+ evt_rsrc->evq.nb_queues - 1];
+ const uint8_t event_d_id = evt_rsrc->event_d_id;
+ struct rte_event ev;
+
+ if (port_id < 0)
+ return;
+
+ printf("%s(): entering eventdev main loop on lcore %u\n", __func__,
+ rte_lcore_id());
+
+ while (!rsrc->force_quit) {
+ /* if timer is enabled */
+ if (is_master && timer_period > 0) {
+ cur_tsc = rte_rdtsc();
+ diff_tsc = cur_tsc - prev_tsc;
+
+ /* advance the timer */
+ timer_tsc += diff_tsc;
+
+ /* if timer has reached its timeout */
+ if (unlikely(timer_tsc >= timer_period)) {
+ print_stats(rsrc);
+ /* reset the timer */
+ timer_tsc = 0;
+ }
+ prev_tsc = cur_tsc;
+ }
+
+ /* Read packet from eventdev */
+ if (!rte_event_dequeue_burst(event_d_id, port_id, &ev, 1, 0))
+ continue;
+
+ l2fwd_event_fwd(rsrc, &ev, tx_q_id, timer_period, flags);
+
+ if (flags & L2FWD_EVENT_TX_ENQ) {
+ while (rte_event_enqueue_burst(event_d_id, port_id,
+ &ev, 1) &&
+ !rsrc->force_quit)
+ ;
+ }
+
+ if (flags & L2FWD_EVENT_TX_DIRECT) {
+ while (!rte_event_eth_tx_adapter_enqueue(event_d_id,
+ port_id,
+ &ev, 1, 0) &&
+ !rsrc->force_quit)
+ ;
+ }
+ }
+}
+
+static __rte_always_inline void
+l2fwd_event_loop_burst(struct l2fwd_resources *rsrc,
+ const uint32_t flags)
+{
+ const uint8_t is_master = rte_get_master_lcore() == rte_lcore_id();
+ struct l2fwd_event_resources *evt_rsrc = rsrc->evt_rsrc;
+ const int port_id = l2fwd_get_free_event_port(evt_rsrc);
+ uint64_t prev_tsc = 0, diff_tsc, cur_tsc, timer_tsc = 0;
+ const uint64_t timer_period = rsrc->timer_period;
+ const uint8_t tx_q_id = evt_rsrc->evq.event_q_id[
+ evt_rsrc->evq.nb_queues - 1];
+ const uint8_t event_d_id = evt_rsrc->event_d_id;
+ const uint8_t deq_len = evt_rsrc->deq_depth;
+ struct rte_event ev[MAX_PKT_BURST];
+ uint16_t nb_rx, nb_tx;
+ uint8_t i;
+
+ if (port_id < 0)
+ return;
+
+ printf("%s(): entering eventdev main loop on lcore %u\n", __func__,
+ rte_lcore_id());
+
+ while (!rsrc->force_quit) {
+ /* if timer is enabled */
+ if (is_master && timer_period > 0) {
+ cur_tsc = rte_rdtsc();
+ diff_tsc = cur_tsc - prev_tsc;
+
+ /* advance the timer */
+ timer_tsc += diff_tsc;
+
+ /* if timer has reached its timeout */
+ if (unlikely(timer_tsc >= timer_period)) {
+ print_stats(rsrc);
+ /* reset the timer */
+ timer_tsc = 0;
+ }
+ prev_tsc = cur_tsc;
+ }
+
+ /* Read packet from eventdev */
+ nb_rx = rte_event_dequeue_burst(event_d_id, port_id, ev,
+ deq_len, 0);
+ if (nb_rx == 0)
+ continue;
+
+ for (i = 0; i < nb_rx; i++) {
+ l2fwd_event_fwd(rsrc, &ev[i], tx_q_id, timer_period,
+ flags);
+ }
+
+ if (flags & L2FWD_EVENT_TX_ENQ) {
+ nb_tx = rte_event_enqueue_burst(event_d_id, port_id,
+ ev, nb_rx);
+ while (nb_tx < nb_rx && !rsrc->force_quit)
+ nb_tx += rte_event_enqueue_burst(event_d_id,
+ port_id, ev + nb_tx,
+ nb_rx - nb_tx);
+ }
+
+ if (flags & L2FWD_EVENT_TX_DIRECT) {
+ nb_tx = rte_event_eth_tx_adapter_enqueue(event_d_id,
+ port_id, ev,
+ nb_rx, 0);
+ while (nb_tx < nb_rx && !rsrc->force_quit)
+ nb_tx += rte_event_eth_tx_adapter_enqueue(
+ event_d_id, port_id,
+ ev + nb_tx, nb_rx - nb_tx, 0);
+ }
+ }
+}
+
+static __rte_always_inline void
+l2fwd_event_loop(struct l2fwd_resources *rsrc,
+ const uint32_t flags)
+{
+ if (flags & L2FWD_EVENT_SINGLE)
+ l2fwd_event_loop_single(rsrc, flags);
+ if (flags & L2FWD_EVENT_BURST)
+ l2fwd_event_loop_burst(rsrc, flags);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_d(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc,
+ L2FWD_EVENT_TX_DIRECT | L2FWD_EVENT_SINGLE);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_d_brst(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_TX_DIRECT | L2FWD_EVENT_BURST);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_q(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_TX_ENQ | L2FWD_EVENT_SINGLE);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_q_brst(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_TX_ENQ | L2FWD_EVENT_BURST);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_d_mac(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_UPDT_MAC |
+ L2FWD_EVENT_TX_DIRECT | L2FWD_EVENT_SINGLE);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_d_brst_mac(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_UPDT_MAC |
+ L2FWD_EVENT_TX_DIRECT | L2FWD_EVENT_BURST);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_q_mac(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_UPDT_MAC |
+ L2FWD_EVENT_TX_ENQ | L2FWD_EVENT_SINGLE);
+}
+
+static void __rte_noinline
+l2fwd_event_main_loop_tx_q_brst_mac(struct l2fwd_resources *rsrc)
+{
+ l2fwd_event_loop(rsrc, L2FWD_EVENT_UPDT_MAC |
+ L2FWD_EVENT_TX_ENQ | L2FWD_EVENT_BURST);
+}
+
void
l2fwd_event_resource_setup(struct l2fwd_resources *rsrc)
{
+ /* [MAC_UPDT][TX_MODE][BURST] */
+ const event_loop_cb event_loop[2][2][2] = {
+ [0][0][0] = l2fwd_event_main_loop_tx_d,
+ [0][0][1] = l2fwd_event_main_loop_tx_d_brst,
+ [0][1][0] = l2fwd_event_main_loop_tx_q,
+ [0][1][1] = l2fwd_event_main_loop_tx_q_brst,
+ [1][0][0] = l2fwd_event_main_loop_tx_d_mac,
+ [1][0][1] = l2fwd_event_main_loop_tx_d_brst_mac,
+ [1][1][0] = l2fwd_event_main_loop_tx_q_mac,
+ [1][1][1] = l2fwd_event_main_loop_tx_q_brst_mac,
+ };
struct l2fwd_event_resources *evt_rsrc;
uint32_t event_queue_cfg;
+ int ret;
if (!rte_event_dev_count())
rte_panic("No Eventdev found\n");
@@ -152,4 +418,14 @@ l2fwd_event_resource_setup(struct l2fwd_resources *rsrc)
/* Rx/Tx adapters configuration */
evt_rsrc->ops.adapter_setup(rsrc);
+
+ /* Start event device */
+ ret = rte_event_dev_start(evt_rsrc->event_d_id);
+ if (ret < 0)
+ rte_panic("Error in starting eventdev\n");
+
+ evt_rsrc->ops.l2fwd_event_loop = event_loop
+ [rsrc->mac_updating]
+ [evt_rsrc->tx_mode_q]
+ [evt_rsrc->has_burst];
}
@@ -18,6 +18,7 @@ typedef void (*event_port_setup_cb)(struct l2fwd_resources *rsrc);
typedef void (*event_queue_setup_cb)(struct l2fwd_resources *rsrc,
uint32_t event_queue_cfg);
typedef void (*adapter_setup_cb)(struct l2fwd_resources *rsrc);
+typedef void (*event_loop_cb)(struct l2fwd_resources *rsrc);
struct event_queues {
uint8_t *event_q_id;
@@ -47,6 +48,7 @@ struct event_setup_ops {
event_queue_setup_cb event_queue_setup;
event_port_setup_cb event_port_setup;
adapter_setup_cb adapter_setup;
+ event_loop_cb l2fwd_event_loop;
};
struct l2fwd_event_resources {
@@ -214,8 +214,12 @@ l2fwd_launch_one_lcore(void *args)
{
struct l2fwd_resources *rsrc = args;
struct l2fwd_poll_resources *poll_rsrc = rsrc->poll_rsrc;
+ struct l2fwd_event_resources *evt_rsrc = rsrc->evt_rsrc;
- poll_rsrc->poll_main_loop(rsrc);
+ if (rsrc->event_mode)
+ evt_rsrc->ops.l2fwd_event_loop(rsrc);
+ else
+ poll_rsrc->poll_main_loop(rsrc);
return 0;
}