[v2,2/2] qos: rearrange enqueue procedure

Message ID 20210318115613.5503-2-konstantin.ananyev@intel.com (mailing list archive)
State Rejected, archived
Delegated to: Thomas Monjalon
Headers
Series [v2,1/2] examples/qos_sched: fixup colors value overrun |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/travis-robot success travis build: passed
ci/github-robot success github build: passed
ci/iol-abi-testing success Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-intel-Performance success Performance Testing PASS
ci/intel-Testing success Testing PASS
ci/iol-testing success Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS

Commit Message

Ananyev, Konstantin March 18, 2021, 11:56 a.m. UTC
In many usage scenarios input mbufs for rte_sched_port_enqueue()
are not yet in the CPU cache(s). That causes quite significant stalls
due to memory latency. Current implementation tries to migitate it
using SW pipeline and SW prefetch techniques, but stalls are still present.
Rework rte_sched_port_enqueue() to do actual fetch of all mbufs
metadata as a first stage of that function.
That helps to minimise load stalls at further stages of enqueue()
and improves overall enqueue performance.
With examples/qos_sched I observed:
on ICX box: up to 30% cycles reduction
on CSX AND BDX: 20-15% cycles reduction
I also run tests with mbufs already in the cache
(one core doing RX, QOS and TX).
With such scenario, on all mentioned above IA boxes
no performance drop was observed.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
v2: fix clang and checkpatch complains
---
 lib/librte_sched/rte_sched.c | 219 +++++------------------------------
 1 file changed, 31 insertions(+), 188 deletions(-)
  

Comments

