[dpdk-dev] Enhance KNI DPDK-app-side to be Multi-Producer/Consumer

Message ID CA+cr1couxWQp+UhcKrRenuEdy34SH11Z=Lr6mrAWL_OPkC5v-A@mail.gmail.com (mailing list archive)
State Not Applicable, archived
Headers

Commit Message

Robert Sanford Nov. 19, 2014, 8:48 p.m. UTC
Hi Danny,

On Fri, Nov 14, 2014 at 7:04 PM, Zhou, Danny <danny.zhou@intel.com> wrote:

> It will be always good if you can submit the RFC patch in terms of KNI
> optimization.
>
> On the other hand, do you have any perf. data to prove that your patchset
> could improve
> KNI performance which is the concern that most customers care about? We
> introduced
> multiple-threaded KNI kernel support last year, if I remember correctly,
> the key perform
> bottle-neck we found is the skb alloc/free and memcpy between skb and
> mbuf. Would be
> very happy if your patchset can approve I am wrong.



This is not an attempt to improve raw performance. Our modest goal is to
make librte_kni's RX/TX burst APIs multithreaded, without changing
rte_kni.ko. In this RFC patch, we make it possible for multiple cores to
concurrently invoke rte_kni_tx_burst (or rte_kni_rx_burst) for the same KNI
device.

At the moment, multiple cores invoking rte_kni_tx_burst for the same device
cannot function correctly, because the rte_kni_fifo structures (memory
shared between app and kernel driver) are single-producer, single-consumer.
The following patch supplements the rte_kni_fifo structure with an
additional structure that is private to the application, and we borrow
librte_ring's MP/MC enqueue/dequeue logic.

Here is a patch for 1.8. We have only tested a 1.7.1 version. Please have a
look and let us know whether you think something like this would be useful.

--
Thanks,
Robert


*Signed-off-by: Robert Sanford <rsanford@akamai.com <rsanford@akamai.com>>*

---
 lib/librte_kni/rte_kni.c      |   21 +++++-
 lib/librte_kni/rte_kni_fifo.h |  131
+++++++++++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+), 4 deletions(-)

+}
  

Patch

