[dpdk-dev,1/3] examples/eventdev_pipeline: added sample app

Message ID 1492768299-84016-2-git-send-email-harry.van.haaren@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Van Haaren, Harry April 21, 2017, 9:51 a.m. UTC
  This commit adds a sample app for the eventdev library.
The app has been tested with DPDK 17.05-rc2, hence this
release (or later) is recommended.

The sample app showcases a pipeline processing use-case,
with event scheduling and processing defined per stage.
The application recieves traffic as normal, with each
packet traversing the pipeline. Once the packet has
been processed by each of the pipeline stages, it is
transmitted again.

The app provides a framework to utilize cores for a single
role or multiple roles. Examples of roles are the RX core,
TX core, Scheduling core (in the case of the event/sw PMD),
and worker cores.

Various flags are available to configure numbers of stages,
cycles of work at each stage, type of scheduling, number of
worker cores, queue depths etc. For a full explaination,
please refer to the documentation.

Signed-off-by: Gage Eads <gage.eads@intel.com>
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
---
 examples/eventdev_pipeline/Makefile |  49 ++
 examples/eventdev_pipeline/main.c   | 975 ++++++++++++++++++++++++++++++++++++
 2 files changed, 1024 insertions(+)
 create mode 100644 examples/eventdev_pipeline/Makefile
 create mode 100644 examples/eventdev_pipeline/main.c
  

Comments

Jerin Jacob May 10, 2017, 2:12 p.m. UTC | #1
-----Original Message-----
> Date: Fri, 21 Apr 2017 10:51:37 +0100
> From: Harry van Haaren <harry.van.haaren@intel.com>
> To: dev@dpdk.org
> CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
>  <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
>  Richardson <bruce.richardson@intel.com>
> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
> X-Mailer: git-send-email 2.7.4
> 
> This commit adds a sample app for the eventdev library.
> The app has been tested with DPDK 17.05-rc2, hence this
> release (or later) is recommended.
> 
> The sample app showcases a pipeline processing use-case,
> with event scheduling and processing defined per stage.
> The application recieves traffic as normal, with each
> packet traversing the pipeline. Once the packet has
> been processed by each of the pipeline stages, it is
> transmitted again.
> 
> The app provides a framework to utilize cores for a single
> role or multiple roles. Examples of roles are the RX core,
> TX core, Scheduling core (in the case of the event/sw PMD),
> and worker cores.
> 
> Various flags are available to configure numbers of stages,
> cycles of work at each stage, type of scheduling, number of
> worker cores, queue depths etc. For a full explaination,
> please refer to the documentation.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>

Thanks for the example application to share the SW view.
I could make it run on HW after some tweaking(not optimized though)

[...]
> +#define MAX_NUM_STAGES 8
> +#define BATCH_SIZE 16
> +#define MAX_NUM_CORE 64

How about RTE_MAX_LCORE?

> +
> +static unsigned int active_cores;
> +static unsigned int num_workers;
> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
> +static unsigned int num_fids = 512;
> +static unsigned int num_priorities = 1;

looks like its not used.

> +static unsigned int num_stages = 1;
> +static unsigned int worker_cq_depth = 16;
> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
> +static int16_t qid[MAX_NUM_STAGES] = {-1};

Moving all fastpath related variables under a structure with cache
aligned will help.

> +static int worker_cycles;
> +static int enable_queue_priorities;
> +
> +struct prod_data {
> +	uint8_t dev_id;
> +	uint8_t port_id;
> +	int32_t qid;
> +	unsigned num_nic_ports;
> +};

cache aligned ?

> +
> +struct cons_data {
> +	uint8_t dev_id;
> +	uint8_t port_id;
> +};
> +

cache aligned ?

> +static struct prod_data prod_data;
> +static struct cons_data cons_data;
> +
> +struct worker_data {
> +	uint8_t dev_id;
> +	uint8_t port_id;
> +};

cache aligned ?

> +
> +static unsigned *enqueue_cnt;
> +static unsigned *dequeue_cnt;
> +
> +static volatile int done;
> +static volatile int prod_stop;

No one updating the prod_stop.

> +static int quiet;
> +static int dump_dev;
> +static int dump_dev_signal;
> +
> +static uint32_t rx_lock;
> +static uint32_t tx_lock;
> +static uint32_t sched_lock;
> +static bool rx_single;
> +static bool tx_single;
> +static bool sched_single;
> +
> +static unsigned rx_core[MAX_NUM_CORE];
> +static unsigned tx_core[MAX_NUM_CORE];
> +static unsigned sched_core[MAX_NUM_CORE];
> +static unsigned worker_core[MAX_NUM_CORE];
> +
> +static bool
> +core_in_use(unsigned lcore_id) {
> +	return (rx_core[lcore_id] || sched_core[lcore_id] ||
> +		tx_core[lcore_id] || worker_core[lcore_id]);
> +}
> +
> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
> +
> +static void
> +rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
> +			void *userdata)

IMO, It is better to not use rte_eth_* for application functions.

> +{
> +	int port_id = (uintptr_t) userdata;
> +	unsigned _sent = 0;
> +
> +	do {
> +		/* Note: hard-coded TX queue */
> +		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
> +					  unsent - _sent);
> +	} while (_sent != unsent);
> +}
> +
> +static int
> +consumer(void)
> +{
> +	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
> +	struct rte_event packets[BATCH_SIZE];
> +
> +	static uint64_t npackets;
> +	static uint64_t received;
> +	static uint64_t received_printed;
> +	static uint64_t time_printed;
> +	static uint64_t start_time;
> +	unsigned i, j;
> +	uint8_t dev_id = cons_data.dev_id;
> +	uint8_t port_id = cons_data.port_id;
> +
> +	if (!npackets)
> +		npackets = num_packets;
> +
> +	do {
> +		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
> +				packets, RTE_DIM(packets), 0);

		const uint16_t n =
> +
> +		if (n == 0) {
> +			for (j = 0; j < rte_eth_dev_count(); j++)
> +				rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
> +			return 0;
> +		}
> +		if (start_time == 0)
> +			time_printed = start_time = rte_get_timer_cycles();
> +
> +		received += n;
> +		for (i = 0; i < n; i++) {
> +			uint8_t outport = packets[i].mbuf->port;
> +			rte_eth_tx_buffer(outport, 0, tx_buf[outport],
> +					packets[i].mbuf);
> +		}
> +
> +		if (!quiet && received >= received_printed + (1<<22)) {
> +			const uint64_t now = rte_get_timer_cycles();
> +			const uint64_t delta_cycles = now - start_time;
> +			const uint64_t elapsed_ms = delta_cycles / freq_khz;
> +			const uint64_t interval_ms =
> +					(now - time_printed) / freq_khz;
> +
> +			uint64_t rx_noprint = received - received_printed;
> +			printf("# consumer RX=%"PRIu64", time %"PRIu64
> +				"ms, avg %.3f mpps [current %.3f mpps]\n",
> +					received, elapsed_ms,
> +					(received) / (elapsed_ms * 1000.0),
> +					rx_noprint / (interval_ms * 1000.0));
> +			received_printed = received;
> +			time_printed = now;
> +		}
> +
> +		dequeue_cnt[0] += n;
> +
> +		if (num_packets > 0 && npackets > 0) {
> +			npackets -= n;
> +			if (npackets == 0 || npackets > num_packets)
> +				done = 1;
> +		}

Looks like very complicated logic.I think we can simplify it.

> +	} while (0);

do while(0); really required here?

> +
> +	return 0;
> +}
> +
> +static int
> +producer(void)
> +{
> +	static uint8_t eth_port;
> +	struct rte_mbuf *mbufs[BATCH_SIZE];
> +	struct rte_event ev[BATCH_SIZE];
> +	uint32_t i, num_ports = prod_data.num_nic_ports;
> +	int32_t qid = prod_data.qid;
> +	uint8_t dev_id = prod_data.dev_id;
> +	uint8_t port_id = prod_data.port_id;
> +	uint32_t prio_idx = 0;
> +
> +	const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
> +	if (++eth_port == num_ports)
> +		eth_port = 0;
> +	if (nb_rx == 0) {
> +		rte_pause();
> +		return 0;
> +	}
> +
> +	for (i = 0; i < nb_rx; i++) {
> +		ev[i].flow_id = mbufs[i]->hash.rss;

prefetching the buff[i+1] may help here?

> +		ev[i].op = RTE_EVENT_OP_NEW;
> +		ev[i].sched_type = queue_type;

The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED. So, we
cannot assign .sched_type as queue_type.

I think, one option could be to avoid translation in application is to
- Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
- Introduce a new RTE_EVENT_DEV_CAP_ to denote RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
ability
- add sched_type in struct rte_event_queue_conf. If capability flag is
  not set then implementation takes sched_type value for the queue.

Any thoughts?


> +		ev[i].queue_id = qid;
> +		ev[i].event_type = RTE_EVENT_TYPE_CPU;

IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is
producing the Ethernet packets/events.

> +		ev[i].sub_event_type = 0;
> +		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
> +		ev[i].mbuf = mbufs[i];
> +		RTE_SET_USED(prio_idx);
> +	}
> +
> +	const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);

For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do burst
operation unlike FORWARD case(which is one event at a time).Earlier, I
thought I can abstract the producer pattern in PMD, but it looks like we
are going with application driven producer model based on latest RFC.So I think,
we can add one flag to rte_event_enqueue_burst to denote all the events
are of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this.

I can send a patch for the same.

Any thoughts?


> +	if (nb_tx != nb_rx) {
> +		for (i = nb_tx; i < nb_rx; i++)
> +			rte_pktmbuf_free(mbufs[i]);
> +	}
> +	enqueue_cnt[0] += nb_tx;
> +
> +	if (unlikely(prod_stop))

I think, No one updating the prod_stop

> +		done = 1;
> +
> +	return 0;
> +}
> +
> +static inline void
> +schedule_devices(uint8_t dev_id, unsigned lcore_id)
> +{
> +	if (rx_core[lcore_id] && (rx_single ||
> +	    rte_atomic32_cmpset(&rx_lock, 0, 1))) {

This pattern(rte_atomic32_cmpset) makes application can inject only
"one core" worth of packets. Not enough for low-end cores. May be we need
multiple producer options. I think, new RFC is addressing it.

> +		producer();
> +		rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
> +	}
> +
> +	if (sched_core[lcore_id] && (sched_single ||
> +	    rte_atomic32_cmpset(&sched_lock, 0, 1))) {
> +		rte_event_schedule(dev_id);
> +		if (dump_dev_signal) {
> +			rte_event_dev_dump(0, stdout);
> +			dump_dev_signal = 0;
> +		}
> +		rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
> +	}

Lot of unwanted code if RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED set.

I think, We can make common code with compile time aware and make
runtime workers based on the flag..
i.e
rte_eal_remote_launch(worker_x, &worker_data[worker_idx], lcore_id);
rte_eal_remote_launch(worker_y, &worker_data[worker_idx], lcore_id);

May we can improve after initial version.

> +
> +	if (tx_core[lcore_id] && (tx_single ||
> +	    rte_atomic32_cmpset(&tx_lock, 0, 1))) {
> +		consumer();

Should consumer() need to come in this pattern? I am thinking like
if events is from last stage then call consumer() in worker()

I think, above scheme works better when the _same_ worker code need to run the
case where
1) ethdev HW is capable to enqueuing the packets to same txq from
  multiple thread
2) ethdev is not capable to do so.

So, The above cases can be addressed in configuration time where we link
the queues to port
case 1) Link all workers to last queue
case 2) Link only worker to last queue

and keeping the common worker code.

HW implementation has functional and performance issue if "two" ports are
assigned to one lcore for dequeue. The above scheme fixes that problem too.

> +		rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
> +	}
> +}
> +
> +static int
> +worker(void *arg)
> +{
> +	struct rte_event events[BATCH_SIZE];
> +
> +	struct worker_data *data = (struct worker_data *)arg;
> +	uint8_t dev_id = data->dev_id;
> +	uint8_t port_id = data->port_id;
> +	size_t sent = 0, received = 0;
> +	unsigned lcore_id = rte_lcore_id();
> +
> +	while (!done) {
> +		uint16_t i;
> +
> +		schedule_devices(dev_id, lcore_id);
> +
> +		if (!worker_core[lcore_id]) {
> +			rte_pause();
> +			continue;
> +		}
> +
> +		uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
> +				events, RTE_DIM(events), 0);
> +
> +		if (nb_rx == 0) {
> +			rte_pause();
> +			continue;
> +		}
> +		received += nb_rx;
> +
> +		for (i = 0; i < nb_rx; i++) {
> +			struct ether_hdr *eth;
> +			struct ether_addr addr;
> +			struct rte_mbuf *m = events[i].mbuf;
> +
> +			/* The first worker stage does classification */
> +			if (events[i].queue_id == qid[0])
> +				events[i].flow_id = m->hash.rss % num_fids;

Not sure why we need do(shrinking the flows) this in worker() in queue based pipeline.
If an PMD has any specific requirement on num_fids,I think, we
can move this configuration stage or PMD can choose optimum fid internally to
avoid modulus operation tax in fastpath in all PMD.

Does struct rte_event_queue_conf.nb_atomic_flows help here?

> +
> +			events[i].queue_id = next_qid[events[i].queue_id];
> +			events[i].op = RTE_EVENT_OP_FORWARD;

missing events[i].sched_type.HW PMD does not work with this.
I think, we can use similar scheme like next_qid for next_sched_type.

> +
> +			/* change mac addresses on packet (to use mbuf data) */
> +			eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
> +			ether_addr_copy(&eth->d_addr, &addr);
> +			ether_addr_copy(&eth->s_addr, &eth->d_addr);
> +			ether_addr_copy(&addr, &eth->s_addr);

IMO, We can make packet processing code code as "static inline function" so
different worker types can reuse.

> +
> +			/* do a number of cycles of work per packet */
> +			volatile uint64_t start_tsc = rte_rdtsc();
> +			while (rte_rdtsc() < start_tsc + worker_cycles)
> +				rte_pause();

Ditto.

I think, All worker specific variables like "worker_cycles" can moved into
one structure and use.

> +		}
> +		uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
> +				events, nb_rx);
> +		while (nb_tx < nb_rx && !done)
> +			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> +							events + nb_tx,
> +							nb_rx - nb_tx);
> +		sent += nb_tx;
> +	}
> +
> +	if (!quiet)
> +		printf("  worker %u thread done. RX=%zu TX=%zu\n",
> +				rte_lcore_id(), received, sent);
> +
> +	return 0;
> +}
> +
> +/*
> + * Parse the coremask given as argument (hexadecimal string) and fill
> + * the global configuration (core role and core count) with the parsed
> + * value.
> + */
> +static int xdigit2val(unsigned char c)

multiple instance of "xdigit2val" in DPDK repo. May be we can push this
as common code.

> +{
> +	int val;
> +
> +	if (isdigit(c))
> +		val = c - '0';
> +	else if (isupper(c))
> +		val = c - 'A' + 10;
> +	else
> +		val = c - 'a' + 10;
> +	return val;
> +}
> +
> +
> +static void
> +usage(void)
> +{
> +	const char *usage_str =
> +		"  Usage: eventdev_demo [options]\n"
> +		"  Options:\n"
> +		"  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
> +		"  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"

I think, this parameter now, effects the application fast path code.I think,
it should eventdev configuration para-mater.

> +		"  -s, --num_stages=N           Use N atomic stages (default 1)\n"
> +		"  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
> +		"  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
> +		"  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
> +		"  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
> +		"  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
> +		"  -W  --work-cycles=N          Worker cycles (default 0)\n"
> +		"  -P  --queue-priority         Enable scheduler queue prioritization\n"
> +		"  -o, --ordered                Use ordered scheduling\n"
> +		"  -p, --parallel               Use parallel scheduling\n"

IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
operation. It is valid have to any combination. We need to express that in
command like
example:
3 stage with
O->A->P

> +		"  -q, --quiet                  Minimize printed output\n"
> +		"  -D, --dump                   Print detailed statistics before exit"
> +		"\n";
> +	fprintf(stderr, "%s", usage_str);
> +	exit(1);
> +}
> +

[...]

> +			rx_single = (popcnt == 1);
> +			break;
> +		case 't':
> +			tx_lcore_mask = parse_coremask(optarg);
> +			popcnt = __builtin_popcountll(tx_lcore_mask);
> +			tx_single = (popcnt == 1);
> +			break;
> +		case 'e':
> +			sched_lcore_mask = parse_coremask(optarg);
> +			popcnt = __builtin_popcountll(sched_lcore_mask);
> +			sched_single = (popcnt == 1);
> +			break;
> +		default:
> +			usage();
> +		}
> +	}
> +
> +	if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
> +	    sched_lcore_mask == 0 || tx_lcore_mask == 0) {

Need to honor RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED i.e sched_lcore_mask
is zero can be valid case.


> +		printf("Core part of pipeline was not assigned any cores. "
> +			"This will stall the pipeline, please check core masks "
> +			"(use -h for details on setting core masks):\n"
> +			"\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
> +			"\n\tworkers: %"PRIu64"\n",
> +			rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
> +			worker_lcore_mask);
> +		rte_exit(-1, "Fix core masks\n");
> +	}
> +	if (num_stages == 0 || num_stages > MAX_NUM_STAGES)
> +		usage();
> +
> +	for (i = 0; i < MAX_NUM_CORE; i++) {
> +		rx_core[i] = !!(rx_lcore_mask & (1UL << i));
> +		tx_core[i] = !!(tx_lcore_mask & (1UL << i));
> +		sched_core[i] = !!(sched_lcore_mask & (1UL << i));
> +		worker_core[i] = !!(worker_lcore_mask & (1UL << i));
> +
> +		if (worker_core[i])
> +			num_workers++;
> +		if (core_in_use(i))
> +			active_cores++;
> +	}
> +}
> +
> +
> +struct port_link {
> +	uint8_t queue_id;
> +	uint8_t priority;
> +};
> +
> +static int
> +setup_eventdev(struct prod_data *prod_data,
> +		struct cons_data *cons_data,
> +		struct worker_data *worker_data)
> +{
> +	const uint8_t dev_id = 0;
> +	/* +1 stages is for a SINGLE_LINK TX stage */
> +	const uint8_t nb_queues = num_stages + 1;
> +	/* + 2 is one port for producer and one for consumer */
> +	const uint8_t nb_ports = num_workers + 2;

selection of number of ports is a function of rte_event_has_producer().
I think, it will be addressed with RFC.

> +	const struct rte_event_dev_config config = {
> +			.nb_event_queues = nb_queues,
> +			.nb_event_ports = nb_ports,
> +			.nb_events_limit  = 4096,
> +			.nb_event_queue_flows = 1024,
> +			.nb_event_port_dequeue_depth = 128,
> +			.nb_event_port_enqueue_depth = 128,

OCTEONTX PMD driver has .nb_event_port_dequeue_depth = 1 and
.nb_event_port_enqueue_depth = 1 and  struct rte_event_dev_info.min_dequeue_timeout_ns
= 853 value.
I think, we need to check the rte_event_dev_info_get() first to get the sane
values and take RTE_MIN or RTE_MAX based on the use case.

or

I can ignore this value in OCTEONTX PMD. But I am not sure NXP case,
Any thoughts from NXP folks.


> +	};
> +	const struct rte_event_port_conf wkr_p_conf = {
> +			.dequeue_depth = worker_cq_depth,

Same as above

> +			.enqueue_depth = 64,

Same as above

> +			.new_event_threshold = 4096,
> +	};
> +	struct rte_event_queue_conf wkr_q_conf = {
> +			.event_queue_cfg = queue_type,
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.nb_atomic_flows = 1024,
> +			.nb_atomic_order_sequences = 1024,
> +	};
> +	const struct rte_event_port_conf tx_p_conf = {
> +			.dequeue_depth = 128,

Same as above
> +			.enqueue_depth = 128,

Same as above

> +			.new_event_threshold = 4096,
> +	};
> +	const struct rte_event_queue_conf tx_q_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
> +			.event_queue_cfg =
> +					RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
> +					RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
> +			.nb_atomic_flows = 1024,
> +			.nb_atomic_order_sequences = 1024,
> +	};
> +
> +	struct port_link worker_queues[MAX_NUM_STAGES];
> +	struct port_link tx_queue;
> +	unsigned i;
> +
> +	int ret, ndev = rte_event_dev_count();
> +	if (ndev < 1) {
> +		printf("%d: No Eventdev Devices Found\n", __LINE__);
> +		return -1;
> +	}
> +
> +	struct rte_event_dev_info dev_info;
> +	ret = rte_event_dev_info_get(dev_id, &dev_info);
> +	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
> +
> +	ret = rte_event_dev_configure(dev_id, &config);
> +	if (ret < 0)
> +		printf("%d: Error configuring device\n", __LINE__)

