[dpdk-dev,v6,15/23] eventtimer: add buffering of timer expiry events

Message ID 1515630074-29020-16-git-send-email-erik.g.carrillo@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Carrillo, Erik G Jan. 11, 2018, 12:21 a.m. UTC
  Buffer timer expiry events generated while walking a "run list"
in rte_timer_manage, and burst enqueue them to an event device
to the extent possible.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 118 +++++++++++++++++++++++---
 1 file changed, 108 insertions(+), 10 deletions(-)
  

Comments

Pavan Nikhilesh Jan. 11, 2018, 12:18 p.m. UTC | #1
On Wed, Jan 10, 2018 at 06:21:06PM -0600, Erik Gabriel Carrillo wrote:
> Buffer timer expiry events generated while walking a "run list"
> in rte_timer_manage, and burst enqueue them to an event device
> to the extent possible.
>

IMO in some cases this adds a lot of delay between expiries and events being
published to event dev. For example, having long expiry interval (default 300
seconds for mac expiry) the expired entries would remain in the buffer till 32
other entries expire.


> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
> ---
>  lib/librte_eventdev/rte_event_timer_adapter.c | 118 +++++++++++++++++++++++---
>  1 file changed, 108 insertions(+), 10 deletions(-)
>
<snip>
  
Carrillo, Erik G Jan. 18, 2018, 11:07 p.m. UTC | #2
> -----Original Message-----
> From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> Sent: Thursday, January 11, 2018 6:19 AM
> To: Carrillo, Erik G <erik.g.carrillo@intel.com>;
> jerin.jacob@caviumnetworks.com; nipun.gupta@nxp.com;
> hemant.agrawal@nxp.com
> Cc: dev@dpdk.org
> Subject: Re: [PATCH v6 15/23] eventtimer: add buffering of timer expiry
> events
> 
> On Wed, Jan 10, 2018 at 06:21:06PM -0600, Erik Gabriel Carrillo wrote:
> > Buffer timer expiry events generated while walking a "run list"
> > in rte_timer_manage, and burst enqueue them to an event device to the
> > extent possible.
> >
> 
> IMO in some cases this adds a lot of delay between expiries and events being
> published to event dev. For example, having long expiry interval (default 300
> seconds for mac expiry) the expired entries would remain in the buffer till 32
> other entries expire.
> 

The service function invokes rte_timer_manage to handle expired timers, and as it does so, the buffer will be flushed under two conditions:  the buffer is full of expired timer events, or the buffer is not full but there are no more expired timers to handle for this iteration of the service.  The latter condition will flush the buffer even if only one event has been buffered after walking the list of expired rte_timers.  

So, there could be some delay for the events that got buffered earliest, but it seems like the throughput benefit outweighs the small delay there.  Thoughts?

We could also make the buffer size configurable.  

> 
> > Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
> > ---
> >  lib/librte_eventdev/rte_event_timer_adapter.c | 118
> > +++++++++++++++++++++++---
> >  1 file changed, 108 insertions(+), 10 deletions(-)
> >
> <snip>
  
Pavan Nikhilesh Jan. 20, 2018, 8:55 a.m. UTC | #3
On Thu, Jan 18, 2018 at 11:07:52PM +0000, Carrillo, Erik G wrote:
> > -----Original Message-----
> > From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> > Sent: Thursday, January 11, 2018 6:19 AM
> > To: Carrillo, Erik G <erik.g.carrillo@intel.com>;
> > jerin.jacob@caviumnetworks.com; nipun.gupta@nxp.com;
> > hemant.agrawal@nxp.com
> > Cc: dev@dpdk.org
> > Subject: Re: [PATCH v6 15/23] eventtimer: add buffering of timer expiry
> > events
> >
> > On Wed, Jan 10, 2018 at 06:21:06PM -0600, Erik Gabriel Carrillo wrote:
> > > Buffer timer expiry events generated while walking a "run list"
> > > in rte_timer_manage, and burst enqueue them to an event device to the
> > > extent possible.
> > >
> >
> > IMO in some cases this adds a lot of delay between expiries and events being
> > published to event dev. For example, having long expiry interval (default 300
> > seconds for mac expiry) the expired entries would remain in the buffer till 32
> > other entries expire.
> >
>
> The service function invokes rte_timer_manage to handle expired timers, and as it does so, the buffer will be flushed under two conditions:  the buffer is full of expired timer events, or the buffer is not full but there are no more expired timers to handle for this iteration of the service.  The latter condition will flush the buffer even if only one event has been buffered after walking the list of expired rte_timers.

Ah, I missed the flush call after timer_manage().

>
> So, there could be some delay for the events that got buffered earliest, but it seems like the throughput benefit outweighs the small delay there.  Thoughts?
>
> We could also make the buffer size configurable.

Maybe make it compile time configurable i.e. in config/common_base.

>
> >
> > > Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
> > > ---
> > >  lib/librte_eventdev/rte_event_timer_adapter.c | 118
> > > +++++++++++++++++++++++---
> > >  1 file changed, 108 insertions(+), 10 deletions(-)
> > >
> > <snip>
  