diff --git a/lib/librte_kni/rte_kni.c b/lib/librte_kni/rte_kni.c
index fdb7509..8009173 100644
--- a/lib/librte_kni/rte_kni.c
+++ b/lib/librte_kni/rte_kni.c
@@ -76,6 +76,11 @@  struct rte_kni {
  struct rte_kni_fifo *alloc_q;       /**< Allocated mbufs queue */
  struct rte_kni_fifo *free_q;        /**< To be freed mbufs queue */

+ struct rte_kni_fifo_multi tx_q_mc;  /**< Make tx_q multi-consumer */
+ struct rte_kni_fifo_multi alloc_q_mp;/**< Make alloc_q multi-producer */
+ struct rte_kni_fifo_multi rx_q_mp;  /**< Make rx_q multi-producer */
+ struct rte_kni_fifo_multi free_q_mc;/**< Make free_q multi-consumer */
+
  /* For request & response */
  struct rte_kni_fifo *req_q;         /**< Request queue */
  struct rte_kni_fifo *resp_q;        /**< Response queue */
@@ -414,6 +419,11 @@  rte_kni_alloc(struct rte_mempool *pktmbuf_pool,
  kni_fifo_init(ctx->free_q, KNI_FIFO_COUNT_MAX);
  dev_info.free_phys = mz->phys_addr;

+ kni_fifo_multi_init(&ctx->tx_q_mc, KNI_FIFO_COUNT_MAX);
+ kni_fifo_multi_init(&ctx->alloc_q_mp, KNI_FIFO_COUNT_MAX);
+ kni_fifo_multi_init(&ctx->rx_q_mp, KNI_FIFO_COUNT_MAX);
+ kni_fifo_multi_init(&ctx->free_q_mc, KNI_FIFO_COUNT_MAX);
+
  /* Request RING */
  mz = slot->m_req_q;
  ctx->req_q = mz->addr;
@@ -557,7 +567,8 @@  rte_kni_handle_request(struct rte_kni *kni)
 unsigned
 rte_kni_tx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned
num)
 {
- unsigned ret = kni_fifo_put(kni->rx_q, (void **)mbufs, num);
+ unsigned ret = kni_fifo_put_mp(kni->rx_q, &kni->rx_q_mp, (void **)mbufs,
+ num);

  /* Get mbufs from free_q and then free them */
  kni_free_mbufs(kni);
@@ -568,7 +579,8 @@  rte_kni_tx_burst(struct rte_kni *kni, struct rte_mbuf
**mbufs, unsigned num)
 unsigned
 rte_kni_rx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned
num)
 {
- unsigned ret = kni_fifo_get(kni->tx_q, (void **)mbufs, num);
+ unsigned ret = kni_fifo_get_mc(kni->tx_q, &kni->tx_q_mc,
+ (void **)mbufs, num);

  /* Allocate mbufs and then put them into alloc_q */
  kni_allocate_mbufs(kni);
@@ -582,7 +594,8 @@  kni_free_mbufs(struct rte_kni *kni)
  int i, ret;
  struct rte_mbuf *pkts[MAX_MBUF_BURST_NUM];

- ret = kni_fifo_get(kni->free_q, (void **)pkts, MAX_MBUF_BURST_NUM);
+ ret = kni_fifo_get_mc(kni->free_q, &kni->free_q_mc, (void **)pkts,
+ MAX_MBUF_BURST_NUM);
  if (likely(ret > 0)) {
  for (i = 0; i < ret; i++)
  rte_pktmbuf_free(pkts[i]);
@@ -629,7 +642,7 @@  kni_allocate_mbufs(struct rte_kni *kni)
  if (i <= 0)
  return;

- ret = kni_fifo_put(kni->alloc_q, (void **)pkts, i);
+ ret = kni_fifo_put_mp(kni->alloc_q, &kni->alloc_q_mp, (void **)pkts, i);

  /* Check if any mbufs not put into alloc_q, and then free them */
  if (ret >= 0 && ret < i && ret < MAX_MBUF_BURST_NUM) {
diff --git a/lib/librte_kni/rte_kni_fifo.h b/lib/librte_kni/rte_kni_fifo.h
index 8cb8587..7dccba2 100644
--- a/lib/librte_kni/rte_kni_fifo.h
+++ b/lib/librte_kni/rte_kni_fifo.h
@@ -91,3 +91,134 @@  kni_fifo_get(struct rte_kni_fifo *fifo, void **data,
unsigned num)
  fifo->read = new_read;
  return i;
 }
+
+
+/**
+ * Suplemental members to facilitate MP/MC access to KNI FIFOs.
+ */
+struct rte_kni_fifo_multi {
+ volatile uint32_t head;
+ volatile uint32_t tail;
+ uint32_t mask;
+ uint32_t size;
+};
+
+/**
+ * Initialize a kni fifo MP/MC struct.
+ */
+static void
+kni_fifo_multi_init(struct rte_kni_fifo_multi *multi, unsigned size)
+{
+ multi->head = 0;
+ multi->tail = 0;
+ multi->mask = (typeof(multi->mask))(size - 1);
+ multi->size = (typeof(multi->size))size;
+}
+
+/**
+ * Adds num elements into the fifo. Return the number actually written.
+ *
+ * Multiple-producer version, modeled after __rte_ring_mp_do_enqueue().
+ */
+static inline unsigned
+kni_fifo_put_mp(struct rte_kni_fifo *fifo, struct rte_kni_fifo_multi *prod,
+ void **data, unsigned n)
+{
+ uint32_t prod_head, prod_next;
+ uint32_t cons_tail, free_entries;
+ const unsigned max = n;
+ int success;
+ unsigned i;
+ const uint32_t mask = prod->mask;
+
+ /* Move prod->head atomically. */
+ do {
+ /* Reset n to the initial burst count. */
+ n = max;
+
+ prod_head = prod->head;
+ cons_tail = fifo->read;
+
+ free_entries = (mask + cons_tail - prod_head) & mask;
+
+ /* Check that we have enough room in ring. */
+ if (unlikely(n > free_entries)) {
+ if (unlikely(free_entries == 0))
+ return 0;
+ n = free_entries;
+ }
+
+ prod_next = (prod_head + n) & mask;
+ success = rte_atomic32_cmpset(&prod->head, prod_head, prod_next);
+ } while (unlikely(success == 0));
+
+ /* Write entries in ring. */
+ for (i = 0; i < n; i++) {
+ fifo->buffer[(prod_head + i) & mask] = data[i];
+ }
+ rte_compiler_barrier();
+
+ /* If there are other enqueues in progress that preceded us,
+ * we need to wait for them to complete.
+ */
+ while (unlikely(prod->tail != prod_head))
+ rte_pause();
+
+ prod->tail = prod_next;
+ fifo->write = prod_next;
+ return n;
+}
+
+/**
+ * Get up to num elements from the fifo. Return the number actully read.
+ *
+ * Multiple-consumer version, modeled after __rte_ring_mc_do_dequeue().
+ */
+static inline unsigned
+kni_fifo_get_mc(struct rte_kni_fifo *fifo, struct rte_kni_fifo_multi *cons,
+ void **data, unsigned n)
+{
+ uint32_t cons_head, prod_tail;
+ uint32_t cons_next, entries;
+ const unsigned max = n;
+ int success;
+ unsigned i;
+ const uint32_t mask = cons->mask;
+
+ /* Move cons->head atomically. */
+ do {
+ /* Restore n as it may change every loop. */
+ n = max;
+
+ cons_head = cons->head;
+ prod_tail = fifo->write;
+
+ entries = (prod_tail - cons_head + mask + 1) & mask;
+
+ /* Set the actual entries for dequeue. */
+ if (n > entries) {
+ if (unlikely(entries == 0))
+ return 0;
+ n = entries;
+ }
+
+ cons_next = (cons_head + n) & mask;
+ success = rte_atomic32_cmpset(&cons->head, cons_head, cons_next);
+ } while (unlikely(success == 0));
+
+ /* Copy entries from ring. */
+ for (i = 0; i < n; i++) {
+ data[i] = fifo->buffer[(cons_head + i) & mask];
+ }
+ rte_compiler_barrier();
+
+ /* If there are other dequeues in progress that preceded us,
+ * we need to wait for them to complete.
+ */
+ while (unlikely(cons->tail != cons_head))
+ rte_pause();
+
+ cons->tail = cons_next;
+ fifo->read = cons_next;
+ return n;