[dpdk-dev,v2,1/3] timer: add per-installer pending lists for each lcore

Message ID 1503692823-16214-1-git-send-email-erik.g.carrillo@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Carrillo, Erik G Aug. 25, 2017, 8:27 p.m. UTC
  Instead of each priv_timer struct containing a single skiplist, this
commit adds a skiplist for each enabled lcore to priv_timer. In the case
that multiple lcores repeatedly install timers on the same target lcore,
this change reduces lock contention for the target lcore's skiplists and
increases performance.

Signed-off-by: Gabriel Carrillo <erik.g.carrillo@intel.com>
---
v2:
* Address checkpatch warnings

 lib/librte_timer/rte_timer.c | 309 +++++++++++++++++++++++++++----------------
 lib/librte_timer/rte_timer.h |   9 +-
 2 files changed, 202 insertions(+), 116 deletions(-)
  

Comments

Ananyev, Konstantin Aug. 29, 2017, 10:57 a.m. UTC | #1
Hi Gabriel,

> 
> Instead of each priv_timer struct containing a single skiplist, this
> commit adds a skiplist for each enabled lcore to priv_timer. In the case
> that multiple lcores repeatedly install timers on the same target lcore,
> this change reduces lock contention for the target lcore's skiplists and
> increases performance.

I am not an rte_timer expert, but there is one thing that worries me:
It seems that complexity of timer_manage() has increased with that patch quite a bit:
now  it has to check/process up to RTE_MAX_LCORE skiplists instead of one,
also it has to somehow to properly sort up to RTE_MAX_LCORE lists of
retrieved (ready to run) timers.
Wouldn't all that affect it's running time?

I understand your intention to reduce lock contention,
but I suppose at least it could be done in a configurable way.
Let say allow user to specify  dimension of pending_lists[] at init phase or so.
Then timer from lcore_id=N will endup in pending_lists[N%RTE_DIM(pendilng_list)].

Another thought - might be better to divide pending timers list not by client (lcore) id,
but by expiration time - some analog of  timer wheel or so.
That, I think might greatly decrease the probability that timer_manage() and timer_add() 
will try to access the same list.
From other side timer_manage() still would have to consume skip-lists one by one.
Though I suppose that's quite radical change from what we have right now.
Konstantin