Don't process further with failed configure.

> +
> +	/* Q creation - one load balanced per pipeline stage*/
> +
> +	/* set up one port per worker, linking to all stage queues */
> +	for (i = 0; i < num_workers; i++) {
> +		struct worker_data *w = &worker_data[i];
> +		w->dev_id = dev_id;
> +		if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
> +			printf("Error setting up port %d\n", i);
> +			return -1;
> +		}
> +
> +		uint32_t s;
> +		for (s = 0; s < num_stages; s++) {
> +			if (rte_event_port_link(dev_id, i,
> +						&worker_queues[s].queue_id,
> +						&worker_queues[s].priority,
> +						1) != 1) {
> +				printf("%d: error creating link for port %d\n",
> +						__LINE__, i);
> +				return -1;
> +			}
> +		}
> +		w->port_id = i;
> +	}
> +	/* port for consumer, linked to TX queue */
> +	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {

If ethdev supports MT txq queue support then this port can be linked to
worker too. something to consider for future.

> +		printf("Error setting up port %d\n", i);
> +		return -1;
> +	}
> +	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
> +				&tx_queue.priority, 1) != 1) {
> +		printf("%d: error creating link for port %d\n",
> +				__LINE__, i);
> +		return -1;
> +	}
> +	/* port for producer, no links */
> +	const struct rte_event_port_conf rx_p_conf = {
> +			.dequeue_depth = 8,
> +			.enqueue_depth = 8,

same as above issue.You could get default config first and configure.

> +			.new_event_threshold = 1200,
> +	};
> +	if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
> +		printf("Error setting up port %d\n", i);
> +		return -1;
> +	}
> +
> +	*prod_data = (struct prod_data){.dev_id = dev_id,
> +					.port_id = i + 1,
> +					.qid = qid[0] };
> +	*cons_data = (struct cons_data){.dev_id = dev_id,
> +					.port_id = i };
> +
> +	enqueue_cnt = rte_calloc(0,
> +			RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])),
> +			sizeof(enqueue_cnt[0]), 0);
> +	dequeue_cnt = rte_calloc(0,
> +			RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])),
> +			sizeof(dequeue_cnt[0]), 0);

Why array? looks like enqueue_cnt[1] and dequeue_cnt[1] not used anywhere.

> +
> +	if (rte_event_dev_start(dev_id) < 0) {
> +		printf("Error starting eventdev\n");
> +		return -1;
> +	}
> +
> +	return dev_id;
> +}
> +
> +static void
> +signal_handler(int signum)
> +{
> +	if (done || prod_stop)

I think, No one updating the prod_stop

> +		rte_exit(1, "Exiting on signal %d\n", signum);
> +	if (signum == SIGINT || signum == SIGTERM) {
> +		printf("\n\nSignal %d received, preparing to exit...\n",
> +				signum);
> +		done = 1;
> +	}
> +	if (signum == SIGTSTP)
> +		rte_event_dev_dump(0, stdout);
> +}
> +
> +int
> +main(int argc, char **argv)

[...]

> +       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
> +               if (lcore_id >= MAX_NUM_CORE)
> +                       break;
> +
> +               if (!rx_core[lcore_id] && !worker_core[lcore_id] &&
> +                   !tx_core[lcore_id] && !sched_core[lcore_id])
> +                       continue;
> +
> +               if (rx_core[lcore_id])
> +                       printf(
> +                                "[%s()] lcore %d executing NIC Rx, and using eventdev port %u\n",
> +                                __func__, lcore_id, prod_data.port_id);

These prints wont show if rx,tx, scheduler running on master core(as we
are browsing through RTE_LCORE_FOREACH_SLAVE)

> +
> +	if (!quiet) {
> +		printf("\nPort Workload distribution:\n");
> +		uint32_t i;
> +		uint64_t tot_pkts = 0;
> +		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
> +		for (i = 0; i < num_workers; i++) {
> +			char statname[64];
> +			snprintf(statname, sizeof(statname), "port_%u_rx",
> +					worker_data[i].port_id);

Please check "port_%u_rx" xstat availability with PMD first.

> +			pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
> +					dev_id, statname, NULL);
> +			tot_pkts += pkts_per_wkr[i];
> +		}
> +		for (i = 0; i < num_workers; i++) {
> +			float pc = pkts_per_wkr[i]  * 100 /
> +				((float)tot_pkts);
> +			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
> +					i, pc, pkts_per_wkr[i]);
> +		}
> +
> +	}
> +
> +	return 0;
> +}

As final note, considering the different options in fastpath, I was
thinking like introducing app/test-eventdev like app/testpmd and have
set of function pointers# for different modes like "macswap", "txonly"
in testpmd to exercise different options and framework for adding new use
cases.I will work on that to check the feasibility.

##
struct fwd_engine {
        const char       *fwd_mode_name; /**< Forwarding mode name. */
        port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special to do. */ 
        port_fwd_end_t   port_fwd_end;   /**< NULL if nothing special to do. */ 
        packet_fwd_t     packet_fwd;     /**< Mandatory. */
};

> -- 
> 2.7.4
>
  
Eads, Gage May 10, 2017, 4:40 p.m. UTC | #2
>  -----Original Message-----
>  From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
>  Sent: Wednesday, May 10, 2017 9:12 AM
>  To: Van Haaren, Harry <harry.van.haaren@intel.com>
>  Cc: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Richardson, Bruce
>  <bruce.richardson@intel.com>
>  Subject: Re: [PATCH 1/3] examples/eventdev_pipeline: added sample app
>  
>  -----Original Message-----
>  > Date: Fri, 21 Apr 2017 10:51:37 +0100
>  > From: Harry van Haaren <harry.van.haaren@intel.com>
>  > To: dev@dpdk.org
>  > CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
>  > <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
>  > Richardson <bruce.richardson@intel.com>
>  > Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
>  > X-Mailer: git-send-email 2.7.4
>  >
>  > This commit adds a sample app for the eventdev library.
>  > The app has been tested with DPDK 17.05-rc2, hence this release (or
>  > later) is recommended.
>  >

<snip>

>  
>  > +		ev[i].op = RTE_EVENT_OP_NEW;
>  > +		ev[i].sched_type = queue_type;
>  
>  The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY !=
>  RTE_SCHED_TYPE_ORDERED. So, we cannot assign .sched_type as
>  queue_type.
>  
>  I think, one option could be to avoid translation in application is to
>  - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES,
>  RTE_EVENT_QUEUE_CFG_*_ONLY
>  - Introduce a new RTE_EVENT_DEV_CAP_ to denote
>  RTE_EVENT_QUEUE_CFG_ALL_TYPES cap ability
>  - add sched_type in struct rte_event_queue_conf. If capability flag is
>    not set then implementation takes sched_type value for the queue.
>  
>  Any thoughts?

I'm not sure this change is needed. We could create a sched_type[] array, indexed by queue ID, for assigning the event's sched type.

With the proposed approach, the sched_type assignment would still be needed for "all types"-capable PMDs, so I'm not sure it buys us anything in the application.

>  
>  
>  > +		ev[i].queue_id = qid;
>  > +		ev[i].event_type = RTE_EVENT_TYPE_CPU;
>  
>  IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is producing
>  the Ethernet packets/events.
>  
>  > +		ev[i].sub_event_type = 0;
>  > +		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
>  > +		ev[i].mbuf = mbufs[i];
>  > +		RTE_SET_USED(prio_idx);
>  > +	}
>  > +
>  > +	const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev,
>  > +nb_rx);
>  
>  For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do
>  burst operation unlike FORWARD case(which is one event at a time).Earlier, I
>  thought I can abstract the producer pattern in PMD, but it looks like we are
>  going with application driven producer model based on latest RFC.So I think,
>  we can add one flag to rte_event_enqueue_burst to denote all the events are
>  of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this.
>  
>  I can send a patch for the same.
>  
>  Any thoughts?
>  

Makes sense, though I'm a little hesitant about putting this sort of PMD-specific hint in the enqueue API. Perhaps we can use the impl_opaque field, or have the PMD inspect the event_type (if TYPE_ETHDEV, assume all packets in the burst are NEWs)?

>  
>  > +	if (nb_tx != nb_rx) {
>  > +		for (i = nb_tx; i < nb_rx; i++)
>  > +			rte_pktmbuf_free(mbufs[i]);
>  > +	}
>  > +	enqueue_cnt[0] += nb_tx;
>  > +
>  > +	if (unlikely(prod_stop))
>  
>  I think, No one updating the prod_stop
>  
>  > +		done = 1;
>  > +
>  > +	return 0;
>  > +}
>  > +
>  > +static inline void
>  > +schedule_devices(uint8_t dev_id, unsigned lcore_id) {
>  > +	if (rx_core[lcore_id] && (rx_single ||
>  > +	    rte_atomic32_cmpset(&rx_lock, 0, 1))) {
>  
>  This pattern(rte_atomic32_cmpset) makes application can inject only "one
>  core" worth of packets. Not enough for low-end cores. May be we need
>  multiple producer options. I think, new RFC is addressing it.
>  

Right, in the "wimpy" core case one can partition the Rx queues across multiple adapters, and assign the adapters to different service cores. The proposal doesn't explicitly state this, but rte_eth_rx_event_adapter_run() is not intended to be MT-safe -- so the service core implementation would need something along the lines of the cmpset if a service is affinitized to multiple service cores.

Thanks,
Gage
  
Eads, Gage May 10, 2017, 8:16 p.m. UTC | #3
>  -----Original Message-----
>  From: Eads, Gage
>  Sent: Wednesday, May 10, 2017 11:41 AM
>  To: 'Jerin Jacob' <jerin.jacob@caviumnetworks.com>; Van Haaren, Harry
>  <harry.van.haaren@intel.com>
>  Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>
>  Subject: RE: [PATCH 1/3] examples/eventdev_pipeline: added sample app
>  
>  
>  
>  >  -----Original Message-----
>  >  From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
>  >  Sent: Wednesday, May 10, 2017 9:12 AM
>  >  To: Van Haaren, Harry <harry.van.haaren@intel.com>
>  >  Cc: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Richardson, Bruce
>  > <bruce.richardson@intel.com>
>  >  Subject: Re: [PATCH 1/3] examples/eventdev_pipeline: added sample app
>  >
>  >  -----Original Message-----
>  >  > Date: Fri, 21 Apr 2017 10:51:37 +0100  > From: Harry van Haaren
>  > <harry.van.haaren@intel.com>  > To: dev@dpdk.org  > CC:
>  > jerin.jacob@caviumnetworks.com, Harry van Haaren  >
>  > <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
>  > > Richardson <bruce.richardson@intel.com>  > Subject: [PATCH 1/3]
>  > examples/eventdev_pipeline: added sample app  > X-Mailer:
>  > git-send-email 2.7.4  >  > This commit adds a sample app for the
>  > eventdev library.
>  >  > The app has been tested with DPDK 17.05-rc2, hence this release (or
>  > > later) is recommended.
>  >  >
>  
>  <snip>
>  
>  >
>  >  > +		ev[i].op = RTE_EVENT_OP_NEW;
>  >  > +		ev[i].sched_type = queue_type;
>  >
>  >  The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY !=
>  > RTE_SCHED_TYPE_ORDERED. So, we cannot assign .sched_type as
>  > queue_type.
>  >
>  >  I think, one option could be to avoid translation in application is
>  > to
>  >  - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES,
>  RTE_EVENT_QUEUE_CFG_*_ONLY
>  >  - Introduce a new RTE_EVENT_DEV_CAP_ to denote
>  > RTE_EVENT_QUEUE_CFG_ALL_TYPES cap ability
>  >  - add sched_type in struct rte_event_queue_conf. If capability flag is
>  >    not set then implementation takes sched_type value for the queue.
>  >
>  >  Any thoughts?
>  
>  I'm not sure this change is needed. We could create a sched_type[] array,
>  indexed by queue ID, for assigning the event's sched type.
>  
>  With the proposed approach, the sched_type assignment would still be needed
>  for "all types"-capable PMDs, so I'm not sure it buys us anything in the
>  application.
>  
>  >
>  >
>  >  > +		ev[i].queue_id = qid;
>  >  > +		ev[i].event_type = RTE_EVENT_TYPE_CPU;
>  >
>  >  IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is
>  > producing  the Ethernet packets/events.
>  >
>  >  > +		ev[i].sub_event_type = 0;
>  >  > +		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
>  >  > +		ev[i].mbuf = mbufs[i];
>  >  > +		RTE_SET_USED(prio_idx);
>  >  > +	}
>  >  > +
>  >  > +	const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev,
>  >  > +nb_rx);
>  >
>  >  For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do
>  > burst operation unlike FORWARD case(which is one event at a
>  > time).Earlier, I  thought I can abstract the producer pattern in PMD,
>  > but it looks like we are  going with application driven producer model
>  > based on latest RFC.So I think,  we can add one flag to
>  > rte_event_enqueue_burst to denote all the events are  of type
>  RTE_EVENT_OP_NEW as hint.SW driver can ignore this.
>  >
>  >  I can send a patch for the same.
>  >
>  >  Any thoughts?
>  >
>  
>  Makes sense, though I'm a little hesitant about putting this sort of PMD-specific
>  hint in the enqueue API. Perhaps we can use the impl_opaque field, or have the
>  PMD inspect the event_type (if TYPE_ETHDEV, assume all packets in the burst
>  are NEWs)?
>  

Another idea -- what if the enqueue burst argument had these four values: Mixed, all RTE_EVENT_OP_NEW, all RTE_EVENT_OP_FWD, all RTE_EVENT_OP_RELEASE?
In the "all _" cases, the app doesn't need to set the event's op field and the PMD can potentially optimize for that type of operation.
In the "mixed" case the PMD would inspect each event's op field and handle them accordingly.
  
