From patchwork Fri Aug 25 20:27:03 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Carrillo, Erik G" X-Patchwork-Id: 28018 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 978587D53; Fri, 25 Aug 2017 22:26:49 +0200 (CEST) Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id EFCE37D30 for ; Fri, 25 Aug 2017 22:26:47 +0200 (CEST) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 25 Aug 2017 13:26:46 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos; i="5.41,426,1498546800"; d="scan'208"; a="1188215718" Received: from wcpqa1.an.intel.com ([10.123.72.207]) by fmsmga001.fm.intel.com with ESMTP; 25 Aug 2017 13:26:46 -0700 From: Gabriel Carrillo To: rsanford@akamai.com Cc: dev@dpdk.org Date: Fri, 25 Aug 2017 15:27:03 -0500 Message-Id: <1503692823-16214-1-git-send-email-erik.g.carrillo@intel.com> X-Mailer: git-send-email 1.7.10 In-Reply-To: <1503499644-29432-2-git-send-email-erik.g.carrillo@intel.com> References: <1503499644-29432-2-git-send-email-erik.g.carrillo@intel.com> Subject: [dpdk-dev] [PATCH v2 1/3] timer: add per-installer pending lists for each lcore X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Instead of each priv_timer struct containing a single skiplist, this commit adds a skiplist for each enabled lcore to priv_timer. In the case that multiple lcores repeatedly install timers on the same target lcore, this change reduces lock contention for the target lcore's skiplists and increases performance. Signed-off-by: Gabriel Carrillo --- v2: * Address checkpatch warnings lib/librte_timer/rte_timer.c | 309 +++++++++++++++++++++++++++---------------- lib/librte_timer/rte_timer.h | 9 +- 2 files changed, 202 insertions(+), 116 deletions(-) diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c index 5ee0840..da2fc1a 100644 --- a/lib/librte_timer/rte_timer.c +++ b/lib/librte_timer/rte_timer.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -56,17 +57,19 @@ LIST_HEAD(rte_timer_list, rte_timer); +struct skiplist { + struct rte_timer head; /**< dummy timer instance to head up list */ + rte_spinlock_t lock; /**< lock to protect list access */ + unsigned int depth; /**< track the current depth of the skiplist */ +} __rte_cache_aligned; + struct priv_timer { - struct rte_timer pending_head; /**< dummy timer instance to head up list */ - rte_spinlock_t list_lock; /**< lock to protect list access */ + /** one pending list per enabled lcore */ + struct skiplist pending_lists[RTE_MAX_LCORE]; /** per-core variable that true if a timer was updated on this * core since last reset of the variable */ int updated; - - /** track the current depth of the skiplist */ - unsigned curr_skiplist_depth; - unsigned prev_lcore; /**< used for lcore round robin */ /** running timer on this lcore now */ @@ -81,6 +84,10 @@ struct priv_timer { /** per-lcore private info for timers */ static struct priv_timer priv_timer[RTE_MAX_LCORE]; +/** cache of IDs of enabled lcores */ +static unsigned int enabled_lcores[RTE_MAX_LCORE]; +static int n_enabled_lcores; + /* when debug is enabled, store some statistics */ #ifdef RTE_LIBRTE_TIMER_DEBUG #define __TIMER_STAT_ADD(name, n) do { \ @@ -96,14 +103,25 @@ static struct priv_timer priv_timer[RTE_MAX_LCORE]; void rte_timer_subsystem_init(void) { - unsigned lcore_id; + unsigned int lcore_id1, lcore_id2; + struct skiplist *list; + int i, j; + + RTE_LCORE_FOREACH(lcore_id1) + enabled_lcores[n_enabled_lcores++] = lcore_id1; /* since priv_timer is static, it's zeroed by default, so only init some * fields. */ - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) { - rte_spinlock_init(&priv_timer[lcore_id].list_lock); - priv_timer[lcore_id].prev_lcore = lcore_id; + for (i = 0, lcore_id1 = enabled_lcores[i]; i < n_enabled_lcores; + lcore_id1 = enabled_lcores[++i]) { + priv_timer[lcore_id1].prev_lcore = lcore_id1; + + for (j = 0, lcore_id2 = enabled_lcores[j]; j < n_enabled_lcores; + lcore_id2 = enabled_lcores[++j]) { + list = &priv_timer[lcore_id1].pending_lists[lcore_id2]; + rte_spinlock_init(&list->lock); + } } } @@ -114,7 +132,8 @@ rte_timer_init(struct rte_timer *tim) union rte_timer_status status; status.state = RTE_TIMER_STOP; - status.owner = RTE_TIMER_NO_OWNER; + status.installer = RTE_TIMER_NO_LCORE; + status.owner = RTE_TIMER_NO_LCORE; tim->status.u32 = status.u32; } @@ -142,7 +161,7 @@ timer_set_config_state(struct rte_timer *tim, * or ready to run on local core, exit */ if (prev_status.state == RTE_TIMER_RUNNING && - (prev_status.owner != (uint16_t)lcore_id || + (prev_status.owner != (int)lcore_id || tim != priv_timer[lcore_id].running_tim)) return -1; @@ -153,7 +172,8 @@ timer_set_config_state(struct rte_timer *tim, /* here, we know that timer is stopped or pending, * mark it atomically as being configured */ status.state = RTE_TIMER_CONFIG; - status.owner = (int16_t)lcore_id; + status.installer = RTE_TIMER_NO_LCORE; + status.owner = lcore_id; success = rte_atomic32_cmpset(&tim->status.u32, prev_status.u32, status.u32); @@ -185,7 +205,8 @@ timer_set_running_state(struct rte_timer *tim) /* here, we know that timer is stopped or pending, * mark it atomically as being configured */ status.state = RTE_TIMER_RUNNING; - status.owner = (int16_t)lcore_id; + status.installer = prev_status.installer; + status.owner = lcore_id; success = rte_atomic32_cmpset(&tim->status.u32, prev_status.u32, status.u32); @@ -236,11 +257,11 @@ timer_get_skiplist_level(unsigned curr_depth) * are <= that time value. */ static void -timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore, +timer_get_prev_entries(uint64_t time_val, struct skiplist *list, struct rte_timer **prev) { - unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth; - prev[lvl] = &priv_timer[tim_lcore].pending_head; + unsigned int lvl = list->depth; + prev[lvl] = &list->head; while(lvl != 0) { lvl--; prev[lvl] = prev[lvl+1]; @@ -255,15 +276,15 @@ timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore, * all skiplist levels. */ static void -timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore, +timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list, struct rte_timer **prev) { int i; /* to get a specific entry in the list, look for just lower than the time * values, and then increment on each level individually if necessary */ - timer_get_prev_entries(tim->expire - 1, tim_lcore, prev); - for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) { + timer_get_prev_entries(tim->expire - 1, list, prev); + for (i = list->depth - 1; i >= 0; i--) { while (prev[i]->sl_next[i] != NULL && prev[i]->sl_next[i] != tim && prev[i]->sl_next[i]->expire <= tim->expire) @@ -282,25 +303,25 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked) unsigned lcore_id = rte_lcore_id(); unsigned lvl; struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1]; + struct skiplist *list = &priv_timer[tim_lcore].pending_lists[lcore_id]; /* if timer needs to be scheduled on another core, we need to * lock the list; if it is on local core, we need to lock if * we are not called from rte_timer_manage() */ if (tim_lcore != lcore_id || !local_is_locked) - rte_spinlock_lock(&priv_timer[tim_lcore].list_lock); + rte_spinlock_lock(&list->lock); /* find where exactly this element goes in the list of elements * for each depth. */ - timer_get_prev_entries(tim->expire, tim_lcore, prev); + timer_get_prev_entries(tim->expire, list, prev); /* now assign it a new level and add at that level */ - const unsigned tim_level = timer_get_skiplist_level( - priv_timer[tim_lcore].curr_skiplist_depth); - if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth) - priv_timer[tim_lcore].curr_skiplist_depth++; + const unsigned int tim_level = timer_get_skiplist_level(list->depth); + if (tim_level == list->depth) + list->depth++; lvl = tim_level; - while (lvl > 0) { + while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) { tim->sl_next[lvl] = prev[lvl]->sl_next[lvl]; prev[lvl]->sl_next[lvl] = tim; lvl--; @@ -310,11 +331,10 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked) /* save the lowest list entry into the expire field of the dummy hdr * NOTE: this is not atomic on 32-bit*/ - priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\ - pending_head.sl_next[0]->expire; + list->head.expire = list->head.sl_next[0]->expire; if (tim_lcore != lcore_id || !local_is_locked) - rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock); + rte_spinlock_unlock(&list->lock); } /* @@ -330,35 +350,38 @@ timer_del(struct rte_timer *tim, union rte_timer_status prev_status, unsigned prev_owner = prev_status.owner; int i; struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1]; + struct skiplist *list; + + list = &priv_timer[prev_owner].pending_lists[prev_status.installer]; /* if timer needs is pending another core, we need to lock the * list; if it is on local core, we need to lock if we are not * called from rte_timer_manage() */ if (prev_owner != lcore_id || !local_is_locked) - rte_spinlock_lock(&priv_timer[prev_owner].list_lock); + rte_spinlock_lock(&list->lock); /* save the lowest list entry into the expire field of the dummy hdr. * NOTE: this is not atomic on 32-bit */ - if (tim == priv_timer[prev_owner].pending_head.sl_next[0]) - priv_timer[prev_owner].pending_head.expire = - ((tim->sl_next[0] == NULL) ? 0 : tim->sl_next[0]->expire); + if (tim == list->head.sl_next[0]) + list->head.expire = ((tim->sl_next[0] == NULL) ? + 0 : tim->sl_next[0]->expire); /* adjust pointers from previous entries to point past this */ - timer_get_prev_entries_for_node(tim, prev_owner, prev); - for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) { + timer_get_prev_entries_for_node(tim, list, prev); + for (i = list->depth - 1; i >= 0; i--) { if (prev[i]->sl_next[i] == tim) prev[i]->sl_next[i] = tim->sl_next[i]; } /* in case we deleted last entry at a level, adjust down max level */ - for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) - if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL) - priv_timer[prev_owner].curr_skiplist_depth --; + for (i = list->depth - 1; i >= 0; i--) + if (list->head.sl_next[i] == NULL) + list->depth--; else break; if (prev_owner != lcore_id || !local_is_locked) - rte_spinlock_unlock(&priv_timer[prev_owner].list_lock); + rte_spinlock_unlock(&list->lock); } /* Reset and start the timer associated with the timer handle (private func) */ @@ -416,7 +439,8 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire, * the state so we don't need to use cmpset() here */ rte_wmb(); status.state = RTE_TIMER_PENDING; - status.owner = (int16_t)tim_lcore; + status.installer = lcore_id; + status.owner = tim_lcore; tim->status.u32 = status.u32; return 0; @@ -484,7 +508,8 @@ rte_timer_stop(struct rte_timer *tim) /* mark timer as stopped */ rte_wmb(); status.state = RTE_TIMER_STOP; - status.owner = RTE_TIMER_NO_OWNER; + status.installer = RTE_TIMER_NO_LCORE; + status.owner = RTE_TIMER_NO_LCORE; tim->status.u32 = status.u32; return 0; @@ -506,119 +531,179 @@ rte_timer_pending(struct rte_timer *tim) } /* must be called periodically, run all timer that expired */ -void rte_timer_manage(void) +void +rte_timer_manage(void) { union rte_timer_status status; - struct rte_timer *tim, *next_tim; - struct rte_timer *run_first_tim, **pprev; - unsigned lcore_id = rte_lcore_id(); + struct rte_timer *tim, *next_tim, **pprev; + struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1]; struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1]; - uint64_t cur_time; - int i, ret; + struct priv_timer *priv_tim; + unsigned int installer_lcore, lcore_id = rte_lcore_id(); + uint64_t cur_time, min_expire; + int i, j, min_idx, ret; + int nrunlists = 0; + int local_is_locked = 0; + struct skiplist *dest_list, *list = NULL; + bool done; /* timer manager only runs on EAL thread with valid lcore_id */ assert(lcore_id < RTE_MAX_LCORE); + priv_tim = &priv_timer[lcore_id]; + __TIMER_STAT_ADD(manage, 1); - /* optimize for the case where per-cpu list is empty */ - if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) - return; - cur_time = rte_get_timer_cycles(); + for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores; + installer_lcore = enabled_lcores[++i]) { + list = &priv_tim->pending_lists[installer_lcore]; + + /* optimize for the case where list is empty */ + if (list->head.sl_next[0] == NULL) + continue; + cur_time = rte_get_timer_cycles(); #ifdef RTE_ARCH_X86_64 - /* on 64-bit the value cached in the pending_head.expired will be - * updated atomically, so we can consult that for a quick check here - * outside the lock */ - if (likely(priv_timer[lcore_id].pending_head.expire > cur_time)) - return; + /* on 64-bit the value cached in the list->head.expired will be + * updated atomically, so we can consult that for a quick check + * here outside the lock + */ + if (likely(list->head.expire > cur_time)) + continue; #endif - /* browse ordered list, add expired timers in 'expired' list */ - rte_spinlock_lock(&priv_timer[lcore_id].list_lock); + /* browse ordered list, add expired timers in 'expired' list */ + rte_spinlock_lock(&list->lock); - /* if nothing to do just unlock and return */ - if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL || - priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) { - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); - return; - } + /* if nothing to do just unlock and continue */ + if (list->head.sl_next[0] == NULL || + list->head.sl_next[0]->expire > cur_time) { + rte_spinlock_unlock(&list->lock); + continue; + } - /* save start of list of expired timers */ - tim = priv_timer[lcore_id].pending_head.sl_next[0]; + /* save start of list of expired timers */ + tim = list->head.sl_next[0]; + + /* break the existing list at current time point */ + timer_get_prev_entries(cur_time, list, prev); + for (j = list->depth - 1; j >= 0; j--) { + if (prev[j] == &list->head) + continue; + list->head.sl_next[j] = + prev[j]->sl_next[j]; + if (prev[j]->sl_next[j] == NULL) + list->depth--; + prev[j]->sl_next[j] = NULL; + } - /* break the existing list at current time point */ - timer_get_prev_entries(cur_time, lcore_id, prev); - for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) { - if (prev[i] == &priv_timer[lcore_id].pending_head) - continue; - priv_timer[lcore_id].pending_head.sl_next[i] = - prev[i]->sl_next[i]; - if (prev[i]->sl_next[i] == NULL) - priv_timer[lcore_id].curr_skiplist_depth--; - prev[i] ->sl_next[i] = NULL; - } + /* transition run-list from PENDING to RUNNING */ + run_first_tims[nrunlists] = tim; + pprev = &run_first_tims[nrunlists]; + nrunlists++; + + for (; tim != NULL; tim = next_tim) { + next_tim = tim->sl_next[0]; + + ret = timer_set_running_state(tim); + if (likely(ret == 0)) { + pprev = &tim->sl_next[0]; + } else { + /* another core is trying to re-config this one, + * remove it from local expired list + */ + *pprev = next_tim; + } + } - /* transition run-list from PENDING to RUNNING */ - run_first_tim = tim; - pprev = &run_first_tim; + /* update the next to expire timer value */ + list->head.expire = (list->head.sl_next[0] == NULL) ? + 0 : list->head.sl_next[0]->expire; - for ( ; tim != NULL; tim = next_tim) { - next_tim = tim->sl_next[0]; + rte_spinlock_unlock(&list->lock); + } - ret = timer_set_running_state(tim); - if (likely(ret == 0)) { - pprev = &tim->sl_next[0]; - } else { - /* another core is trying to re-config this one, - * remove it from local expired list - */ - *pprev = next_tim; + /* Now process the run lists */ + while (1) { + done = true; + min_expire = UINT64_MAX; + min_idx = 0; + + /* Find the next oldest timer to process */ + for (i = 0; i < nrunlists; i++) { + tim = run_first_tims[i]; + + if (tim != NULL && tim->expire < min_expire) { + min_expire = tim->expire; + min_idx = i; + done = false; + } } - } - /* update the next to expire timer value */ - priv_timer[lcore_id].pending_head.expire = - (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 : - priv_timer[lcore_id].pending_head.sl_next[0]->expire; + if (done) + break; + + tim = run_first_tims[min_idx]; - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); + /* Move down the runlist from which we picked a timer to + * execute + */ + run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0]; - /* now scan expired list and call callbacks */ - for (tim = run_first_tim; tim != NULL; tim = next_tim) { - next_tim = tim->sl_next[0]; - priv_timer[lcore_id].updated = 0; - priv_timer[lcore_id].running_tim = tim; + priv_tim->updated = 0; + priv_tim->running_tim = tim; /* execute callback function with list unlocked */ tim->f(tim, tim->arg); __TIMER_STAT_ADD(pending, -1); /* the timer was stopped or reloaded by the callback - * function, we have nothing to do here */ - if (priv_timer[lcore_id].updated == 1) + * function, we have nothing to do here + */ + if (priv_tim->updated == 1) continue; if (tim->period == 0) { /* remove from done list and mark timer as stopped */ status.state = RTE_TIMER_STOP; - status.owner = RTE_TIMER_NO_OWNER; + status.installer = RTE_TIMER_NO_LCORE; + status.owner = RTE_TIMER_NO_LCORE; rte_wmb(); tim->status.u32 = status.u32; } else { - /* keep it in list and mark timer as pending */ - rte_spinlock_lock(&priv_timer[lcore_id].list_lock); + dest_list = &priv_tim->pending_lists[lcore_id]; + + /* If the destination list is the current list + * we can acquire the lock here, and hold it + * across removal and insertion of the timer. + */ + if (list == dest_list) { + rte_spinlock_lock(&list->lock); + local_is_locked = 1; + } + + /* set timer state back to PENDING and + * reinsert it in pending list + */ status.state = RTE_TIMER_PENDING; __TIMER_STAT_ADD(pending, 1); - status.owner = (int16_t)lcore_id; + status.installer = tim->status.installer; + status.owner = lcore_id; rte_wmb(); tim->status.u32 = status.u32; - __rte_timer_reset(tim, tim->expire + tim->period, - tim->period, lcore_id, tim->f, tim->arg, 1); - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); + + __rte_timer_reset(tim, + tim->expire + tim->period, + tim->period, lcore_id, + tim->f, tim->arg, local_is_locked); + + if (local_is_locked) { + rte_spinlock_unlock(&list->lock); + local_is_locked = 0; + } } } - priv_timer[lcore_id].running_tim = NULL; + priv_tim->running_tim = NULL; } /* dump statistics about timers */ diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h index a276a73..4cc6baf 100644 --- a/lib/librte_timer/rte_timer.h +++ b/lib/librte_timer/rte_timer.h @@ -77,7 +77,7 @@ extern "C" { #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */ #define RTE_TIMER_CONFIG 3 /**< State: timer is being configured. */ -#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */ +#define RTE_TIMER_NO_LCORE -2 /** * Timer type: Periodic or single (one-shot). @@ -94,10 +94,11 @@ enum rte_timer_type { union rte_timer_status { RTE_STD_C11 struct { - uint16_t state; /**< Stop, pending, running, config. */ - int16_t owner; /**< The lcore that owns the timer. */ + unsigned state : 2; + int installer : 15; + int owner : 15; }; - uint32_t u32; /**< To atomic-set status + owner. */ + uint32_t u32; /**< To atomic-set status, installer, owner */ }; #ifdef RTE_LIBRTE_TIMER_DEBUG