@@ -140,5 +140,27 @@ schedule_devices(unsigned int lcore_id)
}
}
+static inline void
+worker_cleanup(uint8_t dev_id, uint8_t port_id, struct rte_event events[],
+ uint16_t nb_enq, uint16_t nb_deq)
+{
+ int i;
+
+ if (!(nb_deq - nb_enq))
+ return;
+
+ if (nb_deq) {
+ for (i = nb_enq; i < nb_deq; i++) {
+ if (events[i].op == RTE_EVENT_OP_RELEASE)
+ continue;
+ rte_pktmbuf_free(events[i].mbuf);
+ }
+
+ for (i = 0; i < nb_deq; i++)
+ events[i].op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev_id, port_id, events, nb_deq);
+ }
+}
+
void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);
@@ -16,6 +16,7 @@ worker_generic(void *arg)
uint8_t port_id = data->port_id;
size_t sent = 0, received = 0;
unsigned int lcore_id = rte_lcore_id();
+ uint16_t nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
@@ -27,8 +28,7 @@ worker_generic(void *arg)
continue;
}
- const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
- &ev, 1, 0);
+ nb_rx = rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0);
if (nb_rx == 0) {
rte_pause();
@@ -47,11 +47,14 @@ worker_generic(void *arg)
work();
- while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
- rte_pause();
+ do {
+ nb_tx = rte_event_enqueue_burst(dev_id, port_id, &ev,
+ 1);
+ } while (!nb_tx && !fdata->done);
sent++;
}
+ worker_cleanup(dev_id, port_id, &ev, nb_tx, nb_rx);
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu TX=%zu\n",
rte_lcore_id(), received, sent);
@@ -69,10 +72,9 @@ worker_generic_burst(void *arg)
uint8_t port_id = data->port_id;
size_t sent = 0, received = 0;
unsigned int lcore_id = rte_lcore_id();
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
-
if (fdata->cap.scheduler)
fdata->cap.scheduler(lcore_id);
@@ -81,8 +83,8 @@ worker_generic_burst(void *arg)
continue;
}
- const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
- events, RTE_DIM(events), 0);
+ nb_rx = rte_event_dequeue_burst(dev_id, port_id, events,
+ RTE_DIM(events), 0);
if (nb_rx == 0) {
rte_pause();
@@ -103,8 +105,7 @@ worker_generic_burst(void *arg)
work();
}
- uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
- events, nb_rx);
+ nb_tx = rte_event_enqueue_burst(dev_id, port_id, events, nb_rx);
while (nb_tx < nb_rx && !fdata->done)
nb_tx += rte_event_enqueue_burst(dev_id, port_id,
events + nb_tx,
@@ -112,6 +113,8 @@ worker_generic_burst(void *arg)
sent += nb_tx;
}
+ worker_cleanup(dev_id, port_id, events, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu TX=%zu\n",
rte_lcore_id(), received, sent);
@@ -18,21 +18,22 @@ static __rte_always_inline void
worker_event_enqueue(const uint8_t dev, const uint8_t port,
struct rte_event *ev)
{
- while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+ while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done)
rte_pause();
}
-static __rte_always_inline void
+static __rte_always_inline uint16_t
worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
- struct rte_event *ev, const uint16_t nb_rx)
+ struct rte_event *ev, const uint16_t nb_rx)
{
uint16_t enq;
enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
- while (enq < nb_rx) {
+ while (enq < nb_rx && !fdata->done)
enq += rte_event_enqueue_burst(dev, port,
ev + enq, nb_rx - enq);
- }
+
+ return enq;
}
static __rte_always_inline void
@@ -40,7 +41,8 @@ worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
{
exchange_mac(ev->mbuf);
rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
- while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+ while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) &&
+ !fdata->done)
rte_pause();
}
@@ -76,6 +78,11 @@ worker_do_tx_single(void *arg)
}
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -111,6 +118,11 @@ worker_do_tx_single_atq(void *arg)
}
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -126,11 +138,10 @@ worker_do_tx_single_burst(void *arg)
const uint8_t dev = data->dev_id;
const uint8_t port = data->port_id;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t nb_tx = 0, nb_rx = 0, i;
while (!fdata->done) {
- uint16_t i;
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -153,10 +164,12 @@ worker_do_tx_single_burst(void *arg)
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -172,11 +185,10 @@ worker_do_tx_single_burst_atq(void *arg)
const uint8_t dev = data->dev_id;
const uint8_t port = data->port_id;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -197,10 +209,12 @@ worker_do_tx_single_burst_atq(void *arg)
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -251,6 +265,11 @@ worker_do_tx(void *arg)
fwd++;
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -297,6 +316,11 @@ worker_do_tx_atq(void *arg)
fwd++;
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -314,11 +338,10 @@ worker_do_tx_burst(void *arg)
uint8_t port = data->port_id;
uint8_t lst_qid = cdata.num_stages - 1;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
- const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
- ev, BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (nb_rx == 0) {
rte_pause();
@@ -347,11 +370,13 @@ worker_do_tx_burst(void *arg)
}
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
@@ -369,12 +394,10 @@ worker_do_tx_burst_atq(void *arg)
uint8_t port = data->port_id;
uint8_t lst_qid = cdata.num_stages - 1;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
-
- const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
- ev, BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (nb_rx == 0) {
rte_pause();
@@ -402,10 +425,12 @@ worker_do_tx_burst_atq(void *arg)
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);