> 
> Signed-off-by: Gabriel Carrillo <erik.g.carrillo@intel.com>
> ---
> v2:
> * Address checkpatch warnings
> 
>  lib/librte_timer/rte_timer.c | 309 +++++++++++++++++++++++++++----------------
>  lib/librte_timer/rte_timer.h |   9 +-
>  2 files changed, 202 insertions(+), 116 deletions(-)
> 
> diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
> index 5ee0840..da2fc1a 100644
> --- a/lib/librte_timer/rte_timer.c
> +++ b/lib/librte_timer/rte_timer.c
> @@ -37,6 +37,7 @@
>  #include <inttypes.h>
>  #include <assert.h>
>  #include <sys/queue.h>
> +#include <stdbool.h>
> 
>  #include <rte_atomic.h>
>  #include <rte_common.h>
> @@ -56,17 +57,19 @@
> 
>  LIST_HEAD(rte_timer_list, rte_timer);
> 
> +struct skiplist {
> +	struct rte_timer head;  /**< dummy timer instance to head up list */
> +	rte_spinlock_t lock;	/**< lock to protect list access */
> +	unsigned int depth;	/**< track the current depth of the skiplist */
> +} __rte_cache_aligned;
> +
>  struct priv_timer {
> -	struct rte_timer pending_head;  /**< dummy timer instance to head up list */
> -	rte_spinlock_t list_lock;       /**< lock to protect list access */
> +	/** one pending list per enabled lcore */
> +	struct skiplist pending_lists[RTE_MAX_LCORE];
> 
>  	/** per-core variable that true if a timer was updated on this
>  	 *  core since last reset of the variable */
>  	int updated;
> -
> -	/** track the current depth of the skiplist */
> -	unsigned curr_skiplist_depth;
> -
>  	unsigned prev_lcore;              /**< used for lcore round robin */
> 
>  	/** running timer on this lcore now */
> @@ -81,6 +84,10 @@ struct priv_timer {
>  /** per-lcore private info for timers */
>  static struct priv_timer priv_timer[RTE_MAX_LCORE];
> 
> +/** cache of IDs of enabled lcores */
> +static unsigned int enabled_lcores[RTE_MAX_LCORE];
> +static int n_enabled_lcores;
> +
>  /* when debug is enabled, store some statistics */
>  #ifdef RTE_LIBRTE_TIMER_DEBUG
>  #define __TIMER_STAT_ADD(name, n) do {					\
> @@ -96,14 +103,25 @@ static struct priv_timer priv_timer[RTE_MAX_LCORE];
>  void
>  rte_timer_subsystem_init(void)
>  {
> -	unsigned lcore_id;
> +	unsigned int lcore_id1, lcore_id2;
> +	struct skiplist *list;
> +	int i, j;
> +
> +	RTE_LCORE_FOREACH(lcore_id1)
> +		enabled_lcores[n_enabled_lcores++] = lcore_id1;
> 
>  	/* since priv_timer is static, it's zeroed by default, so only init some
>  	 * fields.
>  	 */
> -	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) {
> -		rte_spinlock_init(&priv_timer[lcore_id].list_lock);
> -		priv_timer[lcore_id].prev_lcore = lcore_id;
> +	for (i = 0, lcore_id1 = enabled_lcores[i]; i < n_enabled_lcores;
> +	     lcore_id1 = enabled_lcores[++i]) {
> +		priv_timer[lcore_id1].prev_lcore = lcore_id1;
> +
> +		for (j = 0, lcore_id2 = enabled_lcores[j]; j < n_enabled_lcores;
> +		     lcore_id2 = enabled_lcores[++j]) {
> +			list = &priv_timer[lcore_id1].pending_lists[lcore_id2];
> +			rte_spinlock_init(&list->lock);
> +		}
>  	}
>  }
> 
> @@ -114,7 +132,8 @@ rte_timer_init(struct rte_timer *tim)
>  	union rte_timer_status status;
> 
>  	status.state = RTE_TIMER_STOP;
> -	status.owner = RTE_TIMER_NO_OWNER;
> +	status.installer = RTE_TIMER_NO_LCORE;
> +	status.owner = RTE_TIMER_NO_LCORE;
>  	tim->status.u32 = status.u32;
>  }
> 
> @@ -142,7 +161,7 @@ timer_set_config_state(struct rte_timer *tim,
>  		 * or ready to run on local core, exit
>  		 */
>  		if (prev_status.state == RTE_TIMER_RUNNING &&
> -		    (prev_status.owner != (uint16_t)lcore_id ||
> +		    (prev_status.owner != (int)lcore_id ||
>  		     tim != priv_timer[lcore_id].running_tim))
>  			return -1;
> 
> @@ -153,7 +172,8 @@ timer_set_config_state(struct rte_timer *tim,
>  		/* here, we know that timer is stopped or pending,
>  		 * mark it atomically as being configured */
>  		status.state = RTE_TIMER_CONFIG;
> -		status.owner = (int16_t)lcore_id;
> +		status.installer = RTE_TIMER_NO_LCORE;
> +		status.owner = lcore_id;
>  		success = rte_atomic32_cmpset(&tim->status.u32,
>  					      prev_status.u32,
>  					      status.u32);
> @@ -185,7 +205,8 @@ timer_set_running_state(struct rte_timer *tim)
>  		/* here, we know that timer is stopped or pending,
>  		 * mark it atomically as being configured */
>  		status.state = RTE_TIMER_RUNNING;
> -		status.owner = (int16_t)lcore_id;
> +		status.installer = prev_status.installer;
> +		status.owner = lcore_id;
>  		success = rte_atomic32_cmpset(&tim->status.u32,
>  					      prev_status.u32,
>  					      status.u32);
> @@ -236,11 +257,11 @@ timer_get_skiplist_level(unsigned curr_depth)
>   * are <= that time value.
>   */
>  static void
> -timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
> +timer_get_prev_entries(uint64_t time_val, struct skiplist *list,
>  		struct rte_timer **prev)
>  {
> -	unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth;
> -	prev[lvl] = &priv_timer[tim_lcore].pending_head;
> +	unsigned int lvl = list->depth;
> +	prev[lvl] = &list->head;
>  	while(lvl != 0) {
>  		lvl--;
>  		prev[lvl] = prev[lvl+1];
> @@ -255,15 +276,15 @@ timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
>   * all skiplist levels.
>   */
>  static void
> -timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore,
> +timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list,
>  		struct rte_timer **prev)
>  {
>  	int i;
>  	/* to get a specific entry in the list, look for just lower than the time
>  	 * values, and then increment on each level individually if necessary
>  	 */
> -	timer_get_prev_entries(tim->expire - 1, tim_lcore, prev);
> -	for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) {
> +	timer_get_prev_entries(tim->expire - 1, list, prev);
> +	for (i = list->depth - 1; i >= 0; i--) {
>  		while (prev[i]->sl_next[i] != NULL &&
>  				prev[i]->sl_next[i] != tim &&
>  				prev[i]->sl_next[i]->expire <= tim->expire)
> @@ -282,25 +303,25 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
>  	unsigned lcore_id = rte_lcore_id();
>  	unsigned lvl;
>  	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
> +	struct skiplist *list = &priv_timer[tim_lcore].pending_lists[lcore_id];
> 
>  	/* if timer needs to be scheduled on another core, we need to
>  	 * lock the list; if it is on local core, we need to lock if
>  	 * we are not called from rte_timer_manage() */
>  	if (tim_lcore != lcore_id || !local_is_locked)
> -		rte_spinlock_lock(&priv_timer[tim_lcore].list_lock);
> +		rte_spinlock_lock(&list->lock);
> 
>  	/* find where exactly this element goes in the list of elements
>  	 * for each depth. */
> -	timer_get_prev_entries(tim->expire, tim_lcore, prev);
> +	timer_get_prev_entries(tim->expire, list, prev);
> 
>  	/* now assign it a new level and add at that level */
> -	const unsigned tim_level = timer_get_skiplist_level(
> -			priv_timer[tim_lcore].curr_skiplist_depth);
> -	if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth)
> -		priv_timer[tim_lcore].curr_skiplist_depth++;
> +	const unsigned int tim_level = timer_get_skiplist_level(list->depth);
> +	if (tim_level == list->depth)
> +		list->depth++;
> 
>  	lvl = tim_level;
> -	while (lvl > 0) {
> +	while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) {
>  		tim->sl_next[lvl] = prev[lvl]->sl_next[lvl];
>  		prev[lvl]->sl_next[lvl] = tim;
>  		lvl--;
> @@ -310,11 +331,10 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
> 
>  	/* save the lowest list entry into the expire field of the dummy hdr
>  	 * NOTE: this is not atomic on 32-bit*/
> -	priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\
> -			pending_head.sl_next[0]->expire;
> +	list->head.expire = list->head.sl_next[0]->expire;
> 
>  	if (tim_lcore != lcore_id || !local_is_locked)
> -		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
> +		rte_spinlock_unlock(&list->lock);
>  }
> 
>  /*
> @@ -330,35 +350,38 @@ timer_del(struct rte_timer *tim, union rte_timer_status prev_status,
>  	unsigned prev_owner = prev_status.owner;
>  	int i;
>  	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
> +	struct skiplist *list;
> +
> +	list = &priv_timer[prev_owner].pending_lists[prev_status.installer];
> 
>  	/* if timer needs is pending another core, we need to lock the
>  	 * list; if it is on local core, we need to lock if we are not
>  	 * called from rte_timer_manage() */
>  	if (prev_owner != lcore_id || !local_is_locked)
> -		rte_spinlock_lock(&priv_timer[prev_owner].list_lock);
> +		rte_spinlock_lock(&list->lock);
> 
>  	/* save the lowest list entry into the expire field of the dummy hdr.
>  	 * NOTE: this is not atomic on 32-bit */
> -	if (tim == priv_timer[prev_owner].pending_head.sl_next[0])
> -		priv_timer[prev_owner].pending_head.expire =
> -				((tim->sl_next[0] == NULL) ? 0 : tim->sl_next[0]->expire);
> +	if (tim == list->head.sl_next[0])
> +		list->head.expire = ((tim->sl_next[0] == NULL) ?
> +				     0 : tim->sl_next[0]->expire);
> 
>  	/* adjust pointers from previous entries to point past this */
> -	timer_get_prev_entries_for_node(tim, prev_owner, prev);
> -	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) {
> +	timer_get_prev_entries_for_node(tim, list, prev);
> +	for (i = list->depth - 1; i >= 0; i--) {
>  		if (prev[i]->sl_next[i] == tim)
>  			prev[i]->sl_next[i] = tim->sl_next[i];
>  	}
> 
>  	/* in case we deleted last entry at a level, adjust down max level */
> -	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--)
> -		if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL)
> -			priv_timer[prev_owner].curr_skiplist_depth --;
> +	for (i = list->depth - 1; i >= 0; i--)
> +		if (list->head.sl_next[i] == NULL)
> +			list->depth--;
>  		else
>  			break;
> 
>  	if (prev_owner != lcore_id || !local_is_locked)
> -		rte_spinlock_unlock(&priv_timer[prev_owner].list_lock);
> +		rte_spinlock_unlock(&list->lock);
>  }
> 
>  /* Reset and start the timer associated with the timer handle (private func) */
> @@ -416,7 +439,8 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
>  	 * the state so we don't need to use cmpset() here */
>  	rte_wmb();
>  	status.state = RTE_TIMER_PENDING;
> -	status.owner = (int16_t)tim_lcore;
> +	status.installer = lcore_id;
> +	status.owner = tim_lcore;
>  	tim->status.u32 = status.u32;
> 
>  	return 0;
> @@ -484,7 +508,8 @@ rte_timer_stop(struct rte_timer *tim)
>  	/* mark timer as stopped */
>  	rte_wmb();
>  	status.state = RTE_TIMER_STOP;
> -	status.owner = RTE_TIMER_NO_OWNER;
> +	status.installer = RTE_TIMER_NO_LCORE;
> +	status.owner = RTE_TIMER_NO_LCORE;
>  	tim->status.u32 = status.u32;
> 
>  	return 0;
> @@ -506,119 +531,179 @@ rte_timer_pending(struct rte_timer *tim)
>  }
> 
>  /* must be called periodically, run all timer that expired */
> -void rte_timer_manage(void)
> +void
> +rte_timer_manage(void)
>  {
>  	union rte_timer_status status;
> -	struct rte_timer *tim, *next_tim;
> -	struct rte_timer *run_first_tim, **pprev;
> -	unsigned lcore_id = rte_lcore_id();
> +	struct rte_timer *tim, *next_tim, **pprev;
> +	struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1];
>  	struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1];
> -	uint64_t cur_time;
> -	int i, ret;
> +	struct priv_timer *priv_tim;
> +	unsigned int installer_lcore, lcore_id = rte_lcore_id();
> +	uint64_t cur_time, min_expire;
> +	int i, j, min_idx, ret;
> +	int nrunlists = 0;
> +	int local_is_locked = 0;
> +	struct skiplist *dest_list, *list = NULL;
> +	bool done;
> 
>  	/* timer manager only runs on EAL thread with valid lcore_id */
>  	assert(lcore_id < RTE_MAX_LCORE);
> 
> +	priv_tim = &priv_timer[lcore_id];
> +
>  	__TIMER_STAT_ADD(manage, 1);
> -	/* optimize for the case where per-cpu list is empty */
> -	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL)
> -		return;
> -	cur_time = rte_get_timer_cycles();
> +	for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores;
> +	     installer_lcore = enabled_lcores[++i]) {
> +		list = &priv_tim->pending_lists[installer_lcore];
> +
> +		/* optimize for the case where list is empty */
> +		if (list->head.sl_next[0] == NULL)
> +			continue;
> +		cur_time = rte_get_timer_cycles();
> 
>  #ifdef RTE_ARCH_X86_64
> -	/* on 64-bit the value cached in the pending_head.expired will be
> -	 * updated atomically, so we can consult that for a quick check here
> -	 * outside the lock */
> -	if (likely(priv_timer[lcore_id].pending_head.expire > cur_time))
> -		return;
> +		/* on 64-bit the value cached in the list->head.expired will be
> +		 * updated atomically, so we can consult that for a quick check
> +		 * here outside the lock
> +		 */
> +		if (likely(list->head.expire > cur_time))
> +			continue;
>  #endif
> 
> -	/* browse ordered list, add expired timers in 'expired' list */
> -	rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
> +		/* browse ordered list, add expired timers in 'expired' list */
> +		rte_spinlock_lock(&list->lock);
> 
> -	/* if nothing to do just unlock and return */
> -	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL ||
> -	    priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) {
> -		rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
> -		return;
> -	}
> +		/* if nothing to do just unlock and continue */
> +		if (list->head.sl_next[0] == NULL ||
> +		    list->head.sl_next[0]->expire > cur_time) {
> +			rte_spinlock_unlock(&list->lock);
> +			continue;
> +		}
> 
> -	/* save start of list of expired timers */
> -	tim = priv_timer[lcore_id].pending_head.sl_next[0];
> +		/* save start of list of expired timers */
> +		tim = list->head.sl_next[0];
> +
> +		/* break the existing list at current time point */
> +		timer_get_prev_entries(cur_time, list, prev);
> +		for (j = list->depth - 1; j >= 0; j--) {
> +			if (prev[j] == &list->head)
> +				continue;
> +			list->head.sl_next[j] =
> +			    prev[j]->sl_next[j];
> +			if (prev[j]->sl_next[j] == NULL)
> +				list->depth--;
> +			prev[j]->sl_next[j] = NULL;
> +		}
> 
> -	/* break the existing list at current time point */
> -	timer_get_prev_entries(cur_time, lcore_id, prev);
> -	for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) {
> -		if (prev[i] == &priv_timer[lcore_id].pending_head)
> -			continue;
> -		priv_timer[lcore_id].pending_head.sl_next[i] =
> -		    prev[i]->sl_next[i];
> -		if (prev[i]->sl_next[i] == NULL)
> -			priv_timer[lcore_id].curr_skiplist_depth--;
> -		prev[i] ->sl_next[i] = NULL;
> -	}
> +		/* transition run-list from PENDING to RUNNING */
> +		run_first_tims[nrunlists] = tim;
> +		pprev = &run_first_tims[nrunlists];
> +		nrunlists++;
> +
> +		for (; tim != NULL; tim = next_tim) {
> +			next_tim = tim->sl_next[0];
> +
> +			ret = timer_set_running_state(tim);
> +			if (likely(ret == 0)) {
> +				pprev = &tim->sl_next[0];
> +			} else {
> +				/* another core is trying to re-config this one,
> +				 * remove it from local expired list
> +				 */
> +				*pprev = next_tim;
> +			}
> +		}
> 
> -	/* transition run-list from PENDING to RUNNING */
> -	run_first_tim = tim;
> -	pprev = &run_first_tim;
> +		/* update the next to expire timer value */
> +		list->head.expire = (list->head.sl_next[0] == NULL) ?
> +				    0 : list->head.sl_next[0]->expire;
> 
> -	for ( ; tim != NULL; tim = next_tim) {
> -		next_tim = tim->sl_next[0];
> +		rte_spinlock_unlock(&list->lock);
> +	}
> 
> -		ret = timer_set_running_state(tim);
> -		if (likely(ret == 0)) {
> -			pprev = &tim->sl_next[0];
> -		} else {
> -			/* another core is trying to re-config this one,
> -			 * remove it from local expired list
> -			 */
> -			*pprev = next_tim;
> +	/* Now process the run lists */
> +	while (1) {
> +		done = true;
> +		min_expire = UINT64_MAX;
> +		min_idx = 0;
> +
> +		/* Find the next oldest timer to process */
> +		for (i = 0; i < nrunlists; i++) {
> +			tim = run_first_tims[i];
> +
> +			if (tim != NULL && tim->expire < min_expire) {
> +				min_expire = tim->expire;
> +				min_idx = i;
> +				done = false;
> +			}
>  		}
> -	}
> 
> -	/* update the next to expire timer value */
> -	priv_timer[lcore_id].pending_head.expire =
> -	    (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 :
> -		priv_timer[lcore_id].pending_head.sl_next[0]->expire;
> +		if (done)
> +			break;
> +
> +		tim = run_first_tims[min_idx];
> 
> -	rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
> +		/* Move down the runlist from which we picked a timer to
> +		 * execute
> +		 */
> +		run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0];
> 
> -	/* now scan expired list and call callbacks */
> -	for (tim = run_first_tim; tim != NULL; tim = next_tim) {
> -		next_tim = tim->sl_next[0];
> -		priv_timer[lcore_id].updated = 0;
> -		priv_timer[lcore_id].running_tim = tim;
> +		priv_tim->updated = 0;
> +		priv_tim->running_tim = tim;
> 
>  		/* execute callback function with list unlocked */
>  		tim->f(tim, tim->arg);
> 
>  		__TIMER_STAT_ADD(pending, -1);
>  		/* the timer was stopped or reloaded by the callback
> -		 * function, we have nothing to do here */
> -		if (priv_timer[lcore_id].updated == 1)
> +		 * function, we have nothing to do here
> +		 */
> +		if (priv_tim->updated == 1)
>  			continue;
> 
>  		if (tim->period == 0) {
>  			/* remove from done list and mark timer as stopped */
>  			status.state = RTE_TIMER_STOP;
> -			status.owner = RTE_TIMER_NO_OWNER;
> +			status.installer = RTE_TIMER_NO_LCORE;
> +			status.owner = RTE_TIMER_NO_LCORE;
>  			rte_wmb();
>  			tim->status.u32 = status.u32;
>  		}
>  		else {
> -			/* keep it in list and mark timer as pending */
> -			rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
> +			dest_list = &priv_tim->pending_lists[lcore_id];
> +
> +			/* If the destination list is the current list
> +			 * we can acquire the lock here, and hold it
> +			 * across removal and insertion of the timer.
> +			 */
> +			if (list == dest_list) {
> +				rte_spinlock_lock(&list->lock);
> +				local_is_locked = 1;
> +			}
> +
> +			/* set timer state back to PENDING and
> +			 * reinsert it in pending list
> +			 */
>  			status.state = RTE_TIMER_PENDING;
>  			__TIMER_STAT_ADD(pending, 1);
> -			status.owner = (int16_t)lcore_id;
> +			status.installer = tim->status.installer;
> +			status.owner = lcore_id;
>  			rte_wmb();
>  			tim->status.u32 = status.u32;
> -			__rte_timer_reset(tim, tim->expire + tim->period,
> -				tim->period, lcore_id, tim->f, tim->arg, 1);
> -			rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
> +
> +			__rte_timer_reset(tim,
> +					  tim->expire + tim->period,
> +					  tim->period, lcore_id,
> +					  tim->f, tim->arg, local_is_locked);
> +
> +			if (local_is_locked) {
> +				rte_spinlock_unlock(&list->lock);
> +				local_is_locked = 0;
> +			}
>  		}
>  	}
> -	priv_timer[lcore_id].running_tim = NULL;
> +	priv_tim->running_tim = NULL;
>  }
> 
>  /* dump statistics about timers */
> diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h
> index a276a73..4cc6baf 100644
> --- a/lib/librte_timer/rte_timer.h
> +++ b/lib/librte_timer/rte_timer.h
> @@ -77,7 +77,7 @@ extern "C" {
>  #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */
>  #define RTE_TIMER_CONFIG  3 /**< State: timer is being configured. */
> 
> -#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */
> +#define RTE_TIMER_NO_LCORE -2
> 
>  /**
>   * Timer type: Periodic or single (one-shot).
> @@ -94,10 +94,11 @@ enum rte_timer_type {
>  union rte_timer_status {
>  	RTE_STD_C11
>  	struct {
> -		uint16_t state;  /**< Stop, pending, running, config. */
> -		int16_t owner;   /**< The lcore that owns the timer. */
> +		unsigned state	: 2;
> +		int installer	: 15;
> +		int owner	: 15;
>  	};
> -	uint32_t u32;            /**< To atomic-set status + owner. */
> +	uint32_t u32;            /**< To atomic-set status, installer, owner */
>  };
> 
>  #ifdef RTE_LIBRTE_TIMER_DEBUG
> --
> 2.6.4
  
