[dpdk-dev,1/3] timer: add per-installer pending lists for each lcore

Message ID 1503499644-29432-2-git-send-email-erik.g.carrillo@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch warning coding style issues

Commit Message

Carrillo, Erik G Aug. 23, 2017, 2:47 p.m. UTC
  Instead of each priv_timer struct containing a single skiplist, this
commit adds a skiplist for each enabled lcore to priv_timer. In the case
that multiple lcores repeatedly install timers on the same target lcore,
this change reduces lock contention for the target lcore's skiplists and
increases performance.

Signed-off-by: Gabriel Carrillo <erik.g.carrillo@intel.com>
---
 lib/librte_timer/rte_timer.c | 309 +++++++++++++++++++++++++++----------------
 lib/librte_timer/rte_timer.h |   9 +-
 2 files changed, 202 insertions(+), 116 deletions(-)
  

Comments

Stephen Hemminger Aug. 29, 2017, 3:11 p.m. UTC | #1
On Wed, 23 Aug 2017 09:47:22 -0500
Gabriel Carrillo <erik.g.carrillo@intel.com> wrote:

>  	__TIMER_STAT_ADD(manage, 1);
> -	/* optimize for the case where per-cpu list is empty */
> -	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL)
> -		return;
> -	cur_time = rte_get_timer_cycles();
> +	for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores;
> +	     installer_lcore = enabled_lcores[++i]) {
> +		list = &priv_tim->pending_lists[installer_lcore];
> +
> +		/* optimize for the case where list is empty */
> +		if (list->head.sl_next[0] == NULL)
> +			continue;
> +		cur_time = rte_get_timer_cycles();

This code is critical. We expect applications using timers to call timer_manage
very often and the case of no timers present, and no timer due must consume as
few cycles as possible

This change will add significant performance delays to these applications.
  

Patch

diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
index 5ee0840..2db74c1 100644
--- a/lib/librte_timer/rte_timer.c
+++ b/lib/librte_timer/rte_timer.c
@@ -37,6 +37,7 @@ 
 #include <inttypes.h>
 #include <assert.h>
 #include <sys/queue.h>
+#include <stdbool.h>
 
 #include <rte_atomic.h>
 #include <rte_common.h>
@@ -56,17 +57,19 @@ 
 
 LIST_HEAD(rte_timer_list, rte_timer);
 
+struct skiplist {
+	struct rte_timer head;  /**< dummy timer instance to head up list */
+	rte_spinlock_t lock;	/**< lock to protect list access */
+	unsigned depth;		/**< track the current depth of the skiplist */
+} __rte_cache_aligned;
+
 struct priv_timer {
-	struct rte_timer pending_head;  /**< dummy timer instance to head up list */
-	rte_spinlock_t list_lock;       /**< lock to protect list access */
+	/** one pending list per enabled lcore */
+	struct skiplist pending_lists[RTE_MAX_LCORE];
 
 	/** per-core variable that true if a timer was updated on this
 	 *  core since last reset of the variable */
 	int updated;
-
-	/** track the current depth of the skiplist */
-	unsigned curr_skiplist_depth;
-
 	unsigned prev_lcore;              /**< used for lcore round robin */
 
 	/** running timer on this lcore now */
@@ -81,6 +84,10 @@  struct priv_timer {
 /** per-lcore private info for timers */
 static struct priv_timer priv_timer[RTE_MAX_LCORE];
 
+/** cache of IDs of enabled lcores */
+static unsigned enabled_lcores[RTE_MAX_LCORE];
+static int n_enabled_lcores;
+
 /* when debug is enabled, store some statistics */
 #ifdef RTE_LIBRTE_TIMER_DEBUG
 #define __TIMER_STAT_ADD(name, n) do {					\
@@ -96,14 +103,25 @@  static struct priv_timer priv_timer[RTE_MAX_LCORE];
 void
 rte_timer_subsystem_init(void)
 {
-	unsigned lcore_id;
+	unsigned lcore_id1, lcore_id2;
+	struct skiplist *list;
+	int i, j;
+
+	RTE_LCORE_FOREACH(lcore_id1)
+		enabled_lcores[n_enabled_lcores++] = lcore_id1;
 
 	/* since priv_timer is static, it's zeroed by default, so only init some
 	 * fields.
 	 */
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) {
-		rte_spinlock_init(&priv_timer[lcore_id].list_lock);
-		priv_timer[lcore_id].prev_lcore = lcore_id;
+	for (i = 0, lcore_id1 = enabled_lcores[i]; i < n_enabled_lcores;
+	     lcore_id1 = enabled_lcores[++i]) {
+		priv_timer[lcore_id1].prev_lcore = lcore_id1;
+
+		for (j = 0, lcore_id2 = enabled_lcores[j]; j < n_enabled_lcores;
+		     lcore_id2 = enabled_lcores[++j]) {
+			list = &priv_timer[lcore_id1].pending_lists[lcore_id2];
+			rte_spinlock_init(&list->lock);
+		}
 	}
 }
 
@@ -114,7 +132,8 @@  rte_timer_init(struct rte_timer *tim)
 	union rte_timer_status status;
 
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.installer = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 }
 
@@ -142,7 +161,7 @@  timer_set_config_state(struct rte_timer *tim,
 		 * or ready to run on local core, exit
 		 */
 		if (prev_status.state == RTE_TIMER_RUNNING &&
-		    (prev_status.owner != (uint16_t)lcore_id ||
+		    (prev_status.owner != (int)lcore_id ||
 		     tim != priv_timer[lcore_id].running_tim))
 			return -1;
 
@@ -153,7 +172,8 @@  timer_set_config_state(struct rte_timer *tim,
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_CONFIG;
-		status.owner = (int16_t)lcore_id;
+		status.installer = RTE_TIMER_NO_LCORE;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -185,7 +205,8 @@  timer_set_running_state(struct rte_timer *tim)
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_RUNNING;
-		status.owner = (int16_t)lcore_id;
+		status.installer = prev_status.installer;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -236,11 +257,11 @@  timer_get_skiplist_level(unsigned curr_depth)
  * are <= that time value.
  */
 static void
-timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
+timer_get_prev_entries(uint64_t time_val, struct skiplist *list,
 		struct rte_timer **prev)
 {
-	unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth;
-	prev[lvl] = &priv_timer[tim_lcore].pending_head;
+	unsigned lvl = list->depth;
+	prev[lvl] = &list->head;
 	while(lvl != 0) {
 		lvl--;
 		prev[lvl] = prev[lvl+1];
@@ -255,15 +276,15 @@  timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
  * all skiplist levels.
  */
 static void
-timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore,
+timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list,
 		struct rte_timer **prev)
 {
 	int i;
 	/* to get a specific entry in the list, look for just lower than the time
 	 * values, and then increment on each level individually if necessary
 	 */
-	timer_get_prev_entries(tim->expire - 1, tim_lcore, prev);
-	for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries(tim->expire - 1, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		while (prev[i]->sl_next[i] != NULL &&
 				prev[i]->sl_next[i] != tim &&
 				prev[i]->sl_next[i]->expire <= tim->expire)
@@ -282,25 +303,25 @@  timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 	unsigned lcore_id = rte_lcore_id();
 	unsigned lvl;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list = &priv_timer[tim_lcore].pending_lists[lcore_id];
 
 	/* if timer needs to be scheduled on another core, we need to
 	 * lock the list; if it is on local core, we need to lock if
 	 * we are not called from rte_timer_manage() */
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* find where exactly this element goes in the list of elements
 	 * for each depth. */
-	timer_get_prev_entries(tim->expire, tim_lcore, prev);
+	timer_get_prev_entries(tim->expire, list, prev);
 
 	/* now assign it a new level and add at that level */
-	const unsigned tim_level = timer_get_skiplist_level(
-			priv_timer[tim_lcore].curr_skiplist_depth);
-	if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth)
-		priv_timer[tim_lcore].curr_skiplist_depth++;
+	const unsigned tim_level = timer_get_skiplist_level(list->depth);
+	if (tim_level == list->depth)
+		list->depth++;
 
 	lvl = tim_level;
-	while (lvl > 0) {
+	while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) {
 		tim->sl_next[lvl] = prev[lvl]->sl_next[lvl];
 		prev[lvl]->sl_next[lvl] = tim;
 		lvl--;
@@ -310,11 +331,10 @@  timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 
 	/* save the lowest list entry into the expire field of the dummy hdr
 	 * NOTE: this is not atomic on 32-bit*/
-	priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\
-			pending_head.sl_next[0]->expire;
+	list->head.expire = list->head.sl_next[0]->expire;
 
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /*
@@ -330,35 +350,38 @@  timer_del(struct rte_timer *tim, union rte_timer_status prev_status,
 	unsigned prev_owner = prev_status.owner;
 	int i;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list;
+
+	list = &priv_timer[prev_owner].pending_lists[prev_status.installer];
 
 	/* if timer needs is pending another core, we need to lock the
 	 * list; if it is on local core, we need to lock if we are not
 	 * called from rte_timer_manage() */
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* save the lowest list entry into the expire field of the dummy hdr.
 	 * NOTE: this is not atomic on 32-bit */
-	if (tim == priv_timer[prev_owner].pending_head.sl_next[0])
-		priv_timer[prev_owner].pending_head.expire =
-				((tim->sl_next[0] == NULL) ? 0 : tim->sl_next[0]->expire);
+	if (tim == list->head.sl_next[0])
+		list->head.expire = ((tim->sl_next[0] == NULL) ?
+				     0 : tim->sl_next[0]->expire);
 
 	/* adjust pointers from previous entries to point past this */
-	timer_get_prev_entries_for_node(tim, prev_owner, prev);
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries_for_node(tim, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		if (prev[i]->sl_next[i] == tim)
 			prev[i]->sl_next[i] = tim->sl_next[i];
 	}
 
 	/* in case we deleted last entry at a level, adjust down max level */
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--)
-		if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL)
-			priv_timer[prev_owner].curr_skiplist_depth --;
+	for (i = list->depth - 1; i >= 0; i--)
+		if (list->head.sl_next[i] == NULL)
+			list->depth--;
 		else
 			break;
 
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /* Reset and start the timer associated with the timer handle (private func) */
@@ -416,7 +439,8 @@  __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 	 * the state so we don't need to use cmpset() here */
 	rte_wmb();
 	status.state = RTE_TIMER_PENDING;
-	status.owner = (int16_t)tim_lcore;
+	status.installer = lcore_id;
+	status.owner = tim_lcore;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -484,7 +508,8 @@  rte_timer_stop(struct rte_timer *tim)
 	/* mark timer as stopped */
 	rte_wmb();
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.installer = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -506,119 +531,179 @@  rte_timer_pending(struct rte_timer *tim)
 }
 
 /* must be called periodically, run all timer that expired */
-void rte_timer_manage(void)
+void
+rte_timer_manage(void)
 {
 	union rte_timer_status status;
-	struct rte_timer *tim, *next_tim;
-	struct rte_timer *run_first_tim, **pprev;
-	unsigned lcore_id = rte_lcore_id();
+	struct rte_timer *tim, *next_tim, **pprev;
+	struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1];
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1];
-	uint64_t cur_time;
-	int i, ret;
+	struct priv_timer *priv_tim;
+	unsigned installer_lcore, lcore_id = rte_lcore_id();
+	uint64_t cur_time, min_expire;
+	int i, j, min_idx, ret;
+	int nrunlists = 0;
+	int local_is_locked = 0;
+	struct skiplist *dest_list, *list = NULL;
+	bool done;
 
 	/* timer manager only runs on EAL thread with valid lcore_id */
 	assert(lcore_id < RTE_MAX_LCORE);
 
+	priv_tim = &priv_timer[lcore_id];
+
 	__TIMER_STAT_ADD(manage, 1);
-	/* optimize for the case where per-cpu list is empty */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL)
-		return;
-	cur_time = rte_get_timer_cycles();
+	for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores;
+	     installer_lcore = enabled_lcores[++i]) {
+		list = &priv_tim->pending_lists[installer_lcore];
+
+		/* optimize for the case where list is empty */
+		if (list->head.sl_next[0] == NULL)
+			continue;
+		cur_time = rte_get_timer_cycles();
 
 #ifdef RTE_ARCH_X86_64
-	/* on 64-bit the value cached in the pending_head.expired will be
-	 * updated atomically, so we can consult that for a quick check here
-	 * outside the lock */
-	if (likely(priv_timer[lcore_id].pending_head.expire > cur_time))
-		return;
+		/* on 64-bit the value cached in the list->head.expired will be
+		 * updated atomically, so we can consult that for a quick check
+		 * here outside the lock
+		 */
+		if (likely(list->head.expire > cur_time))
+			continue;
 #endif
 
-	/* browse ordered list, add expired timers in 'expired' list */
-	rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+		/* browse ordered list, add expired timers in 'expired' list */
+		rte_spinlock_lock(&list->lock);
 
-	/* if nothing to do just unlock and return */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL ||
-	    priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) {
-		rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
-		return;
-	}
+		/* if nothing to do just unlock and continue */
+		if (list->head.sl_next[0] == NULL ||
+		    list->head.sl_next[0]->expire > cur_time) {
+			rte_spinlock_unlock(&list->lock);
+			continue;
+		}
 
-	/* save start of list of expired timers */
-	tim = priv_timer[lcore_id].pending_head.sl_next[0];
+		/* save start of list of expired timers */
+		tim = list->head.sl_next[0];
+
+		/* break the existing list at current time point */
+		timer_get_prev_entries(cur_time, list, prev);
+		for (j = list->depth - 1; j >= 0; j--) {
+			if (prev[j] == &list->head)
+				continue;
+			list->head.sl_next[j] =
+			    prev[j]->sl_next[j];
+			if (prev[j]->sl_next[j] == NULL)
+				list->depth--;
+			prev[j]->sl_next[j] = NULL;
+		}
 
-	/* break the existing list at current time point */
-	timer_get_prev_entries(cur_time, lcore_id, prev);
-	for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) {
-		if (prev[i] == &priv_timer[lcore_id].pending_head)
-			continue;
-		priv_timer[lcore_id].pending_head.sl_next[i] =
-		    prev[i]->sl_next[i];
-		if (prev[i]->sl_next[i] == NULL)
-			priv_timer[lcore_id].curr_skiplist_depth--;
-		prev[i] ->sl_next[i] = NULL;
-	}
+		/* transition run-list from PENDING to RUNNING */
+		run_first_tims[nrunlists] = tim;
+		pprev = &run_first_tims[nrunlists];
+		nrunlists++;
+
+		for (; tim != NULL; tim = next_tim) {
+			next_tim = tim->sl_next[0];
+
+			ret = timer_set_running_state(tim);
+			if (likely(ret == 0)) {
+				pprev = &tim->sl_next[0];
+			} else {
+				/* another core is trying to re-config this one,
+				 * remove it from local expired list
+				 */
+				*pprev = next_tim;
+			}
+		}
 
-	/* transition run-list from PENDING to RUNNING */
-	run_first_tim = tim;
-	pprev = &run_first_tim;
+		/* update the next to expire timer value */
+		list->head.expire = (list->head.sl_next[0] == NULL) ?
+				    0 : list->head.sl_next[0]->expire;
 
-	for ( ; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
+		rte_spinlock_unlock(&list->lock);
+	}
 
-		ret = timer_set_running_state(tim);
-		if (likely(ret == 0)) {
-			pprev = &tim->sl_next[0];
-		} else {
-			/* another core is trying to re-config this one,
-			 * remove it from local expired list
-			 */
-			*pprev = next_tim;
+	/* Now process the run lists */
+	while (1) {
+		done = true;
+		min_expire = UINT64_MAX;
+		min_idx = 0;
+
+		/* Find the next oldest timer to process */
+		for (i = 0; i < nrunlists; i++) {
+			tim = run_first_tims[i];
+
+			if (tim != NULL && tim->expire < min_expire) {
+				min_expire = tim->expire;
+				min_idx = i;
+				done = false;
+			}
 		}
-	}
 
-	/* update the next to expire timer value */
-	priv_timer[lcore_id].pending_head.expire =
-	    (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 :
-		priv_timer[lcore_id].pending_head.sl_next[0]->expire;
+		if (done)
+			break;
+
+		tim = run_first_tims[min_idx];
 
-	rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+		/* Move down the runlist from which we picked a timer to
+		 * execute
+		 */
+		run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0];
 
-	/* now scan expired list and call callbacks */
-	for (tim = run_first_tim; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
-		priv_timer[lcore_id].updated = 0;
-		priv_timer[lcore_id].running_tim = tim;
+		priv_tim->updated = 0;
+		priv_tim->running_tim = tim;
 
 		/* execute callback function with list unlocked */
 		tim->f(tim, tim->arg);
 
 		__TIMER_STAT_ADD(pending, -1);
 		/* the timer was stopped or reloaded by the callback
-		 * function, we have nothing to do here */
-		if (priv_timer[lcore_id].updated == 1)
+		 * function, we have nothing to do here
+		 */
+		if (priv_tim->updated == 1)
 			continue;
 
 		if (tim->period == 0) {
 			/* remove from done list and mark timer as stopped */
 			status.state = RTE_TIMER_STOP;
-			status.owner = RTE_TIMER_NO_OWNER;
+			status.installer = RTE_TIMER_NO_LCORE;
+			status.owner = RTE_TIMER_NO_LCORE;
 			rte_wmb();
 			tim->status.u32 = status.u32;
 		}
 		else {
-			/* keep it in list and mark timer as pending */
-			rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+			dest_list = &priv_tim->pending_lists[lcore_id];
+
+			/* If the destination list is the current list
+			 * we can acquire the lock here, and hold it
+			 * across removal and insertion of the timer.
+			 */
+			if (list == dest_list) {
+				rte_spinlock_lock(&list->lock);
+				local_is_locked = 1;
+			}
+
+			/* set timer state back to PENDING and
+			 * reinsert it in pending list
+			 */
 			status.state = RTE_TIMER_PENDING;
 			__TIMER_STAT_ADD(pending, 1);
-			status.owner = (int16_t)lcore_id;
+			status.installer = tim->status.installer;
+			status.owner = lcore_id;
 			rte_wmb();
 			tim->status.u32 = status.u32;
-			__rte_timer_reset(tim, tim->expire + tim->period,
-				tim->period, lcore_id, tim->f, tim->arg, 1);
-			rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+
+			__rte_timer_reset(tim,
+					  tim->expire + tim->period,
+					  tim->period, lcore_id,
+					  tim->f, tim->arg, local_is_locked);
+
+			if (local_is_locked) {
+				rte_spinlock_unlock(&list->lock);
+				local_is_locked = 0;
+			}
 		}
 	}
-	priv_timer[lcore_id].running_tim = NULL;
+	priv_tim->running_tim = NULL;
 }
 
 /* dump statistics about timers */
diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h
index a276a73..4cc6baf 100644
--- a/lib/librte_timer/rte_timer.h
+++ b/lib/librte_timer/rte_timer.h
@@ -77,7 +77,7 @@  extern "C" {
 #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */
 #define RTE_TIMER_CONFIG  3 /**< State: timer is being configured. */
 
-#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */
+#define RTE_TIMER_NO_LCORE -2
 
 /**
  * Timer type: Periodic or single (one-shot).
@@ -94,10 +94,11 @@  enum rte_timer_type {
 union rte_timer_status {
 	RTE_STD_C11
 	struct {
-		uint16_t state;  /**< Stop, pending, running, config. */
-		int16_t owner;   /**< The lcore that owns the timer. */
+		unsigned state	: 2;
+		int installer	: 15;
+		int owner	: 15;
 	};
-	uint32_t u32;            /**< To atomic-set status + owner. */
+	uint32_t u32;            /**< To atomic-set status, installer, owner */
 };
 
 #ifdef RTE_LIBRTE_TIMER_DEBUG