Jerin Jacob May 17, 2017, 6:03 p.m. UTC | #4
-----Original Message-----
> Date: Fri, 21 Apr 2017 10:51:37 +0100
> From: Harry van Haaren <harry.van.haaren@intel.com>
> To: dev@dpdk.org
> CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
>  <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
>  Richardson <bruce.richardson@intel.com>
> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
> X-Mailer: git-send-email 2.7.4
> 
> This commit adds a sample app for the eventdev library.
> The app has been tested with DPDK 17.05-rc2, hence this
> release (or later) is recommended.
> 
> The sample app showcases a pipeline processing use-case,
> with event scheduling and processing defined per stage.
> The application recieves traffic as normal, with each
> packet traversing the pipeline. Once the packet has
> been processed by each of the pipeline stages, it is
> transmitted again.
> 
> The app provides a framework to utilize cores for a single
> role or multiple roles. Examples of roles are the RX core,
> TX core, Scheduling core (in the case of the event/sw PMD),
> and worker cores.
> 
> Various flags are available to configure numbers of stages,
> cycles of work at each stage, type of scheduling, number of
> worker cores, queue depths etc. For a full explaination,
> please refer to the documentation.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> ---
> +
> +static inline void
> +schedule_devices(uint8_t dev_id, unsigned lcore_id)
> +{
> +	if (rx_core[lcore_id] && (rx_single ||
> +	    rte_atomic32_cmpset(&rx_lock, 0, 1))) {
> +		producer();
> +		rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
> +	}
> +
> +	if (sched_core[lcore_id] && (sched_single ||
> +	    rte_atomic32_cmpset(&sched_lock, 0, 1))) {
> +		rte_event_schedule(dev_id);

One question here,

Does rte_event_schedule()'s SW PMD implementation capable of running
concurrently on multiple cores?

Context:
Currently I am writing a testpmd like test framework to realize
different use cases along with with performance test cases like throughput
and latency and making sure it works on SW and HW driver.

I see the following segfault problem when rte_event_schedule() invoked on
multiple core currently. Is it expected?

#0  0x000000000043e945 in __pull_port_lb (allow_reorder=0, port_id=2,
sw=0x7ff93f3cb540) at
/export/dpdk-thunderx/drivers/event/sw/sw_evdev_scheduler.c:406
/export/dpdk-thunderx/drivers/event/sw/sw_evdev_scheduler.c:406:11647:beg:0x43e945
[Current thread is 1 (Thread 0x7ff9fbd34700 (LWP 796))]
(gdb) bt
#0  0x000000000043e945 in __pull_port_lb (allow_reorder=0, port_id=2,
sw=0x7ff93f3cb540) at
/export/dpdk-thunderx/drivers/event/sw/sw_evdev_scheduler.c:406
#1  sw_schedule_pull_port_no_reorder (port_id=2, sw=0x7ff93f3cb540) at
/export/dpdk-thunderx/drivers/event/sw/sw_evdev_scheduler.c:495
#2  sw_event_schedule (dev=<optimized out>) at
/export/dpdk-thunderx/drivers/event/sw/sw_evdev_scheduler.c:566
#3  0x000000000040b4af in rte_event_schedule (dev_id=<optimized out>) at
/export/dpdk-thunderx/build/include/rte_eventdev.h:1092
#4  worker (arg=<optimized out>) at
/export/dpdk-thunderx/app/test-eventdev/test_queue_order.c:200
#5  0x000000000042d14b in eal_thread_loop (arg=<optimized out>) at
/export/dpdk-thunderx/lib/librte_eal/linuxapp/eal/eal_thread.c:184
#6  0x00007ff9fd8e32e7 in start_thread () from /usr/lib/libpthread.so.0
#7  0x00007ff9fd62454f in clone () from /usr/lib/libc.so.6
(gdb) list
401			 */
402			uint32_t iq_num = PRIO_TO_IQ(qe->priority);
403			struct sw_qid *qid = &sw->qids[qe->queue_id];
404
405			if ((flags & QE_FLAG_VALID) &&
406
iq_ring_free_count(qid->iq[iq_num]) == 0)
407				break;
408
409			/* now process based on flags. Note that for
directed
410			 * queues, the enqueue_flush masks off all but
the
(gdb) 





> +		if (dump_dev_signal) {
> +			rte_event_dev_dump(0, stdout);
> +			dump_dev_signal = 0;
> +		}
> +		rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
> +	}
> +
> +	if (tx_core[lcore_id] && (tx_single ||
> +	    rte_atomic32_cmpset(&tx_lock, 0, 1))) {
> +		consumer();
> +		rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
> +	}
> +}
> +
  
Bruce Richardson May 18, 2017, 10:13 a.m. UTC | #5
On Wed, May 17, 2017 at 11:33:16PM +0530, Jerin Jacob wrote:
> -----Original Message-----
> > Date: Fri, 21 Apr 2017 10:51:37 +0100
> > From: Harry van Haaren <harry.van.haaren@intel.com>
> > To: dev@dpdk.org
> > CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
> >  <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
> >  Richardson <bruce.richardson@intel.com>
> > Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
> > X-Mailer: git-send-email 2.7.4
> > 
> > This commit adds a sample app for the eventdev library.
> > The app has been tested with DPDK 17.05-rc2, hence this
> > release (or later) is recommended.
> > 
> > The sample app showcases a pipeline processing use-case,
> > with event scheduling and processing defined per stage.
> > The application recieves traffic as normal, with each
> > packet traversing the pipeline. Once the packet has
> > been processed by each of the pipeline stages, it is
> > transmitted again.
> > 
> > The app provides a framework to utilize cores for a single
> > role or multiple roles. Examples of roles are the RX core,
> > TX core, Scheduling core (in the case of the event/sw PMD),
> > and worker cores.
> > 
> > Various flags are available to configure numbers of stages,
> > cycles of work at each stage, type of scheduling, number of
> > worker cores, queue depths etc. For a full explaination,
> > please refer to the documentation.
> > 
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > ---
> > +
> > +static inline void
> > +schedule_devices(uint8_t dev_id, unsigned lcore_id)
> > +{
> > +	if (rx_core[lcore_id] && (rx_single ||
> > +	    rte_atomic32_cmpset(&rx_lock, 0, 1))) {
> > +		producer();
> > +		rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
> > +	}
> > +
> > +	if (sched_core[lcore_id] && (sched_single ||
> > +	    rte_atomic32_cmpset(&sched_lock, 0, 1))) {
> > +		rte_event_schedule(dev_id);
> 
> One question here,
> 
> Does rte_event_schedule()'s SW PMD implementation capable of running
> concurrently on multiple cores?
>

No, it's not. It's designed to be called on a single (dedicated) core.

> Context:
> Currently I am writing a testpmd like test framework to realize
> different use cases along with with performance test cases like throughput
> and latency and making sure it works on SW and HW driver.
> 
> I see the following segfault problem when rte_event_schedule() invoked on
> multiple core currently. Is it expected?

Yes,pretty much.

/Bruce
  
Hunt, David June 26, 2017, 2:41 p.m. UTC | #6
This patchset introduces a sample application that demonstrates
a pipeline model for packet processing. Running this sample app
with 17.05-rc2 or later is recommended.

Changes in patch v2:
  * Re-work based on comments on mailing list. No major functional changes.
  * Checkpatch cleanup of a couple of typos

The sample app itself allows configuration of various pipelines using
command line arguments. Parameters like number of stages, number of
worker cores, which cores are assigned to specific tasks, and work-
cycles per-stage in the pipeline can be configured.

Documentation for eventdev is added for the programmers guide and
sample app user guide, providing sample commands to run the app with,
and expected output.

The sample app is presented here as an RFC to the next-eventdev tree
to work towards having eventdev PMD generic sample applications.

[1/3] examples/eventdev_pipeline: added sample app
[2/3] doc: add eventdev pipeline to sample app ug
[3/3] doc: add eventdev library to programmers guide
  
Hunt, David June 26, 2017, 2:46 p.m. UTC | #7
Hi Jerin,

I'm assisting Harry on the sample app, and have just pushed up a V2 
patch based on your feedback. I've addressed most of your suggestions, 
comments below. There may still a couple of outstanding questions that 
need further discussion.

Regards,
Dave

On 10/5/2017 3:12 PM, Jerin Jacob wrote:
 > -----Original Message-----
 >> Date: Fri, 21 Apr 2017 10:51:37 +0100
 >> From: Harry van Haaren <harry.van.haaren@intel.com>
 >> To: dev@dpdk.org
 >> CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
 >> <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
 >>  Richardson <bruce.richardson@intel.com>
 >> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
 >> X-Mailer: git-send-email 2.7.4
 >>
 >> This commit adds a sample app for the eventdev library.
 >> The app has been tested with DPDK 17.05-rc2, hence this
 >> release (or later) is recommended.
 >>
 >> The sample app showcases a pipeline processing use-case,
 >> with event scheduling and processing defined per stage.
 >> The application recieves traffic as normal, with each
 >> packet traversing the pipeline. Once the packet has
 >> been processed by each of the pipeline stages, it is
 >> transmitted again.
 >>
 >> The app provides a framework to utilize cores for a single
 >> role or multiple roles. Examples of roles are the RX core,
 >> TX core, Scheduling core (in the case of the event/sw PMD),
 >> and worker cores.
 >>
 >> Various flags are available to configure numbers of stages,
 >> cycles of work at each stage, type of scheduling, number of
 >> worker cores, queue depths etc. For a full explaination,
 >> please refer to the documentation.
 >>
 >> Signed-off-by: Gage Eads <gage.eads@intel.com>
 >> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
 >> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
 >
 > Thanks for the example application to share the SW view.
 > I could make it run on HW after some tweaking(not optimized though)
 >
 > [...]
 >> +#define MAX_NUM_STAGES 8
 >> +#define BATCH_SIZE 16
 >> +#define MAX_NUM_CORE 64
 >
 > How about RTE_MAX_LCORE?

Core usage in the sample app is held in a uint64_t. Adding arrays would 
be possible, but I feel that the extra effort would not give that much 
benefit. I've left as is for the moment, unless you see any strong 
requirement to go beyond 64 cores?

 >
 >> +
 >> +static unsigned int active_cores;
 >> +static unsigned int num_workers;
 >> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
 >> +static unsigned int num_fids = 512;
 >> +static unsigned int num_priorities = 1;
 >
 > looks like its not used.

Yes, Removed.

 >
 >> +static unsigned int num_stages = 1;
 >> +static unsigned int worker_cq_depth = 16;
 >> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
 >> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
 >> +static int16_t qid[MAX_NUM_STAGES] = {-1};
 >
 > Moving all fastpath related variables under a structure with cache
 > aligned will help.

I tried a few different combinations of this, and saw no gains, some 
losses. So will leave as is for the moment, if that's OK.

 >
 >> +static int worker_cycles;
 >> +static int enable_queue_priorities;
 >> +
 >> +struct prod_data {
 >> +    uint8_t dev_id;
 >> +    uint8_t port_id;
 >> +    int32_t qid;
 >> +    unsigned num_nic_ports;
 >> +};

Yes, saw a percent or two gain when this plus following two data structs 
cache aligned.

 >
 > cache aligned ?
 >
 >> +
 >> +struct cons_data {
 >> +    uint8_t dev_id;
 >> +    uint8_t port_id;
 >> +};
 >> +
 >
 > cache aligned ?

Yes, see comment above

 >
 >> +static struct prod_data prod_data;
 >> +static struct cons_data cons_data;
 >> +
 >> +struct worker_data {
 >> +    uint8_t dev_id;
 >> +    uint8_t port_id;
 >> +};
 >
 > cache aligned ?

Yes, see comment above


 >
 >> +
 >> +static unsigned *enqueue_cnt;
 >> +static unsigned *dequeue_cnt;
 >> +
 >> +static volatile int done;
 >> +static volatile int prod_stop;
 >
 > No one updating the prod_stop.

Old var, removed.

 >
 >> +static int quiet;
 >> +static int dump_dev;
 >> +static int dump_dev_signal;
 >> +
 >> +static uint32_t rx_lock;
 >> +static uint32_t tx_lock;
 >> +static uint32_t sched_lock;
 >> +static bool rx_single;
 >> +static bool tx_single;
 >> +static bool sched_single;
 >> +
 >> +static unsigned rx_core[MAX_NUM_CORE];
 >> +static unsigned tx_core[MAX_NUM_CORE];
 >> +static unsigned sched_core[MAX_NUM_CORE];
 >> +static unsigned worker_core[MAX_NUM_CORE];
 >> +
 >> +static bool
 >> +core_in_use(unsigned lcore_id) {
 >> +    return (rx_core[lcore_id] || sched_core[lcore_id] ||
 >> +        tx_core[lcore_id] || worker_core[lcore_id]);
 >> +}
 >> +
 >> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
 >> +
 >> +static void
 >> +rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
 >> +            void *userdata)
 >
 > IMO, It is better to not use rte_eth_* for application functions.

Sure, removed the 'rte_' part of the function name.

 >
 >> +{
 >> +    int port_id = (uintptr_t) userdata;
 >> +    unsigned _sent = 0;
 >> +
 >> +    do {
 >> +        /* Note: hard-coded TX queue */
 >> +        _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
 >> +                      unsent - _sent);
 >> +    } while (_sent != unsent);
 >> +}
 >> +
 >> +static int
 >> +consumer(void)
 >> +{
 >> +    const uint64_t freq_khz = rte_get_timer_hz() / 1000;
 >> +    struct rte_event packets[BATCH_SIZE];
 >> +
 >> +    static uint64_t npackets;
 >> +    static uint64_t received;
 >> +    static uint64_t received_printed;
 >> +    static uint64_t time_printed;
 >> +    static uint64_t start_time;
 >> +    unsigned i, j;
 >> +    uint8_t dev_id = cons_data.dev_id;
 >> +    uint8_t port_id = cons_data.port_id;
 >> +
 >> +    if (!npackets)
 >> +        npackets = num_packets;
 >> +
 >> +    do {
 >> +        uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
 >> +                packets, RTE_DIM(packets), 0);
 >
 >         const uint16_t n =

sure.

 >
 >> +
 >> +        if (n == 0) {
 >> +            for (j = 0; j < rte_eth_dev_count(); j++)
 >> +                rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
 >> +            return 0;
 >> +        }
 >> +        if (start_time == 0)
 >> +            time_printed = start_time = rte_get_timer_cycles();
 >> +
 >> +        received += n;
 >> +        for (i = 0; i < n; i++) {
 >> +            uint8_t outport = packets[i].mbuf->port;
 >> +            rte_eth_tx_buffer(outport, 0, tx_buf[outport],
 >> +                    packets[i].mbuf);
 >> +        }
 >> +
 >> +        if (!quiet && received >= received_printed + (1<<22)) {
 >> +            const uint64_t now = rte_get_timer_cycles();
 >> +            const uint64_t delta_cycles = now - start_time;
 >> +            const uint64_t elapsed_ms = delta_cycles / freq_khz;
 >> +            const uint64_t interval_ms =
 >> +                    (now - time_printed) / freq_khz;
 >> +
 >> +            uint64_t rx_noprint = received - received_printed;
 >> +            printf("# consumer RX=%"PRIu64", time %"PRIu64
 >> +                "ms, avg %.3f mpps [current %.3f mpps]\n",
 >> +                    received, elapsed_ms,
 >> +                    (received) / (elapsed_ms * 1000.0),
 >> +                    rx_noprint / (interval_ms * 1000.0));
 >> +            received_printed = received;
 >> +            time_printed = now;
 >> +        }
 >> +
 >> +        dequeue_cnt[0] += n;
 >> +
 >> +        if (num_packets > 0 && npackets > 0) {
 >> +            npackets -= n;
 >> +            if (npackets == 0 || npackets > num_packets)
 >> +                done = 1;
 >> +        }
 >
 > Looks like very complicated logic.I think we can simplify it.

I've simplified this.


 >
 >> +    } while (0);
 >
 > do while(0); really required here?

Removed.

 >
 >> +
 >> +    return 0;
 >> +}
 >> +
 >> +static int
 >> +producer(void)
 >> +{
 >> +    static uint8_t eth_port;
 >> +    struct rte_mbuf *mbufs[BATCH_SIZE];
 >> +    struct rte_event ev[BATCH_SIZE];
 >> +    uint32_t i, num_ports = prod_data.num_nic_ports;
 >> +    int32_t qid = prod_data.qid;
 >> +    uint8_t dev_id = prod_data.dev_id;
 >> +    uint8_t port_id = prod_data.port_id;
 >> +    uint32_t prio_idx = 0;
 >> +
 >> +    const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, 
BATCH_SIZE);
 >> +    if (++eth_port == num_ports)
 >> +        eth_port = 0;
 >> +    if (nb_rx == 0) {
 >> +        rte_pause();
 >> +        return 0;
 >> +    }
 >> +
 >> +    for (i = 0; i < nb_rx; i++) {
 >> +        ev[i].flow_id = mbufs[i]->hash.rss;
 >
 > prefetching the buff[i+1] may help here?

I tried, didn't make much difference.

 >
 >> +        ev[i].op = RTE_EVENT_OP_NEW;
 >> +        ev[i].sched_type = queue_type;
 >
 > The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != 
RTE_SCHED_TYPE_ORDERED. So, we
 > cannot assign .sched_type as queue_type.
 >
 > I think, one option could be to avoid translation in application is to
 > - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
 > - Introduce a new RTE_EVENT_DEV_CAP_ to denote 
RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
 > ability
 > - add sched_type in struct rte_event_queue_conf. If capability flag is
 >   not set then implementation takes sched_type value for the queue.
 >
 > Any thoughts?


Not sure here, would it be ok for the moment, and we can work on a patch 
in the future?

 >
 >
 >> +        ev[i].queue_id = qid;
 >> +        ev[i].event_type = RTE_EVENT_TYPE_CPU;
 >
 > IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is
 > producing the Ethernet packets/events.

Changed to RTE_EVENT_TYPE_ETHDEV.


 >
 >> +        ev[i].sub_event_type = 0;
 >> +        ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
 >> +        ev[i].mbuf = mbufs[i];
 >> +        RTE_SET_USED(prio_idx);
 >> +    }
 >> +
 >> +    const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, 
nb_rx);
 >
 > For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do 
burst
 > operation unlike FORWARD case(which is one event at a time).Earlier, I
 > thought I can abstract the producer pattern in PMD, but it looks like we
 > are going with application driven producer model based on latest 
RFC.So I think,
 > we can add one flag to rte_event_enqueue_burst to denote all the events
 > are of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this.
 >
 > I can send a patch for the same.
 >
 > Any thoughts?

I think this comment is closed now, as your patch is upstreamed, afaik?

 >
 >
 >> +    if (nb_tx != nb_rx) {
 >> +        for (i = nb_tx; i < nb_rx; i++)
 >> +            rte_pktmbuf_free(mbufs[i]);
 >> +    }
 >> +    enqueue_cnt[0] += nb_tx;
 >> +
 >> +    if (unlikely(prod_stop))
 >
 > I think, No one updating the prod_stop

Removed.

 >
 >> +        done = 1;
 >> +
 >> +    return 0;
 >> +}
 >> +
 >> +static inline void
 >> +schedule_devices(uint8_t dev_id, unsigned lcore_id)
 >> +{
 >> +    if (rx_core[lcore_id] && (rx_single ||
 >> +        rte_atomic32_cmpset(&rx_lock, 0, 1))) {
 >
 > This pattern(rte_atomic32_cmpset) makes application can inject only
 > "one core" worth of packets. Not enough for low-end cores. May be we need
 > multiple producer options. I think, new RFC is addressing it.

OK, will leave this to the RFC.

 >
 >> +        producer();
 >> +        rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
 >> +    }
 >> +
 >> +    if (sched_core[lcore_id] && (sched_single ||
 >> +        rte_atomic32_cmpset(&sched_lock, 0, 1))) {
 >> +        rte_event_schedule(dev_id);
 >> +        if (dump_dev_signal) {
 >> +            rte_event_dev_dump(0, stdout);
 >> +            dump_dev_signal = 0;
 >> +        }
 >> +        rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
 >> +    }
 >
 > Lot of unwanted code if RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED set.
 >
 > I think, We can make common code with compile time aware and make
 > runtime workers based on the flag..
 > i.e
 > rte_eal_remote_launch(worker_x, &worker_data[worker_idx], lcore_id);
 > rte_eal_remote_launch(worker_y, &worker_data[worker_idx], lcore_id);
 >
 > May we can improve after initial version.

Yes, we can clean up after initial version.


 >
 >> +
 >> +    if (tx_core[lcore_id] && (tx_single ||
 >> +        rte_atomic32_cmpset(&tx_lock, 0, 1))) {
 >> +        consumer();
 >
 > Should consumer() need to come in this pattern? I am thinking like
 > if events is from last stage then call consumer() in worker()
 >
 > I think, above scheme works better when the _same_ worker code need 
to run the
 > case where
 > 1) ethdev HW is capable to enqueuing the packets to same txq from
 >   multiple thread
 > 2) ethdev is not capable to do so.
 >
 > So, The above cases can be addressed in configuration time where we link
 > the queues to port
 > case 1) Link all workers to last queue
 > case 2) Link only worker to last queue
 >
 > and keeping the common worker code.
 >
 > HW implementation has functional and performance issue if "two" ports are
 > assigned to one lcore for dequeue. The above scheme fixes that 
problem too.


Can we have a bit more discussion on this item? Is this needed for this 
sample app, or can we perhaps work a patch for this later? Harry?


 >
 >> +        rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
 >> +    }
 >> +}
 >> +
 >> +static int
 >> +worker(void *arg)
 >> +{
 >> +    struct rte_event events[BATCH_SIZE];
 >> +
 >> +    struct worker_data *data = (struct worker_data *)arg;
 >> +    uint8_t dev_id = data->dev_id;
 >> +    uint8_t port_id = data->port_id;
 >> +    size_t sent = 0, received = 0;
 >> +    unsigned lcore_id = rte_lcore_id();
 >> +
 >> +    while (!done) {
 >> +        uint16_t i;
 >> +
 >> +        schedule_devices(dev_id, lcore_id);
 >> +
 >> +        if (!worker_core[lcore_id]) {
 >> +            rte_pause();
 >> +            continue;
 >> +        }
 >> +
 >> +        uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
 >> +                events, RTE_DIM(events), 0);
 >> +
 >> +        if (nb_rx == 0) {
 >> +            rte_pause();
 >> +            continue;
 >> +        }
 >> +        received += nb_rx;
 >> +
 >> +        for (i = 0; i < nb_rx; i++) {
 >> +            struct ether_hdr *eth;
 >> +            struct ether_addr addr;
 >> +            struct rte_mbuf *m = events[i].mbuf;
 >> +
 >> +            /* The first worker stage does classification */
 >> +            if (events[i].queue_id == qid[0])
 >> +                events[i].flow_id = m->hash.rss % num_fids;
 >
 > Not sure why we need do(shrinking the flows) this in worker() in 
queue based pipeline.
 > If an PMD has any specific requirement on num_fids,I think, we
 > can move this configuration stage or PMD can choose optimum fid 
internally to
 > avoid modulus operation tax in fastpath in all PMD.
 >
 > Does struct rte_event_queue_conf.nb_atomic_flows help here?

In my tests the modulus makes very little difference in the throughput. 
And I think it's good to have a way of varying the number of flows for 
testing different scenarios, even if it's not the most performant.

 >
 >> +
 >> +            events[i].queue_id = next_qid[events[i].queue_id];
 >> +            events[i].op = RTE_EVENT_OP_FORWARD;
 >
 > missing events[i].sched_type.HW PMD does not work with this.
 > I think, we can use similar scheme like next_qid for next_sched_type.

Done. added events[i].sched_type = queue_type.

 >
 >> +
 >> +            /* change mac addresses on packet (to use mbuf data) */
 >> +            eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
 >> +            ether_addr_copy(&eth->d_addr, &addr);
 >> +            ether_addr_copy(&eth->s_addr, &eth->d_addr);
 >> +            ether_addr_copy(&addr, &eth->s_addr);
 >
 > IMO, We can make packet processing code code as "static inline 
function" so
 > different worker types can reuse.

Done. moved out to a work() function.

 >
 >> +
 >> +            /* do a number of cycles of work per packet */
 >> +            volatile uint64_t start_tsc = rte_rdtsc();
 >> +            while (rte_rdtsc() < start_tsc + worker_cycles)
 >> +                rte_pause();
 >
 > Ditto.

Done. moved out to a work() function.

 >
 > I think, All worker specific variables like "worker_cycles" can moved 
into
 > one structure and use.
 >
 >> +        }
 >> +        uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
 >> +                events, nb_rx);
 >> +        while (nb_tx < nb_rx && !done)
 >> +            nb_tx += rte_event_enqueue_burst(dev_id, port_id,
 >> +                            events + nb_tx,
 >> +                            nb_rx - nb_tx);
 >> +        sent += nb_tx;
 >> +    }
 >> +
 >> +    if (!quiet)
 >> +        printf("  worker %u thread done. RX=%zu TX=%zu\n",
 >> +                rte_lcore_id(), received, sent);
 >> +
 >> +    return 0;
 >> +}
 >> +
 >> +/*
 >> + * Parse the coremask given as argument (hexadecimal string) and fill
 >> + * the global configuration (core role and core count) with the parsed
 >> + * value.
 >> + */
 >> +static int xdigit2val(unsigned char c)
 >
 > multiple instance of "xdigit2val" in DPDK repo. May be we can push this
 > as common code.

