[dpdk-dev,2/3] port: added ethdev_writer_nodrop port
Commit Message
When ethdev_writer_nodrop port fails to send data, it tries to resend. Operation
is aborted when maximum number of retries is reached.
---
lib/librte_port/rte_port_ethdev.c | 230 +++++++++++++++++++++++++++++++++++++
lib/librte_port/rte_port_ethdev.h | 19 +++
2 files changed, 249 insertions(+)
Comments
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Maciej Gajdzica
> Sent: Monday, March 30, 2015 11:57 AM
> To: dev@dpdk.org
> Subject: [dpdk-dev] [PATCH 2/3] port: added ethdev_writer_nodrop port
>
> When ethdev_writer_nodrop port fails to send data, it tries to resend.
> Operation
> is aborted when maximum number of retries is reached.
>
> ---
> lib/librte_port/rte_port_ethdev.c | 230
> +++++++++++++++++++++++++++++++++++++
> lib/librte_port/rte_port_ethdev.h | 19 +++
> 2 files changed, 249 insertions(+)
>
> diff --git a/lib/librte_port/rte_port_ethdev.c
> b/lib/librte_port/rte_port_ethdev.c
> index d014913..1f77cb5 100644
> --- a/lib/librte_port/rte_port_ethdev.c
> +++ b/lib/librte_port/rte_port_ethdev.c
> @@ -31,6 +31,7 @@
> * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> */
> #include <string.h>
> +#include <stdint.h>
>
> #include <rte_mbuf.h>
> #include <rte_ethdev.h>
> @@ -288,6 +289,227 @@ rte_port_ethdev_writer_free(void *port)
> }
>
> /*
> + * Port ETHDEV Writer Nodrop
> + */
> +#define RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH 1
> +
> +struct rte_port_ethdev_writer_nodrop {
> + struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
> + uint32_t tx_burst_sz;
> + uint16_t tx_buf_count;
> + uint64_t bsz_mask;
> + uint64_t n_retries;
> + uint16_t queue_id;
> + uint8_t port_id;
> +};
> +
> +static void *
> +rte_port_ethdev_writer_nodrop_create(void *params, int socket_id)
> +{
> + struct rte_port_ethdev_writer_nodrop_params *conf =
> + (struct rte_port_ethdev_writer_nodrop_params *)
> params;
> + struct rte_port_ethdev_writer_nodrop *port;
> +
> + /* Check input parameters */
> + if ((conf == NULL) ||
> + (conf->tx_burst_sz == 0) ||
> + (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
> + (!rte_is_power_of_2(conf->tx_burst_sz))) {
> + RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n",
> __func__);
> + return NULL;
> + }
> +
> + /* Memory allocation */
> + port = rte_zmalloc_socket("PORT", sizeof(*port),
> + RTE_CACHE_LINE_SIZE, socket_id);
> + if (port == NULL) {
> + RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n",
> __func__);
> + return NULL;
> + }
> +
> + /* Initialization */
> + port->port_id = conf->port_id;
> + port->queue_id = conf->queue_id;
> + port->tx_burst_sz = conf->tx_burst_sz;
> + port->tx_buf_count = 0;
> + port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +
> + /*
> + * When n_retries is 0 it means that we should wait for every packet
> to
> + * send no matter how many retries should it take. To limit number of
> + * branches in fast path, we use UINT64_MAX instead of branching.
> + */
> + port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf-
> >n_retries;
> +
> + return port;
> +}
> +
> +static inline void
> +send_burst_nodrop(struct rte_port_ethdev_writer_nodrop *p)
> +{
> + uint32_t nb_tx = 0, i;
> +
> + nb_tx = rte_eth_tx_burst(p->port_id, p->queue_id, p->tx_buf,
> + p->tx_buf_count);
> +
> + /* We sent all the packets in a first try */
> + if (nb_tx >= p->tx_buf_count)
> + return;
> +
> + for (i = 0; i < p->n_retries; i++) {
> + nb_tx += rte_eth_tx_burst(p->port_id, p->queue_id,
> + p->tx_buf + nb_tx, p-
> >tx_buf_count - nb_tx);
> +
> + /* We sent all the packets in more than one try */
> + if (nb_tx >= p->tx_buf_count)
> + return;
> +
I think that this method will work only for single output port in pipeline.
For multiport configurations especially when TX HW rings are continuously overloaded
The driver will wait all the time on this loop.
As example Imagine that you have single TX thread in DPDK pipeline and you have multiple output Ethernet ports with different speeds
10G and 40G calling +send_burst_nodrop - both overloaded. The effect will be that 10G port will wait most of time and limit the speed of the 40G so
The effect expected is that 40G works on 10G speed ;-(
Similar effect possible when same speed ports are accessed by send_burst_nodrop and the DCB/or any other L2 flow control is enabled that limits the TX speed
The speed of whole system will be the speed of the slowest link.
}
> +
> + /* We didn't send the packets in maximum allowed attempts */
> + for ( ; nb_tx < p->tx_buf_count; nb_tx++)
> + rte_pktmbuf_free(p->tx_buf[nb_tx]);
> +
> + p->tx_buf_count = 0;
> +}
> +
> +static int
> +rte_port_ethdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + if (p->tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> +
> + return 0;
> +}
> +
> +#if RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 0
> +
> +static int
> +rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
> + struct rte_mbuf **pkts,
> + uint64_t pkts_mask)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + if ((pkts_mask & (pkts_mask + 1)) == 0) {
> + uint64_t n_pkts = __builtin_popcountll(pkts_mask);
> + uint32_t i;
> +
> + for (i = 0; i < n_pkts; i++) {
> + struct rte_mbuf *pkt = pkts[i];
> +
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + if (p->tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> + }
> + } else {
> + for ( ; pkts_mask; ) {
> + uint32_t pkt_index = __builtin_ctzll(pkts_mask);
> + uint64_t pkt_mask = 1LLU << pkt_index;
> + struct rte_mbuf *pkt = pkts[pkt_index];
> +
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + if (p->tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> + pkts_mask &= ~pkt_mask;
> + }
> + }
> +
> + return 0;
> +}
> +
> +#elif RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 1
> +
> +static int
> +rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
> + struct rte_mbuf **pkts,
> + uint64_t pkts_mask)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + uint32_t bsz_mask = p->bsz_mask;
> + uint32_t tx_buf_count = p->tx_buf_count;
> + uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> + ((pkts_mask & bsz_mask) ^ bsz_mask);
> +
> + if (expr == 0) {
> + uint64_t n_pkts = __builtin_popcountll(pkts_mask);
> + uint32_t n_pkts_ok;
> +
> + if (tx_buf_count)
> + send_burst_nodrop(p);
> +
> + n_pkts_ok = rte_eth_tx_burst(p->port_id, p->queue_id, pkts,
> + n_pkts);
> +
> + if (n_pkts_ok >= n_pkts)
> + return 0;
> +
> + /*
> + * If we didnt manage to send all packets in single burst, move
> + * remaining packets to the buffer and call send burst.
> + */
> + for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
> + struct rte_mbuf *pkt = pkts[n_pkts_ok];
> + p->tx_buf[p->tx_buf_count++] = pkt;
> + }
> + send_burst_nodrop(p);
> + } else {
> + for ( ; pkts_mask; ) {
> + uint32_t pkt_index = __builtin_ctzll(pkts_mask);
> + uint64_t pkt_mask = 1LLU << pkt_index;
> + struct rte_mbuf *pkt = pkts[pkt_index];
> +
> + p->tx_buf[tx_buf_count++] = pkt;
> + pkts_mask &= ~pkt_mask;
> + }
> +
> + p->tx_buf_count = tx_buf_count;
> + if (tx_buf_count >= p->tx_burst_sz)
> + send_burst_nodrop(p);
> + }
> +
> + return 0;
> +}
> +
> +#else
> +
> +#error Invalid value for RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH
> +
> +#endif
> +
> +static int
> +rte_port_ethdev_writer_nodrop_flush(void *port)
> +{
> + struct rte_port_ethdev_writer_nodrop *p =
> + (struct rte_port_ethdev_writer_nodrop *) port;
> +
> + if (p->tx_buf_count > 0)
> + send_burst_nodrop(p);
> +
> + return 0;
> +}
> +
> +static int
> +rte_port_ethdev_writer_nodrop_free(void *port)
> +{
> + if (port == NULL) {
> + RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
> + return -EINVAL;
> + }
> +
> + rte_port_ethdev_writer_nodrop_flush(port);
> + rte_free(port);
> +
> + return 0;
> +}
> +
> +/*
> * Summary of port operations
> */
> struct rte_port_in_ops rte_port_ethdev_reader_ops = {
> @@ -303,3 +525,11 @@ struct rte_port_out_ops
> rte_port_ethdev_writer_ops = {
> .f_tx_bulk = rte_port_ethdev_writer_tx_bulk,
> .f_flush = rte_port_ethdev_writer_flush,
> };
> +
> +struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops = {
> + .f_create = rte_port_ethdev_writer_nodrop_create,
> + .f_free = rte_port_ethdev_writer_nodrop_free,
> + .f_tx = rte_port_ethdev_writer_nodrop_tx,
> + .f_tx_bulk = rte_port_ethdev_writer_nodrop_tx_bulk,
> + .f_flush = rte_port_ethdev_writer_nodrop_flush,
> +};
> diff --git a/lib/librte_port/rte_port_ethdev.h
> b/lib/librte_port/rte_port_ethdev.h
> index af67a12..201a79e 100644
> --- a/lib/librte_port/rte_port_ethdev.h
> +++ b/lib/librte_port/rte_port_ethdev.h
> @@ -79,6 +79,25 @@ struct rte_port_ethdev_writer_params {
> /** ethdev_writer port operations */
> extern struct rte_port_out_ops rte_port_ethdev_writer_ops;
>
> +/** ethdev_writer_nodrop port parameters */
> +struct rte_port_ethdev_writer_nodrop_params {
> + /** NIC RX port ID */
> + uint8_t port_id;
> +
> + /** NIC RX queue ID */
> + uint16_t queue_id;
> +
> + /** Recommended burst size to NIC TX queue. The actual burst size
> can be
> + bigger or smaller than this value. */
> + uint32_t tx_burst_sz;
> +
> + /** Maximum number of retries, 0 for no limit */
> + uint32_t n_retries;
> +};
> +
> +/** ethdev_writer_nodrop port operations */
> +extern struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops;
> +
> #ifdef __cplusplus
> }
> #endif
> --
> 1.7.9.5
@@ -31,6 +31,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
+#include <stdint.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
@@ -288,6 +289,227 @@ rte_port_ethdev_writer_free(void *port)
}
/*
+ * Port ETHDEV Writer Nodrop
+ */
+#define RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH 1
+
+struct rte_port_ethdev_writer_nodrop {
+ struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
+ uint32_t tx_burst_sz;
+ uint16_t tx_buf_count;
+ uint64_t bsz_mask;
+ uint64_t n_retries;
+ uint16_t queue_id;
+ uint8_t port_id;
+};
+
+static void *
+rte_port_ethdev_writer_nodrop_create(void *params, int socket_id)
+{
+ struct rte_port_ethdev_writer_nodrop_params *conf =
+ (struct rte_port_ethdev_writer_nodrop_params *) params;
+ struct rte_port_ethdev_writer_nodrop *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->tx_burst_sz == 0) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
+ (!rte_is_power_of_2(conf->tx_burst_sz))) {
+ RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->port_id = conf->port_id;
+ port->queue_id = conf->queue_id;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ /*
+ * When n_retries is 0 it means that we should wait for every packet to
+ * send no matter how many retries should it take. To limit number of
+ * branches in fast path, we use UINT64_MAX instead of branching.
+ */
+ port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
+
+ return port;
+}
+
+static inline void
+send_burst_nodrop(struct rte_port_ethdev_writer_nodrop *p)
+{
+ uint32_t nb_tx = 0, i;
+
+ nb_tx = rte_eth_tx_burst(p->port_id, p->queue_id, p->tx_buf,
+ p->tx_buf_count);
+
+ /* We sent all the packets in a first try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+
+ for (i = 0; i < p->n_retries; i++) {
+ nb_tx += rte_eth_tx_burst(p->port_id, p->queue_id,
+ p->tx_buf + nb_tx, p->tx_buf_count - nb_tx);
+
+ /* We sent all the packets in more than one try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+ }
+
+ /* We didn't send the packets in maximum allowed attempts */
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ethdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+#if RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 0
+
+static int
+rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ if ((pkts_mask & (pkts_mask + 1)) == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t i;
+
+ for (i = 0; i < n_pkts; i++) {
+ struct rte_mbuf *pkt = pkts[i];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ pkts_mask &= ~pkt_mask;
+ }
+ }
+
+ return 0;
+}
+
+#elif RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH == 1
+
+static int
+rte_port_ethdev_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_nodrop(p);
+
+ n_pkts_ok = rte_eth_tx_burst(p->port_id, p->queue_id, pkts,
+ n_pkts);
+
+ if (n_pkts_ok >= n_pkts)
+ return 0;
+
+ /*
+ * If we didnt manage to send all packets in single burst, move
+ * remaining packets to the buffer and call send burst.
+ */
+ for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ }
+ send_burst_nodrop(p);
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_nodrop(p);
+ }
+
+ return 0;
+}
+
+#else
+
+#error Invalid value for RTE_PORT_ETHDEV_WRITER_NODROP_APPROACH
+
+#endif
+
+static int
+rte_port_ethdev_writer_nodrop_flush(void *port)
+{
+ struct rte_port_ethdev_writer_nodrop *p =
+ (struct rte_port_ethdev_writer_nodrop *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ethdev_writer_nodrop_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ethdev_writer_nodrop_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
* Summary of port operations
*/
struct rte_port_in_ops rte_port_ethdev_reader_ops = {
@@ -303,3 +525,11 @@ struct rte_port_out_ops rte_port_ethdev_writer_ops = {
.f_tx_bulk = rte_port_ethdev_writer_tx_bulk,
.f_flush = rte_port_ethdev_writer_flush,
};
+
+struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops = {
+ .f_create = rte_port_ethdev_writer_nodrop_create,
+ .f_free = rte_port_ethdev_writer_nodrop_free,
+ .f_tx = rte_port_ethdev_writer_nodrop_tx,
+ .f_tx_bulk = rte_port_ethdev_writer_nodrop_tx_bulk,
+ .f_flush = rte_port_ethdev_writer_nodrop_flush,
+};
@@ -79,6 +79,25 @@ struct rte_port_ethdev_writer_params {
/** ethdev_writer port operations */
extern struct rte_port_out_ops rte_port_ethdev_writer_ops;
+/** ethdev_writer_nodrop port parameters */
+struct rte_port_ethdev_writer_nodrop_params {
+ /** NIC RX port ID */
+ uint8_t port_id;
+
+ /** NIC RX queue ID */
+ uint16_t queue_id;
+
+ /** Recommended burst size to NIC TX queue. The actual burst size can be
+ bigger or smaller than this value. */
+ uint32_t tx_burst_sz;
+
+ /** Maximum number of retries, 0 for no limit */
+ uint32_t n_retries;
+};
+
+/** ethdev_writer_nodrop port operations */
+extern struct rte_port_out_ops rte_port_ethdev_writer_nodrop_ops;
+
#ifdef __cplusplus
}
#endif