Carrillo, Erik G Aug. 29, 2017, 4:13 p.m. UTC | #2
Hi Konstantin,

> -----Original Message-----
> From: Ananyev, Konstantin
> Sent: Tuesday, August 29, 2017 5:57 AM
> To: Carrillo, Erik G <erik.g.carrillo@intel.com>; rsanford@akamai.com
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH v2 1/3] timer: add per-installer pending lists
> for each lcore
> 
> Hi Gabriel,
> 
> >
> > Instead of each priv_timer struct containing a single skiplist, this
> > commit adds a skiplist for each enabled lcore to priv_timer. In the
> > case that multiple lcores repeatedly install timers on the same target
> > lcore, this change reduces lock contention for the target lcore's
> > skiplists and increases performance.
> 
> I am not an rte_timer expert, but there is one thing that worries me:
> It seems that complexity of timer_manage() has increased with that patch
> quite a bit:
> now  it has to check/process up to RTE_MAX_LCORE skiplists instead of one,
> also it has to somehow to properly sort up to RTE_MAX_LCORE lists of
> retrieved (ready to run) timers.
> Wouldn't all that affect it's running time?
> 

Yes, it would indeed increase it.

> I understand your intention to reduce lock contention, but I suppose at least
> it could be done in a configurable way.
> Let say allow user to specify  dimension of pending_lists[] at init phase or so.
> Then timer from lcore_id=N will endup in
> pending_lists[N%RTE_DIM(pendilng_list)].
> 

