@@ -13,6 +13,7 @@
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_gen.h>
+#include <rte_telemetry.h>
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
@@ -32,6 +33,29 @@ static volatile int done;
static struct rte_mempool *mbuf_pool;
struct rte_gen *gen;
+struct gen_args {
+ /* Inputs */
+ struct rte_gen *gen;
+
+ /* Outputs */
+ uint64_t tx_total_packets;
+ uint64_t rx_total_packets;
+ uint64_t rx_missed_total;
+ uint64_t tx_failed;
+ uint64_t last_tx_total;
+ uint64_t measured_tx_pps;
+
+
+} __rte_cache_aligned;
+/* Expose a struct as a global so the telemetry callbacks can access the
+ * data required to provide stats etc.
+ */
+struct telemetry_userdata {
+ struct gen_args *prod;
+ struct gen_args *cons;
+};
+static struct telemetry_userdata telemetry_userdata;
+
static void handle_sigint(int sig);
/* Initializes a given port using global settings and with the RX buffers
@@ -123,10 +147,11 @@ port_init(uint16_t port, struct rte_mempool *mbuf_pool)
* an input port and writing to an output port.
*/
static int
-lcore_producer(__rte_unused void *arg)
+lcore_producer(void *arg)
{
+ struct gen_args *args = arg;
+ struct rte_gen *gen = args->gen;
uint16_t port;
-
/* Check that the port is on the same NUMA node as the polling thread
* for best performance.
*/
@@ -138,25 +163,34 @@ lcore_producer(__rte_unused void *arg)
"polling thread.\n\tPerformance will "
"not be optimal.\n", port);
+ uint64_t tsc_hz = rte_get_tsc_hz();
+ uint64_t last_tsc_reading = 0;
+ uint64_t last_tx_total = 0;
+
/* Run until the application is quit or killed. */
while (!done) {
struct rte_mbuf *bufs[BURST_SIZE];
- int i;
+ uint16_t nb_tx = 0;
/* Receive packets from gen and then tx them over port */
RTE_ETH_FOREACH_DEV(port) {
- int nb_recieved = rte_gen_rx_burst(gen, bufs,
+ int nb_generated = rte_gen_rx_burst(gen, bufs,
BURST_SIZE);
- for (i = 0; i < nb_recieved; i++) {
- bufs[i]->pkt_len = 64;
- bufs[i]->data_len = 64;
- }
- uint16_t nb_tx = rte_eth_tx_burst(port, 0, bufs,
- nb_recieved);
- if (nb_tx != nb_recieved)
- rte_pktmbuf_free_bulk(&bufs[nb_tx],
- (nb_recieved - nb_tx));
+ uint64_t start_tsc = rte_rdtsc();
+ if (start_tsc > last_tsc_reading + tsc_hz) {
+ args->measured_tx_pps = args->tx_total_packets -
+ last_tx_total;
+ last_tx_total = args->tx_total_packets;
+ last_tsc_reading = start_tsc;
+ }
+ nb_tx = rte_eth_tx_burst(port, 0, bufs, nb_generated);
+ args->tx_total_packets += nb_tx;
+ uint64_t tx_failed = nb_generated - nb_tx;
+ if (nb_tx != nb_generated) {
+ rte_pktmbuf_free_bulk(&bufs[nb_tx], tx_failed);
+ args->tx_failed += tx_failed;
+ }
if (unlikely(nb_tx == 0))
continue;
@@ -169,10 +203,11 @@ lcore_producer(__rte_unused void *arg)
* an input port and writing to an output port.
*/
static int
-lcore_consumer(__rte_unused void *arg)
+lcore_consumer(void *arg)
{
+ struct gen_args *args = arg;
+ struct rte_gen *gen = args->gen;
uint16_t port;
-
/* Check that the port is on the same NUMA node as the polling thread
* for best performance.
*/
@@ -195,16 +230,16 @@ lcore_consumer(__rte_unused void *arg)
uint64_t latency[BURST_SIZE];
uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
BURST_SIZE);
- rte_gen_tx_burst(gen, bufs, latency, nb_rx);
+ if (unlikely(nb_rx == 0))
+ continue;
+
+ args->rx_total_packets += nb_rx;
int nb_sent = rte_gen_tx_burst(gen, bufs,
latency, nb_rx);
if (nb_sent != nb_rx)
rte_panic("invalid tx quantity\n");
- if (unlikely(nb_rx == 0))
- continue;
-
}
}
return 0;
@@ -217,6 +252,58 @@ void handle_sigint(int sig)
done = 1;
}
+static int
+tele_gen_packet(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+ RTE_SET_USED(cmd);
+ RTE_SET_USED(params);
+ RTE_SET_USED(d);
+
+ rte_tel_data_string(d, "Ether()/IP()");
+ return 0;
+}
+
+static int
+tele_gen_mpps(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+ struct gen_args *args = telemetry_userdata.prod;
+ RTE_SET_USED(cmd);
+ if (params) {
+ rte_tel_data_add_dict_int(d, "TEST",
+ (args->measured_tx_pps/1000000));
+ }
+ rte_tel_data_start_dict(d);
+ rte_tel_data_add_dict_int(d, "mpps",
+ (args->measured_tx_pps/1000000));
+ return 0;
+}
+
+static int
+tele_gen_stats(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+ RTE_SET_USED(cmd);
+ RTE_SET_USED(params);
+
+ struct gen_args *args_prod = telemetry_userdata.prod;
+ struct gen_args *args_cons = telemetry_userdata.cons;
+ rte_tel_data_start_dict(d);
+ static const char * const stats[] = {
+ "tx_total_packets",
+ "rx_total_packets",
+ "measured_tx_pps"
+ };
+
+ uint64_t values[RTE_DIM(stats)] = {0};
+ values[0] = args_prod->tx_total_packets;
+ values[1] = args_cons->rx_total_packets;
+ values[2] = args_prod->measured_tx_pps;
+
+ uint32_t i;
+ for (i = 0; i < RTE_DIM(stats); i++)
+ rte_tel_data_add_dict_int(d, stats[i], values[i]);
+
+ return 0;
+}
/* The main function, which does initialization and calls the per-lcore
* functions.
*/
@@ -224,6 +311,10 @@ int
main(int argc, char *argv[])
{
signal(SIGINT, handle_sigint);
+
+ #define CORE_COUNT 2
+ struct gen_args core_launch_args[CORE_COUNT];
+
unsigned int nb_ports;
uint16_t portid;
@@ -253,7 +344,7 @@ main(int argc, char *argv[])
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
portid);
- gen = rte_gen_create(mbuf_pool);
+ struct rte_gen *gen = rte_gen_create(mbuf_pool);
if (!gen)
rte_panic("Gen failed to initialize\n");
@@ -261,19 +352,39 @@ main(int argc, char *argv[])
if (err)
rte_panic("Failed to parse input args");
+ memset(core_launch_args, 0, sizeof(struct gen_args) * CORE_COUNT);
/* launch lcore functions */
uint32_t lcore_count = 0;
uint32_t lcore_id = 0;
RTE_LCORE_FOREACH_WORKER(lcore_id) {
- if (lcore_count == 0)
- rte_eal_remote_launch(lcore_producer, NULL, lcore_id);
- else if (lcore_count == 1)
- rte_eal_remote_launch(lcore_consumer, NULL, lcore_id);
+ core_launch_args[lcore_count].gen = gen;
+ if (lcore_count == 0) {
+ telemetry_userdata.prod =
+ &core_launch_args[lcore_count];
+ rte_eal_remote_launch(lcore_producer,
+ telemetry_userdata.prod,
+ lcore_id);
+ } else if (lcore_count == 1) {
+ telemetry_userdata.cons =
+ &core_launch_args[lcore_count];
+ rte_eal_remote_launch(lcore_consumer,
+ telemetry_userdata.cons,
+ lcore_id);
+ }
else
break;
lcore_count++;
}
+
+ /* Export stats via Telemetry */
+ rte_telemetry_register_cmd("/gen/stats", tele_gen_stats,
+ "Return statistics of the Gen instance.");
+ rte_telemetry_register_cmd("/gen/packet", tele_gen_packet,
+ "Return the Gen string packet being sent.");
+ rte_telemetry_register_cmd("/gen/mpps", tele_gen_mpps,
+ "Get/Set the mpps rate");
+
/* Stall the main thread until all other threads have returned. */
rte_eal_mp_wait_lcore();