Sure, that's something we can look at in a separate patch, now that it's 
being used more and more.

 >
 >> +{
 >> +    int val;
 >> +
 >> +    if (isdigit(c))
 >> +        val = c - '0';
 >> +    else if (isupper(c))
 >> +        val = c - 'A' + 10;
 >> +    else
 >> +        val = c - 'a' + 10;
 >> +    return val;
 >> +}
 >> +
 >> +
 >> +static void
 >> +usage(void)
 >> +{
 >> +    const char *usage_str =
 >> +        "  Usage: eventdev_demo [options]\n"
 >> +        "  Options:\n"
 >> +        "  -n, --packets=N              Send N packets (default 
~32M), 0 implies no limit\n"
 >> +        "  -f, --atomic-flows=N         Use N random flows from 1 
to N (default 16)\n"
 >
 > I think, this parameter now, effects the application fast path code.I 
think,
 > it should eventdev configuration para-mater.

See above comment on num_fids

 >
 >> +        "  -s, --num_stages=N           Use N atomic stages 
(default 1)\n"
 >> +        "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core 
mask\n"
 >> +        "  -w, --worker-mask=core mask  Run worker on CPUs in core 
mask\n"
 >> +        "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core 
mask\n"
 >> +        "  -e  --sched-mask=core mask   Run scheduler on CPUs in 
core mask\n"
 >> +        "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
 >> +        "  -W  --work-cycles=N          Worker cycles (default 0)\n"
 >> +        "  -P  --queue-priority         Enable scheduler queue 
prioritization\n"
 >> +        "  -o, --ordered                Use ordered scheduling\n"
 >> +        "  -p, --parallel               Use parallel scheduling\n"
 >
 > IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
 > operation. It is valid have to any combination. We need to express 
that in
 > command like
 > example:
 > 3 stage with
 > O->A->P

How about we add an option that specifies the mode of operation for each 
stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4 
stages with atomic, parallel, paralled, ordered. Or maybe reuse your 
test-eventdev parameter style?

 >
 >> +        "  -q, --quiet                  Minimize printed output\n"
 >> +        "  -D, --dump                   Print detailed statistics 
before exit"
 >> +        "\n";
 >> +    fprintf(stderr, "%s", usage_str);
 >> +    exit(1);
 >> +}
 >> +
 >
 > [...]
 >
 >> +            rx_single = (popcnt == 1);
 >> +            break;
 >> +        case 't':
 >> +            tx_lcore_mask = parse_coremask(optarg);
 >> +            popcnt = __builtin_popcountll(tx_lcore_mask);
 >> +            tx_single = (popcnt == 1);
 >> +            break;
 >> +        case 'e':
 >> +            sched_lcore_mask = parse_coremask(optarg);
 >> +            popcnt = __builtin_popcountll(sched_lcore_mask);
 >> +            sched_single = (popcnt == 1);
 >> +            break;
 >> +        default:
 >> +            usage();
 >> +        }
 >> +    }
 >> +
 >> +    if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
 >> +        sched_lcore_mask == 0 || tx_lcore_mask == 0) {
 >
 > Need to honor RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED i.e sched_lcore_mask
 > is zero can be valid case.

I'll seperate this out to a check for RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED.
Need to do it later in eventdev_setup(), after eventdev instance is created.

 >
 >> +        printf("Core part of pipeline was not assigned any cores. "
 >> +            "This will stall the pipeline, please check core masks "
 >> +            "(use -h for details on setting core masks):\n"
 >> +            "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
 >> +            "\n\tworkers: %"PRIu64"\n",
 >> +            rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
 >> +            worker_lcore_mask);
 >> +        rte_exit(-1, "Fix core masks\n");
 >> +    }
 >> +    if (num_stages == 0 || num_stages > MAX_NUM_STAGES)
 >> +        usage();
 >> +
 >> +    for (i = 0; i < MAX_NUM_CORE; i++) {
 >> +        rx_core[i] = !!(rx_lcore_mask & (1UL << i));
 >> +        tx_core[i] = !!(tx_lcore_mask & (1UL << i));
 >> +        sched_core[i] = !!(sched_lcore_mask & (1UL << i));
 >> +        worker_core[i] = !!(worker_lcore_mask & (1UL << i));
 >> +
 >> +        if (worker_core[i])
 >> +            num_workers++;
 >> +        if (core_in_use(i))
 >> +            active_cores++;
 >> +    }
 >> +}
 >> +
 >> +
 >> +struct port_link {
 >> +    uint8_t queue_id;
 >> +    uint8_t priority;
 >> +};
 >> +
 >> +static int
 >> +setup_eventdev(struct prod_data *prod_data,
 >> +        struct cons_data *cons_data,
 >> +        struct worker_data *worker_data)
 >> +{
 >> +    const uint8_t dev_id = 0;
 >> +    /* +1 stages is for a SINGLE_LINK TX stage */
 >> +    const uint8_t nb_queues = num_stages + 1;
 >> +    /* + 2 is one port for producer and one for consumer */
 >> +    const uint8_t nb_ports = num_workers + 2;
 >
 > selection of number of ports is a function of rte_event_has_producer().
 > I think, it will be addressed with RFC.
 >
 >> +    const struct rte_event_dev_config config = {
 >> +            .nb_event_queues = nb_queues,
 >> +            .nb_event_ports = nb_ports,
 >> +            .nb_events_limit  = 4096,
 >> +            .nb_event_queue_flows = 1024,
 >> +            .nb_event_port_dequeue_depth = 128,
 >> +            .nb_event_port_enqueue_depth = 128,
 >
 > OCTEONTX PMD driver has .nb_event_port_dequeue_depth = 1 and
 > .nb_event_port_enqueue_depth = 1 and  struct 
rte_event_dev_info.min_dequeue_timeout_ns
 > = 853 value.
 > I think, we need to check the rte_event_dev_info_get() first to get 
the sane
 > values and take RTE_MIN or RTE_MAX based on the use case.
 >
 > or
 >
 > I can ignore this value in OCTEONTX PMD. But I am not sure NXP case,
 > Any thoughts from NXP folks.
 >
 >

Added in code to do the relevant checks. It should now limit the 
enqueue/dequeue depths to the configured depth for the port if it's less.



 >> +    };
 >> +    const struct rte_event_port_conf wkr_p_conf = {
 >> +            .dequeue_depth = worker_cq_depth,
 >
 > Same as above

See previous comment.

 >
 >> +            .enqueue_depth = 64,
 >
 > Same as above

See previous comment.

 >
 >> +            .new_event_threshold = 4096,
 >> +    };
 >> +    struct rte_event_queue_conf wkr_q_conf = {
 >> +            .event_queue_cfg = queue_type,
 >> +            .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
 >> +            .nb_atomic_flows = 1024,
 >> +            .nb_atomic_order_sequences = 1024,
 >> +    };
 >> +    const struct rte_event_port_conf tx_p_conf = {
 >> +            .dequeue_depth = 128,
 >
 > Same as above

See previous comment.

 >> +            .enqueue_depth = 128,
 >
 > Same as above

See previous comment.

 >> +            .new_event_threshold = 4096,
 >> +    };
 >> +    const struct rte_event_queue_conf tx_q_conf = {
 >> +            .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
 >> +            .event_queue_cfg =
 >> +                    RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
 >> +                    RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
 >> +            .nb_atomic_flows = 1024,
 >> +            .nb_atomic_order_sequences = 1024,
 >> +    };
 >> +
 >> +    struct port_link worker_queues[MAX_NUM_STAGES];
 >> +    struct port_link tx_queue;
 >> +    unsigned i;
 >> +
 >> +    int ret, ndev = rte_event_dev_count();
 >> +    if (ndev < 1) {
 >> +        printf("%d: No Eventdev Devices Found\n", __LINE__);
 >> +        return -1;
 >> +    }
 >> +
 >> +    struct rte_event_dev_info dev_info;
 >> +    ret = rte_event_dev_info_get(dev_id, &dev_info);
 >> +    printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
 >> +
 >> +    ret = rte_event_dev_configure(dev_id, &config);
 >> +    if (ret < 0)
 >> +        printf("%d: Error configuring device\n", __LINE__)
 >
 > Don't process further with failed configure.
 >

Done.

 >> +
 >> +    /* Q creation - one load balanced per pipeline stage*/
 >> +
 >> +    /* set up one port per worker, linking to all stage queues */
 >> +    for (i = 0; i < num_workers; i++) {
 >> +        struct worker_data *w = &worker_data[i];
 >> +        w->dev_id = dev_id;
 >> +        if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
 >> +            printf("Error setting up port %d\n", i);
 >> +            return -1;
 >> +        }
 >> +
 >> +        uint32_t s;
 >> +        for (s = 0; s < num_stages; s++) {
 >> +            if (rte_event_port_link(dev_id, i,
 >> +                        &worker_queues[s].queue_id,
 >> +                        &worker_queues[s].priority,
 >> +                        1) != 1) {
 >> +                printf("%d: error creating link for port %d\n",
 >> +                        __LINE__, i);
 >> +                return -1;
 >> +            }
 >> +        }
 >> +        w->port_id = i;
 >> +    }
 >> +    /* port for consumer, linked to TX queue */
 >> +    if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
 >
 > If ethdev supports MT txq queue support then this port can be linked to
 > worker too. something to consider for future.
 >

Sure. No change for now.

 >> +        printf("Error setting up port %d\n", i);
 >> +        return -1;
 >> +    }
 >> +    if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
 >> +                &tx_queue.priority, 1) != 1) {
 >> +        printf("%d: error creating link for port %d\n",
 >> +                __LINE__, i);
 >> +        return -1;
 >> +    }
 >> +    /* port for producer, no links */
 >> +    const struct rte_event_port_conf rx_p_conf = {
 >> +            .dequeue_depth = 8,
 >> +            .enqueue_depth = 8,
 >
 > same as above issue.You could get default config first and configure.
 >

Done. Checking config.nb_event_port_dequeue_depth and reducing if less.

 >> +            .new_event_threshold = 1200,
 >> +    };
 >> +    if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
 >> +        printf("Error setting up port %d\n", i);
 >> +        return -1;
 >> +    }
 >> +
 >> +    *prod_data = (struct prod_data){.dev_id = dev_id,
 >> +                    .port_id = i + 1,
 >> +                    .qid = qid[0] };
 >> +    *cons_data = (struct cons_data){.dev_id = dev_id,
 >> +                    .port_id = i };
 >> +
 >> +    enqueue_cnt = rte_calloc(0,
 >> +            RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])),
 >> +            sizeof(enqueue_cnt[0]), 0);
 >> +    dequeue_cnt = rte_calloc(0,
 >> +            RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])),
 >> +            sizeof(dequeue_cnt[0]), 0);
 >
 > Why array? looks like enqueue_cnt[1] and dequeue_cnt[1] not used 
anywhere.
 >

Looks like there was an intention to extend this more. And all the app 
does is increment without using. I've removed these two vars to clean up.

 >> +
 >> +    if (rte_event_dev_start(dev_id) < 0) {
 >> +        printf("Error starting eventdev\n");
 >> +        return -1;
 >> +    }
 >> +
 >> +    return dev_id;
 >> +}
 >> +
 >> +static void
 >> +signal_handler(int signum)
 >> +{
 >> +    if (done || prod_stop)
 >
 > I think, No one updating the prod_stop
 >

Removed.


 >> +        rte_exit(1, "Exiting on signal %d\n", signum);
 >> +    if (signum == SIGINT || signum == SIGTERM) {
 >> +        printf("\n\nSignal %d received, preparing to exit...\n",
 >> +                signum);
 >> +        done = 1;
 >> +    }
 >> +    if (signum == SIGTSTP)
 >> +        rte_event_dev_dump(0, stdout);
 >> +}
 >> +
 >> +int
 >> +main(int argc, char **argv)
 >
 > [...]
 >
 >> +       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 >> +               if (lcore_id >= MAX_NUM_CORE)
 >> +                       break;
 >> +
 >> +               if (!rx_core[lcore_id] && !worker_core[lcore_id] &&
 >> +                   !tx_core[lcore_id] && !sched_core[lcore_id])
 >> +                       continue;
 >> +
 >> +               if (rx_core[lcore_id])
 >> +                       printf(
 >> +                                "[%s()] lcore %d executing NIC Rx, 
and using eventdev port %u\n",
 >> +                                __func__, lcore_id, prod_data.port_id);
 >
 > These prints wont show if rx,tx, scheduler running on master core(as we
 > are browsing through RTE_LCORE_FOREACH_SLAVE)

OK, changed to RTE_LCORE_FOREACH, which also includes the master core.

 >
 >> +
 >> +    if (!quiet) {
 >> +        printf("\nPort Workload distribution:\n");
 >> +        uint32_t i;
 >> +        uint64_t tot_pkts = 0;
 >> +        uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
 >> +        for (i = 0; i < num_workers; i++) {
 >> +            char statname[64];
 >> +            snprintf(statname, sizeof(statname), "port_%u_rx",
 >> +                    worker_data[i].port_id);
 >
 > Please check "port_%u_rx" xstat availability with PMD first.

Added a check after rte_event_dev_xstats_by_name_get() to see if it's 
not -ENOTSUP.

 >> +            pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
 >> +                    dev_id, statname, NULL);
 >> +            tot_pkts += pkts_per_wkr[i];
 >> +        }
 >> +        for (i = 0; i < num_workers; i++) {
 >> +            float pc = pkts_per_wkr[i]  * 100 /
 >> +                ((float)tot_pkts);
 >> +            printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
 >> +                    i, pc, pkts_per_wkr[i]);
 >> +        }
 >> +
 >> +    }
 >> +
 >> +    return 0;
 >> +}
 >
 > As final note, considering the different options in fastpath, I was
 > thinking like introducing app/test-eventdev like app/testpmd and have
 > set of function pointers# for different modes like "macswap", "txonly"
 > in testpmd to exercise different options and framework for adding new use
 > cases.I will work on that to check the feasibility.
 >
 > ##
 > struct fwd_engine {
 >         const char       *fwd_mode_name; /**< Forwarding mode name. */
 >         port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special 
to do. */
 >         port_fwd_end_t   port_fwd_end;   /**< NULL if nothing special 
to do. */
 >         packet_fwd_t     packet_fwd;     /**< Mandatory. */
 > };
 >
 >> --
 >> 2.7.4
 >>
  
Jerin Jacob June 27, 2017, 9:35 a.m. UTC | #8
-----Original Message-----
> Date: Mon, 26 Jun 2017 15:46:47 +0100
> From: "Hunt, David" <david.hunt@intel.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>, Harry van Haaren
>  <harry.van.haaren@intel.com>
> CC: dev@dpdk.org, Gage Eads <gage.eads@intel.com>, Bruce Richardson
>  <bruce.richardson@intel.com>
> Subject: Re: [dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added
>  sample app
> User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101
>  Thunderbird/45.8.0
> 
> Hi Jerin,

Hi David,

Looks like you have sent the old version. The below mentioned comments
are not addressed in v2.
> 
> I'm assisting Harry on the sample app, and have just pushed up a V2 patch
> based on your feedback. I've addressed most of your suggestions, comments
> below. There may still a couple of outstanding questions that need further
> discussion.

A few general comments:
1) Nikhil/Gage's proposal on ethdev rx to eventdev adapter will change the major
portion(eventdev setup and producer()) of this application
2) Producing one lcore worth of packets, really cant show as example
eventdev application as it will be pretty bad in low-end machine.
At least application infrastructure should not limit.

Considering above points, Should we wait for rx adapter to complete
first? I would like to show this as real world application to use eventdev.

Thoughts?

On the same note:
Can we finalize on rx adapter proposal? I can work on v1 of patch and
common code if Nikhil or Gage don't have bandwidth. Let me know?

last followup:
http://dpdk.org/ml/archives/dev/2017-June/068776.html

> 
> Regards,
> Dave
> 
> On 10/5/2017 3:12 PM, Jerin Jacob wrote:
> > -----Original Message-----
> >> Date: Fri, 21 Apr 2017 10:51:37 +0100
> >> From: Harry van Haaren <harry.van.haaren@intel.com>
> >> To: dev@dpdk.org
> >> CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
> >> <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
> >>  Richardson <bruce.richardson@intel.com>
> >> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
> >> X-Mailer: git-send-email 2.7.4
> >>
> >> This commit adds a sample app for the eventdev library.
> >> The app has been tested with DPDK 17.05-rc2, hence this
> >> release (or later) is recommended.
> >>
> >> The sample app showcases a pipeline processing use-case,
> >> with event scheduling and processing defined per stage.
> >> The application recieves traffic as normal, with each
> >> packet traversing the pipeline. Once the packet has
> >> been processed by each of the pipeline stages, it is
> >> transmitted again.
> >>
> >> The app provides a framework to utilize cores for a single
> >> role or multiple roles. Examples of roles are the RX core,
> >> TX core, Scheduling core (in the case of the event/sw PMD),
> >> and worker cores.
> >>
> >> Various flags are available to configure numbers of stages,
> >> cycles of work at each stage, type of scheduling, number of
> >> worker cores, queue depths etc. For a full explaination,
> >> please refer to the documentation.
> >>
> >> Signed-off-by: Gage Eads <gage.eads@intel.com>
> >> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> >> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> >
> > Thanks for the example application to share the SW view.
> > I could make it run on HW after some tweaking(not optimized though)
> >
> > [...]
> >> +#define MAX_NUM_STAGES 8
> >> +#define BATCH_SIZE 16
> >> +#define MAX_NUM_CORE 64
> >
> > How about RTE_MAX_LCORE?
> 
> Core usage in the sample app is held in a uint64_t. Adding arrays would be
> possible, but I feel that the extra effort would not give that much benefit.
> I've left as is for the moment, unless you see any strong requirement to go
> beyond 64 cores?

I think, it is OK. Again with service core infrastructure this will change.

> 
> >
> >> +
> >> +static unsigned int active_cores;
> >> +static unsigned int num_workers;
> >> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
> >> +static unsigned int num_fids = 512;
> >> +static unsigned int num_priorities = 1;
> >
> > looks like its not used.
> 
> Yes, Removed.
> 
> >
> >> +static unsigned int num_stages = 1;
> >> +static unsigned int worker_cq_depth = 16;
> >> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> >> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
> >> +static int16_t qid[MAX_NUM_STAGES] = {-1};
> >
> > Moving all fastpath related variables under a structure with cache
> > aligned will help.
> 
> I tried a few different combinations of this, and saw no gains, some losses.
> So will leave as is for the moment, if that's OK.

I think, the one are using in fastpath better to allocate from huge page
using rte_malloc()

