From patchwork Thu Apr 20 09:31:08 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: =?utf-8?q?Juraj_Linke=C5=A1?= X-Patchwork-Id: 126320 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id B3FA542995; Thu, 20 Apr 2023 11:51:32 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id C101642D16; Thu, 20 Apr 2023 11:51:12 +0200 (CEST) Received: from mail-ed1-f47.google.com (mail-ed1-f47.google.com [209.85.208.47]) by mails.dpdk.org (Postfix) with ESMTP id 2B37F42C24 for ; Thu, 20 Apr 2023 11:51:10 +0200 (CEST) Received: by mail-ed1-f47.google.com with SMTP id 4fb4d7f45d1cf-5050491cb04so676994a12.0 for ; Thu, 20 Apr 2023 02:51:10 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=pantheon-tech.20221208.gappssmtp.com; s=20221208; t=1681984270; x=1684576270; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=YUJOXbbpdckjOQaxjF8cg17k9E6EwTm+W94hVYVmA58=; b=zcFwpGDQAcfLjB+EmPvu85d6dIWsVCcH2r3C1SfIc6Ht2wXWSE1XzcPxmrTka4oNCp AN0rv4phKVy5eQ40e/odRlj/G+X+VBenXqWq25xkd+hkR4Su5Yg1jTKS2nhqIpqrSP7X ZZH8LvUfW7goh5dXmDSSMtaL1JcLWkHGBNe8+yfZGkuRPiTyJp8D6hFd9zDvc11EILV7 MySYP4YOO1BdQnKEqqdJ3RFwcbAqpFeyzwDdeIq8SXZrZbWvnlq4zAG6ALGvdFo2H4WD rvFPF/6ka32iDwgnmaLD7VIGln/IGzz9/YxB8tbwoakP/BM9odQFTSAnHrfnHAABUPX4 CuYg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20221208; t=1681984270; x=1684576270; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=YUJOXbbpdckjOQaxjF8cg17k9E6EwTm+W94hVYVmA58=; b=MSYNtLydjISgKeB0oH/0cjvto1z8xaBlWWXTuHk2dfIt0LwtUnU1fm0V39gyllLnQX rNji32CgBAUy/TsFC62vgkdSMABFG2mxrQBQYHJaGi9QFrM7ai1Oxp+cTStAJBrFULir BHhvdAhzyhdlglQgsxqBLQ+rcmUzAvK64+IpT4d5aqmh9RSxS2S0Is5SWcZqlygZZNz8 fwEawlQbZaLUM8IeG2faljH9ZACwrwSdBaQHSE2cwLzvZNhIvawLJPe7Qcsb3IhDTOpU s9QIpEQBl6v2Im7gz/PSmfhqdEYRFiVl8NzA6Z86bPoTYPSLHr976okZhsOYoPMsqaXx LpZw== X-Gm-Message-State: AAQBX9eJIMcKNhK4A1lLrNqXnJ1vOwzJuPx4gs7fwHIXfB8PuFG1e66G YbtV2xw3ibt2DrssaDtI4mDV7JT7Z72hMNehRJb8dxbvDLabanr+LhsAhbiR6CC4ldo4El7DzGQ zq0GzC3y4O6I= X-Google-Smtp-Source: AKy350YZw26MdHFyGpfKf+/ms5cREHXYTyO9fWAOAawM6skeWO9GK+iZNl6Mtg68/QYEwUysl3Aimw== X-Received: by 2002:aa7:c3c7:0:b0:506:7671:4e72 with SMTP id l7-20020aa7c3c7000000b0050676714e72mr1079174edr.38.1681984269716; Thu, 20 Apr 2023 02:51:09 -0700 (PDT) Received: from jlinkes.pantheon.local (81.89.53.154.host.vnet.sk. [81.89.53.154]) by smtp.gmail.com with ESMTPSA id v2-20020aa7d802000000b004ad601533a3sm580801edq.55.2023.04.20.02.51.08 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 20 Apr 2023 02:51:09 -0700 (PDT) From: =?utf-8?q?Juraj_Linke=C5=A1?= To: thomas@monjalon.net, Honnappa.Nagarahalli@arm.com, lijuan.tu@intel.com, wathsala.vithanage@arm.com, jspewock@iol.unh.edu, probb@iol.unh.edu Cc: dev@dpdk.org, =?utf-8?q?Juraj_Linke=C5=A1?= Subject: [RFC PATCH v1 4/5] dts: scapy traffic generator implementation Date: Thu, 20 Apr 2023 11:31:08 +0200 Message-Id: <20230420093109.594704-5-juraj.linkes@pantheon.tech> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20230420093109.594704-1-juraj.linkes@pantheon.tech> References: <20230420093109.594704-1-juraj.linkes@pantheon.tech> MIME-Version: 1.0 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Scapy is a traffic generator capable of sending and receiving traffic. Since it's a software traffic generator, it's not suitable for performance testing, but it is suitable for functional testing. Signed-off-by: Juraj Linkeš --- dts/framework/remote_session/linux_session.py | 55 +++ .../remote_session/remote/remote_session.py | 7 + dts/framework/testbed_model/scapy.py | 348 ++++++++++++++++++ 3 files changed, 410 insertions(+) create mode 100644 dts/framework/testbed_model/scapy.py diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py index a1e3bc3a92..b99a27bba4 100644 --- a/dts/framework/remote_session/linux_session.py +++ b/dts/framework/remote_session/linux_session.py @@ -2,13 +2,29 @@ # Copyright(c) 2023 PANTHEON.tech s.r.o. # Copyright(c) 2023 University of New Hampshire +import json +from typing import TypedDict +from typing_extensions import NotRequired + + from framework.exception import RemoteCommandExecutionError from framework.testbed_model import LogicalCore +from framework.testbed_model.hw.port import PortIdentifier from framework.utils import expand_range from .posix_session import PosixSession +class LshwOutputConfigurationDict(TypedDict): + link: str + + +class LshwOutputDict(TypedDict): + businfo: str + logicalname: NotRequired[str] + configuration: LshwOutputConfigurationDict + + class LinuxSession(PosixSession): """ The implementation of non-Posix compliant parts of Linux remote sessions. @@ -105,3 +121,42 @@ def _configure_huge_pages( self.remote_session.send_command( f"echo {amount} | sudo tee {hugepage_config_path}" ) + + def get_lshw_info(self) -> list[LshwOutputDict]: + output = self.remote_session.send_expect("lshw -quiet -json -C network", "#") + assert not isinstance( + output, int + ), "send_expect returned an int when it should have been a string" + return json.loads(output) + + def get_logical_name_of_port(self, id: PortIdentifier) -> str | None: + self._logger.debug(f"Searching for logical name of {id.pci}") + assert ( + id.node == self.name + ), "Attempted to get the logical port name on the wrong node" + port_info_list: list[LshwOutputDict] = self.get_lshw_info() + for port_info in port_info_list: + if f"pci@{id.pci}" == port_info.get("businfo"): + if "logicalname" in port_info: + self._logger.debug( + f"Found logical name for port {id.pci}, {port_info.get('logicalname')}" + ) + return port_info.get("logicalname") + else: + self._logger.warning( + f"Attempted to get the logical name of {id.pci}, but none existed" + ) + return None + self._logger.warning(f"No port at pci address {id.pci} found.") + return None + + def check_link_is_up(self, id: PortIdentifier) -> bool | None: + self._logger.debug(f"Checking link status for {id.pci}") + port_info_list: list[LshwOutputDict] = self.get_lshw_info() + for port_info in port_info_list: + if f"pci@{id.pci}" == port_info.get("businfo"): + status = port_info["configuration"]["link"] + self._logger.debug(f"Found link status for port {id.pci}, {status}") + return status == "up" + self._logger.warning(f"No port at pci address {id.pci} found.") + return None diff --git a/dts/framework/remote_session/remote/remote_session.py b/dts/framework/remote_session/remote/remote_session.py index 91dee3cb4f..5b36e2d7d2 100644 --- a/dts/framework/remote_session/remote/remote_session.py +++ b/dts/framework/remote_session/remote/remote_session.py @@ -84,6 +84,13 @@ def _connect(self) -> None: Create connection to assigned node. """ + @abstractmethod + def send_expect( + self, command: str, prompt: str, timeout: float = 15, + verify: bool = False + ) -> str | int: + """""" + def send_command( self, command: str, diff --git a/dts/framework/testbed_model/scapy.py b/dts/framework/testbed_model/scapy.py new file mode 100644 index 0000000000..1e5caab897 --- /dev/null +++ b/dts/framework/testbed_model/scapy.py @@ -0,0 +1,348 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2022 University of New Hampshire +# + +import inspect +import json +import marshal +import types +import xmlrpc.client +from typing import TypedDict +from xmlrpc.server import SimpleXMLRPCServer + +import scapy.all +from scapy.packet import Packet +from typing_extensions import NotRequired + +from framework.config import OS +from framework.logger import getLogger +from .tg_node import TGNode +from .hw.port import Port, PortIdentifier +from .capturing_traffic_generator import ( + CapturingTrafficGenerator, + _get_default_capture_name, +) +from framework.settings import SETTINGS +from framework.remote_session import OSSession + +""" +========= BEGIN RPC FUNCTIONS ========= + +All of the functions in this section are intended to be exported to a python +shell which runs a scapy RPC server. These functions are made available via that +RPC server to the packet generator. To add a new function to the RPC server, +first write the function in this section. Then, if you need any imports, make sure to +add them to SCAPY_RPC_SERVER_IMPORTS as well. After that, add the function to the list +in EXPORTED_FUNCTIONS. Note that kwargs (keyword arguments) do not work via xmlrpc, +so you may need to construct wrapper functions around many scapy types. +""" + +""" +Add the line needed to import something in a normal python environment +as an entry to this array. It will be imported before any functions are +sent to the server. +""" +SCAPY_RPC_SERVER_IMPORTS = [ + "from scapy.all import *", + "import xmlrpc", + "import sys", + "from xmlrpc.server import SimpleXMLRPCServer", + "import marshal", + "import pickle", + "import types", +] + + +def scapy_sr1_different_interfaces( + packets: list[Packet], send_iface: str, recv_iface: str, timeout_s: int +) -> bytes: + packets = [scapy.all.Packet(packet.data) for packet in packets] + sniffer = scapy.all.AsyncSniffer( + iface=recv_iface, + store=True, + timeout=timeout_s, + started_callback=lambda _: scapy.all.sendp(packets, iface=send_iface), + stop_filter=lambda _: True, + ) + sniffer.start() + packets = sniffer.stop(join=True) + assert len(packets) != 0, "Not enough packets were sniffed" + assert len(packets) == 1, "More packets than expected were sniffed" + return packets[0].build() + + +def scapy_send_packets_and_capture( + packets: list[Packet], send_iface: str, recv_iface: str, duration_s: int +) -> list[bytes]: + packets = [scapy.all.Packet(packet.data) for packet in packets] + sniffer = scapy.all.AsyncSniffer( + iface=recv_iface, + store=True, + timeout=duration_s, + started_callback=lambda _: scapy.all.sendp(packets, iface=send_iface), + ) + sniffer.start() + return [packet.build() for packet in sniffer.stop(join=True)] + + +def scapy_send_packets(packets: list[xmlrpc.client.Binary], send_iface: str) -> None: + packets = [scapy.all.Packet(packet.data) for packet in packets] + scapy.all.sendp(packets, iface=send_iface, realtime=True, verbose=True) + + +""" +Functions to be exposed by the scapy RPC server. +""" +RPC_FUNCTIONS = [ + scapy_send_packets, + scapy_send_packets_and_capture, + scapy_sr1_different_interfaces, +] + + +class QuittableXMLRPCServer(SimpleXMLRPCServer): + def __init__(self, *args, **kwargs): + kwargs["allow_none"] = True + super().__init__(*args, **kwargs) + self.register_introspection_functions() + self.register_function(self.quit) + self.register_function(self.add_rpc_function) + + def quit(self) -> None: + self._BaseServer__shutdown_request = True + return None + + def add_rpc_function(self, name: str, function_bytes: xmlrpc.client.Binary): + function_code = marshal.loads(function_bytes.data) + function = types.FunctionType(function_code, globals(), name) + self.register_function(function) + + def serve_forever(self, poll_interval: float = 0.5) -> None: + print("XMLRPC OK") + super().serve_forever(poll_interval) + + +""" +========= END RPC FUNCTIONS ========= +""" + + +class NetworkInfoDict(TypedDict): + businfo: str + logicalname: NotRequired[str] + + +class ScapyTrafficGenerator(CapturingTrafficGenerator): + """ + Provides access to scapy functions via an RPC interface + """ + + tg_node: TGNode + ports: list[Port] + session: OSSession + scapy: xmlrpc.client.ServerProxy + iface_names: dict[PortIdentifier, str] + + def __init__(self, tg_node: TGNode, ports: list[Port]): + self.tg_node = tg_node + + assert tg_node.config.os == OS.linux, ( + "Linux is the only supported OS for scapy traffic generation" + ) + + self.session = tg_node.create_session("scapy") + self.logger = getLogger("scapy-pktgen-messages", node=tg_node.name) + self.ports = ports + + # No fancy colors + + prompt_str = "" + self.session.remote_session.send_expect(f'export PS1="{prompt_str}"', prompt_str) + + network_info_str: str = self.session.remote_session.send_expect( + "lshw -quiet -json -C network", prompt_str, timeout=10 + ) + + network_info_list: list[NetworkInfoDict] = json.loads(network_info_str) + network_info_lookup: dict[str, str] = { + network_info["businfo"]: network_info.get("logicalname") + for network_info in network_info_list + } + + self.iface_names = dict() + for port in self.ports: + businfo_str = f"pci@{port.pci}" + assert businfo_str in network_info_lookup, ( + f"Expected '{businfo_str}' in lshw output for {self.tg_node.name}, but " + f"it was not present." + ) + + self.iface_names[port.identifier] = network_info_lookup[businfo_str] + + assert ( + self.iface_names[port.identifier] is not None + ), f"No interface was present for {port.pci} on {self.tg_node.name}" + + self._run_command("python3") + + self._add_helper_functions_to_scapy() + self.session.remote_session.send_expect( + 'server = QuittableXMLRPCServer(("0.0.0.0", 8000)); server.serve_forever()', + "XMLRPC OK", + timeout=5, + ) + + server_url: str = f"http://{self.tg_node.config.hostname}:8000" + + self.scapy = xmlrpc.client.ServerProxy( + server_url, allow_none=True, verbose=SETTINGS.verbose + ) + + for function in RPC_FUNCTIONS: + # A slightly hacky way to move a function to the remote server. + # It is constructed from the name and code on the other side. + # Pickle cannot handle functions, nor can any of the other serialization + # frameworks aside from the libraries used to generate pyc files, which + # are even more messy to work with. + function_bytes = marshal.dumps(function.__code__) + self.scapy.add_rpc_function(function.__name__, function_bytes) + + def _add_helper_functions_to_scapy(self): + for import_statement in SCAPY_RPC_SERVER_IMPORTS: + self._run_command(import_statement + "\r\n") + + for helper_function in {QuittableXMLRPCServer}: + # load the source of the function + src = inspect.getsource(helper_function) + # Lines with only whitespace break the repl if in the middle of a function + # or class, so strip all lines containing only whitespace + src = "\n".join( + [line for line in src.splitlines() if not line.isspace() and line != ""] + ) + + spacing = "\n" * 4 + + # execute it in the python terminal + self._run_command(spacing + src + spacing) + + def _run_command(self, command: str) -> str: + return self.session.remote_session.send_expect(command, ">>>") + + def _get_port_interface_or_error(self, port: PortIdentifier) -> str: + match self.iface_names.get(port): + case None: + assert ( + False + ), f"{port} is not a valid port on this packet generator on {self.tg_node.name}." + case iface: + return iface + + def send_packet(self, port: PortIdentifier, packet: Packet) -> None: + iface = self._get_port_interface_or_error(port) + self.logger.info("Sending packet") + self.logger.debug("Packet contents: \n" + packet._do_summary()[1]) + self.scapy.scapy_send_packets([packet.build()], iface) + + def send_packets(self, port: PortIdentifier, packets: list[Packet]) -> None: + iface = self._get_port_interface_or_error(port) + self.logger.info("Sending packets") + packet_summaries = json.dumps( + list(map(lambda pkt: pkt._do_summary()[1], packets)), indent=4 + ) + packets = [packet.build() for packet in packets] + self.logger.debug("Packet contents: \n" + packet_summaries) + self.scapy.scapy_send_packets(packets, iface) + + def send_packet_and_capture( + self, + send_port_id: PortIdentifier, + packet: Packet, + receive_port_id: PortIdentifier, + duration_s: int, + capture_name: str = _get_default_capture_name(), + ) -> list[Packet]: + packets = self.scapy.scapy_send_packets_and_capture( + [packet.build()], send_port_id, receive_port_id, duration_s + ) + self._write_capture_from_packets(capture_name, packets) + return packets + + def send_packets_and_capture( + self, + send_port_id: PortIdentifier, + packets: Packet, + receive_port_id: PortIdentifier, + duration_s: int, + capture_name: str = _get_default_capture_name(), + ) -> list[Packet]: + packets: list[bytes] = [packet.build() for packet in packets] + packets: list[bytes] = self.scapy.scapy_send_packets_and_capture( + packets, send_port_id, receive_port_id, duration_s + ) + packets: list[Packet] = [scapy.all.Packet(packet) for packet in packets] + self._write_capture_from_packets(capture_name, packets) + return packets + + def send_packet_and_expect_packet( + self, + send_port_id: PortIdentifier, + packet: Packet, + receive_port_id: PortIdentifier, + expected_packet: Packet, + timeout: int = SETTINGS.timeout, + capture_name: str = _get_default_capture_name(), + ) -> None: + self.send_packets_and_expect_packets( + send_port_id, + [packet], + receive_port_id, + [expected_packet], + timeout, + capture_name, + ) + + def send_packets_and_expect_packets( + self, + send_port_id: PortIdentifier, + packets: list[Packet], + receive_port_id: PortIdentifier, + expected_packets: list[Packet], + timeout: int = SETTINGS.timeout, + capture_name: str = _get_default_capture_name(), + ) -> None: + send_iface = self._get_port_interface_or_error(send_port_id) + recv_iface = self._get_port_interface_or_error(receive_port_id) + + packets = [packet.build() for packet in packets] + + received_packets = self.scapy.scapy_sr1_different_interfaces( + packets, send_iface, recv_iface, timeout + ) + + received_packets = [scapy.all.Packet(packet) for packet in received_packets] + + self._write_capture_from_packets(capture_name, received_packets) + + assert len(received_packets) == len( + expected_packets + ), "Incorrect number of packets received" + for i, expected_packet in enumerate(expected_packets): + assert ( + received_packets[i] == expected_packet + ), f"Received packet {i} differed from expected packet" + + def close(self): + try: + self.scapy.quit() + except ConnectionRefusedError: + # Because the python instance closes, we get no RPC response. + # Thus, this error is expected + pass + try: + self.session.close(force=True) + except TimeoutError: + # Pexpect does not like being in a python prompt when it closes + pass + + def assert_port_is_connected(self, id: PortIdentifier) -> None: + self.tg_node.main_session.check_link_is_up(id)