Singh, Jasvinder April 2, 2021, 2:14 p.m. UTC | #1
> -----Original Message-----
> From: Ananyev, Konstantin <konstantin.ananyev@intel.com>
> Sent: Thursday, March 18, 2021 11:56 AM
> To: dev@dpdk.org
> Cc: Dumitrescu, Cristian <cristian.dumitrescu@intel.com>; Singh, Jasvinder
> <jasvinder.singh@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>
> Subject: [PATCH v2 2/2] qos: rearrange enqueue procedure
> 
> In many usage scenarios input mbufs for rte_sched_port_enqueue() are not
> yet in the CPU cache(s). That causes quite significant stalls due to memory
> latency. Current implementation tries to migitate it using SW pipeline and SW
> prefetch techniques, but stalls are still present.
> Rework rte_sched_port_enqueue() to do actual fetch of all mbufs metadata
> as a first stage of that function.
> That helps to minimise load stalls at further stages of enqueue() and
> improves overall enqueue performance.
> With examples/qos_sched I observed:
> on ICX box: up to 30% cycles reduction
> on CSX AND BDX: 20-15% cycles reduction
> I also run tests with mbufs already in the cache (one core doing RX, QOS and
> TX).
> With such scenario, on all mentioned above IA boxes no performance drop
> was observed.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
> v2: fix clang and checkpatch complains
> ---
>  lib/librte_sched/rte_sched.c | 219 +++++------------------------------
>  1 file changed, 31 insertions(+), 188 deletions(-)
> 
> diff --git a/lib/librte_sched/rte_sched.c b/lib/librte_sched/rte_sched.c index
> 7c5688068..41ef147e0 100644
> --- a/lib/librte_sched/rte_sched.c
> +++ b/lib/librte_sched/rte_sched.c
> @@ -1861,24 +1861,23 @@ debug_check_queue_slab(struct
> rte_sched_subport *subport, uint32_t bmp_pos,  #endif /*
> RTE_SCHED_DEBUG */
> 
>  static inline struct rte_sched_subport * -rte_sched_port_subport(struct
> rte_sched_port *port,
> -	struct rte_mbuf *pkt)
> +sched_port_subport(const struct rte_sched_port *port, struct
> +rte_mbuf_sched sch)
>  {
> -	uint32_t queue_id = rte_mbuf_sched_queue_get(pkt);
> +	uint32_t queue_id = sch.queue_id;
>  	uint32_t subport_id = queue_id >> (port-
> >n_pipes_per_subport_log2 + 4);
> 
>  	return port->subports[subport_id];
>  }
> 
>  static inline uint32_t
> -rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_subport
> *subport,
> -	struct rte_mbuf *pkt, uint32_t subport_qmask)
> +sched_port_enqueue_qptrs_prefetch0(const struct rte_sched_subport
> *subport,
> +	struct rte_mbuf_sched sch, uint32_t subport_qmask)
>  {
>  	struct rte_sched_queue *q;
>  #ifdef RTE_SCHED_COLLECT_STATS
>  	struct rte_sched_queue_extra *qe;
>  #endif
> -	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
> +	uint32_t qindex = sch.queue_id;
>  	uint32_t subport_queue_id = subport_qmask & qindex;
> 
>  	q = subport->queue + subport_queue_id; @@ -1971,197 +1970,41
> @@ int  rte_sched_port_enqueue(struct rte_sched_port *port, struct
> rte_mbuf **pkts,
>  		       uint32_t n_pkts)
>  {
> -	struct rte_mbuf *pkt00, *pkt01, *pkt10, *pkt11, *pkt20, *pkt21,
> -		*pkt30, *pkt31, *pkt_last;
> -	struct rte_mbuf **q00_base, **q01_base, **q10_base,
> **q11_base,
> -		**q20_base, **q21_base, **q30_base, **q31_base,
> **q_last_base;
> -	struct rte_sched_subport *subport00, *subport01, *subport10,
> *subport11,
> -		*subport20, *subport21, *subport30, *subport31,
> *subport_last;
> -	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
> -	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
> -	uint32_t subport_qmask;
>  	uint32_t result, i;
> +	struct rte_mbuf_sched sch[n_pkts];
> +	struct rte_sched_subport *subports[n_pkts];
> +	struct rte_mbuf **q_base[n_pkts];
> +	uint32_t q[n_pkts];
> +
> +	const uint32_t subport_qmask =
> +		(1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> 
>  	result = 0;
> -	subport_qmask = (1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> 
> -	/*
> -	 * Less then 6 input packets available, which is not enough to
> -	 * feed the pipeline
> -	 */
> -	if (unlikely(n_pkts < 6)) {
> -		struct rte_sched_subport *subports[5];
> -		struct rte_mbuf **q_base[5];
> -		uint32_t q[5];
> -
> -		/* Prefetch the mbuf structure of each packet */
> -		for (i = 0; i < n_pkts; i++)
> -			rte_prefetch0(pkts[i]);
> -
> -		/* Prefetch the subport structure for each packet */
> -		for (i = 0; i < n_pkts; i++)
> -			subports[i] = rte_sched_port_subport(port, pkts[i]);
> -
> -		/* Prefetch the queue structure for each queue */
> -		for (i = 0; i < n_pkts; i++)
> -			q[i] =
> rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
> -					pkts[i], subport_qmask);
> -
> -		/* Prefetch the write pointer location of each queue */
> -		for (i = 0; i < n_pkts; i++) {
> -			q_base[i] =
> rte_sched_subport_pipe_qbase(subports[i], q[i]);
> -			rte_sched_port_enqueue_qwa_prefetch0(port,
> subports[i],
> -				q[i], q_base[i]);
> -		}
> +	/* Prefetch the mbuf structure of each packet */
> +	for (i = 0; i < n_pkts; i++)
> +		sch[i] = pkts[i]->hash.sched;
> 

Hi Konstantin,  thanks for the patch. In above case, all packets are touched straight with any prefetch. If we consider the input burst size of 64 pkts, it means 512 bytes of packet addresses  (8 cache-lines) which is likely to be available in cache. For larger size burst, e.g. 128 or 256, there might be instances when some addresses are not available the cache, may stall core. How about adding explicit prefetch before starting to iterate through the packets if that helps?
  
Cristian Dumitrescu April 2, 2021, 9:12 p.m. UTC | #2
> -----Original Message-----
> From: Singh, Jasvinder <jasvinder.singh@intel.com>
> Sent: Friday, April 2, 2021 3:15 PM
> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org
> Cc: Dumitrescu, Cristian <cristian.dumitrescu@intel.com>
> Subject: RE: [PATCH v2 2/2] qos: rearrange enqueue procedure
> 
> 
> 
> > -----Original Message-----
> > From: Ananyev, Konstantin <konstantin.ananyev@intel.com>
> > Sent: Thursday, March 18, 2021 11:56 AM
> > To: dev@dpdk.org
> > Cc: Dumitrescu, Cristian <cristian.dumitrescu@intel.com>; Singh, Jasvinder
> > <jasvinder.singh@intel.com>; Ananyev, Konstantin
> > <konstantin.ananyev@intel.com>
> > Subject: [PATCH v2 2/2] qos: rearrange enqueue procedure
> >
> > In many usage scenarios input mbufs for rte_sched_port_enqueue() are
> not
> > yet in the CPU cache(s). That causes quite significant stalls due to memory
> > latency. Current implementation tries to migitate it using SW pipeline and
> SW
> > prefetch techniques, but stalls are still present.
> > Rework rte_sched_port_enqueue() to do actual fetch of all mbufs
> metadata
> > as a first stage of that function.
> > That helps to minimise load stalls at further stages of enqueue() and
> > improves overall enqueue performance.
> > With examples/qos_sched I observed:
> > on ICX box: up to 30% cycles reduction
> > on CSX AND BDX: 20-15% cycles reduction
> > I also run tests with mbufs already in the cache (one core doing RX, QOS
> and
> > TX).
> > With such scenario, on all mentioned above IA boxes no performance drop
> > was observed.
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > ---
> > v2: fix clang and checkpatch complains
> > ---
> >  lib/librte_sched/rte_sched.c | 219 +++++------------------------------
> >  1 file changed, 31 insertions(+), 188 deletions(-)
> >
> > diff --git a/lib/librte_sched/rte_sched.c b/lib/librte_sched/rte_sched.c
> index
> > 7c5688068..41ef147e0 100644
> > --- a/lib/librte_sched/rte_sched.c
> > +++ b/lib/librte_sched/rte_sched.c
> > @@ -1861,24 +1861,23 @@ debug_check_queue_slab(struct
> > rte_sched_subport *subport, uint32_t bmp_pos,  #endif /*
> > RTE_SCHED_DEBUG */
> >
> >  static inline struct rte_sched_subport * -rte_sched_port_subport(struct
> > rte_sched_port *port,
> > -	struct rte_mbuf *pkt)
> > +sched_port_subport(const struct rte_sched_port *port, struct
> > +rte_mbuf_sched sch)
> >  {
> > -	uint32_t queue_id = rte_mbuf_sched_queue_get(pkt);
> > +	uint32_t queue_id = sch.queue_id;
> >  	uint32_t subport_id = queue_id >> (port-
> > >n_pipes_per_subport_log2 + 4);
> >
> >  	return port->subports[subport_id];
> >  }
> >
> >  static inline uint32_t
> > -rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_subport
> > *subport,
> > -	struct rte_mbuf *pkt, uint32_t subport_qmask)
> > +sched_port_enqueue_qptrs_prefetch0(const struct rte_sched_subport
> > *subport,
> > +	struct rte_mbuf_sched sch, uint32_t subport_qmask)
> >  {
> >  	struct rte_sched_queue *q;
> >  #ifdef RTE_SCHED_COLLECT_STATS
> >  	struct rte_sched_queue_extra *qe;
> >  #endif
> > -	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
> > +	uint32_t qindex = sch.queue_id;
> >  	uint32_t subport_queue_id = subport_qmask & qindex;
> >
> >  	q = subport->queue + subport_queue_id; @@ -1971,197 +1970,41
> > @@ int  rte_sched_port_enqueue(struct rte_sched_port *port, struct
> > rte_mbuf **pkts,
> >  		       uint32_t n_pkts)
> >  {
> > -	struct rte_mbuf *pkt00, *pkt01, *pkt10, *pkt11, *pkt20, *pkt21,
> > -		*pkt30, *pkt31, *pkt_last;
> > -	struct rte_mbuf **q00_base, **q01_base, **q10_base,
> > **q11_base,
> > -		**q20_base, **q21_base, **q30_base, **q31_base,
> > **q_last_base;
> > -	struct rte_sched_subport *subport00, *subport01, *subport10,
> > *subport11,
> > -		*subport20, *subport21, *subport30, *subport31,
> > *subport_last;
> > -	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
> > -	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
> > -	uint32_t subport_qmask;
> >  	uint32_t result, i;
> > +	struct rte_mbuf_sched sch[n_pkts];
> > +	struct rte_sched_subport *subports[n_pkts];
> > +	struct rte_mbuf **q_base[n_pkts];
> > +	uint32_t q[n_pkts];
> > +
> > +	const uint32_t subport_qmask =
> > +		(1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> >
> >  	result = 0;
> > -	subport_qmask = (1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> >
> > -	/*
> > -	 * Less then 6 input packets available, which is not enough to
> > -	 * feed the pipeline
> > -	 */
> > -	if (unlikely(n_pkts < 6)) {
> > -		struct rte_sched_subport *subports[5];
> > -		struct rte_mbuf **q_base[5];
> > -		uint32_t q[5];
> > -
> > -		/* Prefetch the mbuf structure of each packet */
> > -		for (i = 0; i < n_pkts; i++)
> > -			rte_prefetch0(pkts[i]);
> > -
> > -		/* Prefetch the subport structure for each packet */
> > -		for (i = 0; i < n_pkts; i++)
> > -			subports[i] = rte_sched_port_subport(port, pkts[i]);
> > -
> > -		/* Prefetch the queue structure for each queue */
> > -		for (i = 0; i < n_pkts; i++)
> > -			q[i] =
> > rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
> > -					pkts[i], subport_qmask);
> > -
> > -		/* Prefetch the write pointer location of each queue */
> > -		for (i = 0; i < n_pkts; i++) {
> > -			q_base[i] =
> > rte_sched_subport_pipe_qbase(subports[i], q[i]);
> > -			rte_sched_port_enqueue_qwa_prefetch0(port,
> > subports[i],
> > -				q[i], q_base[i]);
> > -		}
> > +	/* Prefetch the mbuf structure of each packet */
> > +	for (i = 0; i < n_pkts; i++)
> > +		sch[i] = pkts[i]->hash.sched;
> >
> 
> Hi Konstantin,  thanks for the patch. In above case, all packets are touched
> straight with any prefetch. If we consider the input burst size of 64 pkts, it
> means 512 bytes of packet addresses  (8 cache-lines) which is likely to be
> available in cache. For larger size burst, e.g. 128 or 256, there might be
> instances when some addresses are not available the cache, may stall core.
> How about adding explicit prefetch before starting to iterate through the
> packets if that helps?

Exactly. Konstantin, you might not be a fan of prefetches, but the current enqueue implementation (as well as the dequeue) uses a prefetch state machine. Please keep the prefetch state machine in the scalar code. Even if the examples/qos_sched might not show an advantage, this is just a sample app and there are some more relevant use-cases as well.

Thanks,
Cristian
  
Ananyev, Konstantin April 3, 2021, 11:53 p.m. UTC | #3
Hi guys,

> > > In many usage scenarios input mbufs for rte_sched_port_enqueue() are
> > not
> > > yet in the CPU cache(s). That causes quite significant stalls due to memory
> > > latency. Current implementation tries to migitate it using SW pipeline and
> > SW
> > > prefetch techniques, but stalls are still present.
> > > Rework rte_sched_port_enqueue() to do actual fetch of all mbufs
> > metadata
> > > as a first stage of that function.
> > > That helps to minimise load stalls at further stages of enqueue() and
> > > improves overall enqueue performance.
> > > With examples/qos_sched I observed:
> > > on ICX box: up to 30% cycles reduction
> > > on CSX AND BDX: 20-15% cycles reduction
> > > I also run tests with mbufs already in the cache (one core doing RX, QOS
> > and
> > > TX).
> > > With such scenario, on all mentioned above IA boxes no performance drop
> > > was observed.
> > >
> > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > > ---
> > > v2: fix clang and checkpatch complains
> > > ---
> > >  lib/librte_sched/rte_sched.c | 219 +++++------------------------------
> > >  1 file changed, 31 insertions(+), 188 deletions(-)
> > >
> > > diff --git a/lib/librte_sched/rte_sched.c b/lib/librte_sched/rte_sched.c
> > index
> > > 7c5688068..41ef147e0 100644
> > > --- a/lib/librte_sched/rte_sched.c
> > > +++ b/lib/librte_sched/rte_sched.c
> > > @@ -1861,24 +1861,23 @@ debug_check_queue_slab(struct
> > > rte_sched_subport *subport, uint32_t bmp_pos,  #endif /*
> > > RTE_SCHED_DEBUG */
> > >
> > >  static inline struct rte_sched_subport * -rte_sched_port_subport(struct
> > > rte_sched_port *port,
> > > -	struct rte_mbuf *pkt)
> > > +sched_port_subport(const struct rte_sched_port *port, struct
> > > +rte_mbuf_sched sch)
> > >  {
> > > -	uint32_t queue_id = rte_mbuf_sched_queue_get(pkt);
> > > +	uint32_t queue_id = sch.queue_id;
> > >  	uint32_t subport_id = queue_id >> (port-
> > > >n_pipes_per_subport_log2 + 4);
> > >
> > >  	return port->subports[subport_id];
> > >  }
> > >
> > >  static inline uint32_t
> > > -rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_subport
> > > *subport,
> > > -	struct rte_mbuf *pkt, uint32_t subport_qmask)
> > > +sched_port_enqueue_qptrs_prefetch0(const struct rte_sched_subport
> > > *subport,
> > > +	struct rte_mbuf_sched sch, uint32_t subport_qmask)
> > >  {
> > >  	struct rte_sched_queue *q;
> > >  #ifdef RTE_SCHED_COLLECT_STATS
> > >  	struct rte_sched_queue_extra *qe;
> > >  #endif
> > > -	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
> > > +	uint32_t qindex = sch.queue_id;
> > >  	uint32_t subport_queue_id = subport_qmask & qindex;
> > >
> > >  	q = subport->queue + subport_queue_id; @@ -1971,197 +1970,41
> > > @@ int  rte_sched_port_enqueue(struct rte_sched_port *port, struct
> > > rte_mbuf **pkts,
> > >  		       uint32_t n_pkts)
> > >  {
> > > -	struct rte_mbuf *pkt00, *pkt01, *pkt10, *pkt11, *pkt20, *pkt21,
> > > -		*pkt30, *pkt31, *pkt_last;
> > > -	struct rte_mbuf **q00_base, **q01_base, **q10_base,
> > > **q11_base,
> > > -		**q20_base, **q21_base, **q30_base, **q31_base,
> > > **q_last_base;
> > > -	struct rte_sched_subport *subport00, *subport01, *subport10,
> > > *subport11,
> > > -		*subport20, *subport21, *subport30, *subport31,
> > > *subport_last;
> > > -	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
> > > -	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
> > > -	uint32_t subport_qmask;
> > >  	uint32_t result, i;
> > > +	struct rte_mbuf_sched sch[n_pkts];
> > > +	struct rte_sched_subport *subports[n_pkts];
> > > +	struct rte_mbuf **q_base[n_pkts];
> > > +	uint32_t q[n_pkts];
> > > +
> > > +	const uint32_t subport_qmask =
> > > +		(1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> > >
> > >  	result = 0;
> > > -	subport_qmask = (1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> > >
> > > -	/*
> > > -	 * Less then 6 input packets available, which is not enough to
> > > -	 * feed the pipeline
> > > -	 */
> > > -	if (unlikely(n_pkts < 6)) {
> > > -		struct rte_sched_subport *subports[5];
> > > -		struct rte_mbuf **q_base[5];
> > > -		uint32_t q[5];
> > > -
> > > -		/* Prefetch the mbuf structure of each packet */
> > > -		for (i = 0; i < n_pkts; i++)
> > > -			rte_prefetch0(pkts[i]);
> > > -
> > > -		/* Prefetch the subport structure for each packet */
> > > -		for (i = 0; i < n_pkts; i++)
> > > -			subports[i] = rte_sched_port_subport(port, pkts[i]);
> > > -
> > > -		/* Prefetch the queue structure for each queue */
> > > -		for (i = 0; i < n_pkts; i++)
> > > -			q[i] =
> > > rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
> > > -					pkts[i], subport_qmask);
> > > -
> > > -		/* Prefetch the write pointer location of each queue */
> > > -		for (i = 0; i < n_pkts; i++) {
> > > -			q_base[i] =
> > > rte_sched_subport_pipe_qbase(subports[i], q[i]);
> > > -			rte_sched_port_enqueue_qwa_prefetch0(port,
> > > subports[i],
> > > -				q[i], q_base[i]);
> > > -		}
> > > +	/* Prefetch the mbuf structure of each packet */
> > > +	for (i = 0; i < n_pkts; i++)
> > > +		sch[i] = pkts[i]->hash.sched;
> > >
> >
> > Hi Konstantin,  thanks for the patch. In above case, all packets are touched
> > straight with any prefetch. If we consider the input burst size of 64 pkts, it
> > means 512 bytes of packet addresses  (8 cache-lines) which is likely to be
> > available in cache. For larger size burst, e.g. 128 or 256, there might be
> > instances when some addresses are not available the cache, may stall core.
> > How about adding explicit prefetch before starting to iterate through the
> > packets if that helps?

I don't think we need any prefetch here.
pkts[] is a sequential array, HW prefetcher should be able to do good job here.
Again in majority of use-cases pkts[] contents will already present in the cache.
Though there is a valid concern here: n_pkts can be big, in that case we probably
don't want to store too much on the stack and read too much from pkts[].
It is better to work in some fixed chunks (64 or so).
I can prepare v2 with these changes, if there still is an interest in this patch.   

> Exactly. Konstantin, you might not be a fan of prefetches, but the current enqueue implementation (as well as the dequeue) uses a prefetch
> state machine. Please keep the prefetch state machine in the scalar code.

It is not about our own preferences.
From my measurements new version is faster and it is definitely simpler.

> Even if the examples/qos_sched might not show an advantage,
> this is just a sample app and there are some more relevant use-cases as well.

Well, I hope that examples/qos_sched reflects at least some real-world use-cases for QOS library.
Otherwise why do we have it inside DPDK codebase? 
About 'more relevant use-cases': if you do know such, can you try them with the patch?
I would really appreciate that.
In fact, it is an ask not only to Cristian, but to all other interested parties:
if your app does use librte_sched - please try this patch and provide the feedback.
If some tests would flag a regression - I am absolutely ok to drop the patch.
Konstantin
  
Thomas Monjalon Nov. 17, 2021, 10:33 a.m. UTC | #4
Hi all,
What is the conclusion for this patch and the number 1/2?

04/04/2021 01:53, Ananyev, Konstantin:
> 
> Hi guys,
> 
> > > > In many usage scenarios input mbufs for rte_sched_port_enqueue() are
> > > not
> > > > yet in the CPU cache(s). That causes quite significant stalls due to memory
> > > > latency. Current implementation tries to migitate it using SW pipeline and
> > > SW
> > > > prefetch techniques, but stalls are still present.
> > > > Rework rte_sched_port_enqueue() to do actual fetch of all mbufs
> > > metadata
> > > > as a first stage of that function.
> > > > That helps to minimise load stalls at further stages of enqueue() and
> > > > improves overall enqueue performance.
> > > > With examples/qos_sched I observed:
> > > > on ICX box: up to 30% cycles reduction
> > > > on CSX AND BDX: 20-15% cycles reduction
> > > > I also run tests with mbufs already in the cache (one core doing RX, QOS
> > > and
> > > > TX).
> > > > With such scenario, on all mentioned above IA boxes no performance drop
> > > > was observed.
> > > >
> > > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > > > ---
> > > > v2: fix clang and checkpatch complains
> > > > ---
> > > >  lib/librte_sched/rte_sched.c | 219 +++++------------------------------
> > > >  1 file changed, 31 insertions(+), 188 deletions(-)
> > > >
> > > > diff --git a/lib/librte_sched/rte_sched.c b/lib/librte_sched/rte_sched.c
> > > index
> > > > 7c5688068..41ef147e0 100644
> > > > --- a/lib/librte_sched/rte_sched.c
> > > > +++ b/lib/librte_sched/rte_sched.c
> > > > @@ -1861,24 +1861,23 @@ debug_check_queue_slab(struct
> > > > rte_sched_subport *subport, uint32_t bmp_pos,  #endif /*
> > > > RTE_SCHED_DEBUG */
> > > >
> > > >  static inline struct rte_sched_subport * -rte_sched_port_subport(struct
> > > > rte_sched_port *port,
> > > > -	struct rte_mbuf *pkt)
> > > > +sched_port_subport(const struct rte_sched_port *port, struct
> > > > +rte_mbuf_sched sch)
> > > >  {
> > > > -	uint32_t queue_id = rte_mbuf_sched_queue_get(pkt);
> > > > +	uint32_t queue_id = sch.queue_id;
> > > >  	uint32_t subport_id = queue_id >> (port-
> > > > >n_pipes_per_subport_log2 + 4);
> > > >
> > > >  	return port->subports[subport_id];
> > > >  }
> > > >
> > > >  static inline uint32_t
> > > > -rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_subport
> > > > *subport,
> > > > -	struct rte_mbuf *pkt, uint32_t subport_qmask)
> > > > +sched_port_enqueue_qptrs_prefetch0(const struct rte_sched_subport
> > > > *subport,
> > > > +	struct rte_mbuf_sched sch, uint32_t subport_qmask)
> > > >  {
> > > >  	struct rte_sched_queue *q;
> > > >  #ifdef RTE_SCHED_COLLECT_STATS
> > > >  	struct rte_sched_queue_extra *qe;
> > > >  #endif
> > > > -	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
> > > > +	uint32_t qindex = sch.queue_id;
> > > >  	uint32_t subport_queue_id = subport_qmask & qindex;
> > > >
> > > >  	q = subport->queue + subport_queue_id; @@ -1971,197 +1970,41
> > > > @@ int  rte_sched_port_enqueue(struct rte_sched_port *port, struct
> > > > rte_mbuf **pkts,
> > > >  		       uint32_t n_pkts)
> > > >  {
> > > > -	struct rte_mbuf *pkt00, *pkt01, *pkt10, *pkt11, *pkt20, *pkt21,
> > > > -		*pkt30, *pkt31, *pkt_last;
> > > > -	struct rte_mbuf **q00_base, **q01_base, **q10_base,
> > > > **q11_base,
> > > > -		**q20_base, **q21_base, **q30_base, **q31_base,
> > > > **q_last_base;
> > > > -	struct rte_sched_subport *subport00, *subport01, *subport10,
> > > > *subport11,
> > > > -		*subport20, *subport21, *subport30, *subport31,
> > > > *subport_last;
> > > > -	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
> > > > -	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
> > > > -	uint32_t subport_qmask;
> > > >  	uint32_t result, i;
> > > > +	struct rte_mbuf_sched sch[n_pkts];
> > > > +	struct rte_sched_subport *subports[n_pkts];
> > > > +	struct rte_mbuf **q_base[n_pkts];
> > > > +	uint32_t q[n_pkts];
> > > > +
> > > > +	const uint32_t subport_qmask =
> > > > +		(1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> > > >
> > > >  	result = 0;
> > > > -	subport_qmask = (1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> > > >
> > > > -	/*
> > > > -	 * Less then 6 input packets available, which is not enough to
> > > > -	 * feed the pipeline
> > > > -	 */
> > > > -	if (unlikely(n_pkts < 6)) {
> > > > -		struct rte_sched_subport *subports[5];
> > > > -		struct rte_mbuf **q_base[5];
> > > > -		uint32_t q[5];
> > > > -
> > > > -		/* Prefetch the mbuf structure of each packet */
> > > > -		for (i = 0; i < n_pkts; i++)
> > > > -			rte_prefetch0(pkts[i]);
> > > > -
> > > > -		/* Prefetch the subport structure for each packet */
> > > > -		for (i = 0; i < n_pkts; i++)
> > > > -			subports[i] = rte_sched_port_subport(port, pkts[i]);
> > > > -
> > > > -		/* Prefetch the queue structure for each queue */
> > > > -		for (i = 0; i < n_pkts; i++)
> > > > -			q[i] =
> > > > rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
> > > > -					pkts[i], subport_qmask);
> > > > -
> > > > -		/* Prefetch the write pointer location of each queue */
> > > > -		for (i = 0; i < n_pkts; i++) {
> > > > -			q_base[i] =
> > > > rte_sched_subport_pipe_qbase(subports[i], q[i]);
> > > > -			rte_sched_port_enqueue_qwa_prefetch0(port,
> > > > subports[i],
> > > > -				q[i], q_base[i]);
> > > > -		}
> > > > +	/* Prefetch the mbuf structure of each packet */
> > > > +	for (i = 0; i < n_pkts; i++)
> > > > +		sch[i] = pkts[i]->hash.sched;
> > > >
> > >
> > > Hi Konstantin,  thanks for the patch. In above case, all packets are touched
> > > straight with any prefetch. If we consider the input burst size of 64 pkts, it
> > > means 512 bytes of packet addresses  (8 cache-lines) which is likely to be
> > > available in cache. For larger size burst, e.g. 128 or 256, there might be
> > > instances when some addresses are not available the cache, may stall core.
> > > How about adding explicit prefetch before starting to iterate through the
> > > packets if that helps?
> 
> I don't think we need any prefetch here.
> pkts[] is a sequential array, HW prefetcher should be able to do good job here.
> Again in majority of use-cases pkts[] contents will already present in the cache.
> Though there is a valid concern here: n_pkts can be big, in that case we probably
> don't want to store too much on the stack and read too much from pkts[].
> It is better to work in some fixed chunks (64 or so).
> I can prepare v2 with these changes, if there still is an interest in this patch.   
> 
> > Exactly. Konstantin, you might not be a fan of prefetches, but the current enqueue implementation (as well as the dequeue) uses a prefetch
> > state machine. Please keep the prefetch state machine in the scalar code.
> 
> It is not about our own preferences.
> From my measurements new version is faster and it is definitely simpler.
> 
> > Even if the examples/qos_sched might not show an advantage,
> > this is just a sample app and there are some more relevant use-cases as well.
> 
> Well, I hope that examples/qos_sched reflects at least some real-world use-cases for QOS library.
> Otherwise why do we have it inside DPDK codebase? 
> About 'more relevant use-cases': if you do know such, can you try them with the patch?
> I would really appreciate that.
> In fact, it is an ask not only to Cristian, but to all other interested parties:
> if your app does use librte_sched - please try this patch and provide the feedback.
> If some tests would flag a regression - I am absolutely ok to drop the patch.
> Konstantin
  
Cristian Dumitrescu Nov. 17, 2021, 10:53 a.m. UTC | #5
> -----Original Message-----
> From: Thomas Monjalon <thomas@monjalon.net>
> Sent: Wednesday, November 17, 2021 10:33 AM
> To: Dumitrescu, Cristian <cristian.dumitrescu@intel.com>; Singh, Jasvinder
> <jasvinder.singh@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v2 2/2] qos: rearrange enqueue procedure
> 
> Hi all,
> What is the conclusion for this patch and the number 1/2?
> 
> 04/04/2021 01:53, Ananyev, Konstantin:
> >
> > Hi guys,
> >
> > > > > In many usage scenarios input mbufs for rte_sched_port_enqueue()
> are
> > > > not
> > > > > yet in the CPU cache(s). That causes quite significant stalls due to
> memory
> > > > > latency. Current implementation tries to migitate it using SW pipeline
> and
> > > > SW
> > > > > prefetch techniques, but stalls are still present.
> > > > > Rework rte_sched_port_enqueue() to do actual fetch of all mbufs
> > > > metadata
> > > > > as a first stage of that function.
> > > > > That helps to minimise load stalls at further stages of enqueue() and
> > > > > improves overall enqueue performance.
> > > > > With examples/qos_sched I observed:
> > > > > on ICX box: up to 30% cycles reduction
> > > > > on CSX AND BDX: 20-15% cycles reduction
> > > > > I also run tests with mbufs already in the cache (one core doing RX,
> QOS
> > > > and
> > > > > TX).
> > > > > With such scenario, on all mentioned above IA boxes no performance
> drop
> > > > > was observed.
> > > > >
> > > > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > > > > ---
> > > > > v2: fix clang and checkpatch complains
> > > > > ---
> > > > >  lib/librte_sched/rte_sched.c | 219 +++++------------------------------
> > > > >  1 file changed, 31 insertions(+), 188 deletions(-)
> > > > >
> > > > > diff --git a/lib/librte_sched/rte_sched.c
> b/lib/librte_sched/rte_sched.c
> > > > index
> > > > > 7c5688068..41ef147e0 100644
> > > > > --- a/lib/librte_sched/rte_sched.c
> > > > > +++ b/lib/librte_sched/rte_sched.c
> > > > > @@ -1861,24 +1861,23 @@ debug_check_queue_slab(struct
> > > > > rte_sched_subport *subport, uint32_t bmp_pos,  #endif /*
> > > > > RTE_SCHED_DEBUG */
> > > > >
> > > > >  static inline struct rte_sched_subport * -
> rte_sched_port_subport(struct
> > > > > rte_sched_port *port,
> > > > > -	struct rte_mbuf *pkt)
> > > > > +sched_port_subport(const struct rte_sched_port *port, struct
> > > > > +rte_mbuf_sched sch)
> > > > >  {
> > > > > -	uint32_t queue_id = rte_mbuf_sched_queue_get(pkt);
> > > > > +	uint32_t queue_id = sch.queue_id;
> > > > >  	uint32_t subport_id = queue_id >> (port-
> > > > > >n_pipes_per_subport_log2 + 4);
> > > > >
> > > > >  	return port->subports[subport_id];
> > > > >  }
> > > > >
> > > > >  static inline uint32_t
> > > > > -rte_sched_port_enqueue_qptrs_prefetch0(struct
> rte_sched_subport
> > > > > *subport,
> > > > > -	struct rte_mbuf *pkt, uint32_t subport_qmask)
> > > > > +sched_port_enqueue_qptrs_prefetch0(const struct
> rte_sched_subport
> > > > > *subport,
> > > > > +	struct rte_mbuf_sched sch, uint32_t subport_qmask)
> > > > >  {
> > > > >  	struct rte_sched_queue *q;
> > > > >  #ifdef RTE_SCHED_COLLECT_STATS
> > > > >  	struct rte_sched_queue_extra *qe;
> > > > >  #endif
> > > > > -	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
> > > > > +	uint32_t qindex = sch.queue_id;
> > > > >  	uint32_t subport_queue_id = subport_qmask & qindex;
> > > > >
> > > > >  	q = subport->queue + subport_queue_id; @@ -1971,197
> +1970,41
> > > > > @@ int  rte_sched_port_enqueue(struct rte_sched_port *port,
> struct
> > > > > rte_mbuf **pkts,
> > > > >  		       uint32_t n_pkts)
> > > > >  {
> > > > > -	struct rte_mbuf *pkt00, *pkt01, *pkt10, *pkt11, *pkt20,
> *pkt21,
> > > > > -		*pkt30, *pkt31, *pkt_last;
> > > > > -	struct rte_mbuf **q00_base, **q01_base, **q10_base,
> > > > > **q11_base,
> > > > > -		**q20_base, **q21_base, **q30_base, **q31_base,
> > > > > **q_last_base;
> > > > > -	struct rte_sched_subport *subport00, *subport01,
> *subport10,
> > > > > *subport11,
> > > > > -		*subport20, *subport21, *subport30, *subport31,
> > > > > *subport_last;
> > > > > -	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
> > > > > -	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
> > > > > -	uint32_t subport_qmask;
> > > > >  	uint32_t result, i;
> > > > > +	struct rte_mbuf_sched sch[n_pkts];
> > > > > +	struct rte_sched_subport *subports[n_pkts];
> > > > > +	struct rte_mbuf **q_base[n_pkts];
> > > > > +	uint32_t q[n_pkts];
> > > > > +
> > > > > +	const uint32_t subport_qmask =
> > > > > +		(1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
> > > > >
> > > > >  	result = 0;
> > > > > -	subport_qmask = (1 << (port->n_pipes_per_subport_log2 +
> 4)) - 1;
> > > > >
> > > > > -	/*
> > > > > -	 * Less then 6 input packets available, which is not enough to
> > > > > -	 * feed the pipeline
> > > > > -	 */
> > > > > -	if (unlikely(n_pkts < 6)) {
> > > > > -		struct rte_sched_subport *subports[5];
> > > > > -		struct rte_mbuf **q_base[5];
> > > > > -		uint32_t q[5];
> > > > > -
> > > > > -		/* Prefetch the mbuf structure of each packet */
> > > > > -		for (i = 0; i < n_pkts; i++)
> > > > > -			rte_prefetch0(pkts[i]);
> > > > > -
> > > > > -		/* Prefetch the subport structure for each packet */
> > > > > -		for (i = 0; i < n_pkts; i++)
> > > > > -			subports[i] = rte_sched_port_subport(port,
> pkts[i]);
> > > > > -
> > > > > -		/* Prefetch the queue structure for each queue */
> > > > > -		for (i = 0; i < n_pkts; i++)
> > > > > -			q[i] =
> > > > > rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
> > > > > -					pkts[i], subport_qmask);
> > > > > -
> > > > > -		/* Prefetch the write pointer location of each queue
> */
> > > > > -		for (i = 0; i < n_pkts; i++) {
> > > > > -			q_base[i] =
> > > > > rte_sched_subport_pipe_qbase(subports[i], q[i]);
> > > > > -
> 	rte_sched_port_enqueue_qwa_prefetch0(port,
> > > > > subports[i],
> > > > > -				q[i], q_base[i]);
> > > > > -		}
> > > > > +	/* Prefetch the mbuf structure of each packet */
> > > > > +	for (i = 0; i < n_pkts; i++)
> > > > > +		sch[i] = pkts[i]->hash.sched;
> > > > >
> > > >
> > > > Hi Konstantin,  thanks for the patch. In above case, all packets are
> touched
> > > > straight with any prefetch. If we consider the input burst size of 64 pkts,
> it
> > > > means 512 bytes of packet addresses  (8 cache-lines) which is likely to
> be
> > > > available in cache. For larger size burst, e.g. 128 or 256, there might be
> > > > instances when some addresses are not available the cache, may stall
> core.
> > > > How about adding explicit prefetch before starting to iterate through
> the
> > > > packets if that helps?
> >
> > I don't think we need any prefetch here.
> > pkts[] is a sequential array, HW prefetcher should be able to do good job
> here.
> > Again in majority of use-cases pkts[] contents will already present in the
> cache.
> > Though there is a valid concern here: n_pkts can be big, in that case we
> probably
> > don't want to store too much on the stack and read too much from pkts[].
> > It is better to work in some fixed chunks (64 or so).
> > I can prepare v2 with these changes, if there still is an interest in this patch.
> >
> > > Exactly. Konstantin, you might not be a fan of prefetches, but the current
> enqueue implementation (as well as the dequeue) uses a prefetch
> > > state machine. Please keep the prefetch state machine in the scalar code.
> >
> > It is not about our own preferences.
> > From my measurements new version is faster and it is definitely simpler.
> >
> > > Even if the examples/qos_sched might not show an advantage,
> > > this is just a sample app and there are some more relevant use-cases as
> well.
> >
> > Well, I hope that examples/qos_sched reflects at least some real-world
> use-cases for QOS library.
> > Otherwise why do we have it inside DPDK codebase?
> > About 'more relevant use-cases': if you do know such, can you try them
> with the patch?
> > I would really appreciate that.
> > In fact, it is an ask not only to Cristian, but to all other interested parties:
> > if your app does use librte_sched - please try this patch and provide the
> feedback.
> > If some tests would flag a regression - I am absolutely ok to drop the patch.
> > Konstantin
> 
> 
> 
Hi Thomas,

We are not going ahead with this at this moment. 

Regards,
Cristian
  

Patch

diff --git a/lib/librte_sched/rte_sched.c b/lib/librte_sched/rte_sched.c
index 7c5688068..41ef147e0 100644
--- a/lib/librte_sched/rte_sched.c
+++ b/lib/librte_sched/rte_sched.c
@@ -1861,24 +1861,23 @@  debug_check_queue_slab(struct rte_sched_subport *subport, uint32_t bmp_pos,
 #endif /* RTE_SCHED_DEBUG */
 
 static inline struct rte_sched_subport *
-rte_sched_port_subport(struct rte_sched_port *port,
-	struct rte_mbuf *pkt)
+sched_port_subport(const struct rte_sched_port *port, struct rte_mbuf_sched sch)
 {
-	uint32_t queue_id = rte_mbuf_sched_queue_get(pkt);
+	uint32_t queue_id = sch.queue_id;
 	uint32_t subport_id = queue_id >> (port->n_pipes_per_subport_log2 + 4);
 
 	return port->subports[subport_id];
 }
 
 static inline uint32_t
-rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_subport *subport,
-	struct rte_mbuf *pkt, uint32_t subport_qmask)
+sched_port_enqueue_qptrs_prefetch0(const struct rte_sched_subport *subport,
+	struct rte_mbuf_sched sch, uint32_t subport_qmask)
 {
 	struct rte_sched_queue *q;
 #ifdef RTE_SCHED_COLLECT_STATS
 	struct rte_sched_queue_extra *qe;
 #endif
-	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
+	uint32_t qindex = sch.queue_id;
 	uint32_t subport_queue_id = subport_qmask & qindex;
 
 	q = subport->queue + subport_queue_id;
@@ -1971,197 +1970,41 @@  int
 rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 		       uint32_t n_pkts)
 {
-	struct rte_mbuf *pkt00, *pkt01, *pkt10, *pkt11, *pkt20, *pkt21,
-		*pkt30, *pkt31, *pkt_last;
-	struct rte_mbuf **q00_base, **q01_base, **q10_base, **q11_base,
-		**q20_base, **q21_base, **q30_base, **q31_base, **q_last_base;
-	struct rte_sched_subport *subport00, *subport01, *subport10, *subport11,
-		*subport20, *subport21, *subport30, *subport31, *subport_last;
-	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
-	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
-	uint32_t subport_qmask;
 	uint32_t result, i;
+	struct rte_mbuf_sched sch[n_pkts];
+	struct rte_sched_subport *subports[n_pkts];
+	struct rte_mbuf **q_base[n_pkts];
+	uint32_t q[n_pkts];
+
+	const uint32_t subport_qmask =
+		(1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
 
 	result = 0;
-	subport_qmask = (1 << (port->n_pipes_per_subport_log2 + 4)) - 1;
 
-	/*
-	 * Less then 6 input packets available, which is not enough to
-	 * feed the pipeline
-	 */
-	if (unlikely(n_pkts < 6)) {
-		struct rte_sched_subport *subports[5];
-		struct rte_mbuf **q_base[5];
-		uint32_t q[5];
-
-		/* Prefetch the mbuf structure of each packet */
-		for (i = 0; i < n_pkts; i++)
-			rte_prefetch0(pkts[i]);
-
-		/* Prefetch the subport structure for each packet */
-		for (i = 0; i < n_pkts; i++)
-			subports[i] = rte_sched_port_subport(port, pkts[i]);
-
-		/* Prefetch the queue structure for each queue */
-		for (i = 0; i < n_pkts; i++)
-			q[i] = rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
-					pkts[i], subport_qmask);
-
-		/* Prefetch the write pointer location of each queue */
-		for (i = 0; i < n_pkts; i++) {
-			q_base[i] = rte_sched_subport_pipe_qbase(subports[i], q[i]);
-			rte_sched_port_enqueue_qwa_prefetch0(port, subports[i],
-				q[i], q_base[i]);
-		}
+	/* Prefetch the mbuf structure of each packet */
+	for (i = 0; i < n_pkts; i++)
+		sch[i] = pkts[i]->hash.sched;
 
-		/* Write each packet to its queue */
-		for (i = 0; i < n_pkts; i++)
-			result += rte_sched_port_enqueue_qwa(port, subports[i],
-						q[i], q_base[i], pkts[i]);
+	/* Prefetch the subport structure for each packet */
+	for (i = 0; i < n_pkts; i++)
+		subports[i] = sched_port_subport(port, sch[i]);
 
-		return result;
-	}
+	/* Prefetch the queue structure for each queue */
+	for (i = 0; i < n_pkts; i++)
+		q[i] = sched_port_enqueue_qptrs_prefetch0(subports[i],
+				sch[i], subport_qmask);
 
-	/* Feed the first 3 stages of the pipeline (6 packets needed) */
-	pkt20 = pkts[0];
-	pkt21 = pkts[1];
-	rte_prefetch0(pkt20);
-	rte_prefetch0(pkt21);
-
-	pkt10 = pkts[2];
-	pkt11 = pkts[3];
-	rte_prefetch0(pkt10);
-	rte_prefetch0(pkt11);
-
-	subport20 = rte_sched_port_subport(port, pkt20);
-	subport21 = rte_sched_port_subport(port, pkt21);
-	q20 = rte_sched_port_enqueue_qptrs_prefetch0(subport20,
-			pkt20, subport_qmask);
-	q21 = rte_sched_port_enqueue_qptrs_prefetch0(subport21,
-			pkt21, subport_qmask);
-
-	pkt00 = pkts[4];
-	pkt01 = pkts[5];
-	rte_prefetch0(pkt00);
-	rte_prefetch0(pkt01);
-
-	subport10 = rte_sched_port_subport(port, pkt10);
-	subport11 = rte_sched_port_subport(port, pkt11);
-	q10 = rte_sched_port_enqueue_qptrs_prefetch0(subport10,
-			pkt10, subport_qmask);
-	q11 = rte_sched_port_enqueue_qptrs_prefetch0(subport11,
-			pkt11, subport_qmask);
-
-	q20_base = rte_sched_subport_pipe_qbase(subport20, q20);
-	q21_base = rte_sched_subport_pipe_qbase(subport21, q21);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport20, q20, q20_base);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport21, q21, q21_base);
-
-	/* Run the pipeline */
-	for (i = 6; i < (n_pkts & (~1)); i += 2) {
-		/* Propagate stage inputs */
-		pkt30 = pkt20;
-		pkt31 = pkt21;
-		pkt20 = pkt10;
-		pkt21 = pkt11;
-		pkt10 = pkt00;
-		pkt11 = pkt01;
-		q30 = q20;
-		q31 = q21;
-		q20 = q10;
-		q21 = q11;
-		subport30 = subport20;
-		subport31 = subport21;
-		subport20 = subport10;
-		subport21 = subport11;
-		q30_base = q20_base;
-		q31_base = q21_base;
-
-		/* Stage 0: Get packets in */
-		pkt00 = pkts[i];
-		pkt01 = pkts[i + 1];
-		rte_prefetch0(pkt00);
-		rte_prefetch0(pkt01);
-
-		/* Stage 1: Prefetch subport and queue structure storing queue pointers */
-		subport10 = rte_sched_port_subport(port, pkt10);
-		subport11 = rte_sched_port_subport(port, pkt11);
-		q10 = rte_sched_port_enqueue_qptrs_prefetch0(subport10,
-				pkt10, subport_qmask);
-		q11 = rte_sched_port_enqueue_qptrs_prefetch0(subport11,
-				pkt11, subport_qmask);
-
-		/* Stage 2: Prefetch queue write location */
-		q20_base = rte_sched_subport_pipe_qbase(subport20, q20);
-		q21_base = rte_sched_subport_pipe_qbase(subport21, q21);
-		rte_sched_port_enqueue_qwa_prefetch0(port, subport20, q20, q20_base);
-		rte_sched_port_enqueue_qwa_prefetch0(port, subport21, q21, q21_base);
-
-		/* Stage 3: Write packet to queue and activate queue */
-		r30 = rte_sched_port_enqueue_qwa(port, subport30,
-				q30, q30_base, pkt30);
-		r31 = rte_sched_port_enqueue_qwa(port, subport31,
-				q31, q31_base, pkt31);
-		result += r30 + r31;
-	}
-
-	/*
-	 * Drain the pipeline (exactly 6 packets).
-	 * Handle the last packet in the case
-	 * of an odd number of input packets.
-	 */
-	pkt_last = pkts[n_pkts - 1];
-	rte_prefetch0(pkt_last);
-
-	subport00 = rte_sched_port_subport(port, pkt00);
-	subport01 = rte_sched_port_subport(port, pkt01);
-	q00 = rte_sched_port_enqueue_qptrs_prefetch0(subport00,
-			pkt00, subport_qmask);
-	q01 = rte_sched_port_enqueue_qptrs_prefetch0(subport01,
-			pkt01, subport_qmask);
-
-	q10_base = rte_sched_subport_pipe_qbase(subport10, q10);
-	q11_base = rte_sched_subport_pipe_qbase(subport11, q11);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport10, q10, q10_base);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport11, q11, q11_base);
-
-	r20 = rte_sched_port_enqueue_qwa(port, subport20,
-			q20, q20_base, pkt20);
-	r21 = rte_sched_port_enqueue_qwa(port, subport21,
-			q21, q21_base, pkt21);
-	result += r20 + r21;
-
-	subport_last = rte_sched_port_subport(port, pkt_last);
-	q_last = rte_sched_port_enqueue_qptrs_prefetch0(subport_last,
-				pkt_last, subport_qmask);
-
-	q00_base = rte_sched_subport_pipe_qbase(subport00, q00);
-	q01_base = rte_sched_subport_pipe_qbase(subport01, q01);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport00, q00, q00_base);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport01, q01, q01_base);
-
-	r10 = rte_sched_port_enqueue_qwa(port, subport10, q10,
-			q10_base, pkt10);
-	r11 = rte_sched_port_enqueue_qwa(port, subport11, q11,
-			q11_base, pkt11);
-	result += r10 + r11;
-
-	q_last_base = rte_sched_subport_pipe_qbase(subport_last, q_last);
-	rte_sched_port_enqueue_qwa_prefetch0(port, subport_last,
-		q_last, q_last_base);
-
-	r00 = rte_sched_port_enqueue_qwa(port, subport00, q00,
-			q00_base, pkt00);
-	r01 = rte_sched_port_enqueue_qwa(port, subport01, q01,
-			q01_base, pkt01);
-	result += r00 + r01;
-
-	if (n_pkts & 1) {
-		r_last = rte_sched_port_enqueue_qwa(port, subport_last,
-					q_last,	q_last_base, pkt_last);
-		result += r_last;
+	/* Prefetch the write pointer location of each queue */
+	for (i = 0; i < n_pkts; i++) {
+		q_base[i] = rte_sched_subport_pipe_qbase(subports[i], q[i]);
+		rte_sched_port_enqueue_qwa_prefetch0(port, subports[i],
+			q[i], q_base[i]);
 	}
 
+	/* Write each packet to its queue */
+	for (i = 0; i < n_pkts; i++)
+		result += rte_sched_port_enqueue_qwa(port, subports[i],
+					q[i], q_base[i], pkts[i]);
 	return result;
 }