> 
> >
> >> +static int worker_cycles;
> >> +static int enable_queue_priorities;
> >> +
> >> +struct prod_data {
> >> +    uint8_t dev_id;
> >> +    uint8_t port_id;
> >> +    int32_t qid;
> >> +    unsigned num_nic_ports;
> >> +};
> 
> Yes, saw a percent or two gain when this plus following two data structs
> cache aligned.

looks like it not fixed in v2. Looks like you have sent the old
version.

> 
> >
> >> +
> >> +    return 0;
> >> +}
> >> +
> >> +static int
> >> +producer(void)
> >> +{
> >> +    static uint8_t eth_port;
> >> +    struct rte_mbuf *mbufs[BATCH_SIZE];
> >> +    struct rte_event ev[BATCH_SIZE];
> >> +    uint32_t i, num_ports = prod_data.num_nic_ports;
> >> +    int32_t qid = prod_data.qid;
> >> +    uint8_t dev_id = prod_data.dev_id;
> >> +    uint8_t port_id = prod_data.port_id;
> >> +    uint32_t prio_idx = 0;
> >> +
> >> +    const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs,
> BATCH_SIZE);
> >> +    if (++eth_port == num_ports)
> >> +        eth_port = 0;
> >> +    if (nb_rx == 0) {
> >> +        rte_pause();
> >> +        return 0;
> >> +    }
> >> +
> >> +    for (i = 0; i < nb_rx; i++) {
> >> +        ev[i].flow_id = mbufs[i]->hash.rss;
> >
> > prefetching the buff[i+1] may help here?
> 
> I tried, didn't make much difference.

OK.

> 
> >
> >> +        ev[i].op = RTE_EVENT_OP_NEW;
> >> +        ev[i].sched_type = queue_type;
> >
> > The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED.
> So, we
> > cannot assign .sched_type as queue_type.
> >
> > I think, one option could be to avoid translation in application is to
> > - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
> > - Introduce a new RTE_EVENT_DEV_CAP_ to denote
> RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
> > ability
> > - add sched_type in struct rte_event_queue_conf. If capability flag is
> >   not set then implementation takes sched_type value for the queue.
> >
> > Any thoughts?
> 
> 
> Not sure here, would it be ok for the moment, and we can work on a patch in
> the future?

OK

> >> +
> >> +    if (tx_core[lcore_id] && (tx_single ||
> >> +        rte_atomic32_cmpset(&tx_lock, 0, 1))) {
> >> +        consumer();
> >
> > Should consumer() need to come in this pattern? I am thinking like
> > if events is from last stage then call consumer() in worker()
> >
> > I think, above scheme works better when the _same_ worker code need to run
> the
> > case where
> > 1) ethdev HW is capable to enqueuing the packets to same txq from
> >   multiple thread
> > 2) ethdev is not capable to do so.
> >
> > So, The above cases can be addressed in configuration time where we link
> > the queues to port
> > case 1) Link all workers to last queue
> > case 2) Link only worker to last queue
> >
> > and keeping the common worker code.
> >
> > HW implementation has functional and performance issue if "two" ports are
> > assigned to one lcore for dequeue. The above scheme fixes that problem
> too.
> 
> 
> Can we have a bit more discussion on this item? Is this needed for this
> sample app, or can we perhaps work a patch for this later? Harry?

As explained above, Is there any issue in keeping consumer() for last
stage ?

> 
> 
> >
> >> +        rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
> >> +    }
> >> +}
> >> +
> >> +static int
> >> +worker(void *arg)
> >> +{
> >> +    struct rte_event events[BATCH_SIZE];
> >> +
> >> +    struct worker_data *data = (struct worker_data *)arg;
> >> +    uint8_t dev_id = data->dev_id;
> >> +    uint8_t port_id = data->port_id;
> >> +    size_t sent = 0, received = 0;
> >> +    unsigned lcore_id = rte_lcore_id();
> >> +
> >> +    while (!done) {
> >> +        uint16_t i;
> >> +
> >> +        schedule_devices(dev_id, lcore_id);
> >> +
> >> +        if (!worker_core[lcore_id]) {
> >> +            rte_pause();
> >> +            continue;
> >> +        }
> >> +
> >> +        uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
> >> +                events, RTE_DIM(events), 0);
> >> +
> >> +        if (nb_rx == 0) {
> >> +            rte_pause();
> >> +            continue;
> >> +        }
> >> +        received += nb_rx;
> >> +
> >> +        for (i = 0; i < nb_rx; i++) {
> >> +            struct ether_hdr *eth;
> >> +            struct ether_addr addr;
> >> +            struct rte_mbuf *m = events[i].mbuf;
> >> +
> >> +            /* The first worker stage does classification */
> >> +            if (events[i].queue_id == qid[0])
> >> +                events[i].flow_id = m->hash.rss % num_fids;
> >
> > Not sure why we need do(shrinking the flows) this in worker() in queue
> based pipeline.
> > If an PMD has any specific requirement on num_fids,I think, we
> > can move this configuration stage or PMD can choose optimum fid internally
> to
> > avoid modulus operation tax in fastpath in all PMD.
> >
> > Does struct rte_event_queue_conf.nb_atomic_flows help here?
> 
> In my tests the modulus makes very little difference in the throughput. And
> I think it's good to have a way of varying the number of flows for testing
> different scenarios, even if it's not the most performant.

Not sure.

> 
> >
> >> +
> >> +            events[i].queue_id = next_qid[events[i].queue_id];
> >> +            events[i].op = RTE_EVENT_OP_FORWARD;
> >
> > missing events[i].sched_type.HW PMD does not work with this.
> > I think, we can use similar scheme like next_qid for next_sched_type.
> 
> Done. added events[i].sched_type = queue_type.
> 
> >
> >> +
> >> +            /* change mac addresses on packet (to use mbuf data) */
> >> +            eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
> >> +            ether_addr_copy(&eth->d_addr, &addr);
> >> +            ether_addr_copy(&eth->s_addr, &eth->d_addr);
> >> +            ether_addr_copy(&addr, &eth->s_addr);
> >
> > IMO, We can make packet processing code code as "static inline function"
> so
> > different worker types can reuse.
> 
> Done. moved out to a work() function.

I think, mac swap should do in last stage, not on each forward.
ie. With existing code, 2 stage forward makes in original order.

> 
> >
> >> +
> >> +            /* do a number of cycles of work per packet */
> >> +            volatile uint64_t start_tsc = rte_rdtsc();
> >> +            while (rte_rdtsc() < start_tsc + worker_cycles)
> >> +                rte_pause();
> >
> > Ditto.
> 
> Done. moved out to a work() function.
> 
> >
> > I think, All worker specific variables like "worker_cycles" can moved into
> > one structure and use.
> >
> >> +        }
> >> +        uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
> >> +                events, nb_rx);
> >> +        while (nb_tx < nb_rx && !done)
> >> +            nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> >> +                            events + nb_tx,
> >> +                            nb_rx - nb_tx);
> >> +        sent += nb_tx;
> >> +    }
> >> +
> >> +    if (!quiet)
> >> +        printf("  worker %u thread done. RX=%zu TX=%zu\n",
> >> +                rte_lcore_id(), received, sent);
> >> +
> >> +    return 0;
> >> +}
> >> +
> >> +/*
> >> + * Parse the coremask given as argument (hexadecimal string) and fill
> >> + * the global configuration (core role and core count) with the parsed
> >> + * value.
> >> + */
> >> +static int xdigit2val(unsigned char c)
> >
> > multiple instance of "xdigit2val" in DPDK repo. May be we can push this
> > as common code.
> 
> Sure, that's something we can look at in a separate patch, now that it's
> being used more and more.

make sense.

> 
> >
> >> +{
> >> +    int val;
> >> +
> >> +    if (isdigit(c))
> >> +        val = c - '0';
> >> +    else if (isupper(c))
> >> +        val = c - 'A' + 10;
> >> +    else
> >> +        val = c - 'a' + 10;
> >> +    return val;
> >> +}
> >> +
> >> +
> >> +static void
> >> +usage(void)
> >> +{
> >> +    const char *usage_str =
> >> +        "  Usage: eventdev_demo [options]\n"
> >> +        "  Options:\n"
> >> +        "  -n, --packets=N              Send N packets (default ~32M), 0
> implies no limit\n"
> >> +        "  -f, --atomic-flows=N         Use N random flows from 1 to N
> (default 16)\n"
> >
> > I think, this parameter now, effects the application fast path code.I
> think,
> > it should eventdev configuration para-mater.
> 
> See above comment on num_fids
> 
> >
> >> +        "  -s, --num_stages=N           Use N atomic stages (default
> 1)\n"
> >> +        "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core
> mask\n"
> >> +        "  -w, --worker-mask=core mask  Run worker on CPUs in core
> mask\n"
> >> +        "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core
> mask\n"
> >> +        "  -e  --sched-mask=core mask   Run scheduler on CPUs in core
> mask\n"
> >> +        "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
> >> +        "  -W  --work-cycles=N          Worker cycles (default 0)\n"
> >> +        "  -P  --queue-priority         Enable scheduler queue
> prioritization\n"
> >> +        "  -o, --ordered                Use ordered scheduling\n"
> >> +        "  -p, --parallel               Use parallel scheduling\n"
> >
> > IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
> > operation. It is valid have to any combination. We need to express that in
> > command like
> > example:
> > 3 stage with
> > O->A->P
> 
> How about we add an option that specifies the mode of operation for each
> stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4
> stages with atomic, parallel, paralled, ordered. Or maybe reuse your
> test-eventdev parameter style?

Any scheme is fine.

> 
> >
> >> +        "  -q, --quiet                  Minimize printed output\n"
> >> +        "  -D, --dump                   Print detailed statistics before
> exit"
> >> +        "\n";
> >> +    fprintf(stderr, "%s", usage_str);
> >> +    exit(1);
> >> +}
> >> +
> >
> > [...]
> >
> >> +            rx_single = (popcnt == 1);
> >> +            break;
> >> +        case 't':
> >> +            tx_lcore_mask = parse_coremask(optarg);
> >> +            popcnt = __builtin_popcountll(tx_lcore_mask);
> >> +            tx_single = (popcnt == 1);
> >> +            break;
> >> +        case 'e':
> >> +            sched_lcore_mask = parse_coremask(optarg);
> >> +            popcnt = __builtin_popcountll(sched_lcore_mask);
> >> +            sched_single = (popcnt == 1);
> >> +            break;
> >> +        default:
> >> +            usage();
> >> +        }
> >> +    }
> >> +
> >> +    if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
> >> +        sched_lcore_mask == 0 || tx_lcore_mask == 0) {
> >
> >> +
> >> +    /* Q creation - one load balanced per pipeline stage*/
> >> +
> >> +    /* set up one port per worker, linking to all stage queues */
> >> +    for (i = 0; i < num_workers; i++) {
> >> +        struct worker_data *w = &worker_data[i];
> >> +        w->dev_id = dev_id;
> >> +        if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
> >> +            printf("Error setting up port %d\n", i);
> >> +            return -1;
> >> +        }
> >> +
> >> +        uint32_t s;
> >> +        for (s = 0; s < num_stages; s++) {
> >> +            if (rte_event_port_link(dev_id, i,
> >> +                        &worker_queues[s].queue_id,
> >> +                        &worker_queues[s].priority,
> >> +                        1) != 1) {
> >> +                printf("%d: error creating link for port %d\n",
> >> +                        __LINE__, i);
> >> +                return -1;
> >> +            }
> >> +        }
> >> +        w->port_id = i;
> >> +    }
> >> +    /* port for consumer, linked to TX queue */
> >> +    if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
> >
> > If ethdev supports MT txq queue support then this port can be linked to
> > worker too. something to consider for future.
> >
> 
> Sure. No change for now.

OK
  
Hunt, David June 27, 2017, 1:12 p.m. UTC | #9
Hi Jerin:


On 27/6/2017 10:35 AM, Jerin Jacob wrote:
> -----Original Message-----
>> Date: Mon, 26 Jun 2017 15:46:47 +0100
>> From: "Hunt, David" <david.hunt@intel.com>
>> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>, Harry van Haaren
>>   <harry.van.haaren@intel.com>
>> CC: dev@dpdk.org, Gage Eads <gage.eads@intel.com>, Bruce Richardson
>>   <bruce.richardson@intel.com>
>> Subject: Re: [dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added
>>   sample app
>> User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101
>>   Thunderbird/45.8.0
>>
>> Hi Jerin,
> Hi David,
>
> Looks like you have sent the old version. The below mentioned comments
> are not addressed in v2.

Oops. Glitch in the Matrix. I've just pushed a V3 with the changes.

>> I'm assisting Harry on the sample app, and have just pushed up a V2 patch
>> based on your feedback. I've addressed most of your suggestions, comments
>> below. There may still a couple of outstanding questions that need further
>> discussion.
> A few general comments:
> 1) Nikhil/Gage's proposal on ethdev rx to eventdev adapter will change the major
> portion(eventdev setup and producer()) of this application
> 2) Producing one lcore worth of packets, really cant show as example
> eventdev application as it will be pretty bad in low-end machine.
> At least application infrastructure should not limit.
>
> Considering above points, Should we wait for rx adapter to complete
> first? I would like to show this as real world application to use eventdev.
>
> Thoughts?
>
> On the same note:
> Can we finalize on rx adapter proposal? I can work on v1 of patch and
> common code if Nikhil or Gage don't have bandwidth. Let me know?
>
> last followup:
> http://dpdk.org/ml/archives/dev/2017-June/068776.html

I had a quick chat with Harry, and wonder if we'd be as well to merge 
the app as it is now, and as the new frameworks become available, the 
app can be updated to make use of them? I feel it would be better to 
have something out there for people to play with than waiting for 17.11.

Also, if you have bandwidth to patch the app for your desired use cases, 
that would be a good contribution. I'd only be guessing for some of it :)


>> Regards,
>> Dave
>>
>> On 10/5/2017 3:12 PM, Jerin Jacob wrote:
>>> -----Original Message-----
>>>> Date: Fri, 21 Apr 2017 10:51:37 +0100
>>>> From: Harry van Haaren <harry.van.haaren@intel.com>
>>>> To: dev@dpdk.org
>>>> CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
>>>> <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
>>>>   Richardson <bruce.richardson@intel.com>
>>>> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
>>>> X-Mailer: git-send-email 2.7.4
>>>>
>>>> This commit adds a sample app for the eventdev library.
>>>> The app has been tested with DPDK 17.05-rc2, hence this
>>>> release (or later) is recommended.
>>>>
>>>> The sample app showcases a pipeline processing use-case,
>>>> with event scheduling and processing defined per stage.
>>>> The application recieves traffic as normal, with each
>>>> packet traversing the pipeline. Once the packet has
>>>> been processed by each of the pipeline stages, it is
>>>> transmitted again.
>>>>
>>>> The app provides a framework to utilize cores for a single
>>>> role or multiple roles. Examples of roles are the RX core,
>>>> TX core, Scheduling core (in the case of the event/sw PMD),
>>>> and worker cores.
>>>>
>>>> Various flags are available to configure numbers of stages,
>>>> cycles of work at each stage, type of scheduling, number of
>>>> worker cores, queue depths etc. For a full explaination,
>>>> please refer to the documentation.
>>>>
>>>> Signed-off-by: Gage Eads <gage.eads@intel.com>
>>>> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
>>>> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
>>> Thanks for the example application to share the SW view.
>>> I could make it run on HW after some tweaking(not optimized though)
>>>
>>> [...]
>>>> +#define MAX_NUM_STAGES 8
>>>> +#define BATCH_SIZE 16
>>>> +#define MAX_NUM_CORE 64
>>> How about RTE_MAX_LCORE?
>> Core usage in the sample app is held in a uint64_t. Adding arrays would be
>> possible, but I feel that the extra effort would not give that much benefit.
>> I've left as is for the moment, unless you see any strong requirement to go
>> beyond 64 cores?
> I think, it is OK. Again with service core infrastructure this will change.
>
>>>> +
>>>> +static unsigned int active_cores;
>>>> +static unsigned int num_workers;
>>>> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
>>>> +static unsigned int num_fids = 512;
>>>> +static unsigned int num_priorities = 1;
>>> looks like its not used.
>> Yes, Removed.
>>
>>>> +static unsigned int num_stages = 1;
>>>> +static unsigned int worker_cq_depth = 16;
>>>> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
>>>> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
>>>> +static int16_t qid[MAX_NUM_STAGES] = {-1};
>>> Moving all fastpath related variables under a structure with cache
>>> aligned will help.
>> I tried a few different combinations of this, and saw no gains, some losses.
>> So will leave as is for the moment, if that's OK.
> I think, the one are using in fastpath better to allocate from huge page
> using rte_malloc()
>
>>>> +static int worker_cycles;
>>>> +static int enable_queue_priorities;
>>>> +
>>>> +struct prod_data {
>>>> +    uint8_t dev_id;
>>>> +    uint8_t port_id;
>>>> +    int32_t qid;
>>>> +    unsigned num_nic_ports;
>>>> +};
>> Yes, saw a percent or two gain when this plus following two data structs
>> cache aligned.
> looks like it not fixed in v2. Looks like you have sent the old
> version.
>
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static int
>>>> +producer(void)
>>>> +{
>>>> +    static uint8_t eth_port;
>>>> +    struct rte_mbuf *mbufs[BATCH_SIZE];
>>>> +    struct rte_event ev[BATCH_SIZE];
>>>> +    uint32_t i, num_ports = prod_data.num_nic_ports;
>>>> +    int32_t qid = prod_data.qid;
>>>> +    uint8_t dev_id = prod_data.dev_id;
>>>> +    uint8_t port_id = prod_data.port_id;
>>>> +    uint32_t prio_idx = 0;
>>>> +
>>>> +    const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs,
>> BATCH_SIZE);
>>>> +    if (++eth_port == num_ports)
>>>> +        eth_port = 0;
>>>> +    if (nb_rx == 0) {
>>>> +        rte_pause();
>>>> +        return 0;
>>>> +    }
>>>> +
>>>> +    for (i = 0; i < nb_rx; i++) {
>>>> +        ev[i].flow_id = mbufs[i]->hash.rss;
>>> prefetching the buff[i+1] may help here?
>> I tried, didn't make much difference.
> OK.
>
>>>> +        ev[i].op = RTE_EVENT_OP_NEW;
>>>> +        ev[i].sched_type = queue_type;
>>> The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED.
>> So, we
>>> cannot assign .sched_type as queue_type.
>>>
>>> I think, one option could be to avoid translation in application is to
>>> - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
>>> - Introduce a new RTE_EVENT_DEV_CAP_ to denote
>> RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
>>> ability
>>> - add sched_type in struct rte_event_queue_conf. If capability flag is
>>>    not set then implementation takes sched_type value for the queue.
>>>
>>> Any thoughts?
>>
>> Not sure here, would it be ok for the moment, and we can work on a patch in
>> the future?
> OK
>
>>>> +
>>>> +    if (tx_core[lcore_id] && (tx_single ||
>>>> +        rte_atomic32_cmpset(&tx_lock, 0, 1))) {
>>>> +        consumer();
>>> Should consumer() need to come in this pattern? I am thinking like
>>> if events is from last stage then call consumer() in worker()
>>>
>>> I think, above scheme works better when the _same_ worker code need to run
>> the
>>> case where
>>> 1) ethdev HW is capable to enqueuing the packets to same txq from
>>>    multiple thread
>>> 2) ethdev is not capable to do so.
>>>
>>> So, The above cases can be addressed in configuration time where we link
>>> the queues to port
>>> case 1) Link all workers to last queue
>>> case 2) Link only worker to last queue
>>>
>>> and keeping the common worker code.
>>>
>>> HW implementation has functional and performance issue if "two" ports are
>>> assigned to one lcore for dequeue. The above scheme fixes that problem
>> too.
>>
>>
>> Can we have a bit more discussion on this item? Is this needed for this
>> sample app, or can we perhaps work a patch for this later? Harry?
> As explained above, Is there any issue in keeping consumer() for last
> stage ?