This is a neat idea, and seemingly would allow the original performance to be maintained for applications where contention is not an issue.   I'll look into this change, as it may address other developers' concerns as well - thanks for the suggestion.

> Another thought - might be better to divide pending timers list not by client
> (lcore) id, but by expiration time - some analog of  timer wheel or so.
> That, I think might greatly decrease the probability that timer_manage() and
> timer_add() will try to access the same list.
> From other side timer_manage() still would have to consume skip-lists one
> by one.
> Though I suppose that's quite radical change from what we have right now.
> Konstantin
> 

Thanks,
Gabriel
  

Patch

diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
index 5ee0840..da2fc1a 100644
--- a/lib/librte_timer/rte_timer.c
+++ b/lib/librte_timer/rte_timer.c
@@ -37,6 +37,7 @@ 
 #include <inttypes.h>
 #include <assert.h>
 #include <sys/queue.h>
+#include <stdbool.h>
 
 #include <rte_atomic.h>
 #include <rte_common.h>
@@ -56,17 +57,19 @@ 
 
 LIST_HEAD(rte_timer_list, rte_timer);
 
+struct skiplist {
+	struct rte_timer head;  /**< dummy timer instance to head up list */
+	rte_spinlock_t lock;	/**< lock to protect list access */
+	unsigned int depth;	/**< track the current depth of the skiplist */
+} __rte_cache_aligned;
+
 struct priv_timer {
-	struct rte_timer pending_head;  /**< dummy timer instance to head up list */
-	rte_spinlock_t list_lock;       /**< lock to protect list access */
+	/** one pending list per enabled lcore */
+	struct skiplist pending_lists[RTE_MAX_LCORE];
 
 	/** per-core variable that true if a timer was updated on this
 	 *  core since last reset of the variable */
 	int updated;
-
-	/** track the current depth of the skiplist */
-	unsigned curr_skiplist_depth;
-
 	unsigned prev_lcore;              /**< used for lcore round robin */
 
 	/** running timer on this lcore now */
@@ -81,6 +84,10 @@  struct priv_timer {
 /** per-lcore private info for timers */
 static struct priv_timer priv_timer[RTE_MAX_LCORE];
 
+/** cache of IDs of enabled lcores */
+static unsigned int enabled_lcores[RTE_MAX_LCORE];
+static int n_enabled_lcores;
+
 /* when debug is enabled, store some statistics */
 #ifdef RTE_LIBRTE_TIMER_DEBUG
 #define __TIMER_STAT_ADD(name, n) do {					\
@@ -96,14 +103,25 @@  static struct priv_timer priv_timer[RTE_MAX_LCORE];
 void
 rte_timer_subsystem_init(void)
 {
-	unsigned lcore_id;
+	unsigned int lcore_id1, lcore_id2;
+	struct skiplist *list;
+	int i, j;
+
+	RTE_LCORE_FOREACH(lcore_id1)
+		enabled_lcores[n_enabled_lcores++] = lcore_id1;
 
 	/* since priv_timer is static, it's zeroed by default, so only init some
 	 * fields.
 	 */
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) {
-		rte_spinlock_init(&priv_timer[lcore_id].list_lock);
-		priv_timer[lcore_id].prev_lcore = lcore_id;
+	for (i = 0, lcore_id1 = enabled_lcores[i]; i < n_enabled_lcores;
+	     lcore_id1 = enabled_lcores[++i]) {
+		priv_timer[lcore_id1].prev_lcore = lcore_id1;
+
+		for (j = 0, lcore_id2 = enabled_lcores[j]; j < n_enabled_lcores;
+		     lcore_id2 = enabled_lcores[++j]) {
+			list = &priv_timer[lcore_id1].pending_lists[lcore_id2];
+			rte_spinlock_init(&list->lock);
+		}
 	}
 }
 
@@ -114,7 +132,8 @@  rte_timer_init(struct rte_timer *tim)
 	union rte_timer_status status;
 
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.installer = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 }
 
@@ -142,7 +161,7 @@  timer_set_config_state(struct rte_timer *tim,
 		 * or ready to run on local core, exit
 		 */
 		if (prev_status.state == RTE_TIMER_RUNNING &&
-		    (prev_status.owner != (uint16_t)lcore_id ||
+		    (prev_status.owner != (int)lcore_id ||
 		     tim != priv_timer[lcore_id].running_tim))
 			return -1;
 
@@ -153,7 +172,8 @@  timer_set_config_state(struct rte_timer *tim,
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_CONFIG;
-		status.owner = (int16_t)lcore_id;
+		status.installer = RTE_TIMER_NO_LCORE;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -185,7 +205,8 @@  timer_set_running_state(struct rte_timer *tim)
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_RUNNING;
-		status.owner = (int16_t)lcore_id;
+		status.installer = prev_status.installer;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -236,11 +257,11 @@  timer_get_skiplist_level(unsigned curr_depth)
  * are <= that time value.
  */
 static void
-timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
+timer_get_prev_entries(uint64_t time_val, struct skiplist *list,
 		struct rte_timer **prev)
 {
-	unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth;
-	prev[lvl] = &priv_timer[tim_lcore].pending_head;
+	unsigned int lvl = list->depth;
+	prev[lvl] = &list->head;
 	while(lvl != 0) {
 		lvl--;
 		prev[lvl] = prev[lvl+1];
@@ -255,15 +276,15 @@  timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
  * all skiplist levels.
  */
 static void
-timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore,
+timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list,
 		struct rte_timer **prev)
 {
 	int i;
 	/* to get a specific entry in the list, look for just lower than the time
 	 * values, and then increment on each level individually if necessary
 	 */
-	timer_get_prev_entries(tim->expire - 1, tim_lcore, prev);
-	for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries(tim->expire - 1, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		while (prev[i]->sl_next[i] != NULL &&
 				prev[i]->sl_next[i] != tim &&
 				prev[i]->sl_next[i]->expire <= tim->expire)
@@ -282,25 +303,25 @@  timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 	unsigned lcore_id = rte_lcore_id();
 	unsigned lvl;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list = &priv_timer[tim_lcore].pending_lists[lcore_id];
 
 	/* if timer needs to be scheduled on another core, we need to
 	 * lock the list; if it is on local core, we need to lock if
 	 * we are not called from rte_timer_manage() */
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* find where exactly this element goes in the list of elements
 	 * for each depth. */
-	timer_get_prev_entries(tim->expire, tim_lcore, prev);
+	timer_get_prev_entries(tim->expire, list, prev);
 
 	/* now assign it a new level and add at that level */
-	const unsigned tim_level = timer_get_skiplist_level(
-			priv_timer[tim_lcore].curr_skiplist_depth);
-	if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth)
-		priv_timer[tim_lcore].curr_skiplist_depth++;
+	const unsigned int tim_level = timer_get_skiplist_level(list->depth);
+	if (tim_level == list->depth)
+		list->depth++;
 
 	lvl = tim_level;
-	while (lvl > 0) {
+	while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) {
 		tim->sl_next[lvl] = prev[lvl]->sl_next[lvl];
 		prev[lvl]->sl_next[lvl] = tim;
 		lvl--;
@@ -310,11 +331,10 @@  timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 
 	/* save the lowest list entry into the expire field of the dummy hdr
 	 * NOTE: this is not atomic on 32-bit*/
-	priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\
-			pending_head.sl_next[0]->expire;
+	list->head.expire = list->head.sl_next[0]->expire;
 
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /*
@@ -330,35 +350,38 @@  timer_del(struct rte_timer *tim, union rte_timer_status prev_status,
 	unsigned prev_owner = prev_status.owner;
 	int i;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list;
+
+	list = &priv_timer[prev_owner].pending_lists[prev_status.installer];
 
 	/* if timer needs is pending another core, we need to lock the
 	 * list; if it is on local core, we need to lock if we are not
 	 * called from rte_timer_manage() */
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* save the lowest list entry into the expire field of the dummy hdr.
 	 * NOTE: this is not atomic on 32-bit */
-	if (tim == priv_timer[prev_owner].pending_head.sl_next[0])
-		priv_timer[prev_owner].pending_head.expire =
-				((tim->sl_next[0] == NULL) ? 0 : tim->sl_next[0]->expire);
+	if (tim == list->head.sl_next[0])
+		list->head.expire = ((tim->sl_next[0] == NULL) ?
+				     0 : tim->sl_next[0]->expire);
 
 	/* adjust pointers from previous entries to point past this */
-	timer_get_prev_entries_for_node(tim, prev_owner, prev);
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries_for_node(tim, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		if (prev[i]->sl_next[i] == tim)
 			prev[i]->sl_next[i] = tim->sl_next[i];
 	}
 
 	/* in case we deleted last entry at a level, adjust down max level */
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--)
-		if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL)
-			priv_timer[prev_owner].curr_skiplist_depth --;
+	for (i = list->depth - 1; i >= 0; i--)
+		if (list->head.sl_next[i] == NULL)
+			list->depth--;
 		else
 			break;
 
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /* Reset and start the timer associated with the timer handle (private func) */
@@ -416,7 +439,8 @@  __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 	 * the state so we don't need to use cmpset() here */
 	rte_wmb();
 	status.state = RTE_TIMER_PENDING;
-	status.owner = (int16_t)tim_lcore;
+	status.installer = lcore_id;
+	status.owner = tim_lcore;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -484,7 +508,8 @@  rte_timer_stop(struct rte_timer *tim)
 	/* mark timer as stopped */
 	rte_wmb();
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.installer = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -506,119 +531,179 @@  rte_timer_pending(struct rte_timer *tim)
 }
 
 /* must be called periodically, run all timer that expired */
-void rte_timer_manage(void)
+void
+rte_timer_manage(void)
 {
 	union rte_timer_status status;
-	struct rte_timer *tim, *next_tim;
-	struct rte_timer *run_first_tim, **pprev;
-	unsigned lcore_id = rte_lcore_id();
+	struct rte_timer *tim, *next_tim, **pprev;
+	struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1];
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1];
-	uint64_t cur_time;
-	int i, ret;
+	struct priv_timer *priv_tim;
+	unsigned int installer_lcore, lcore_id = rte_lcore_id();
+	uint64_t cur_time, min_expire;
+	int i, j, min_idx, ret;
+	int nrunlists = 0;
+	int local_is_locked = 0;
+	struct skiplist *dest_list, *list = NULL;
+	bool done;
 
 	/* timer manager only runs on EAL thread with valid lcore_id */
 	assert(lcore_id < RTE_MAX_LCORE);
 
+	priv_tim = &priv_timer[lcore_id];
+
 	__TIMER_STAT_ADD(manage, 1);
-	/* optimize for the case where per-cpu list is empty */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL)
-		return;
-	cur_time = rte_get_timer_cycles();
+	for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores;
+	     installer_lcore = enabled_lcores[++i]) {
+		list = &priv_tim->pending_lists[installer_lcore];
+
+		/* optimize for the case where list is empty */
+		if (list->head.sl_next[0] == NULL)
+			continue;
+		cur_time = rte_get_timer_cycles();
 
 #ifdef RTE_ARCH_X86_64
-	/* on 64-bit the value cached in the pending_head.expired will be
-	 * updated atomically, so we can consult that for a quick check here
-	 * outside the lock */
-	if (likely(priv_timer[lcore_id].pending_head.expire > cur_time))
-		return;
+		/* on 64-bit the value cached in the list->head.expired will be
+		 * updated atomically, so we can consult that for a quick check
+		 * here outside the lock
+		 */
+		if (likely(list->head.expire > cur_time))
+			continue;
 #endif
 
-	/* browse ordered list, add expired timers in 'expired' list */
-	rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+		/* browse ordered list, add expired timers in 'expired' list */
+		rte_spinlock_lock(&list->lock);
 
-	/* if nothing to do just unlock and return */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL ||
-	    priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) {
-		rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
-		return;
-	}
+		/* if nothing to do just unlock and continue */
+		if (list->head.sl_next[0] == NULL ||
+		    list->head.sl_next[0]->expire > cur_time) {
+			rte_spinlock_unlock(&list->lock);
+			continue;
+		}
 
-	/* save start of list of expired timers */
-	tim = priv_timer[lcore_id].pending_head.sl_next[0];
+		/* save start of list of expired timers */
+		tim = list->head.sl_next[0];
+
+		/* break the existing list at current time point */
+		timer_get_prev_entries(cur_time, list, prev);
+		for (j = list->depth - 1; j >= 0; j--) {
+			if (prev[j] == &list->head)
+				continue;
+			list->head.sl_next[j] =
+			    prev[j]->sl_next[j];
+			if (prev[j]->sl_next[j] == NULL)
+				list->depth--;
+			prev[j]->sl_next[j] = NULL;
+		}
 
-	/* break the existing list at current time point */
-	timer_get_prev_entries(cur_time, lcore_id, prev);
-	for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) {
-		if (prev[i] == &priv_timer[lcore_id].pending_head)
-			continue;
-		priv_timer[lcore_id].pending_head.sl_next[i] =
-		    prev[i]->sl_next[i];
-		if (prev[i]->sl_next[i] == NULL)
-			priv_timer[lcore_id].curr_skiplist_depth--;
-		prev[i] ->sl_next[i] = NULL;
-	}
+		/* transition run-list from PENDING to RUNNING */
+		run_first_tims[nrunlists] = tim;
+		pprev = &run_first_tims[nrunlists];
+		nrunlists++;
+
+		for (; tim != NULL; tim = next_tim) {
+			next_tim = tim->sl_next[0];
+
+			ret = timer_set_running_state(tim);
+			if (likely(ret == 0)) {
+				pprev = &tim->sl_next[0];
+			} else {
+				/* another core is trying to re-config this one,
+				 * remove it from local expired list
+				 */
+				*pprev = next_tim;
+			}
+		}
 
