@@ -1,6 +1,7 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2023 University of New Hampshire
# Copyright(c) 2023 PANTHEON.tech s.r.o.
+# Copyright(c) 2024 Arm Limited
"""Testpmd interactive shell.
@@ -15,12 +16,17 @@
testpmd_shell.close()
"""
+import re
import time
-from enum import auto
+from dataclasses import dataclass, field
+from enum import Flag, auto
from pathlib import PurePath
from typing import Callable, ClassVar
+from typing_extensions import Self
+
from framework.exception import InteractiveCommandExecutionError
+from framework.parser import ParserFn, TextParser
from framework.settings import SETTINGS
from framework.utils import StrEnum
@@ -80,6 +86,451 @@ class TestPmdForwardingModes(StrEnum):
recycle_mbufs = auto()
+class VLANOffloadFlag(Flag):
+ """Flag representing the VLAN offload settings of a NIC port."""
+
+ #:
+ STRIP = auto()
+ #:
+ FILTER = auto()
+ #:
+ EXTEND = auto()
+ #:
+ QINQ_STRIP = auto()
+
+ @classmethod
+ def from_str_dict(cls, d):
+ """Makes an instance from a dict containing the flag member names with an "on" value."""
+ flag = cls(0)
+ for name in cls.__members__:
+ if d.get(name) == "on":
+ flag |= cls[name]
+ return flag
+
+ @classmethod
+ def make_parser(cls) -> ParserFn:
+ """Makes a parser function."""
+ return TextParser.compose(
+ cls.from_str_dict,
+ TextParser.find(
+ r"VLAN offload:\s+"
+ r"strip (?P<STRIP>on|off), "
+ r"filter (?P<FILTER>on|off), "
+ r"extend (?P<EXTEND>on|off), "
+ r"qinq strip (?P<QINQ_STRIP>on|off)$",
+ re.MULTILINE,
+ named=True,
+ ),
+ )
+
+
+class RSSOffloadTypesFlag(Flag):
+ """Flag representing the RSS offload flow types supported by the NIC port."""
+
+ #:
+ ipv4 = auto()
+ #:
+ ipv4_frag = auto()
+ #:
+ ipv4_tcp = auto()
+ #:
+ ipv4_udp = auto()
+ #:
+ ipv4_sctp = auto()
+ #:
+ ipv4_other = auto()
+ #:
+ ipv6 = auto()
+ #:
+ ipv6_frag = auto()
+ #:
+ ipv6_tcp = auto()
+ #:
+ ipv6_udp = auto()
+ #:
+ ipv6_sctp = auto()
+ #:
+ ipv6_other = auto()
+ #:
+ l2_payload = auto()
+ #:
+ ipv6_ex = auto()
+ #:
+ ipv6_tcp_ex = auto()
+ #:
+ ipv6_udp_ex = auto()
+ #:
+ port = auto()
+ #:
+ vxlan = auto()
+ #:
+ geneve = auto()
+ #:
+ nvgre = auto()
+ #:
+ user_defined_22 = auto()
+ #:
+ gtpu = auto()
+ #:
+ eth = auto()
+ #:
+ s_vlan = auto()
+ #:
+ c_vlan = auto()
+ #:
+ esp = auto()
+ #:
+ ah = auto()
+ #:
+ l2tpv3 = auto()
+ #:
+ pfcp = auto()
+ #:
+ pppoe = auto()
+ #:
+ ecpri = auto()
+ #:
+ mpls = auto()
+ #:
+ ipv4_chksum = auto()
+ #:
+ l4_chksum = auto()
+ #:
+ l2tpv2 = auto()
+ #:
+ ipv6_flow_label = auto()
+ #:
+ user_defined_38 = auto()
+ #:
+ user_defined_39 = auto()
+ #:
+ user_defined_40 = auto()
+ #:
+ user_defined_41 = auto()
+ #:
+ user_defined_42 = auto()
+ #:
+ user_defined_43 = auto()
+ #:
+ user_defined_44 = auto()
+ #:
+ user_defined_45 = auto()
+ #:
+ user_defined_46 = auto()
+ #:
+ user_defined_47 = auto()
+ #:
+ user_defined_48 = auto()
+ #:
+ user_defined_49 = auto()
+ #:
+ user_defined_50 = auto()
+ #:
+ user_defined_51 = auto()
+ #:
+ l3_pre96 = auto()
+ #:
+ l3_pre64 = auto()
+ #:
+ l3_pre56 = auto()
+ #:
+ l3_pre48 = auto()
+ #:
+ l3_pre40 = auto()
+ #:
+ l3_pre32 = auto()
+ #:
+ l2_dst_only = auto()
+ #:
+ l2_src_only = auto()
+ #:
+ l4_dst_only = auto()
+ #:
+ l4_src_only = auto()
+ #:
+ l3_dst_only = auto()
+ #:
+ l3_src_only = auto()
+
+ #:
+ ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
+ #:
+ udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
+ #:
+ tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
+ #:
+ sctp = ipv4_sctp | ipv6_sctp
+ #:
+ tunnel = vxlan | geneve | nvgre
+ #:
+ vlan = s_vlan | c_vlan
+ #:
+ all = (
+ eth
+ | vlan
+ | ip
+ | tcp
+ | udp
+ | sctp
+ | l2_payload
+ | l2tpv3
+ | esp
+ | ah
+ | pfcp
+ | gtpu
+ | ecpri
+ | mpls
+ | l2tpv2
+ )
+
+ @classmethod
+ def from_list_string(cls, names: str) -> Self:
+ """Makes a flag from a whitespace-separated list of names."""
+ flag = cls(0)
+ for name in names.split():
+ flag |= cls.from_str(name)
+ return flag
+
+ @classmethod
+ def from_str(cls, name: str) -> Self:
+ """Returns the flag corresponding to the supplied name."""
+ member_name = name.strip().replace("-", "_")
+ return cls[member_name]
+
+ def __str__(self):
+ """String representation."""
+ return self.name.replace("_", "-")
+
+ @classmethod
+ def make_parser(cls) -> ParserFn:
+ """Makes a parser function."""
+ return TextParser.compose(
+ RSSOffloadTypesFlag.from_list_string,
+ TextParser.find(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE),
+ )
+
+
+class DeviceCapabilitiesFlag(Flag):
+ """Flag representing the device capabilities."""
+
+ RUNTIME_RX_QUEUE_SETUP = auto()
+ """Device supports Rx queue setup after device started."""
+ RUNTIME_TX_QUEUE_SETUP = auto()
+ """Device supports Tx queue setup after device started."""
+ RXQ_SHARE = auto()
+ """Device supports shared Rx queue among ports within Rx domain and switch domain."""
+ FLOW_RULE_KEEP = auto()
+ """Device supports keeping flow rules across restart."""
+ FLOW_SHARED_OBJECT_KEEP = auto()
+ """Device supports keeping shared flow objects across restart."""
+
+ @classmethod
+ def make_parser(cls) -> ParserFn:
+ """Makes a parser function."""
+ return TextParser.compose(
+ cls,
+ TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
+ )
+
+
+class DeviceErrorHandlingMode(StrEnum):
+ """Enum representing the device error handling mode."""
+
+ #:
+ none = auto()
+ #:
+ passive = auto()
+ #:
+ proactive = auto()
+ #:
+ unknown = auto()
+
+ @classmethod
+ def make_parser(cls) -> ParserFn:
+ """Makes a parser function."""
+ return TextParser.compose(cls, TextParser.find(r"Device error handling mode: (\w+)"))
+
+
+def make_device_private_info_parser() -> ParserFn:
+ """Device private information parser.
+
+ Ensure that we are not parsing invalid device private info output.
+ """
+
+ def _validate(info: str):
+ info = info.strip()
+ if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
+ return None
+ return info
+
+ return TextParser.compose(_validate, TextParser.find(r"Device private info:\s+([\s\S]+)"))
+
+
+@dataclass
+class TestPmdPort(TextParser):
+ """Dataclass representing the result of testpmd's ``show port info`` command."""
+
+ #:
+ id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
+ #:
+ device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
+ #:
+ driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
+ #:
+ socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
+ #:
+ is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
+ #:
+ link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
+ #:
+ is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
+ #:
+ is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
+ #:
+ is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
+ #:
+ is_allmulticast_mode_enabled: bool = field(
+ metadata=TextParser.find("Allmulticast mode: enabled")
+ )
+ #: Maximum number of MAC addresses
+ max_mac_addresses_num: int = field(
+ metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
+ )
+ #: Maximum configurable length of RX packet
+ max_hash_mac_addresses_num: int = field(
+ metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
+ )
+ #: Minimum size of RX buffer
+ min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
+ #: Maximum configurable length of RX packet
+ max_rx_packet_length: int = field(
+ metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
+ )
+ #: Maximum configurable size of LRO aggregated packet
+ max_lro_packet_size: int = field(
+ metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
+ )
+
+ #: Current number of RX queues
+ rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
+ #: Max possible RX queues
+ max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
+ #: Max possible number of RXDs per queue
+ max_queue_rxd_num: int = field(
+ metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
+ )
+ #: Min possible number of RXDs per queue
+ min_queue_rxd_num: int = field(
+ metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
+ )
+ #: RXDs number alignment
+ rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
+
+ #: Current number of TX queues
+ tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
+ #: Max possible TX queues
+ max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
+ #: Max possible number of TXDs per queue
+ max_queue_txd_num: int = field(
+ metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
+ )
+ #: Min possible number of TXDs per queue
+ min_queue_txd_num: int = field(
+ metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
+ )
+ #: TXDs number alignment
+ txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
+ #: Max segment number per packet
+ max_packet_segment_num: int = field(
+ metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
+ )
+ #: Max segment number per MTU/TSO
+ max_mtu_segment_num: int = field(
+ metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
+ )
+
+ #:
+ device_capabilities: DeviceCapabilitiesFlag = field(
+ metadata=DeviceCapabilitiesFlag.make_parser(),
+ )
+ #:
+ device_error_handling_mode: DeviceErrorHandlingMode = field(
+ metadata=DeviceErrorHandlingMode.make_parser()
+ )
+ #:
+ device_private_info: str | None = field(
+ default=None,
+ metadata=make_device_private_info_parser(),
+ )
+
+ #:
+ hash_key_size: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
+ )
+ #:
+ redirection_table_size: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
+ )
+ #:
+ supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
+ default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
+ )
+
+ #:
+ mac_address: str | None = field(
+ default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
+ )
+ #:
+ fw_version: str | None = field(
+ default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
+ )
+ #:
+ dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
+ #: Socket id of the memory allocation
+ mem_alloc_socket_id: int | None = field(
+ default=None,
+ metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
+ )
+ #:
+ mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
+
+ #:
+ vlan_offload: VLANOffloadFlag | None = field(
+ default=None,
+ metadata=VLANOffloadFlag.make_parser(),
+ )
+
+ #: Maximum size of RX buffer
+ max_rx_bufsize: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
+ )
+ #: Maximum number of VFs
+ max_vfs_num: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
+ )
+ #: Maximum number of VMDq pools
+ max_vmdq_pools_num: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
+ )
+
+ #:
+ switch_name: str | None = field(
+ default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
+ )
+ #:
+ switch_domain_id: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
+ )
+ #:
+ switch_port_id: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
+ )
+ #:
+ switch_rx_domain: int | None = field(
+ default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
+ )
+
+
class TestPmdShell(InteractiveShell):
"""Testpmd interactive shell.
@@ -225,6 +676,43 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
f"Test pmd failed to set fwd mode to {mode.value}"
)
+ def show_port_info_all(self) -> list[TestPmdPort]:
+ """Returns the information of all the ports."""
+ output = self.send_command("show port info all")
+
+ # Sample output of the "all" command looks like:
+ #
+ # <start>
+ #
+ # ********************* Infos for port 0 *********************
+ # Key: value
+ #
+ # ********************* Infos for port 1 *********************
+ # Key: value
+ # <end>
+ #
+ # Take advantage of the double new line in between ports as end delimiter.
+ # But we need to artificially add a new line at the end to pick up the last port.
+ # Because commands are executed on a pseudo-terminal created by paramiko on the remote
+ # target lines end with CRLF. Therefore we also need to take the carriage return in account.
+ iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
+ return [TestPmdPort.parse(block.group(0)) for block in iter]
+
+ def show_port_info(self, port_id: int) -> TestPmdPort:
+ """Returns the given port information.
+
+ Args:
+ port_id: The port ID to gather information for.
+
+ Raises:
+ InteractiveCommandExecutionError: If `port_id` is invalid.
+ """
+ output = self.send_command(f"show port info {port_id}", skip_first_line=True)
+ if output.startswith("Invalid port"):
+ raise InteractiveCommandExecutionError("invalid port given")
+
+ return TestPmdPort.parse(output)
+
def close(self) -> None:
"""Overrides :meth:`~.interactive_shell.close`."""
self.send_command("quit", "")