I would probably see this as a future enhancement as per my initial 
comments above. Any hardware or new framework additions are welcome as 
future patches to the app.

>>
>>>> +        rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
>>>> +    }
>>>> +}
>>>> +
>>>> +static int
>>>> +worker(void *arg)
>>>> +{
>>>> +    struct rte_event events[BATCH_SIZE];
>>>> +
>>>> +    struct worker_data *data = (struct worker_data *)arg;
>>>> +    uint8_t dev_id = data->dev_id;
>>>> +    uint8_t port_id = data->port_id;
>>>> +    size_t sent = 0, received = 0;
>>>> +    unsigned lcore_id = rte_lcore_id();
>>>> +
>>>> +    while (!done) {
>>>> +        uint16_t i;
>>>> +
>>>> +        schedule_devices(dev_id, lcore_id);
>>>> +
>>>> +        if (!worker_core[lcore_id]) {
>>>> +            rte_pause();
>>>> +            continue;
>>>> +        }
>>>> +
>>>> +        uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
>>>> +                events, RTE_DIM(events), 0);
>>>> +
>>>> +        if (nb_rx == 0) {
>>>> +            rte_pause();
>>>> +            continue;
>>>> +        }
>>>> +        received += nb_rx;
>>>> +
>>>> +        for (i = 0; i < nb_rx; i++) {
>>>> +            struct ether_hdr *eth;
>>>> +            struct ether_addr addr;
>>>> +            struct rte_mbuf *m = events[i].mbuf;
>>>> +
>>>> +            /* The first worker stage does classification */
>>>> +            if (events[i].queue_id == qid[0])
>>>> +                events[i].flow_id = m->hash.rss % num_fids;
>>> Not sure why we need do(shrinking the flows) this in worker() in queue
>> based pipeline.
>>> If an PMD has any specific requirement on num_fids,I think, we
>>> can move this configuration stage or PMD can choose optimum fid internally
>> to
>>> avoid modulus operation tax in fastpath in all PMD.
>>>
>>> Does struct rte_event_queue_conf.nb_atomic_flows help here?
>> In my tests the modulus makes very little difference in the throughput. And
>> I think it's good to have a way of varying the number of flows for testing
>> different scenarios, even if it's not the most performant.
> Not sure.
>
>>>> +
>>>> +            events[i].queue_id = next_qid[events[i].queue_id];
>>>> +            events[i].op = RTE_EVENT_OP_FORWARD;
>>> missing events[i].sched_type.HW PMD does not work with this.
>>> I think, we can use similar scheme like next_qid for next_sched_type.
>> Done. added events[i].sched_type = queue_type.
>>
>>>> +
>>>> +            /* change mac addresses on packet (to use mbuf data) */
>>>> +            eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
>>>> +            ether_addr_copy(&eth->d_addr, &addr);
>>>> +            ether_addr_copy(&eth->s_addr, &eth->d_addr);
>>>> +            ether_addr_copy(&addr, &eth->s_addr);
>>> IMO, We can make packet processing code code as "static inline function"
>> so
>>> different worker types can reuse.
>> Done. moved out to a work() function.
> I think, mac swap should do in last stage, not on each forward.
> ie. With existing code, 2 stage forward makes in original order.
>
>>>> +
>>>> +            /* do a number of cycles of work per packet */
>>>> +            volatile uint64_t start_tsc = rte_rdtsc();
>>>> +            while (rte_rdtsc() < start_tsc + worker_cycles)
>>>> +                rte_pause();
>>> Ditto.
>> Done. moved out to a work() function.
>>
>>> I think, All worker specific variables like "worker_cycles" can moved into
>>> one structure and use.
>>>
>>>> +        }
>>>> +        uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
>>>> +                events, nb_rx);
>>>> +        while (nb_tx < nb_rx && !done)
>>>> +            nb_tx += rte_event_enqueue_burst(dev_id, port_id,
>>>> +                            events + nb_tx,
>>>> +                            nb_rx - nb_tx);
>>>> +        sent += nb_tx;
>>>> +    }
>>>> +
>>>> +    if (!quiet)
>>>> +        printf("  worker %u thread done. RX=%zu TX=%zu\n",
>>>> +                rte_lcore_id(), received, sent);
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +/*
>>>> + * Parse the coremask given as argument (hexadecimal string) and fill
>>>> + * the global configuration (core role and core count) with the parsed
>>>> + * value.
>>>> + */
>>>> +static int xdigit2val(unsigned char c)
>>> multiple instance of "xdigit2val" in DPDK repo. May be we can push this
>>> as common code.
>> Sure, that's something we can look at in a separate patch, now that it's
>> being used more and more.
> make sense.
>
>>>> +{
>>>> +    int val;
>>>> +
>>>> +    if (isdigit(c))
>>>> +        val = c - '0';
>>>> +    else if (isupper(c))
>>>> +        val = c - 'A' + 10;
>>>> +    else
>>>> +        val = c - 'a' + 10;
>>>> +    return val;
>>>> +}
>>>> +
>>>> +
>>>> +static void
>>>> +usage(void)
>>>> +{
>>>> +    const char *usage_str =
>>>> +        "  Usage: eventdev_demo [options]\n"
>>>> +        "  Options:\n"
>>>> +        "  -n, --packets=N              Send N packets (default ~32M), 0
>> implies no limit\n"
>>>> +        "  -f, --atomic-flows=N         Use N random flows from 1 to N
>> (default 16)\n"
>>> I think, this parameter now, effects the application fast path code.I
>> think,
>>> it should eventdev configuration para-mater.
>> See above comment on num_fids
>>
>>>> +        "  -s, --num_stages=N           Use N atomic stages (default
>> 1)\n"
>>>> +        "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core
>> mask\n"
>>>> +        "  -w, --worker-mask=core mask  Run worker on CPUs in core
>> mask\n"
>>>> +        "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core
>> mask\n"
>>>> +        "  -e  --sched-mask=core mask   Run scheduler on CPUs in core
>> mask\n"
>>>> +        "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
>>>> +        "  -W  --work-cycles=N          Worker cycles (default 0)\n"
>>>> +        "  -P  --queue-priority         Enable scheduler queue
>> prioritization\n"
>>>> +        "  -o, --ordered                Use ordered scheduling\n"
>>>> +        "  -p, --parallel               Use parallel scheduling\n"
>>> IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
>>> operation. It is valid have to any combination. We need to express that in
>>> command like
>>> example:
>>> 3 stage with
>>> O->A->P
>> How about we add an option that specifies the mode of operation for each
>> stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4
>> stages with atomic, parallel, paralled, ordered. Or maybe reuse your
>> test-eventdev parameter style?
> Any scheme is fine.
>
>>>> +        "  -q, --quiet                  Minimize printed output\n"
>>>> +        "  -D, --dump                   Print detailed statistics before
>> exit"
>>>> +        "\n";
>>>> +    fprintf(stderr, "%s", usage_str);
>>>> +    exit(1);
>>>> +}
>>>> +
>>> [...]
>>>
>>>> +            rx_single = (popcnt == 1);
>>>> +            break;
>>>> +        case 't':
>>>> +            tx_lcore_mask = parse_coremask(optarg);
>>>> +            popcnt = __builtin_popcountll(tx_lcore_mask);
>>>> +            tx_single = (popcnt == 1);
>>>> +            break;
>>>> +        case 'e':
>>>> +            sched_lcore_mask = parse_coremask(optarg);
>>>> +            popcnt = __builtin_popcountll(sched_lcore_mask);
>>>> +            sched_single = (popcnt == 1);
>>>> +            break;
>>>> +        default:
>>>> +            usage();
>>>> +        }
>>>> +    }
>>>> +
>>>> +    if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
>>>> +        sched_lcore_mask == 0 || tx_lcore_mask == 0) {
>>>> +
>>>> +    /* Q creation - one load balanced per pipeline stage*/
>>>> +
>>>> +    /* set up one port per worker, linking to all stage queues */
>>>> +    for (i = 0; i < num_workers; i++) {
>>>> +        struct worker_data *w = &worker_data[i];
>>>> +        w->dev_id = dev_id;
>>>> +        if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
>>>> +            printf("Error setting up port %d\n", i);
>>>> +            return -1;
>>>> +        }
>>>> +
>>>> +        uint32_t s;
>>>> +        for (s = 0; s < num_stages; s++) {
>>>> +            if (rte_event_port_link(dev_id, i,
>>>> +                        &worker_queues[s].queue_id,
>>>> +                        &worker_queues[s].priority,
>>>> +                        1) != 1) {
>>>> +                printf("%d: error creating link for port %d\n",
>>>> +                        __LINE__, i);
>>>> +                return -1;
>>>> +            }
>>>> +        }
>>>> +        w->port_id = i;
>>>> +    }
>>>> +    /* port for consumer, linked to TX queue */
>>>> +    if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
>>> If ethdev supports MT txq queue support then this port can be linked to
>>> worker too. something to consider for future.
>>>
>> Sure. No change for now.
> OK

Just to add a comment for any remaining comments above, we would hope 
that none of them are blockers for the merge of the current version, as 
they can be patched in the future as the infrastructure changes.

Rgds,
Dave.
  
Jerin Jacob June 29, 2017, 7:17 a.m. UTC | #10
-----Original Message-----
> Date: Tue, 27 Jun 2017 14:12:20 +0100
> From: "Hunt, David" <david.hunt@intel.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> CC: Harry van Haaren <harry.van.haaren@intel.com>, dev@dpdk.org, Gage Eads
>  <gage.eads@intel.com>, Bruce Richardson <bruce.richardson@intel.com>
> Subject: Re: [dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added
>  sample app
> User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101
>  Thunderbird/45.8.0
> 
> Hi Jerin:
> 
> 
> On 27/6/2017 10:35 AM, Jerin Jacob wrote:
> > -----Original Message-----
> > > Date: Mon, 26 Jun 2017 15:46:47 +0100
> > > From: "Hunt, David" <david.hunt@intel.com>
> > > To: Jerin Jacob <jerin.jacob@caviumnetworks.com>, Harry van Haaren
> > >   <harry.van.haaren@intel.com>
> > > CC: dev@dpdk.org, Gage Eads <gage.eads@intel.com>, Bruce Richardson
> > >   <bruce.richardson@intel.com>
> > > Subject: Re: [dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added
> > >   sample app
> > > User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101
> > >   Thunderbird/45.8.0
> > > 
> > > Hi Jerin,
> > Hi David,
> > 
> > Looks like you have sent the old version. The below mentioned comments
> > are not addressed in v2.
> 
> Oops. Glitch in the Matrix. I've just pushed a V3 with the changes.
> 
> > > I'm assisting Harry on the sample app, and have just pushed up a V2 patch
> > > based on your feedback. I've addressed most of your suggestions, comments
> > > below. There may still a couple of outstanding questions that need further
> > > discussion.
> > A few general comments:
> > 1) Nikhil/Gage's proposal on ethdev rx to eventdev adapter will change the major
> > portion(eventdev setup and producer()) of this application
> > 2) Producing one lcore worth of packets, really cant show as example
> > eventdev application as it will be pretty bad in low-end machine.
> > At least application infrastructure should not limit.
> > 
> > Considering above points, Should we wait for rx adapter to complete
> > first? I would like to show this as real world application to use eventdev.
> > 
> > Thoughts?
> > 
> > On the same note:
> > Can we finalize on rx adapter proposal? I can work on v1 of patch and
> > common code if Nikhil or Gage don't have bandwidth. Let me know?
> > 
> > last followup:
> > http://dpdk.org/ml/archives/dev/2017-June/068776.html
> 
> I had a quick chat with Harry, and wonder if we'd be as well to merge the
> app as it is now, and as the new frameworks become available, the app can be
> updated to make use of them? I feel it would be better to have something out
> there for people to play with than waiting for 17.11.

I agree with your concern.
How about renaming the test and doc specific to SW PMD and then once we
fix the known issues with HW eventdev + ethdev(Rx adapter) integration and then
rename the application to generic eventdev.