-	/* transition run-list from PENDING to RUNNING */
-	run_first_tim = tim;
-	pprev = &run_first_tim;
+		/* update the next to expire timer value */
+		list->head.expire = (list->head.sl_next[0] == NULL) ?
+				    0 : list->head.sl_next[0]->expire;
 
-	for ( ; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
+		rte_spinlock_unlock(&list->lock);
+	}
 
-		ret = timer_set_running_state(tim);
-		if (likely(ret == 0)) {
-			pprev = &tim->sl_next[0];
-		} else {
-			/* another core is trying to re-config this one,
-			 * remove it from local expired list
-			 */
-			*pprev = next_tim;
+	/* Now process the run lists */
+	while (1) {
+		done = true;
+		min_expire = UINT64_MAX;
+		min_idx = 0;
+
+		/* Find the next oldest timer to process */
+		for (i = 0; i < nrunlists; i++) {
+			tim = run_first_tims[i];
+
+			if (tim != NULL && tim->expire < min_expire) {
+				min_expire = tim->expire;
+				min_idx = i;
+				done = false;
+			}
 		}
-	}
 
-	/* update the next to expire timer value */
-	priv_timer[lcore_id].pending_head.expire =
-	    (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 :
-		priv_timer[lcore_id].pending_head.sl_next[0]->expire;
+		if (done)
+			break;
+
+		tim = run_first_tims[min_idx];
 
-	rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+		/* Move down the runlist from which we picked a timer to
+		 * execute
+		 */
+		run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0];
 
-	/* now scan expired list and call callbacks */
-	for (tim = run_first_tim; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
-		priv_timer[lcore_id].updated = 0;
-		priv_timer[lcore_id].running_tim = tim;
+		priv_tim->updated = 0;
+		priv_tim->running_tim = tim;
 
 		/* execute callback function with list unlocked */
 		tim->f(tim, tim->arg);
 
 		__TIMER_STAT_ADD(pending, -1);
 		/* the timer was stopped or reloaded by the callback
-		 * function, we have nothing to do here */
-		if (priv_timer[lcore_id].updated == 1)
+		 * function, we have nothing to do here
+		 */
+		if (priv_tim->updated == 1)
 			continue;
 
 		if (tim->period == 0) {
 			/* remove from done list and mark timer as stopped */
 			status.state = RTE_TIMER_STOP;
-			status.owner = RTE_TIMER_NO_OWNER;
+			status.installer = RTE_TIMER_NO_LCORE;
+			status.owner = RTE_TIMER_NO_LCORE;
 			rte_wmb();
 			tim->status.u32 = status.u32;
 		}
 		else {
-			/* keep it in list and mark timer as pending */
-			rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+			dest_list = &priv_tim->pending_lists[lcore_id];
+
+			/* If the destination list is the current list
+			 * we can acquire the lock here, and hold it
+			 * across removal and insertion of the timer.
+			 */
+			if (list == dest_list) {
+				rte_spinlock_lock(&list->lock);
+				local_is_locked = 1;
+			}
+
+			/* set timer state back to PENDING and
+			 * reinsert it in pending list
+			 */
 			status.state = RTE_TIMER_PENDING;
 			__TIMER_STAT_ADD(pending, 1);
-			status.owner = (int16_t)lcore_id;
+			status.installer = tim->status.installer;
+			status.owner = lcore_id;
 			rte_wmb();
 			tim->status.u32 = status.u32;
-			__rte_timer_reset(tim, tim->expire + tim->period,
-				tim->period, lcore_id, tim->f, tim->arg, 1);
-			rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+
+			__rte_timer_reset(tim,
+					  tim->expire + tim->period,
+					  tim->period, lcore_id,
+					  tim->f, tim->arg, local_is_locked);
+
+			if (local_is_locked) {
+				rte_spinlock_unlock(&list->lock);
+				local_is_locked = 0;
+			}
 		}
 	}
-	priv_timer[lcore_id].running_tim = NULL;
+	priv_tim->running_tim = NULL;
 }
 
 /* dump statistics about timers */
diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h
index a276a73..4cc6baf 100644
--- a/lib/librte_timer/rte_timer.h
+++ b/lib/librte_timer/rte_timer.h
@@ -77,7 +77,7 @@  extern "C" {
 #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */
 #define RTE_TIMER_CONFIG  3 /**< State: timer is being configured. */
 
-#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */
+#define RTE_TIMER_NO_LCORE -2
 
 /**
  * Timer type: Periodic or single (one-shot).
@@ -94,10 +94,11 @@  enum rte_timer_type {
 union rte_timer_status {
 	RTE_STD_C11
 	struct {
-		uint16_t state;  /**< Stop, pending, running, config. */
-		int16_t owner;   /**< The lcore that owns the timer. */
+		unsigned state	: 2;
+		int installer	: 15;
+		int owner	: 15;
 	};
-	uint32_t u32;            /**< To atomic-set status + owner. */
+	uint32_t u32;            /**< To atomic-set status, installer, owner */
 };
 
 #ifdef RTE_LIBRTE_TIMER_DEBUG