From patchwork Wed Jan 11 15:05:17 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bruce Richardson X-Patchwork-Id: 19127 X-Patchwork-Delegate: ferruh.yigit@amd.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 878DDF92F; Wed, 11 Jan 2017 16:05:56 +0100 (CET) Received: from mga09.intel.com (mga09.intel.com [134.134.136.24]) by dpdk.org (Postfix) with ESMTP id A8CA5F922 for ; Wed, 11 Jan 2017 16:05:49 +0100 (CET) Received: from orsmga002.jf.intel.com ([10.7.209.21]) by orsmga102.jf.intel.com with ESMTP; 11 Jan 2017 07:05:48 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,346,1477983600"; d="scan'208";a="29056674" Received: from sivswdev01.ir.intel.com (HELO localhost.localdomain) ([10.237.217.45]) by orsmga002.jf.intel.com with ESMTP; 11 Jan 2017 07:05:47 -0800 From: Bruce Richardson To: olivier.matz@6wind.com Cc: dev@dpdk.org, Bruce Richardson Date: Wed, 11 Jan 2017 15:05:17 +0000 Message-Id: <1484147125-5948-4-git-send-email-bruce.richardson@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: <1484147125-5948-1-git-send-email-bruce.richardson@intel.com> References: <1484147125-5948-1-git-send-email-bruce.richardson@intel.com> Subject: [dpdk-dev] [RFC PATCH 03/11] ring: add ring management functions to typed ring header X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" take the ring management functions from rte_ring.c and copy them into the typed ring header file. This gives us a complete ring implementation that we can look to generalize in later commits. Signed-off-by: Bruce Richardson --- lib/librte_ring/rte_typed_ring.h | 579 +++++++++++++++++++++++++++++---------- 1 file changed, 427 insertions(+), 152 deletions(-) diff --git a/lib/librte_ring/rte_typed_ring.h b/lib/librte_ring/rte_typed_ring.h index 18cc6fe..5a14403 100644 --- a/lib/librte_ring/rte_typed_ring.h +++ b/lib/librte_ring/rte_typed_ring.h @@ -95,12 +95,21 @@ extern "C" { #include #include #include +#include +#include + #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include #define RTE_TAILQ_RING_NAME "RTE_RING" @@ -224,138 +233,6 @@ struct rte_ring { #define __RING_STAT_ADD(r, name, n) do {} while (0) #endif -/** - * Calculate the memory size needed for a ring - * - * This function returns the number of bytes needed for a ring, given - * the number of elements in it. This value is the sum of the size of - * the structure rte_ring and the size of the memory needed by the - * objects pointers. The value is aligned to a cache line size. - * - * @param count - * The number of elements in the ring (must be a power of 2). - * @return - * - The memory size needed for the ring on success. - * - -EINVAL if count is not a power of 2. - */ -ssize_t rte_ring_get_memsize(unsigned int count); - -/** - * Initialize a ring structure. - * - * Initialize a ring structure in memory pointed by "r". The size of the - * memory area must be large enough to store the ring structure and the - * object table. It is advised to use rte_ring_get_memsize() to get the - * appropriate size. - * - * The ring size is set to *count*, which must be a power of two. Water - * marking is disabled by default. The real usable ring size is - * *count-1* instead of *count* to differentiate a free ring from an - * empty ring. - * - * The ring is not added in RTE_TAILQ_RING global list. Indeed, the - * memory given by the caller may not be shareable among dpdk - * processes. - * - * @param r - * The pointer to the ring structure followed by the objects table. - * @param name - * The name of the ring. - * @param count - * The number of elements in the ring (must be a power of 2). - * @param flags - * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". - * @return - * 0 on success, or a negative value on error. - */ -int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count, - unsigned int flags); - -/** - * Create a new ring named *name* in memory. - * - * This function uses ``memzone_reserve()`` to allocate memory. Then it - * calls rte_ring_init() to initialize an empty ring. - * - * The new ring size is set to *count*, which must be a power of - * two. Water marking is disabled by default. The real usable ring size - * is *count-1* instead of *count* to differentiate a free ring from an - * empty ring. - * - * The ring is added in RTE_TAILQ_RING list. - * - * @param name - * The name of the ring. - * @param count - * The size of the ring (must be a power of 2). - * @param socket_id - * The *socket_id* argument is the socket identifier in case of - * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA - * constraint for the reserved zone. - * @param flags - * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". - * @return - * On success, the pointer to the new allocated ring. NULL on error with - * rte_errno set appropriately. Possible errno values include: - * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure - * - E_RTE_SECONDARY - function was called from a secondary process instance - * - EINVAL - count provided is not a power of 2 - * - ENOSPC - the maximum number of memzones has already been allocated - * - EEXIST - a memzone with the same name already exists - * - ENOMEM - no appropriate memory area found in which to create memzone - */ -struct rte_ring *rte_ring_create(const char *name, unsigned int count, - int socket_id, unsigned int flags); -/** - * De-allocate all memory used by the ring. - * - * @param r - * Ring to free - */ -void rte_ring_free(struct rte_ring *r); - -/** - * Change the high water mark. - * - * If *count* is 0, water marking is disabled. Otherwise, it is set to the - * *count* value. The *count* value must be greater than 0 and less - * than the ring size. - * - * This function can be called at any time (not necessarily at - * initialization). - * - * @param r - * A pointer to the ring structure. - * @param count - * The new water mark value. - * @return - * - 0: Success; water mark changed. - * - -EINVAL: Invalid water mark value. - */ -int rte_ring_set_water_mark(struct rte_ring *r, unsigned int count); - -/** - * Dump the status of the ring to a file. - * - * @param f - * A pointer to a file for output - * @param r - * A pointer to the ring structure. - */ -void rte_ring_dump(FILE *f, const struct rte_ring *r); - /* the actual enqueue of pointers on the ring. * Placed here since identical code needed in both * single and multi producer enqueue functions @@ -1124,26 +1001,6 @@ rte_ring_free_count(const struct rte_ring *r) } /** - * Dump the status of all rings on the console - * - * @param f - * A pointer to a file for output - */ -void rte_ring_list_dump(FILE *f); - -/** - * Search a ring from its name - * - * @param name - * The name of the ring. - * @return - * The pointer to the ring matching the name, or NULL if not found, - * with rte_errno set appropriately. Possible rte_errno values include: - * - ENOENT - required entry not available to return. - */ -struct rte_ring *rte_ring_lookup(const char *name); - -/** * Enqueue several objects on the ring (multi-producers safe). * * This function uses a "compare and set" instruction to move the @@ -1278,6 +1135,424 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n) return rte_ring_mc_dequeue_burst(r, obj_table, n); } +TAILQ_HEAD(rte_ring_list, rte_tailq_entry); + +static struct rte_tailq_elem rte_ring_tailq = { + .name = RTE_TAILQ_RING_NAME, +}; +EAL_REGISTER_TAILQ(rte_ring_tailq) + +/* true if x is a power of 2 */ +#define POWEROF2(x) ((((x)-1) & (x)) == 0) + +/** + * Calculate the memory size needed for a ring + * + * This function returns the number of bytes needed for a ring, given + * the number of elements in it. This value is the sum of the size of + * the structure rte_ring and the size of the memory needed by the + * objects pointers. The value is aligned to a cache line size. + * + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ring on success. + * - -EINVAL if count is not a power of 2. + */ +static inline ssize_t +rte_ring_get_memsize(unsigned int count) +{ + ssize_t sz; + + /* count must be a power of 2 */ + if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) { + RTE_LOG(ERR, RING, + "Requested size is invalid, must be power of 2, and " + "do not exceed the size limit %u\n", RTE_RING_SZ_MASK); + return -EINVAL; + } + + sz = sizeof(struct rte_ring) + count * sizeof(void *); + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + return sz; +} + +/** + * Initialize a ring structure. + * + * Initialize a ring structure in memory pointed by "r". The size of the + * memory area must be large enough to store the ring structure and the + * object table. It is advised to use rte_ring_get_memsize() to get the + * appropriate size. + * + * The ring size is set to *count*, which must be a power of two. Water + * marking is disabled by default. The real usable ring size is + * *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is not added in RTE_TAILQ_RING global list. Indeed, the + * memory given by the caller may not be shareable among dpdk + * processes. + * + * @param r + * The pointer to the ring structure followed by the objects table. + * @param name + * The name of the ring. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * 0 on success, or a negative value on error. + */ +static inline int +rte_ring_init(struct rte_ring *r, const char *name, unsigned int count, + unsigned int flags) +{ + int ret; + + /* compilation-time checks */ + RTE_BUILD_BUG_ON((sizeof(struct rte_ring) & + RTE_CACHE_LINE_MASK) != 0); +#ifdef RTE_RING_SPLIT_PROD_CONS + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) & + RTE_CACHE_LINE_MASK) != 0); +#endif + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & + RTE_CACHE_LINE_MASK) != 0); +#ifdef RTE_LIBRTE_RING_DEBUG + RTE_BUILD_BUG_ON((sizeof(struct rte_ring_debug_stats) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, stats) & + RTE_CACHE_LINE_MASK) != 0); +#endif + + /* init the ring structure */ + memset(r, 0, sizeof(*r)); + ret = snprintf(r->name, sizeof(r->name), "%s", name); + if (ret < 0 || ret >= (int)sizeof(r->name)) + return -ENAMETOOLONG; + r->flags = flags; + r->prod.watermark = count; + r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ); + r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ); + r->prod.size = r->cons.size = count; + r->prod.mask = r->cons.mask = count-1; + r->prod.head = r->cons.head = 0; + r->prod.tail = r->cons.tail = 0; + + return 0; +} + +/** + * Create a new ring named *name* in memory. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. + * + * @param name + * The name of the ring. + * @param count + * The size of the ring (must be a power of 2). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - E_RTE_SECONDARY - function was called from a secondary process instance + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +static inline struct rte_ring * +rte_ring_create(const char *name, unsigned int count, int socket_id, + unsigned int flags) +{ + char mz_name[RTE_MEMZONE_NAMESIZE]; + struct rte_ring *r; + struct rte_tailq_entry *te; + const struct rte_memzone *mz; + ssize_t ring_size; + int mz_flags = 0; + struct rte_ring_list *ring_list = NULL; + int ret; + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + + ring_size = rte_ring_get_memsize(count); + if (ring_size < 0) { + rte_errno = ring_size; + return NULL; + } + + ret = snprintf(mz_name, sizeof(mz_name), "%s%s", + RTE_RING_MZ_PREFIX, name); + if (ret < 0 || ret >= (int)sizeof(mz_name)) { + rte_errno = ENAMETOOLONG; + return NULL; + } + + te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0); + if (te == NULL) { + RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n"); + rte_errno = ENOMEM; + return NULL; + } + + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + + /* reserve a memory zone for this ring. If we can't get rte_config or + * we are secondary process, the memzone_reserve function will set + * rte_errno for us appropriately - hence no check in this this function + */ + mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags); + if (mz != NULL) { + r = mz->addr; + /* no need to check return value here, checked the args above */ + rte_ring_init(r, name, count, flags); + + te->data = (void *) r; + r->memzone = mz; + + TAILQ_INSERT_TAIL(ring_list, te, next); + } else { + r = NULL; + RTE_LOG(ERR, RING, "Cannot reserve memory\n"); + rte_free(te); + } + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + + return r; +} + +/** + * De-allocate all memory used by the ring. + * + * @param r + * Ring to free + */ +static inline void +rte_ring_free(struct rte_ring *r) +{ + struct rte_ring_list *ring_list = NULL; + struct rte_tailq_entry *te; + + if (r == NULL) + return; + + /* + * Ring was not created with rte_ring_create, + * therefore, there is no memzone to free. + */ + if (r->memzone == NULL) { + RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_ring_create()"); + return; + } + + if (rte_memzone_free(r->memzone) != 0) { + RTE_LOG(ERR, RING, "Cannot free memory\n"); + return; + } + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + + /* find out tailq entry */ + TAILQ_FOREACH(te, ring_list, next) { + if (te->data == (void *) r) + break; + } + + if (te == NULL) { + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + return; + } + + TAILQ_REMOVE(ring_list, te, next); + + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + + rte_free(te); +} + +/** + * Change the high water mark. + * + * If *count* is 0, water marking is disabled. Otherwise, it is set to the + * *count* value. The *count* value must be greater than 0 and less + * than the ring size. + * + * This function can be called at any time (not necessarily at + * initialization). + * + * @param r + * A pointer to the ring structure. + * @param count + * The new water mark value. + * @return + * - 0: Success; water mark changed. + * - -EINVAL: Invalid water mark value. + */ +static inline int +rte_ring_set_water_mark(struct rte_ring *r, unsigned int count) +{ + if (count >= r->prod.size) + return -EINVAL; + + /* if count is 0, disable the watermarking */ + if (count == 0) + count = r->prod.size; + + r->prod.watermark = count; + return 0; +} + +/** + * Dump the status of the ring to a file. + * + * @param f + * A pointer to a file for output + * @param r + * A pointer to the ring structure. + */ +static inline void +rte_ring_dump(FILE *f, const struct rte_ring *r) +{ +#ifdef RTE_LIBRTE_RING_DEBUG + struct rte_ring_debug_stats sum; + unsigned int lcore_id; +#endif + + fprintf(f, "ring <%s>@%p\n", r->name, r); + fprintf(f, " flags=%x\n", r->flags); + fprintf(f, " size=%"PRIu32"\n", r->prod.size); + fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); + fprintf(f, " ch=%"PRIu32"\n", r->cons.head); + fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); + fprintf(f, " ph=%"PRIu32"\n", r->prod.head); + fprintf(f, " used=%u\n", rte_ring_count(r)); + fprintf(f, " avail=%u\n", rte_ring_free_count(r)); + if (r->prod.watermark == r->prod.size) + fprintf(f, " watermark=0\n"); + else + fprintf(f, " watermark=%"PRIu32"\n", r->prod.watermark); + + /* sum and dump statistics */ +#ifdef RTE_LIBRTE_RING_DEBUG + memset(&sum, 0, sizeof(sum)); + for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { + sum.enq_success_bulk += r->stats[lcore_id].enq_success_bulk; + sum.enq_success_objs += r->stats[lcore_id].enq_success_objs; + sum.enq_quota_bulk += r->stats[lcore_id].enq_quota_bulk; + sum.enq_quota_objs += r->stats[lcore_id].enq_quota_objs; + sum.enq_fail_bulk += r->stats[lcore_id].enq_fail_bulk; + sum.enq_fail_objs += r->stats[lcore_id].enq_fail_objs; + sum.deq_success_bulk += r->stats[lcore_id].deq_success_bulk; + sum.deq_success_objs += r->stats[lcore_id].deq_success_objs; + sum.deq_fail_bulk += r->stats[lcore_id].deq_fail_bulk; + sum.deq_fail_objs += r->stats[lcore_id].deq_fail_objs; + } + fprintf(f, " size=%"PRIu32"\n", r->prod.size); + fprintf(f, " enq_success_bulk=%"PRIu64"\n", sum.enq_success_bulk); + fprintf(f, " enq_success_objs=%"PRIu64"\n", sum.enq_success_objs); + fprintf(f, " enq_quota_bulk=%"PRIu64"\n", sum.enq_quota_bulk); + fprintf(f, " enq_quota_objs=%"PRIu64"\n", sum.enq_quota_objs); + fprintf(f, " enq_fail_bulk=%"PRIu64"\n", sum.enq_fail_bulk); + fprintf(f, " enq_fail_objs=%"PRIu64"\n", sum.enq_fail_objs); + fprintf(f, " deq_success_bulk=%"PRIu64"\n", sum.deq_success_bulk); + fprintf(f, " deq_success_objs=%"PRIu64"\n", sum.deq_success_objs); + fprintf(f, " deq_fail_bulk=%"PRIu64"\n", sum.deq_fail_bulk); + fprintf(f, " deq_fail_objs=%"PRIu64"\n", sum.deq_fail_objs); +#else + fprintf(f, " no statistics available\n"); +#endif +} + +/** + * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output + */ +static inline void +rte_ring_list_dump(FILE *f) +{ + const struct rte_tailq_entry *te; + struct rte_ring_list *ring_list; + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + + rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK); + + TAILQ_FOREACH(te, ring_list, next) { + rte_ring_dump(f, (struct rte_ring *) te->data); + } + + rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK); +} + +/** + * Search a ring from its name + * + * @param name + * The name of the ring. + * @return + * The pointer to the ring matching the name, or NULL if not found, + * with rte_errno set appropriately. Possible rte_errno values include: + * - ENOENT - required entry not available to return. + */ +static inline struct rte_ring * +rte_ring_lookup(const char *name) +{ + struct rte_tailq_entry *te; + struct rte_ring *r = NULL; + struct rte_ring_list *ring_list; + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + + rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK); + + TAILQ_FOREACH(te, ring_list, next) { + r = (struct rte_ring *) te->data; + if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0) + break; + } + + rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK); + + if (te == NULL) { + rte_errno = ENOENT; + return NULL; + } + + return r; +} + #ifdef __cplusplus } #endif