> 
> Also, if you have bandwidth to patch the app for your desired use cases,
> that would be a good contribution. I'd only be guessing for some of it :)
> 
> 
> > > Regards,
> > > Dave
> > > 
> > > On 10/5/2017 3:12 PM, Jerin Jacob wrote:
> > > > -----Original Message-----
> > > > > Date: Fri, 21 Apr 2017 10:51:37 +0100
> > > > > From: Harry van Haaren <harry.van.haaren@intel.com>
> > > > > To: dev@dpdk.org
> > > > > CC: jerin.jacob@caviumnetworks.com, Harry van Haaren
> > > > > <harry.van.haaren@intel.com>, Gage Eads <gage.eads@intel.com>, Bruce
> > > > >   Richardson <bruce.richardson@intel.com>
> > > > > Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
> > > > > X-Mailer: git-send-email 2.7.4
> > > > > 
> > > > > This commit adds a sample app for the eventdev library.
> > > > > The app has been tested with DPDK 17.05-rc2, hence this
> > > > > release (or later) is recommended.
> > > > > 
> > > > > The sample app showcases a pipeline processing use-case,
> > > > > with event scheduling and processing defined per stage.
> > > > > The application recieves traffic as normal, with each
> > > > > packet traversing the pipeline. Once the packet has
> > > > > been processed by each of the pipeline stages, it is
> > > > > transmitted again.
> > > > > 
> > > > > The app provides a framework to utilize cores for a single
> > > > > role or multiple roles. Examples of roles are the RX core,
> > > > > TX core, Scheduling core (in the case of the event/sw PMD),
> > > > > and worker cores.
> > > > > 
> > > > > Various flags are available to configure numbers of stages,
> > > > > cycles of work at each stage, type of scheduling, number of
> > > > > worker cores, queue depths etc. For a full explaination,
> > > > > please refer to the documentation.
> > > > > 
> > > > > Signed-off-by: Gage Eads <gage.eads@intel.com>
> > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > > > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > > > Thanks for the example application to share the SW view.
> > > > I could make it run on HW after some tweaking(not optimized though)
> > > > 
> > > > [...]
> > > > > +#define MAX_NUM_STAGES 8
> > > > > +#define BATCH_SIZE 16
> > > > > +#define MAX_NUM_CORE 64
> > > > How about RTE_MAX_LCORE?
> > > Core usage in the sample app is held in a uint64_t. Adding arrays would be
> > > possible, but I feel that the extra effort would not give that much benefit.
> > > I've left as is for the moment, unless you see any strong requirement to go
> > > beyond 64 cores?
> > I think, it is OK. Again with service core infrastructure this will change.
> > 
> > > > > +
> > > > > +static unsigned int active_cores;
> > > > > +static unsigned int num_workers;
> > > > > +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
> > > > > +static unsigned int num_fids = 512;
> > > > > +static unsigned int num_priorities = 1;
> > > > looks like its not used.
> > > Yes, Removed.
> > > 
> > > > > +static unsigned int num_stages = 1;
> > > > > +static unsigned int worker_cq_depth = 16;
> > > > > +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> > > > > +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
> > > > > +static int16_t qid[MAX_NUM_STAGES] = {-1};
> > > > Moving all fastpath related variables under a structure with cache
> > > > aligned will help.
> > > I tried a few different combinations of this, and saw no gains, some losses.
> > > So will leave as is for the moment, if that's OK.
> > I think, the one are using in fastpath better to allocate from huge page
> > using rte_malloc()
> > 
> > > > > +static int worker_cycles;
> > > > > +static int enable_queue_priorities;
> > > > > +
> > > > > +struct prod_data {
> > > > > +    uint8_t dev_id;
> > > > > +    uint8_t port_id;
> > > > > +    int32_t qid;
> > > > > +    unsigned num_nic_ports;
> > > > > +};
> > > Yes, saw a percent or two gain when this plus following two data structs
> > > cache aligned.
> > looks like it not fixed in v2. Looks like you have sent the old
> > version.
> > 
> > > > > +
> > > > > +    return 0;
> > > > > +}
> > > > > +
> > > > > +static int
> > > > > +producer(void)
> > > > > +{
> > > > > +    static uint8_t eth_port;
> > > > > +    struct rte_mbuf *mbufs[BATCH_SIZE];
> > > > > +    struct rte_event ev[BATCH_SIZE];
> > > > > +    uint32_t i, num_ports = prod_data.num_nic_ports;
> > > > > +    int32_t qid = prod_data.qid;
> > > > > +    uint8_t dev_id = prod_data.dev_id;
> > > > > +    uint8_t port_id = prod_data.port_id;
> > > > > +    uint32_t prio_idx = 0;
> > > > > +
> > > > > +    const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs,
> > > BATCH_SIZE);
> > > > > +    if (++eth_port == num_ports)
> > > > > +        eth_port = 0;
> > > > > +    if (nb_rx == 0) {
> > > > > +        rte_pause();
> > > > > +        return 0;
> > > > > +    }
> > > > > +
> > > > > +    for (i = 0; i < nb_rx; i++) {
> > > > > +        ev[i].flow_id = mbufs[i]->hash.rss;
> > > > prefetching the buff[i+1] may help here?
> > > I tried, didn't make much difference.
> > OK.
> > 
> > > > > +        ev[i].op = RTE_EVENT_OP_NEW;
> > > > > +        ev[i].sched_type = queue_type;
> > > > The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED.
> > > So, we
> > > > cannot assign .sched_type as queue_type.
> > > > 
> > > > I think, one option could be to avoid translation in application is to
> > > > - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
> > > > - Introduce a new RTE_EVENT_DEV_CAP_ to denote
> > > RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
> > > > ability
> > > > - add sched_type in struct rte_event_queue_conf. If capability flag is
> > > >    not set then implementation takes sched_type value for the queue.
> > > > 
> > > > Any thoughts?
> > > 
> > > Not sure here, would it be ok for the moment, and we can work on a patch in
> > > the future?
> > OK
> > 
> > > > > +
> > > > > +    if (tx_core[lcore_id] && (tx_single ||
> > > > > +        rte_atomic32_cmpset(&tx_lock, 0, 1))) {
> > > > > +        consumer();
> > > > Should consumer() need to come in this pattern? I am thinking like
> > > > if events is from last stage then call consumer() in worker()
> > > > 
> > > > I think, above scheme works better when the _same_ worker code need to run
> > > the
> > > > case where
> > > > 1) ethdev HW is capable to enqueuing the packets to same txq from
> > > >    multiple thread
> > > > 2) ethdev is not capable to do so.
> > > > 
> > > > So, The above cases can be addressed in configuration time where we link
> > > > the queues to port
> > > > case 1) Link all workers to last queue
> > > > case 2) Link only worker to last queue
> > > > 
> > > > and keeping the common worker code.
> > > > 
> > > > HW implementation has functional and performance issue if "two" ports are
> > > > assigned to one lcore for dequeue. The above scheme fixes that problem
> > > too.
> > > 
> > > 
> > > Can we have a bit more discussion on this item? Is this needed for this
> > > sample app, or can we perhaps work a patch for this later? Harry?
> > As explained above, Is there any issue in keeping consumer() for last
> > stage ?
> 
> I would probably see this as a future enhancement as per my initial comments
> above. Any hardware or new framework additions are welcome as future patches
> to the app.
> 
> > > 
> > > > > +        rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
> > > > > +    }
> > > > > +}
> > > > > +
> > > > > +static int
> > > > > +worker(void *arg)
> > > > > +{
> > > > > +    struct rte_event events[BATCH_SIZE];
> > > > > +
> > > > > +    struct worker_data *data = (struct worker_data *)arg;
> > > > > +    uint8_t dev_id = data->dev_id;
> > > > > +    uint8_t port_id = data->port_id;
> > > > > +    size_t sent = 0, received = 0;
> > > > > +    unsigned lcore_id = rte_lcore_id();
> > > > > +
> > > > > +    while (!done) {
> > > > > +        uint16_t i;
> > > > > +
> > > > > +        schedule_devices(dev_id, lcore_id);
> > > > > +
> > > > > +        if (!worker_core[lcore_id]) {
> > > > > +            rte_pause();
> > > > > +            continue;
> > > > > +        }
> > > > > +
> > > > > +        uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
> > > > > +                events, RTE_DIM(events), 0);
> > > > > +
> > > > > +        if (nb_rx == 0) {
> > > > > +            rte_pause();
> > > > > +            continue;
> > > > > +        }
> > > > > +        received += nb_rx;
> > > > > +
> > > > > +        for (i = 0; i < nb_rx; i++) {
> > > > > +            struct ether_hdr *eth;
> > > > > +            struct ether_addr addr;
> > > > > +            struct rte_mbuf *m = events[i].mbuf;
> > > > > +
> > > > > +            /* The first worker stage does classification */
> > > > > +            if (events[i].queue_id == qid[0])
> > > > > +                events[i].flow_id = m->hash.rss % num_fids;
> > > > Not sure why we need do(shrinking the flows) this in worker() in queue
> > > based pipeline.
> > > > If an PMD has any specific requirement on num_fids,I think, we
> > > > can move this configuration stage or PMD can choose optimum fid internally
> > > to
> > > > avoid modulus operation tax in fastpath in all PMD.
> > > > 
> > > > Does struct rte_event_queue_conf.nb_atomic_flows help here?
> > > In my tests the modulus makes very little difference in the throughput. And
> > > I think it's good to have a way of varying the number of flows for testing
> > > different scenarios, even if it's not the most performant.
> > Not sure.
> > 
> > > > > +
> > > > > +            events[i].queue_id = next_qid[events[i].queue_id];
> > > > > +            events[i].op = RTE_EVENT_OP_FORWARD;
> > > > missing events[i].sched_type.HW PMD does not work with this.
> > > > I think, we can use similar scheme like next_qid for next_sched_type.
> > > Done. added events[i].sched_type = queue_type.
> > > 
> > > > > +
> > > > > +            /* change mac addresses on packet (to use mbuf data) */
> > > > > +            eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
> > > > > +            ether_addr_copy(&eth->d_addr, &addr);
> > > > > +            ether_addr_copy(&eth->s_addr, &eth->d_addr);
> > > > > +            ether_addr_copy(&addr, &eth->s_addr);
> > > > IMO, We can make packet processing code code as "static inline function"
> > > so
> > > > different worker types can reuse.
> > > Done. moved out to a work() function.
> > I think, mac swap should do in last stage, not on each forward.
> > ie. With existing code, 2 stage forward makes in original order.
> > 
> > > > > +
> > > > > +            /* do a number of cycles of work per packet */
> > > > > +            volatile uint64_t start_tsc = rte_rdtsc();
> > > > > +            while (rte_rdtsc() < start_tsc + worker_cycles)
> > > > > +                rte_pause();
> > > > Ditto.
> > > Done. moved out to a work() function.
> > > 
> > > > I think, All worker specific variables like "worker_cycles" can moved into
> > > > one structure and use.
> > > > 
> > > > > +        }
> > > > > +        uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
> > > > > +                events, nb_rx);
> > > > > +        while (nb_tx < nb_rx && !done)
> > > > > +            nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> > > > > +                            events + nb_tx,
> > > > > +                            nb_rx - nb_tx);
> > > > > +        sent += nb_tx;
> > > > > +    }
> > > > > +
> > > > > +    if (!quiet)
> > > > > +        printf("  worker %u thread done. RX=%zu TX=%zu\n",
> > > > > +                rte_lcore_id(), received, sent);
> > > > > +
> > > > > +    return 0;
> > > > > +}
> > > > > +
> > > > > +/*
> > > > > + * Parse the coremask given as argument (hexadecimal string) and fill
> > > > > + * the global configuration (core role and core count) with the parsed
> > > > > + * value.
> > > > > + */
> > > > > +static int xdigit2val(unsigned char c)
> > > > multiple instance of "xdigit2val" in DPDK repo. May be we can push this
> > > > as common code.
> > > Sure, that's something we can look at in a separate patch, now that it's
> > > being used more and more.
> > make sense.
> > 
> > > > > +{
> > > > > +    int val;
> > > > > +
> > > > > +    if (isdigit(c))
> > > > > +        val = c - '0';
> > > > > +    else if (isupper(c))
> > > > > +        val = c - 'A' + 10;
> > > > > +    else
> > > > > +        val = c - 'a' + 10;
> > > > > +    return val;
> > > > > +}
> > > > > +
> > > > > +
> > > > > +static void
> > > > > +usage(void)
> > > > > +{
> > > > > +    const char *usage_str =
> > > > > +        "  Usage: eventdev_demo [options]\n"
> > > > > +        "  Options:\n"
> > > > > +        "  -n, --packets=N              Send N packets (default ~32M), 0
> > > implies no limit\n"
> > > > > +        "  -f, --atomic-flows=N         Use N random flows from 1 to N
> > > (default 16)\n"
> > > > I think, this parameter now, effects the application fast path code.I
> > > think,
> > > > it should eventdev configuration para-mater.
> > > See above comment on num_fids
> > > 
> > > > > +        "  -s, --num_stages=N           Use N atomic stages (default
> > > 1)\n"
> > > > > +        "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core
> > > mask\n"
> > > > > +        "  -w, --worker-mask=core mask  Run worker on CPUs in core
> > > mask\n"
> > > > > +        "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core
> > > mask\n"
> > > > > +        "  -e  --sched-mask=core mask   Run scheduler on CPUs in core
> > > mask\n"
> > > > > +        "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
> > > > > +        "  -W  --work-cycles=N          Worker cycles (default 0)\n"
> > > > > +        "  -P  --queue-priority         Enable scheduler queue
> > > prioritization\n"
> > > > > +        "  -o, --ordered                Use ordered scheduling\n"
> > > > > +        "  -p, --parallel               Use parallel scheduling\n"
> > > > IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
> > > > operation. It is valid have to any combination. We need to express that in
> > > > command like
> > > > example:
> > > > 3 stage with
> > > > O->A->P
> > > How about we add an option that specifies the mode of operation for each
> > > stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4
> > > stages with atomic, parallel, paralled, ordered. Or maybe reuse your
> > > test-eventdev parameter style?
> > Any scheme is fine.
> > 
> > > > > +        "  -q, --quiet                  Minimize printed output\n"
> > > > > +        "  -D, --dump                   Print detailed statistics before
> > > exit"
> > > > > +        "\n";
> > > > > +    fprintf(stderr, "%s", usage_str);
> > > > > +    exit(1);
> > > > > +}
> > > > > +
> > > > [...]
> > > > 
> > > > > +            rx_single = (popcnt == 1);
> > > > > +            break;
> > > > > +        case 't':
> > > > > +            tx_lcore_mask = parse_coremask(optarg);
> > > > > +            popcnt = __builtin_popcountll(tx_lcore_mask);
> > > > > +            tx_single = (popcnt == 1);
> > > > > +            break;
> > > > > +        case 'e':
> > > > > +            sched_lcore_mask = parse_coremask(optarg);
> > > > > +            popcnt = __builtin_popcountll(sched_lcore_mask);
> > > > > +            sched_single = (popcnt == 1);
> > > > > +            break;
> > > > > +        default:
> > > > > +            usage();
> > > > > +        }
> > > > > +    }
> > > > > +
> > > > > +    if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
> > > > > +        sched_lcore_mask == 0 || tx_lcore_mask == 0) {
> > > > > +
> > > > > +    /* Q creation - one load balanced per pipeline stage*/
> > > > > +
> > > > > +    /* set up one port per worker, linking to all stage queues */
> > > > > +    for (i = 0; i < num_workers; i++) {
> > > > > +        struct worker_data *w = &worker_data[i];
> > > > > +        w->dev_id = dev_id;
> > > > > +        if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
> > > > > +            printf("Error setting up port %d\n", i);
> > > > > +            return -1;
> > > > > +        }
> > > > > +
> > > > > +        uint32_t s;
> > > > > +        for (s = 0; s < num_stages; s++) {
> > > > > +            if (rte_event_port_link(dev_id, i,
> > > > > +                        &worker_queues[s].queue_id,
> > > > > +                        &worker_queues[s].priority,
> > > > > +                        1) != 1) {
> > > > > +                printf("%d: error creating link for port %d\n",
> > > > > +                        __LINE__, i);
> > > > > +                return -1;
> > > > > +            }
> > > > > +        }
> > > > > +        w->port_id = i;
> > > > > +    }
> > > > > +    /* port for consumer, linked to TX queue */
> > > > > +    if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
> > > > If ethdev supports MT txq queue support then this port can be linked to
> > > > worker too. something to consider for future.
> > > > 
> > > Sure. No change for now.
> > OK
> 
> Just to add a comment for any remaining comments above, we would hope that
> none of them are blockers for the merge of the current version, as they can
> be patched in the future as the infrastructure changes.
> 
> Rgds,
> Dave.
> 
>
  
Hunt, David June 29, 2017, 12:51 p.m. UTC | #11
On 29/6/2017 8:17 AM, Jerin Jacob wrote:
> -----Original Message-----
>> Date: Tue, 27 Jun 2017 14:12:20 +0100
>> From: "Hunt, David" <david.hunt@intel.com>
>> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
>> CC: Harry van Haaren <harry.van.haaren@intel.com>, dev@dpdk.org, Gage Eads
>>   <gage.eads@intel.com>, Bruce Richardson <bruce.richardson@intel.com>
>> Subject: Re: [dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added
>>   sample app
>> User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101
>>   Thunderbird/45.8.0
>>
>> Hi Jerin:
>>
>>
>> On 27/6/2017 10:35 AM, Jerin Jacob wrote:
>>> -----Original Message-----
>>>> Date: Mon, 26 Jun 2017 15:46:47 +0100
>>>> From: "Hunt, David" <david.hunt@intel.com>
>>>> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>, Harry van Haaren
>>>>    <harry.van.haaren@intel.com>
>>>> CC: dev@dpdk.org, Gage Eads <gage.eads@intel.com>, Bruce Richardson
>>>>    <bruce.richardson@intel.com>
>>>> Subject: Re: [dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added
>>>>    sample app
>>>> User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64; rv:45.0) Gecko/20100101
>>>>    Thunderbird/45.8.0
>>>>
>>>> Hi Jerin,
>>> Hi David,
>>>
>>> Looks like you have sent the old version. The below mentioned comments
>>> are not addressed in v2.
>> Oops. Glitch in the Matrix. I've just pushed a V3 with the changes.
>>
>>>> I'm assisting Harry on the sample app, and have just pushed up a V2 patch
>>>> based on your feedback. I've addressed most of your suggestions, comments
>>>> below. There may still a couple of outstanding questions that need further
>>>> discussion.
>>> A few general comments:
>>> 1) Nikhil/Gage's proposal on ethdev rx to eventdev adapter will change the major
>>> portion(eventdev setup and producer()) of this application
>>> 2) Producing one lcore worth of packets, really cant show as example
>>> eventdev application as it will be pretty bad in low-end machine.
>>> At least application infrastructure should not limit.
>>>
>>> Considering above points, Should we wait for rx adapter to complete
>>> first? I would like to show this as real world application to use eventdev.
>>>
>>> Thoughts?
>>>
>>> On the same note:
>>> Can we finalize on rx adapter proposal? I can work on v1 of patch and
>>> common code if Nikhil or Gage don't have bandwidth. Let me know?
>>>
>>> last followup:
>>> http://dpdk.org/ml/archives/dev/2017-June/068776.html
>> I had a quick chat with Harry, and wonder if we'd be as well to merge the
>> app as it is now, and as the new frameworks become available, the app can be
>> updated to make use of them? I feel it would be better to have something out
>> there for people to play with than waiting for 17.11.
> I agree with your concern.
> How about renaming the test and doc specific to SW PMD and then once we
> fix the known issues with HW eventdev + ethdev(Rx adapter) integration and then
> rename the application to generic eventdev.
>

Sure. I'll rename the app as 'eventdev_pipeline_sw' and push a v4. we 
can rename back once it's generic.

Rgds,
Dave.


--snip--
  

Patch

diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile
new file mode 100644
index 0000000..bab8916
--- /dev/null
+++ b/examples/eventdev_pipeline/Makefile
@@ -0,0 +1,49 @@ 
+#   BSD LICENSE
+#
+#   Copyright(c) 2016 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = eventdev_pipeline
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
new file mode 100644
index 0000000..618e078
--- /dev/null
+++ b/examples/eventdev_pipeline/main.c
@@ -0,0 +1,975 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+#include <sched.h>
+#include <stdbool.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+
+#define MAX_NUM_STAGES 8
+#define BATCH_SIZE 16
+#define MAX_NUM_CORE 64
+
+static unsigned int active_cores;
+static unsigned int num_workers;
+static unsigned long num_packets = (1L << 25); /* do ~32M packets */
+static unsigned int num_fids = 512;
+static unsigned int num_priorities = 1;
+static unsigned int num_stages = 1;
+static unsigned int worker_cq_depth = 16;
+static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
+static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
+static int16_t qid[MAX_NUM_STAGES] = {-1};
+static int worker_cycles;
+static int enable_queue_priorities;
+
+struct prod_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	int32_t qid;
+	unsigned num_nic_ports;
+};
+
+struct cons_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+};
+
+static struct prod_data prod_data;
+static struct cons_data cons_data;
+
+struct worker_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+};
+
+static unsigned *enqueue_cnt;
+static unsigned *dequeue_cnt;
+
+static volatile int done;
+static volatile int prod_stop;
+static int quiet;
+static int dump_dev;
+static int dump_dev_signal;
+
+static uint32_t rx_lock;
+static uint32_t tx_lock;
+static uint32_t sched_lock;
+static bool rx_single;
+static bool tx_single;
+static bool sched_single;
+
+static unsigned rx_core[MAX_NUM_CORE];
+static unsigned tx_core[MAX_NUM_CORE];
+static unsigned sched_core[MAX_NUM_CORE];
+static unsigned worker_core[MAX_NUM_CORE];
+
+static bool
+core_in_use(unsigned lcore_id) {
+	return (rx_core[lcore_id] || sched_core[lcore_id] ||
+		tx_core[lcore_id] || worker_core[lcore_id]);
+}
+
+static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+
+static void
+rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
+			void *userdata)
+{
+	int port_id = (uintptr_t) userdata;
+	unsigned _sent = 0;
+
+	do {
+		/* Note: hard-coded TX queue */
+		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
+					  unsent - _sent);
+	} while (_sent != unsent);
+}
+
+static int
+consumer(void)
+{
+	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+	struct rte_event packets[BATCH_SIZE];
+
+	static uint64_t npackets;
+	static uint64_t received;
+	static uint64_t received_printed;
+	static uint64_t time_printed;
+	static uint64_t start_time;
+	unsigned i, j;
+	uint8_t dev_id = cons_data.dev_id;
+	uint8_t port_id = cons_data.port_id;
+
+	if (!npackets)
+		npackets = num_packets;
+
+	do {
+		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+				packets, RTE_DIM(packets), 0);
+
+		if (n == 0) {
+			for (j = 0; j < rte_eth_dev_count(); j++)
+				rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
+			return 0;
+		}
+		if (start_time == 0)
+			time_printed = start_time = rte_get_timer_cycles();
+
+		received += n;
+		for (i = 0; i < n; i++) {
+			uint8_t outport = packets[i].mbuf->port;
+			rte_eth_tx_buffer(outport, 0, tx_buf[outport],
+					packets[i].mbuf);
+		}
+
+		if (!quiet && received >= received_printed + (1<<22)) {
+			const uint64_t now = rte_get_timer_cycles();
+			const uint64_t delta_cycles = now - start_time;
+			const uint64_t elapsed_ms = delta_cycles / freq_khz;
+			const uint64_t interval_ms =
+					(now - time_printed) / freq_khz;
+
+			uint64_t rx_noprint = received - received_printed;
+			printf("# consumer RX=%"PRIu64", time %"PRIu64
+				"ms, avg %.3f mpps [current %.3f mpps]\n",
+					received, elapsed_ms,
+					(received) / (elapsed_ms * 1000.0),
+					rx_noprint / (interval_ms * 1000.0));
+			received_printed = received;
+			time_printed = now;
+		}
+
+		dequeue_cnt[0] += n;
+
+		if (num_packets > 0 && npackets > 0) {
+			npackets -= n;
+			if (npackets == 0 || npackets > num_packets)
+				done = 1;
+		}
+	} while (0);
+
+	return 0;
+}
+
+static int
+producer(void)
+{
+	static uint8_t eth_port;
+	struct rte_mbuf *mbufs[BATCH_SIZE];
+	struct rte_event ev[BATCH_SIZE];
+	uint32_t i, num_ports = prod_data.num_nic_ports;
+	int32_t qid = prod_data.qid;
+	uint8_t dev_id = prod_data.dev_id;
+	uint8_t port_id = prod_data.port_id;
+	uint32_t prio_idx = 0;
+
+	const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
+	if (++eth_port == num_ports)
+		eth_port = 0;
+	if (nb_rx == 0) {
+		rte_pause();
+		return 0;
+	}
+
+	for (i = 0; i < nb_rx; i++) {
+		ev[i].flow_id = mbufs[i]->hash.rss;
+		ev[i].op = RTE_EVENT_OP_NEW;
+		ev[i].sched_type = queue_type;
+		ev[i].queue_id = qid;
+		ev[i].event_type = RTE_EVENT_TYPE_CPU;
+		ev[i].sub_event_type = 0;
+		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
+		ev[i].mbuf = mbufs[i];
+		RTE_SET_USED(prio_idx);
+	}
+
+	const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);
+	if (nb_tx != nb_rx) {
+		for (i = nb_tx; i < nb_rx; i++)
+			rte_pktmbuf_free(mbufs[i]);
+	}
+	enqueue_cnt[0] += nb_tx;
+
+	if (unlikely(prod_stop))
+		done = 1;
+
+	return 0;
+}
+
+static inline void
+schedule_devices(uint8_t dev_id, unsigned lcore_id)
+{
+	if (rx_core[lcore_id] && (rx_single ||
+	    rte_atomic32_cmpset(&rx_lock, 0, 1))) {
+		producer();
+		rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
+	}
+
+	if (sched_core[lcore_id] && (sched_single ||
+	    rte_atomic32_cmpset(&sched_lock, 0, 1))) {
+		rte_event_schedule(dev_id);
+		if (dump_dev_signal) {
+			rte_event_dev_dump(0, stdout);
+			dump_dev_signal = 0;
+		}
+		rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
+	}
+
+	if (tx_core[lcore_id] && (tx_single ||
+	    rte_atomic32_cmpset(&tx_lock, 0, 1))) {
+		consumer();
+		rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
+	}
+}
+
+static int
+worker(void *arg)
+{
+	struct rte_event events[BATCH_SIZE];
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t dev_id = data->dev_id;
+	uint8_t port_id = data->port_id;
+	size_t sent = 0, received = 0;
+	unsigned lcore_id = rte_lcore_id();
+
+	while (!done) {
+		uint16_t i;
+
+		schedule_devices(dev_id, lcore_id);
+
+		if (!worker_core[lcore_id]) {
+			rte_pause();
+			continue;
+		}
+
+		uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+				events, RTE_DIM(events), 0);
+
+		if (nb_rx == 0) {
+			rte_pause();
+			continue;
+		}
+		received += nb_rx;
+
+		for (i = 0; i < nb_rx; i++) {
+			struct ether_hdr *eth;
+			struct ether_addr addr;
+			struct rte_mbuf *m = events[i].mbuf;
+
+			/* The first worker stage does classification */
+			if (events[i].queue_id == qid[0])
+				events[i].flow_id = m->hash.rss % num_fids;
+
+			events[i].queue_id = next_qid[events[i].queue_id];
+			events[i].op = RTE_EVENT_OP_FORWARD;
+
+			/* change mac addresses on packet (to use mbuf data) */
+			eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
+			ether_addr_copy(&eth->d_addr, &addr);
+			ether_addr_copy(&eth->s_addr, &eth->d_addr);
+			ether_addr_copy(&addr, &eth->s_addr);
+
+			/* do a number of cycles of work per packet */
+			volatile uint64_t start_tsc = rte_rdtsc();
+			while (rte_rdtsc() < start_tsc + worker_cycles)
+				rte_pause();
+		}
+		uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+				events, nb_rx);
+		while (nb_tx < nb_rx && !done)
+			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+							events + nb_tx,
+							nb_rx - nb_tx);
+		sent += nb_tx;
+	}
+
+	if (!quiet)
+		printf("  worker %u thread done. RX=%zu TX=%zu\n",
+				rte_lcore_id(), received, sent);
+
+	return 0;
+}
+
+/*
+ * Parse the coremask given as argument (hexadecimal string) and fill
+ * the global configuration (core role and core count) with the parsed
+ * value.
+ */
+static int xdigit2val(unsigned char c)
+{
+	int val;
+
+	if (isdigit(c))
+		val = c - '0';
+	else if (isupper(c))
+		val = c - 'A' + 10;
+	else
+		val = c - 'a' + 10;
+	return val;
+}
+
+static uint64_t
+parse_coremask(const char *coremask)
+{
+	int i, j, idx = 0;
+	unsigned count = 0;
+	char c;
+	int val;
+	uint64_t mask = 0;
+	const int32_t BITS_HEX = 4;
+
+	if (coremask == NULL)
+		return -1;
+	/* Remove all blank characters ahead and after .
+	 * Remove 0x/0X if exists.
+	 */
+	while (isblank(*coremask))
+		coremask++;
+	if (coremask[0] == '0' && ((coremask[1] == 'x')
+		|| (coremask[1] == 'X')))
+		coremask += 2;
+	i = strlen(coremask);
+	while ((i > 0) && isblank(coremask[i - 1]))
+		i--;
+	if (i == 0)
+		return -1;
+
+	for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
+		c = coremask[i];
+		if (isxdigit(c) == 0) {
+			/* invalid characters */
+			return -1;
+		}
+		val = xdigit2val(c);
+		for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
+			if ((1 << j) & val) {
+				mask |= (1UL << idx);
+				count++;
+			}
+		}
+	}
+	for (; i >= 0; i--)
+		if (coremask[i] != '0')
+			return -1;
+	if (count == 0)
+		return -1;
+	return mask;
+}
+
+static struct option long_options[] = {
+	{"workers", required_argument, 0, 'w'},
+	{"packets", required_argument, 0, 'n'},
+	{"atomic-flows", required_argument, 0, 'f'},
+	{"num_stages", required_argument, 0, 's'},
+	{"rx-mask", required_argument, 0, 'r'},
+	{"tx-mask", required_argument, 0, 't'},
+	{"sched-mask", required_argument, 0, 'e'},
+	{"cq-depth", required_argument, 0, 'c'},
+	{"work-cycles", required_argument, 0, 'W'},
+	{"queue-priority", no_argument, 0, 'P'},
+	{"parallel", no_argument, 0, 'p'},
+	{"ordered", no_argument, 0, 'o'},
+	{"quiet", no_argument, 0, 'q'},
+	{"dump", no_argument, 0, 'D'},
+	{0, 0, 0, 0}
+};
+
+static void
+usage(void)
+{
+	const char *usage_str =
+		"  Usage: eventdev_demo [options]\n"
+		"  Options:\n"
+		"  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
+		"  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
+		"  -s, --num_stages=N           Use N atomic stages (default 1)\n"
+		"  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
+		"  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
+		"  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
+		"  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
+		"  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
+		"  -W  --work-cycles=N          Worker cycles (default 0)\n"
+		"  -P  --queue-priority         Enable scheduler queue prioritization\n"
+		"  -o, --ordered                Use ordered scheduling\n"
+		"  -p, --parallel               Use parallel scheduling\n"
+		"  -q, --quiet                  Minimize printed output\n"
+		"  -D, --dump                   Print detailed statistics before exit"
+		"\n";
+	fprintf(stderr, "%s", usage_str);
+	exit(1);
+}
+
+static void
+parse_app_args(int argc, char **argv)
+{
+	/* Parse cli options*/
+	int option_index;
+	int c;
+	opterr = 0;
+	uint64_t rx_lcore_mask = 0;
+	uint64_t tx_lcore_mask = 0;
+	uint64_t sched_lcore_mask = 0;
+	uint64_t worker_lcore_mask = 0;
+	int i;
+
+	for (;;) {
+		c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:poPqDW:",
+				long_options, &option_index);
+		if (c == -1)
+			break;
+
+		int popcnt = 0;
+		switch (c) {
+		case 'n':
+			num_packets = (unsigned long)atol(optarg);
+			break;
+		case 'f':
+			num_fids = (unsigned int)atoi(optarg);
+			break;
+		case 's':
+			num_stages = (unsigned int)atoi(optarg);
+			break;
+		case 'c':
+			worker_cq_depth = (unsigned int)atoi(optarg);
+			break;
+		case 'W':
+			worker_cycles = (unsigned int)atoi(optarg);
+			break;
+		case 'P':
+			enable_queue_priorities = 1;
+			break;
+		case 'o':
+			queue_type = RTE_EVENT_QUEUE_CFG_ORDERED_ONLY;
+			break;
+		case 'p':
+			queue_type = RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY;
+			break;
+		case 'q':
+			quiet = 1;
+			break;
+		case 'D':
+			dump_dev = 1;
+			break;
+		case 'w':
+			worker_lcore_mask = parse_coremask(optarg);
+			break;
+		case 'r':
+			rx_lcore_mask = parse_coremask(optarg);
+			popcnt = __builtin_popcountll(rx_lcore_mask);
+			rx_single = (popcnt == 1);
+			break;
+		case 't':
+			tx_lcore_mask = parse_coremask(optarg);
+			popcnt = __builtin_popcountll(tx_lcore_mask);
+			tx_single = (popcnt == 1);
+			break;
+		case 'e':
+			sched_lcore_mask = parse_coremask(optarg);
+			popcnt = __builtin_popcountll(sched_lcore_mask);
+			sched_single = (popcnt == 1);
+			break;
+		default:
+			usage();
+		}
+	}
+
+	if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
+	    sched_lcore_mask == 0 || tx_lcore_mask == 0) {
+		printf("Core part of pipeline was not assigned any cores. "
+			"This will stall the pipeline, please check core masks "
+			"(use -h for details on setting core masks):\n"
+			"\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
+			"\n\tworkers: %"PRIu64"\n",
+			rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
+			worker_lcore_mask);
+		rte_exit(-1, "Fix core masks\n");
+	}
+	if (num_stages == 0 || num_stages > MAX_NUM_STAGES)
+		usage();
+
+	for (i = 0; i < MAX_NUM_CORE; i++) {
+		rx_core[i] = !!(rx_lcore_mask & (1UL << i));
+		tx_core[i] = !!(tx_lcore_mask & (1UL << i));
+		sched_core[i] = !!(sched_lcore_mask & (1UL << i));
+		worker_core[i] = !!(worker_lcore_mask & (1UL << i));
+
+		if (worker_core[i])
+			num_workers++;
+		if (core_in_use(i))
+			active_cores++;
+	}
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IP |
+					  ETH_RSS_TCP |
+					  ETH_RSS_UDP,
+			}
+		}
+	};
+	const uint16_t rx_rings = 1, tx_rings = 1;
+	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+	struct rte_eth_conf port_conf = port_conf_default;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	/* Configure the Ethernet device. */
+	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	/* Allocate and set up 1 RX queue per Ethernet port. */
+	for (q = 0; q < rx_rings; q++) {
+		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Allocate and set up 1 TX queue per Ethernet port. */
+	for (q = 0; q < tx_rings; q++) {
+		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+				rte_eth_dev_socket_id(port), NULL);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Start the Ethernet port. */
+	retval = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	/* Display the port MAC address. */
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+			(unsigned)port,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	/* Enable RX in promiscuous mode for the Ethernet device. */
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+static int
+init_ports(unsigned num_ports)
+{
+	uint8_t portid;
+	unsigned i;
+
+	struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+			/* mbufs */ 16384 * num_ports,
+			/* cache_size */ 512,
+			/* priv_size*/ 0,
+			/* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+			rte_socket_id());
+
+	for (portid = 0; portid < num_ports; portid++)
+		if (port_init(portid, mp) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+					portid);
+
+	for (i = 0; i < num_ports; i++) {
+		void *userdata = (void *)(uintptr_t) i;
+		tx_buf[i] = rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+		if (tx_buf[i] == NULL)
+			rte_panic("Out of memory\n");
+		rte_eth_tx_buffer_init(tx_buf[i], 32);
+		rte_eth_tx_buffer_set_err_callback(tx_buf[i],
+						   rte_eth_tx_buffer_retry,
+						   userdata);
+	}
+
+	return 0;
+}
+
+struct port_link {
+	uint8_t queue_id;
+	uint8_t priority;
+};
+
+static int
+setup_eventdev(struct prod_data *prod_data,
+		struct cons_data *cons_data,
+		struct worker_data *worker_data)
+{
+	const uint8_t dev_id = 0;
+	/* +1 stages is for a SINGLE_LINK TX stage */
+	const uint8_t nb_queues = num_stages + 1;
+	/* + 2 is one port for producer and one for consumer */
+	const uint8_t nb_ports = num_workers + 2;
+	const struct rte_event_dev_config config = {
+			.nb_event_queues = nb_queues,
+			.nb_event_ports = nb_ports,
+			.nb_events_limit  = 4096,
+			.nb_event_queue_flows = 1024,
+			.nb_event_port_dequeue_depth = 128,
+			.nb_event_port_enqueue_depth = 128,
+	};
+	const struct rte_event_port_conf wkr_p_conf = {
+			.dequeue_depth = worker_cq_depth,
+			.enqueue_depth = 64,
+			.new_event_threshold = 4096,
+	};
+	struct rte_event_queue_conf wkr_q_conf = {
+			.event_queue_cfg = queue_type,
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.nb_atomic_flows = 1024,
+			.nb_atomic_order_sequences = 1024,
+	};
+	const struct rte_event_port_conf tx_p_conf = {
+			.dequeue_depth = 128,
+			.enqueue_depth = 128,
+			.new_event_threshold = 4096,
+	};
+	const struct rte_event_queue_conf tx_q_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
+			.event_queue_cfg =
+					RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
+					RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
+			.nb_atomic_flows = 1024,
+			.nb_atomic_order_sequences = 1024,
+	};
+
+	struct port_link worker_queues[MAX_NUM_STAGES];
+	struct port_link tx_queue;
+	unsigned i;
+
+	int ret, ndev = rte_event_dev_count();
+	if (ndev < 1) {
+		printf("%d: No Eventdev Devices Found\n", __LINE__);
+		return -1;
+	}
+
+	struct rte_event_dev_info dev_info;
+	ret = rte_event_dev_info_get(dev_id, &dev_info);
+	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+
+	ret = rte_event_dev_configure(dev_id, &config);
+	if (ret < 0)
+		printf("%d: Error configuring device\n", __LINE__);
+
+	/* Q creation - one load balanced per pipeline stage*/
+	printf("  Stages:\n");
+	for (i = 0; i < num_stages; i++) {
+		if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
+			printf("%d: error creating qid %d\n", __LINE__, i);
+			return -1;
+		}
+		qid[i] = i;
+		next_qid[i] = i+1;
+		worker_queues[i].queue_id = i;
+		if (enable_queue_priorities) {
+			/* calculate priority stepping for each stage, leaving
+			 * headroom of 1 for the SINGLE_LINK TX below
+			 */
+			const uint32_t prio_delta =
+				(RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
+
+			/* higher priority for queues closer to tx */
+			wkr_q_conf.priority =
+				RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
+		}
+
+		const char *type_str = "Atomic";
+		switch (wkr_q_conf.event_queue_cfg) {
+		case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY:
+			type_str = "Ordered";
+			break;
+		case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY:
+			type_str = "Parallel";
+			break;
+		}
+		printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
+				wkr_q_conf.priority);
+	}
+	printf("\n");
+
+	/* final queue for sending to TX core */
+	if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, i);
+		return -1;
+	}
+	tx_queue.queue_id = i;
+	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+
+	/* set up one port per worker, linking to all stage queues */
+	for (i = 0; i < num_workers; i++) {
+		struct worker_data *w = &worker_data[i];
+		w->dev_id = dev_id;
+		if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
+			printf("Error setting up port %d\n", i);
+			return -1;
+		}
+
+		uint32_t s;
+		for (s = 0; s < num_stages; s++) {
+			if (rte_event_port_link(dev_id, i,
+						&worker_queues[s].queue_id,
+						&worker_queues[s].priority,
+						1) != 1) {
+				printf("%d: error creating link for port %d\n",
+						__LINE__, i);
+				return -1;
+			}
+		}
+		w->port_id = i;
+	}
+	/* port for consumer, linked to TX queue */
+	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
+		printf("Error setting up port %d\n", i);
+		return -1;
+	}
+	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
+				&tx_queue.priority, 1) != 1) {
+		printf("%d: error creating link for port %d\n",
+				__LINE__, i);
+		return -1;
+	}
+	/* port for producer, no links */
+	const struct rte_event_port_conf rx_p_conf = {
+			.dequeue_depth = 8,
+			.enqueue_depth = 8,
+			.new_event_threshold = 1200,
+	};
+	if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
+		printf("Error setting up port %d\n", i);
+		return -1;
+	}
+
+	*prod_data = (struct prod_data){.dev_id = dev_id,
+					.port_id = i + 1,
+					.qid = qid[0] };
+	*cons_data = (struct cons_data){.dev_id = dev_id,
+					.port_id = i };
+
+	enqueue_cnt = rte_calloc(0,
+			RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])),
+			sizeof(enqueue_cnt[0]), 0);
+	dequeue_cnt = rte_calloc(0,
+			RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])),
+			sizeof(dequeue_cnt[0]), 0);
+
+	if (rte_event_dev_start(dev_id) < 0) {
+		printf("Error starting eventdev\n");
+		return -1;
+	}
+
+	return dev_id;
+}
+
+static void
+signal_handler(int signum)
+{
+	if (done || prod_stop)
+		rte_exit(1, "Exiting on signal %d\n", signum);
+	if (signum == SIGINT || signum == SIGTERM) {
+		printf("\n\nSignal %d received, preparing to exit...\n",
+				signum);
+		done = 1;
+	}
+	if (signum == SIGTSTP)
+		rte_event_dev_dump(0, stdout);
+}
+
+int
+main(int argc, char **argv)
+{
+	struct worker_data *worker_data;
+	unsigned num_ports;
+	int lcore_id;
+	int err;
+
+	signal(SIGINT, signal_handler);
+	signal(SIGTERM, signal_handler);
+	signal(SIGTSTP, signal_handler);
+
+	err = rte_eal_init(argc, argv);
+	if (err < 0)
+		rte_panic("Invalid EAL arguments\n");
+
+	argc -= err;
+	argv += err;
+
+	/* Parse cli options*/
+	parse_app_args(argc, argv);
+
+	num_ports = rte_eth_dev_count();
+	if (num_ports == 0)
+		rte_panic("No ethernet ports found\n");
+
+	const unsigned cores_needed = active_cores;
+
+	if (!quiet) {
+		printf("  Config:\n");
+		printf("\tports: %u\n", num_ports);
+		printf("\tworkers: %u\n", num_workers);
+		printf("\tpackets: %lu\n", num_packets);
+		printf("\tpriorities: %u\n", num_priorities);
+		printf("\tQueue-prio: %u\n", enable_queue_priorities);
+		if (queue_type == RTE_EVENT_QUEUE_CFG_ORDERED_ONLY)
+			printf("\tqid0 type: ordered\n");
+		if (queue_type == RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY)
+			printf("\tqid0 type: atomic\n");
+		printf("\tCores available: %u\n", rte_lcore_count());
+		printf("\tCores used: %u\n", cores_needed);
+	}
+
+	if (rte_lcore_count() < cores_needed)
+		rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
+				cores_needed);
+
+	const unsigned ndevs = rte_event_dev_count();
+	if (ndevs == 0)
+		rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
+	if (ndevs > 1)
+		fprintf(stderr, "Warning: More than one eventdev, using idx 0");
+
+	worker_data = rte_calloc(0, num_workers, sizeof(worker_data[0]), 0);
+	if (worker_data == NULL)
+		rte_panic("rte_calloc failed\n");
+
+	int dev_id = setup_eventdev(&prod_data, &cons_data, worker_data);
+	if (dev_id < 0)
+		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
+
+	prod_data.num_nic_ports = num_ports;
+	init_ports(num_ports);
+
+	int worker_idx = 0;
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		if (lcore_id >= MAX_NUM_CORE)
+			break;
+
+		if (!rx_core[lcore_id] && !worker_core[lcore_id] &&
+		    !tx_core[lcore_id] && !sched_core[lcore_id])
+			continue;
+
+		if (rx_core[lcore_id])
+			printf(
+				"[%s()] lcore %d executing NIC Rx, and using eventdev port %u\n",
+				__func__, lcore_id, prod_data.port_id);
+
+		if (tx_core[lcore_id])
+			printf(
+				"[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
+				__func__, lcore_id, cons_data.port_id);
+
+		if (sched_core[lcore_id])
+			printf("[%s()] lcore %d executing scheduler\n",
+					__func__, lcore_id);
+
+		if (worker_core[lcore_id])
+			printf(
+				"[%s()] lcore %d executing worker, using eventdev port %u\n",
+				__func__, lcore_id,
+				worker_data[worker_idx].port_id);
+
+		err = rte_eal_remote_launch(worker, &worker_data[worker_idx],
+					    lcore_id);
+		if (err) {
+			rte_panic("Failed to launch worker on core %d\n",
+					lcore_id);
+			continue;
+		}
+		if (worker_core[lcore_id])
+			worker_idx++;
+	}
+
+	lcore_id = rte_lcore_id();
+
+	if (core_in_use(lcore_id))
+		worker(&worker_data[worker_idx++]);
+
+	rte_eal_mp_wait_lcore();
+
+	if (dump_dev)
+		rte_event_dev_dump(dev_id, stdout);
+
+	if (!quiet) {
+		printf("\nPort Workload distribution:\n");
+		uint32_t i;
+		uint64_t tot_pkts = 0;
+		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
+		for (i = 0; i < num_workers; i++) {
+			char statname[64];
+			snprintf(statname, sizeof(statname), "port_%u_rx",
+					worker_data[i].port_id);
+			pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
+					dev_id, statname, NULL);
+			tot_pkts += pkts_per_wkr[i];
+		}
+		for (i = 0; i < num_workers; i++) {
+			float pc = pkts_per_wkr[i]  * 100 /
+				((float)tot_pkts);
+			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
+					i, pc, pkts_per_wkr[i]);
+		}
+
+	}
+
+	return 0;
+}