Patch

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 8bd9ebc..176cc5b 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -447,7 +447,93 @@  rte_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
 }
 
 /*
- * Software event timer adapter ops definitions
+ * Software event timer adapter buffer helper functions
+ */
+
+#define EVENT_BUFFER_SZ 1024
+#if EVENT_BUFFER_SZ < 1 || EVENT_BUFFER_SZ > 65536
+#error "EVENT_BUFFER_SZ must be between 1 and 65536"
+#endif
+
+#define EVENT_BUFFER_BATCHSZ 32
+
+struct event_buffer {
+	uint16_t head;
+	uint16_t tail;
+	uint16_t count;
+	struct rte_event events[EVENT_BUFFER_SZ];
+} __rte_cache_aligned;
+
+static inline bool
+event_buffer_full(struct event_buffer *bufp)
+{
+	return (bufp->tail + 1) % EVENT_BUFFER_SZ == bufp->head;
+}
+
+static inline bool
+event_buffer_batch_ready(struct event_buffer *bufp)
+{
+	return bufp->count >= EVENT_BUFFER_BATCHSZ;
+}
+
+static void
+event_buffer_init(struct event_buffer *bufp)
+{
+	bufp->head = bufp->tail = bufp->count = 0;
+	memset(&bufp->events, 0, sizeof(struct rte_event) * EVENT_BUFFER_SZ);
+}
+
+static int
+event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
+{
+	uint16_t *tailp = &bufp->tail;
+	struct rte_event *buf_eventp;
+
+	if (event_buffer_full(bufp))
+		return -1;
+
+	buf_eventp = &bufp->events[*tailp];
+	rte_memcpy(buf_eventp, eventp, sizeof(struct rte_event));
+
+	*tailp = (*tailp + 1) % EVENT_BUFFER_SZ;
+	bufp->count++;
+
+	return 0;
+}
+
+static void
+event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
+		   uint16_t *nb_events_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_event *events = bufp->events;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = EVENT_BUFFER_SZ - *headp;
+	else {  /* buffer empty */
+		*nb_events_flushed = 0;
+		return;
+	}
+
+	*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
+						     &events[*headp], n);
+	if (*nb_events_flushed != n && rte_errno == -EINVAL) {
+		/* We must have hit a bad event...skip it to ensure we don't
+		 * hang on it.
+		 */
+		(*nb_events_flushed)++;
+	}
+
+	*headp = (*headp + *nb_events_flushed) % EVENT_BUFFER_SZ;
+	bufp->count -= *nb_events_flushed;
+}
+
+/*
+ * Software event timer adapter implementation
  */
 
 struct rte_event_timer_adapter_sw_data {
@@ -461,6 +547,8 @@  struct rte_event_timer_adapter_sw_data {
 	struct rte_mempool *msg_pool;
 	/* Mempool containing timer objects */
 	struct rte_mempool *tim_pool;
+	/* Buffered timer expiry events to be enqueued to an event device. */
+	struct event_buffer buffer;
 };
 
 enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
@@ -473,7 +561,8 @@  struct msg {
 static void
 sw_event_timer_cb(struct rte_timer *tim, void *arg)
 {
-	uint16_t n;
+	uint16_t nb_evs_flushed;
+	int ret;
 	struct rte_event_timer *evtim;
 	struct rte_event_timer_adapter *adapter;
 	struct rte_event_timer_adapter_sw_data *sw_data;
@@ -482,14 +571,10 @@  sw_event_timer_cb(struct rte_timer *tim, void *arg)
 	adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
 	sw_data = adapter->data->adapter_priv;
 
-	n = rte_event_enqueue_burst(adapter->data->event_dev_id,
-				    adapter->data->event_port_id,
-				    &evtim->ev,
-				    1);
-	if (n != 1 && rte_errno == -ENOSPC) {
-		/* If we couldn't enqueue because the event port was
-		 * backpressured, put the timer back in the skiplist with an
-		 * immediate expiry value so we can process it again on the
+	ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+	if (ret < 0) {
+		/* If event buffer is full, put timer back in list with
+		 * immediate expiry value, so that we process it again on the
 		 * next iteration.
 		 */
 		rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
@@ -500,6 +585,13 @@  sw_event_timer_cb(struct rte_timer *tim, void *arg)
 		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
 		rte_mempool_put(sw_data->tim_pool, (void **)&tim);
 	}
+
+	if (event_buffer_batch_ready(&sw_data->buffer)) {
+		event_buffer_flush(&sw_data->buffer,
+				   adapter->data->event_dev_id,
+				   adapter->data->event_port_id,
+				   &nb_evs_flushed);
+	}
 }
 
 static __rte_always_inline uint64_t
@@ -658,10 +750,14 @@  sw_event_timer_adapter_service_func(void *arg)
 
 	rte_timer_manage();
 
+	event_buffer_flush(&sw_data->buffer, adapter->data->event_dev_id,
+			   adapter->data->event_port_id, &nb_events);
+
 	/* Could use for stats */
 	RTE_SET_USED(nb_events);
 	RTE_SET_USED(ret);
 
+
 	return 0;
 }
 
@@ -726,6 +822,8 @@  sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
 		return -1;
 	}
 
+	event_buffer_init(&sw_data->buffer);
+
 	/* Register a service component */
 	memset(&service, 0, sizeof(service));
 	snprintf(service.name, RTE_SERVICE_NAME_MAX,