From patchwork Tue Sep 26 16:34:43 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Robin Jarry X-Patchwork-Id: 131972 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id A02F642645; Tue, 26 Sep 2023 18:34:51 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 291E94028C; Tue, 26 Sep 2023 18:34:51 +0200 (CEST) Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [170.10.133.124]) by mails.dpdk.org (Postfix) with ESMTP id BF64840277 for ; Tue, 26 Sep 2023 18:34:49 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1695746089; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding; bh=MIxBRqbNoHOQyC4Hu2K0gf9eZ7N6Tjw7n4Nv0v4CQKg=; b=MKpm3X/LyJiDipUEEY6nPRIA/QN2mj8NQkdXW5hSxkWl8azecdsGg5GWxn5PbP2b3foPc9 +4JaxRK4pfMK62ORVqOGv9uOr19kqTFrwqQzyzpVpCWEOBoH2CjKO3VhW1AbCnKiv7AP0q Cyjyd86W6lu25sTXp1c/PoSALXQqzro= Received: from mimecast-mx02.redhat.com (mimecast-mx02.redhat.com [66.187.233.88]) by relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id us-mta-448-2-Y6HmkrPmSeF5XNJb_OWw-1; Tue, 26 Sep 2023 12:34:47 -0400 X-MC-Unique: 2-Y6HmkrPmSeF5XNJb_OWw-1 Received: from smtp.corp.redhat.com (int-mx05.intmail.prod.int.rdu2.redhat.com [10.11.54.5]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mimecast-mx02.redhat.com (Postfix) with ESMTPS id 5861D85A5A8 for ; Tue, 26 Sep 2023 16:34:47 +0000 (UTC) Received: from ringo.redhat.com (unknown [10.39.208.8]) by smtp.corp.redhat.com (Postfix) with ESMTP id 4E1EF51E3; Tue, 26 Sep 2023 16:34:46 +0000 (UTC) From: Robin Jarry To: dev@dpdk.org Cc: Robin Jarry Subject: [RFC PATCH] usertools: add telemetry exporter Date: Tue, 26 Sep 2023 18:34:43 +0200 Message-ID: <20230926163442.844006-2-rjarry@redhat.com> MIME-Version: 1.0 X-Scanned-By: MIMEDefang 3.1 on 10.11.54.5 X-Mimecast-Spam-Score: 0 X-Mimecast-Originator: redhat.com X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org For now the telemetry socket is local to the machine running a DPDK application. Also, there is no official "schema" for the exposed metrics. Add a framework and a script to collect and expose these metrics to telemetry and observability agree gators such as Prometheus, Carbon or Influxdb. The exposed data must be done with end-users in mind, some DPDK terminology or internals may not make sense to everyone. The script only serves as an entry point and does not know anything about any specific metrics nor JSON data structures exposed in the telemetry socket. It uses dynamically loaded endpoint exporters which are basic python files that must implement two functions: def info() -> dict[MetricName, MetricInfo]: Mapping of metric names to their description and type. def metrics(sock: TelemetrySocket) -> list[MetricValue]: Request data from sock and return it as metric values. A metric value is a 3-tuple: (name: str, value: any, labels: dict). Each name must be present in info(). The sock argument passed to metrics() has a single method: def cmd(self, uri: str, arg: any = None) -> dict | list: Request JSON data to the telemetry socket and parse it to python values. The main script invokes endpoints and exports the data into an output format. For now, only two formats are implemented: * openmetrics/prometheus: text based format exported via a local HTTP server. * carbon/graphite: binary (python pickle) format exported to a distant carbon TCP server. As a starting point, 3 built-in endpoints are implemented: * counters: ethdev hardware counters * cpu: lcore usage * memory: overall memory usage The goal is to keep all built-in endpoints in the DPDK repository so that they can be updated along with the telemetry JSON data structures. Example output for the openmetrics:// format: ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 & INFO using endpoint: counters (from .../telemetry-endpoints/counters.py) INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py) INFO using endpoint: memory (from .../telemetry-endpoints/memory.py) INFO listening on port 9876 [1] 838829 ~$ curl http://127.0.0.1:9876/ # HELP dpdk_cpu_total_cycles Total number of CPU cycles. # TYPE dpdk_cpu_total_cycles counter # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles. # TYPE dpdk_cpu_busy_cycles counter dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980 dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860 dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740 dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860 dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540 dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160 dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320 dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860 # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes. # TYPE dpdk_memory_total_bytes gauge # HELP dpdk_memory_used_bytes The currently used memory in bytes. # TYPE dpdk_memory_used_bytes gauge dpdk_memory_total_bytes 1073741824 dpdk_memory_used_bytes 794197376 Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus Signed-off-by: Robin Jarry --- Notes: v1: * Ideally, this script should be tested in CI to avoid breakage when the telemetry data structures change. I don't know where such a test could be done. * There was work done 3/4 years ago in collectd to add DPDK telemetry support but I think this has now been abandoned. https://github.com/collectd/collectd/blob/main/src/dpdk_telemetry.c I think that keeping the exporters in the DPDK repository makes more sense from a maintainability perspective. usertools/dpdk-telemetry-exporter.py | 376 ++++++++++++++++++++++ usertools/meson.build | 6 + usertools/telemetry-endpoints/counters.py | 47 +++ usertools/telemetry-endpoints/cpu.py | 29 ++ usertools/telemetry-endpoints/memory.py | 37 +++ 5 files changed, 495 insertions(+) create mode 100755 usertools/dpdk-telemetry-exporter.py create mode 100644 usertools/telemetry-endpoints/counters.py create mode 100644 usertools/telemetry-endpoints/cpu.py create mode 100644 usertools/telemetry-endpoints/memory.py diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py new file mode 100755 index 000000000000..6c1495a9e1e8 --- /dev/null +++ b/usertools/dpdk-telemetry-exporter.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +''' +DPDK telemetry exporter. + +It uses dynamically loaded endpoint exporters which are basic python files that +must implement two functions: + + def info() -> dict[MetricName, MetricInfo]: + """ + Mapping of metric names to their description and type. + """ + + def metrics(sock: TelemetrySocket) -> list[MetricValue]: + """ + Request data from sock and return it as metric values. A metric value + is a 3-tuple: (name: str, value: any, labels: dict). Each name must be + present in info(). + """ + +The sock argument passed to metrics() has a single method: + + def cmd(self, uri, arg=None) -> dict | list: + """ + Request JSON data to the telemetry socket and parse it to python + values. + """ + +See existing endpoints for examples. + +The exporter supports multiple output formats: + +prometheus://ADDRESS:PORT +openmetrics://ADDRESS:PORT + Expose the enabled endpoints via a local HTTP server listening on the + specified address and port. GET requests on that server are served with + text/plain responses in the prometheus/openmetrics format. + + More details: + https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format + +carbon://ADDRESS:PORT +graphite://ADDRESS:PORT + Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle + carbon format. + + More details: + https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol +''' + +import argparse +from http import HTTPStatus, server +import importlib.util +import json +import logging +import os +import pickle +import re +import socket +import struct +import sys +import time +import typing +from urllib.parse import urlparse + + +LOG = logging.getLogger(__name__) +# Use local endpoints path only when running from source +LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints") +DEFAULT_LOAD_PATHS = [] +if os.path.isdir(LOCAL): + DEFAULT_LOAD_PATHS.append(LOCAL) +DEFAULT_LOAD_PATHS += [ + "/usr/local/share/dpdk/telemetry-endpoints", + "/usr/share/dpdk/telemetry-endpoints", +] +DEFAULT_OUTPUT = "openmetrics://:9876" + + +def main(): + logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-o", + "--output", + metavar="FORMAT://PARAMETERS", + default=urlparse(DEFAULT_OUTPUT), + type=urlparse, + help=f""" + Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format, + URL elements have different meanings. By default, the exporter starts a + local HTTP server on port 9876 that serves requests in the + prometheus/openmetrics plain text format. + """, + ) + parser.add_argument( + "-p", + "--load-path", + dest="load_paths", + type=lambda v: v.split(os.pathsep), + default=DEFAULT_LOAD_PATHS, + help=f""" + The list of paths from which to disvover endpoints. + (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}"). + """, + ) + parser.add_argument( + "-e", + "--endpoint", + dest="endpoints", + action="append", + help=""" + Telemetry endpoint to export (by default, all discovered endpoints are + enabled). This option can be specified more than once. + """, + ) + parser.add_argument( + "-l", + "--list", + action="store_true", + help=""" + Only list detected endpoints and exit. + """, + ) + parser.add_argument( + "-s", + "--socket-path", + default="/run/dpdk/rte/dpdk_telemetry.v2", + help=""" + The DPDK telemetry socket path (default: "%(default)s"). + """, + ) + args = parser.parse_args() + output = OUTPUT_FORMATS.get(args.output.scheme) + if output is None: + parser.error(f"unsupported output format: {args.output.scheme}://") + try: + endpoints = load_endpoints(args.load_paths, args.endpoints) + if args.list: + return + output(args, endpoints) + except KeyboardInterrupt: + pass + except Exception: + LOG.exception("") + + +class TelemetrySocket: + """ + Abstraction of the DPDK telemetry socket. + """ + + def __init__(self, path: str): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.sock.connect(path) + data = json.loads(self.sock.recv(1024).decode()) + self.max_output_len = data["max_output_len"] + + def cmd( + self, uri: str, arg: typing.Any = None + ) -> typing.Optional[typing.Union[dict, list]]: + """ + Request JSON data to the telemetry socket and parse it to python + values. + """ + if arg is not None: + u = f"{uri},{arg}" + else: + u = uri + self.sock.send(u.encode("utf-8")) + data = self.sock.recv(self.max_output_len) + return json.loads(data.decode("utf-8"))[uri] + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.sock.close() + + +MetricDescription = str +MetricType = str +MetricName = str +MetricLabels = typing.Dict[str, typing.Any] +MetricInfo = typing.Tuple[MetricDescription, MetricType] +MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels] + + +class TelemetryEndpoint: + """ + Placeholder class only used for typing annotations. + """ + + @staticmethod + def info() -> typing.Dict[MetricName, MetricInfo]: + """ + Mapping of metric names to their description and type. + """ + raise NotImplementedError() + + @staticmethod + def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: + """ + Request data from sock and return it as metric values. Each metric + name must be present in info(). + """ + raise NotImplementedError() + + +def load_endpoints( + paths: typing.List[str], names: typing.List[str] +) -> typing.List[TelemetryEndpoint]: + """ + Load selected telemetry endpoints from the specified paths. + """ + + endpoints = {} + dwb = sys.dont_write_bytecode + sys.dont_write_bytecode = True # never generate .pyc files for endpoints + + for p in paths: + if not os.path.isdir(p): + continue + for fname in os.listdir(p): + f = os.path.join(p, fname) + if os.path.isdir(f): + continue + try: + name, _ = os.path.splitext(fname) + if names is not None and name not in names: + # not selected by user + continue + if name in endpoints: + # endpoint with same name already loaded + continue + spec = importlib.util.spec_from_file_location(name, f) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + endpoints[name] = module + except Exception: + LOG.exception("parsing endpoint: %s", f) + + sys.dont_write_bytecode = dwb + + modules = [] + info = {} + for name, module in sorted(endpoints.items()): + LOG.info("using endpoint: %s (from %s)", name, module.__file__) + try: + for metric, (description, type_) in module.info().items(): + info[(name, metric)] = (description, type_) + modules.append(module) + except Exception: + LOG.exception("getting endpoint info: %s", name) + return modules + + +def serve_openmetrics( + args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint] +): + """ + Start an HTTP server and serve requests in the openmetrics/prometheus + format. + """ + listen = (args.output.hostname or "", int(args.output.port or 80)) + with server.HTTPServer(listen, OpenmetricsHandler) as httpd: + httpd.dpdk_socket_path = args.socket_path + httpd.telemetry_endpoints = endpoints + LOG.info("listening on port %s", httpd.server_port) + httpd.serve_forever() + + +class OpenmetricsHandler(server.BaseHTTPRequestHandler): + """ + Basic HTTP handler that returns prometheus/openmetrics formatted responses. + """ + + CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8" + + def escape(self, value: typing.Any) -> str: + """ + Escape a metric label value. + """ + value = str(value) + value = value.replace('"', '\\"') + value = value.replace("\\", "\\\\") + return value.replace("\n", "\\n") + + def do_GET(self): + """ + Called uppon GET requests. + """ + try: + lines = [] + metrics_names = set() + with TelemetrySocket(self.server.dpdk_socket_path) as sock: + for e in self.server.telemetry_endpoints: + info = e.info() + metrics_lines = [] + for name, value, labels in e.metrics(sock): + fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}") + labels = ", ".join( + f'{k}="{self.escape(v)}"' for k, v in labels.items() + ) + if labels: + labels = f"{{{labels}}}" + metrics_lines.append(f"{fullname}{labels} {value}") + if fullname not in metrics_names: + metrics_names.add(fullname) + desc, metric_type = info[name] + lines += [ + f"# HELP {fullname} {desc}", + f"# TYPE {fullname} {metric_type}", + ] + lines += metrics_lines + body = "\n".join(lines).encode("utf-8") + b"\n" + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", self.CONTENT_TYPE) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + LOG.info("%s %s", self.address_string(), self.requestline) + + except Exception as e: + if isinstance(e, (FileNotFoundError, ConnectionRefusedError)): + self.send_error(HTTPStatus.SERVICE_UNAVAILABLE) + else: + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) + LOG.exception("%s %s", self.address_string(), self.requestline) + + def log_message(self, fmt, *args): + pass # disable built-in logger + + +def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]): + """ + Collect all metrics and export them to a carbon server in the pickle format. + """ + addr = (args.output.hostname or "", int(args.output.port or 80)) + with TelemetrySocket(args.socket_path) as dpdk: + with socket.socket() as carbon: + carbon.connect(addr) + metrics = [] + for e in endpoints: + for name, value, labels in e.metrics(dpdk): + fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}") + for key, val in labels.items(): + val = str(val).replace(";", "") + fullname += f";{key}={val}" + metrics.append((fullname, (time.time(), value))) + payload = pickle.dumps(metrics, protocol=2) + header = struct.pack("!L", len(payload)) + buf = header + payload + carbon.sendall(buf) + + +OUTPUT_FORMATS = { + "openmetrics": serve_openmetrics, + "prometheus": serve_openmetrics, + "carbon": export_carbon, + "graphite": export_carbon, +} + + +if __name__ == "__main__": + main() diff --git a/usertools/meson.build b/usertools/meson.build index 740b4832f36d..eb48e2f4403f 100644 --- a/usertools/meson.build +++ b/usertools/meson.build @@ -11,5 +11,11 @@ install_data([ 'dpdk-telemetry.py', 'dpdk-hugepages.py', 'dpdk-rss-flows.py', + 'dpdk-telemetry-exporter.py', ], install_dir: 'bin') + +install_subdir( + 'telemetry-endpoints', + install_dir: 'share/dpdk', + strip_directory: false) diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py new file mode 100644 index 000000000000..e17cffb43b2c --- /dev/null +++ b/usertools/telemetry-endpoints/counters.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +RX_PACKETS = "rx_packets" +RX_BYTES = "rx_bytes" +RX_MISSED = "rx_missed" +RX_NOMBUF = "rx_nombuf" +RX_ERRORS = "rx_errors" +TX_PACKETS = "tx_packets" +TX_BYTES = "tx_bytes" +TX_ERRORS = "tx_errors" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + RX_PACKETS: ("Number of successfully received packets.", "counter"), + RX_BYTES: ("Number of successfully received bytes.", "counter"), + RX_MISSED: ( + "Number of packets dropped by the HW because Rx queues are full.", + "counter", + ), + RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"), + RX_ERRORS: ("Number of erroneous received packets.", "counter"), + TX_PACKETS: ("Number of successfully transmitted packets.", "counter"), + TX_BYTES: ("Number of successfully transmitted bytes.", "counter"), + TX_ERRORS: ("Number of packet transmission failures.", "counter"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + out = [] + for port_id in sock.cmd("/ethdev/list"): + port = sock.cmd("/ethdev/info", port_id) + stats = sock.cmd("/ethdev/stats", port_id) + labels = {"port": port["name"]} + out += [ + (RX_PACKETS, stats["ipackets"], labels), + (RX_PACKETS, stats["ipackets"], labels), + (RX_BYTES, stats["ibytes"], labels), + (RX_MISSED, stats["imissed"], labels), + (RX_NOMBUF, stats["rx_nombuf"], labels), + (RX_ERRORS, stats["ierrors"], labels), + (TX_PACKETS, stats["opackets"], labels), + (TX_BYTES, stats["obytes"], labels), + (TX_ERRORS, stats["oerrors"], labels), + ] + return out diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py new file mode 100644 index 000000000000..d38d8d6e2558 --- /dev/null +++ b/usertools/telemetry-endpoints/cpu.py @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +CPU_TOTAL = "total_cycles" +CPU_BUSY = "busy_cycles" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + CPU_TOTAL: ("Total number of CPU cycles.", "counter"), + CPU_BUSY: ("Number of busy CPU cycles.", "counter"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + out = [] + for lcore_id in sock.cmd("/eal/lcore/list"): + lcore = sock.cmd("/eal/lcore/info", lcore_id) + cpu = ",".join(str(c) for c in lcore.get("cpuset", [])) + total = lcore.get("total_cycles") + busy = lcore.get("busy_cycles", 0) + if not (cpu and total): + continue + labels = {"cpu": cpu, "numa": lcore.get("socket", 0)} + out += [ + (CPU_TOTAL, total, labels), + (CPU_BUSY, busy, labels), + ] + return out diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py new file mode 100644 index 000000000000..32cce1e59382 --- /dev/null +++ b/usertools/telemetry-endpoints/memory.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Robin Jarry + +MEM_TOTAL = "total_bytes" +MEM_USED = "used_bytes" + + +def info() -> "dict[Name, tuple[Description, Type]]": + return { + MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"), + MEM_USED: ("The currently used memory in bytes.", "gauge"), + } + + +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": + zones = {} + used = 0 + for zone in sock.cmd("/eal/memzone_list") or []: + z = sock.cmd("/eal/memzone_info", zone) + start = int(z["Hugepage_base"], 16) + end = start + (z["Hugepage_size"] * z["Hugepage_used"]) + used += z["Length"] + for s, e in list(zones.items()): + if s < start < e < end: + zones[s] = end + break + if start < s < end < e: + del zones[s] + zones[start] = e + break + else: + zones[start] = end + + return [ + (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}), + (MEM_USED, max(0, used), {}), + ]