[v5,4/5] dts: add `show port info` command to TestPmdShell

Message ID 20240606213420.254260-5-luca.vizzarro@arm.com (mailing list archive)
State Accepted
Delegated to: Thomas Monjalon
Headers
Series dts: testpmd show port info/stats |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Luca Vizzarro June 6, 2024, 9:34 p.m. UTC
  Add a new TestPmdPort data structure to represent the output
returned by `show port info`, which is implemented as part of
TestPmdShell.

The TestPmdPort data structure and its derived classes are modelled
based on the relevant testpmd source code.

This implementation makes extensive use of regular expressions, which
all parse individually. The rationale behind this is to lower the risk
of the testpmd output changing as part of development. Therefore
minimising breakage.

Bugzilla ID: 1407

Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>
---
 dts/framework/remote_session/testpmd_shell.py | 537 +++++++++++++++++-
 1 file changed, 536 insertions(+), 1 deletion(-)
  

Comments

Juraj Linkeš June 7, 2024, 11:16 a.m. UTC | #1
On 6. 6. 2024 23:34, Luca Vizzarro wrote:
> Add a new TestPmdPort data structure to represent the output
> returned by `show port info`, which is implemented as part of
> TestPmdShell.
> 
> The TestPmdPort data structure and its derived classes are modelled
> based on the relevant testpmd source code.
> 
> This implementation makes extensive use of regular expressions, which
> all parse individually. The rationale behind this is to lower the risk
> of the testpmd output changing as part of development. Therefore
> minimising breakage.
> 
> Bugzilla ID: 1407
> 
> Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
> Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>

Reviewed-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
  
Jeremy Spewock June 7, 2024, 1:11 p.m. UTC | #2
On Thu, Jun 6, 2024 at 5:34 PM Luca Vizzarro <luca.vizzarro@arm.com> wrote:
>
> Add a new TestPmdPort data structure to represent the output
> returned by `show port info`, which is implemented as part of
> TestPmdShell.
>
> The TestPmdPort data structure and its derived classes are modelled
> based on the relevant testpmd source code.
>
> This implementation makes extensive use of regular expressions, which
> all parse individually. The rationale behind this is to lower the risk
> of the testpmd output changing as part of development. Therefore
> minimising breakage.
>
> Bugzilla ID: 1407
>
> Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
> Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>

Reviewed-by: Jeremy Spewock <jspewock@iol.unh.edu>
  
Nicholas Pratte June 14, 2024, 5:36 p.m. UTC | #3
Tested-by: Nicholas Pratte <npratte@iol.unh.edu>
Reviewed-by: Nicholas Pratte <npratte@iol.unh.edu>

On Thu, Jun 6, 2024 at 5:34 PM Luca Vizzarro <luca.vizzarro@arm.com> wrote:
>
> Add a new TestPmdPort data structure to represent the output
> returned by `show port info`, which is implemented as part of
> TestPmdShell.
>
> The TestPmdPort data structure and its derived classes are modelled
> based on the relevant testpmd source code.
>
> This implementation makes extensive use of regular expressions, which
> all parse individually. The rationale behind this is to lower the risk
> of the testpmd output changing as part of development. Therefore
> minimising breakage.
>
> Bugzilla ID: 1407
>
> Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
> Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>
> ---
>  dts/framework/remote_session/testpmd_shell.py | 537 +++++++++++++++++-
>  1 file changed, 536 insertions(+), 1 deletion(-)
>
> diff --git a/dts/framework/remote_session/testpmd_shell.py b/dts/framework/remote_session/testpmd_shell.py
> index cb2ab6bd00..ab9a1f86a9 100644
> --- a/dts/framework/remote_session/testpmd_shell.py
> +++ b/dts/framework/remote_session/testpmd_shell.py
> @@ -1,6 +1,7 @@
>  # SPDX-License-Identifier: BSD-3-Clause
>  # Copyright(c) 2023 University of New Hampshire
>  # Copyright(c) 2023 PANTHEON.tech s.r.o.
> +# Copyright(c) 2024 Arm Limited
>
>  """Testpmd interactive shell.
>
> @@ -15,12 +16,17 @@
>      testpmd_shell.close()
>  """
>
> +import re
>  import time
> -from enum import auto
> +from dataclasses import dataclass, field
> +from enum import Flag, auto
>  from pathlib import PurePath
>  from typing import Callable, ClassVar
>
> +from typing_extensions import Self
> +
>  from framework.exception import InteractiveCommandExecutionError
> +from framework.parser import ParserFn, TextParser
>  from framework.settings import SETTINGS
>  from framework.utils import StrEnum
>
> @@ -80,6 +86,491 @@ class TestPmdForwardingModes(StrEnum):
>      recycle_mbufs = auto()
>
>
> +class VLANOffloadFlag(Flag):
> +    """Flag representing the VLAN offload settings of a NIC port."""
> +
> +    #:
> +    STRIP = auto()
> +    #:
> +    FILTER = auto()
> +    #:
> +    EXTEND = auto()
> +    #:
> +    QINQ_STRIP = auto()
> +
> +    @classmethod
> +    def from_str_dict(cls, d):
> +        """Makes an instance from a dict containing the flag member names with an "on" value.
> +
> +        Args:
> +            d: A dictionary containing the flag members as keys and any string value.
> +
> +        Returns:
> +            A new instance of the flag.
> +        """
> +        flag = cls(0)
> +        for name in cls.__members__:
> +            if d.get(name) == "on":
> +                flag |= cls[name]
> +        return flag
> +
> +    @classmethod
> +    def make_parser(cls) -> ParserFn:
> +        """Makes a parser function.
> +
> +        Returns:
> +            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
> +                parser function that makes an instance of this flag from text.
> +        """
> +        return TextParser.wrap(
> +            TextParser.find(
> +                r"VLAN offload:\s+"
> +                r"strip (?P<STRIP>on|off), "
> +                r"filter (?P<FILTER>on|off), "
> +                r"extend (?P<EXTEND>on|off), "
> +                r"qinq strip (?P<QINQ_STRIP>on|off)$",
> +                re.MULTILINE,
> +                named=True,
> +            ),
> +            cls.from_str_dict,
> +        )
> +
> +
> +class RSSOffloadTypesFlag(Flag):
> +    """Flag representing the RSS offload flow types supported by the NIC port."""
> +
> +    #:
> +    ipv4 = auto()
> +    #:
> +    ipv4_frag = auto()
> +    #:
> +    ipv4_tcp = auto()
> +    #:
> +    ipv4_udp = auto()
> +    #:
> +    ipv4_sctp = auto()
> +    #:
> +    ipv4_other = auto()
> +    #:
> +    ipv6 = auto()
> +    #:
> +    ipv6_frag = auto()
> +    #:
> +    ipv6_tcp = auto()
> +    #:
> +    ipv6_udp = auto()
> +    #:
> +    ipv6_sctp = auto()
> +    #:
> +    ipv6_other = auto()
> +    #:
> +    l2_payload = auto()
> +    #:
> +    ipv6_ex = auto()
> +    #:
> +    ipv6_tcp_ex = auto()
> +    #:
> +    ipv6_udp_ex = auto()
> +    #:
> +    port = auto()
> +    #:
> +    vxlan = auto()
> +    #:
> +    geneve = auto()
> +    #:
> +    nvgre = auto()
> +    #:
> +    user_defined_22 = auto()
> +    #:
> +    gtpu = auto()
> +    #:
> +    eth = auto()
> +    #:
> +    s_vlan = auto()
> +    #:
> +    c_vlan = auto()
> +    #:
> +    esp = auto()
> +    #:
> +    ah = auto()
> +    #:
> +    l2tpv3 = auto()
> +    #:
> +    pfcp = auto()
> +    #:
> +    pppoe = auto()
> +    #:
> +    ecpri = auto()
> +    #:
> +    mpls = auto()
> +    #:
> +    ipv4_chksum = auto()
> +    #:
> +    l4_chksum = auto()
> +    #:
> +    l2tpv2 = auto()
> +    #:
> +    ipv6_flow_label = auto()
> +    #:
> +    user_defined_38 = auto()
> +    #:
> +    user_defined_39 = auto()
> +    #:
> +    user_defined_40 = auto()
> +    #:
> +    user_defined_41 = auto()
> +    #:
> +    user_defined_42 = auto()
> +    #:
> +    user_defined_43 = auto()
> +    #:
> +    user_defined_44 = auto()
> +    #:
> +    user_defined_45 = auto()
> +    #:
> +    user_defined_46 = auto()
> +    #:
> +    user_defined_47 = auto()
> +    #:
> +    user_defined_48 = auto()
> +    #:
> +    user_defined_49 = auto()
> +    #:
> +    user_defined_50 = auto()
> +    #:
> +    user_defined_51 = auto()
> +    #:
> +    l3_pre96 = auto()
> +    #:
> +    l3_pre64 = auto()
> +    #:
> +    l3_pre56 = auto()
> +    #:
> +    l3_pre48 = auto()
> +    #:
> +    l3_pre40 = auto()
> +    #:
> +    l3_pre32 = auto()
> +    #:
> +    l2_dst_only = auto()
> +    #:
> +    l2_src_only = auto()
> +    #:
> +    l4_dst_only = auto()
> +    #:
> +    l4_src_only = auto()
> +    #:
> +    l3_dst_only = auto()
> +    #:
> +    l3_src_only = auto()
> +
> +    #:
> +    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
> +    #:
> +    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
> +    #:
> +    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
> +    #:
> +    sctp = ipv4_sctp | ipv6_sctp
> +    #:
> +    tunnel = vxlan | geneve | nvgre
> +    #:
> +    vlan = s_vlan | c_vlan
> +    #:
> +    all = (
> +        eth
> +        | vlan
> +        | ip
> +        | tcp
> +        | udp
> +        | sctp
> +        | l2_payload
> +        | l2tpv3
> +        | esp
> +        | ah
> +        | pfcp
> +        | gtpu
> +        | ecpri
> +        | mpls
> +        | l2tpv2
> +    )
> +
> +    @classmethod
> +    def from_list_string(cls, names: str) -> Self:
> +        """Makes a flag from a whitespace-separated list of names.
> +
> +        Args:
> +            names: a whitespace-separated list containing the members of this flag.
> +
> +        Returns:
> +            An instance of this flag.
> +        """
> +        flag = cls(0)
> +        for name in names.split():
> +            flag |= cls.from_str(name)
> +        return flag
> +
> +    @classmethod
> +    def from_str(cls, name: str) -> Self:
> +        """Makes a flag matching the supplied name.
> +
> +        Args:
> +            name: a valid member of this flag in text
> +        Returns:
> +            An instance of this flag.
> +        """
> +        member_name = name.strip().replace("-", "_")
> +        return cls[member_name]
> +
> +    @classmethod
> +    def make_parser(cls) -> ParserFn:
> +        """Makes a parser function.
> +
> +        Returns:
> +            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
> +                parser function that makes an instance of this flag from text.
> +        """
> +        return TextParser.wrap(
> +            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
> +            RSSOffloadTypesFlag.from_list_string,
> +        )
> +
> +
> +class DeviceCapabilitiesFlag(Flag):
> +    """Flag representing the device capabilities."""
> +
> +    #: Device supports Rx queue setup after device started.
> +    RUNTIME_RX_QUEUE_SETUP = auto()
> +    #: Device supports Tx queue setup after device started.
> +    RUNTIME_TX_QUEUE_SETUP = auto()
> +    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
> +    RXQ_SHARE = auto()
> +    #: Device supports keeping flow rules across restart.
> +    FLOW_RULE_KEEP = auto()
> +    #: Device supports keeping shared flow objects across restart.
> +    FLOW_SHARED_OBJECT_KEEP = auto()
> +
> +    @classmethod
> +    def make_parser(cls) -> ParserFn:
> +        """Makes a parser function.
> +
> +        Returns:
> +            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
> +                parser function that makes an instance of this flag from text.
> +        """
> +        return TextParser.wrap(
> +            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
> +            cls,
> +        )
> +
> +
> +class DeviceErrorHandlingMode(StrEnum):
> +    """Enum representing the device error handling mode."""
> +
> +    #:
> +    none = auto()
> +    #:
> +    passive = auto()
> +    #:
> +    proactive = auto()
> +    #:
> +    unknown = auto()
> +
> +    @classmethod
> +    def make_parser(cls) -> ParserFn:
> +        """Makes a parser function.
> +
> +        Returns:
> +            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
> +                parser function that makes an instance of this enum from text.
> +        """
> +        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
> +
> +
> +def make_device_private_info_parser() -> ParserFn:
> +    """Device private information parser.
> +
> +    Ensures that we are not parsing invalid device private info output.
> +
> +    Returns:
> +        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
> +            function that parses the device private info from the TestPmd port info output.
> +    """
> +
> +    def _validate(info: str):
> +        info = info.strip()
> +        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
> +            return None
> +        return info
> +
> +    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
> +
> +
> +@dataclass
> +class TestPmdPort(TextParser):
> +    """Dataclass representing the result of testpmd's ``show port info`` command."""
> +
> +    #:
> +    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
> +    #:
> +    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
> +    #:
> +    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
> +    #:
> +    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
> +    #:
> +    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
> +    #:
> +    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
> +    #:
> +    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
> +    #:
> +    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
> +    #:
> +    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
> +    #:
> +    is_allmulticast_mode_enabled: bool = field(
> +        metadata=TextParser.find("Allmulticast mode: enabled")
> +    )
> +    #: Maximum number of MAC addresses
> +    max_mac_addresses_num: int = field(
> +        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
> +    )
> +    #: Maximum configurable length of RX packet
> +    max_hash_mac_addresses_num: int = field(
> +        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
> +    )
> +    #: Minimum size of RX buffer
> +    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
> +    #: Maximum configurable length of RX packet
> +    max_rx_packet_length: int = field(
> +        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
> +    )
> +    #: Maximum configurable size of LRO aggregated packet
> +    max_lro_packet_size: int = field(
> +        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
> +    )
> +
> +    #: Current number of RX queues
> +    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
> +    #: Max possible RX queues
> +    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
> +    #: Max possible number of RXDs per queue
> +    max_queue_rxd_num: int = field(
> +        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
> +    )
> +    #: Min possible number of RXDs per queue
> +    min_queue_rxd_num: int = field(
> +        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
> +    )
> +    #: RXDs number alignment
> +    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
> +
> +    #: Current number of TX queues
> +    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
> +    #: Max possible TX queues
> +    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
> +    #: Max possible number of TXDs per queue
> +    max_queue_txd_num: int = field(
> +        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
> +    )
> +    #: Min possible number of TXDs per queue
> +    min_queue_txd_num: int = field(
> +        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
> +    )
> +    #: TXDs number alignment
> +    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
> +    #: Max segment number per packet
> +    max_packet_segment_num: int = field(
> +        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
> +    )
> +    #: Max segment number per MTU/TSO
> +    max_mtu_segment_num: int = field(
> +        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
> +    )
> +
> +    #:
> +    device_capabilities: DeviceCapabilitiesFlag = field(
> +        metadata=DeviceCapabilitiesFlag.make_parser(),
> +    )
> +    #:
> +    device_error_handling_mode: DeviceErrorHandlingMode = field(
> +        metadata=DeviceErrorHandlingMode.make_parser()
> +    )
> +    #:
> +    device_private_info: str | None = field(
> +        default=None,
> +        metadata=make_device_private_info_parser(),
> +    )
> +
> +    #:
> +    hash_key_size: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
> +    )
> +    #:
> +    redirection_table_size: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
> +    )
> +    #:
> +    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
> +        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
> +    )
> +
> +    #:
> +    mac_address: str | None = field(
> +        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
> +    )
> +    #:
> +    fw_version: str | None = field(
> +        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
> +    )
> +    #:
> +    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
> +    #: Socket id of the memory allocation
> +    mem_alloc_socket_id: int | None = field(
> +        default=None,
> +        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
> +    )
> +    #:
> +    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
> +
> +    #:
> +    vlan_offload: VLANOffloadFlag | None = field(
> +        default=None,
> +        metadata=VLANOffloadFlag.make_parser(),
> +    )
> +
> +    #: Maximum size of RX buffer
> +    max_rx_bufsize: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
> +    )
> +    #: Maximum number of VFs
> +    max_vfs_num: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
> +    )
> +    #: Maximum number of VMDq pools
> +    max_vmdq_pools_num: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
> +    )
> +
> +    #:
> +    switch_name: str | None = field(
> +        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
> +    )
> +    #:
> +    switch_domain_id: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
> +    )
> +    #:
> +    switch_port_id: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
> +    )
> +    #:
> +    switch_rx_domain: int | None = field(
> +        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
> +    )
> +
> +
>  class TestPmdShell(InteractiveShell):
>      """Testpmd interactive shell.
>
> @@ -225,6 +716,50 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
>                  f"Test pmd failed to set fwd mode to {mode.value}"
>              )
>
> +    def show_port_info_all(self) -> list[TestPmdPort]:
> +        """Returns the information of all the ports.
> +
> +        Returns:
> +            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
> +        """
> +        output = self.send_command("show port info all")
> +
> +        # Sample output of the "all" command looks like:
> +        #
> +        # <start>
> +        #
> +        #   ********************* Infos for port 0 *********************
> +        #   Key: value
> +        #
> +        #   ********************* Infos for port 1 *********************
> +        #   Key: value
> +        # <end>
> +        #
> +        # Takes advantage of the double new line in between ports as end delimiter. But we need to
> +        # artificially add a new line at the end to pick up the last port. Because commands are
> +        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
> +        # Therefore we also need to take the carriage return into account.
> +        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
> +        return [TestPmdPort.parse(block.group(0)) for block in iter]
> +
> +    def show_port_info(self, port_id: int) -> TestPmdPort:
> +        """Returns the given port information.
> +
> +        Args:
> +            port_id: The port ID to gather information for.
> +
> +        Raises:
> +            InteractiveCommandExecutionError: If `port_id` is invalid.
> +
> +        Returns:
> +            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
> +        """
> +        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
> +        if output.startswith("Invalid port"):
> +            raise InteractiveCommandExecutionError("invalid port given")
> +
> +        return TestPmdPort.parse(output)
> +
>      def close(self) -> None:
>          """Overrides :meth:`~.interactive_shell.close`."""
>          self.send_command("quit", "")
> --
> 2.34.1
>
  

Patch

diff --git a/dts/framework/remote_session/testpmd_shell.py b/dts/framework/remote_session/testpmd_shell.py
index cb2ab6bd00..ab9a1f86a9 100644
--- a/dts/framework/remote_session/testpmd_shell.py
+++ b/dts/framework/remote_session/testpmd_shell.py
@@ -1,6 +1,7 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2023 University of New Hampshire
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
+# Copyright(c) 2024 Arm Limited
 
 """Testpmd interactive shell.
 
@@ -15,12 +16,17 @@ 
     testpmd_shell.close()
 """
 
+import re
 import time
-from enum import auto
+from dataclasses import dataclass, field
+from enum import Flag, auto
 from pathlib import PurePath
 from typing import Callable, ClassVar
 
+from typing_extensions import Self
+
 from framework.exception import InteractiveCommandExecutionError
+from framework.parser import ParserFn, TextParser
 from framework.settings import SETTINGS
 from framework.utils import StrEnum
 
@@ -80,6 +86,491 @@  class TestPmdForwardingModes(StrEnum):
     recycle_mbufs = auto()
 
 
+class VLANOffloadFlag(Flag):
+    """Flag representing the VLAN offload settings of a NIC port."""
+
+    #:
+    STRIP = auto()
+    #:
+    FILTER = auto()
+    #:
+    EXTEND = auto()
+    #:
+    QINQ_STRIP = auto()
+
+    @classmethod
+    def from_str_dict(cls, d):
+        """Makes an instance from a dict containing the flag member names with an "on" value.
+
+        Args:
+            d: A dictionary containing the flag members as keys and any string value.
+
+        Returns:
+            A new instance of the flag.
+        """
+        flag = cls(0)
+        for name in cls.__members__:
+            if d.get(name) == "on":
+                flag |= cls[name]
+        return flag
+
+    @classmethod
+    def make_parser(cls) -> ParserFn:
+        """Makes a parser function.
+
+        Returns:
+            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
+                parser function that makes an instance of this flag from text.
+        """
+        return TextParser.wrap(
+            TextParser.find(
+                r"VLAN offload:\s+"
+                r"strip (?P<STRIP>on|off), "
+                r"filter (?P<FILTER>on|off), "
+                r"extend (?P<EXTEND>on|off), "
+                r"qinq strip (?P<QINQ_STRIP>on|off)$",
+                re.MULTILINE,
+                named=True,
+            ),
+            cls.from_str_dict,
+        )
+
+
+class RSSOffloadTypesFlag(Flag):
+    """Flag representing the RSS offload flow types supported by the NIC port."""
+
+    #:
+    ipv4 = auto()
+    #:
+    ipv4_frag = auto()
+    #:
+    ipv4_tcp = auto()
+    #:
+    ipv4_udp = auto()
+    #:
+    ipv4_sctp = auto()
+    #:
+    ipv4_other = auto()
+    #:
+    ipv6 = auto()
+    #:
+    ipv6_frag = auto()
+    #:
+    ipv6_tcp = auto()
+    #:
+    ipv6_udp = auto()
+    #:
+    ipv6_sctp = auto()
+    #:
+    ipv6_other = auto()
+    #:
+    l2_payload = auto()
+    #:
+    ipv6_ex = auto()
+    #:
+    ipv6_tcp_ex = auto()
+    #:
+    ipv6_udp_ex = auto()
+    #:
+    port = auto()
+    #:
+    vxlan = auto()
+    #:
+    geneve = auto()
+    #:
+    nvgre = auto()
+    #:
+    user_defined_22 = auto()
+    #:
+    gtpu = auto()
+    #:
+    eth = auto()
+    #:
+    s_vlan = auto()
+    #:
+    c_vlan = auto()
+    #:
+    esp = auto()
+    #:
+    ah = auto()
+    #:
+    l2tpv3 = auto()
+    #:
+    pfcp = auto()
+    #:
+    pppoe = auto()
+    #:
+    ecpri = auto()
+    #:
+    mpls = auto()
+    #:
+    ipv4_chksum = auto()
+    #:
+    l4_chksum = auto()
+    #:
+    l2tpv2 = auto()
+    #:
+    ipv6_flow_label = auto()
+    #:
+    user_defined_38 = auto()
+    #:
+    user_defined_39 = auto()
+    #:
+    user_defined_40 = auto()
+    #:
+    user_defined_41 = auto()
+    #:
+    user_defined_42 = auto()
+    #:
+    user_defined_43 = auto()
+    #:
+    user_defined_44 = auto()
+    #:
+    user_defined_45 = auto()
+    #:
+    user_defined_46 = auto()
+    #:
+    user_defined_47 = auto()
+    #:
+    user_defined_48 = auto()
+    #:
+    user_defined_49 = auto()
+    #:
+    user_defined_50 = auto()
+    #:
+    user_defined_51 = auto()
+    #:
+    l3_pre96 = auto()
+    #:
+    l3_pre64 = auto()
+    #:
+    l3_pre56 = auto()
+    #:
+    l3_pre48 = auto()
+    #:
+    l3_pre40 = auto()
+    #:
+    l3_pre32 = auto()
+    #:
+    l2_dst_only = auto()
+    #:
+    l2_src_only = auto()
+    #:
+    l4_dst_only = auto()
+    #:
+    l4_src_only = auto()
+    #:
+    l3_dst_only = auto()
+    #:
+    l3_src_only = auto()
+
+    #:
+    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
+    #:
+    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
+    #:
+    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
+    #:
+    sctp = ipv4_sctp | ipv6_sctp
+    #:
+    tunnel = vxlan | geneve | nvgre
+    #:
+    vlan = s_vlan | c_vlan
+    #:
+    all = (
+        eth
+        | vlan
+        | ip
+        | tcp
+        | udp
+        | sctp
+        | l2_payload
+        | l2tpv3
+        | esp
+        | ah
+        | pfcp
+        | gtpu
+        | ecpri
+        | mpls
+        | l2tpv2
+    )
+
+    @classmethod
+    def from_list_string(cls, names: str) -> Self:
+        """Makes a flag from a whitespace-separated list of names.
+
+        Args:
+            names: a whitespace-separated list containing the members of this flag.
+
+        Returns:
+            An instance of this flag.
+        """
+        flag = cls(0)
+        for name in names.split():
+            flag |= cls.from_str(name)
+        return flag
+
+    @classmethod
+    def from_str(cls, name: str) -> Self:
+        """Makes a flag matching the supplied name.
+
+        Args:
+            name: a valid member of this flag in text
+        Returns:
+            An instance of this flag.
+        """
+        member_name = name.strip().replace("-", "_")
+        return cls[member_name]
+
+    @classmethod
+    def make_parser(cls) -> ParserFn:
+        """Makes a parser function.
+
+        Returns:
+            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
+                parser function that makes an instance of this flag from text.
+        """
+        return TextParser.wrap(
+            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
+            RSSOffloadTypesFlag.from_list_string,
+        )
+
+
+class DeviceCapabilitiesFlag(Flag):
+    """Flag representing the device capabilities."""
+
+    #: Device supports Rx queue setup after device started.
+    RUNTIME_RX_QUEUE_SETUP = auto()
+    #: Device supports Tx queue setup after device started.
+    RUNTIME_TX_QUEUE_SETUP = auto()
+    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
+    RXQ_SHARE = auto()
+    #: Device supports keeping flow rules across restart.
+    FLOW_RULE_KEEP = auto()
+    #: Device supports keeping shared flow objects across restart.
+    FLOW_SHARED_OBJECT_KEEP = auto()
+
+    @classmethod
+    def make_parser(cls) -> ParserFn:
+        """Makes a parser function.
+
+        Returns:
+            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
+                parser function that makes an instance of this flag from text.
+        """
+        return TextParser.wrap(
+            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
+            cls,
+        )
+
+
+class DeviceErrorHandlingMode(StrEnum):
+    """Enum representing the device error handling mode."""
+
+    #:
+    none = auto()
+    #:
+    passive = auto()
+    #:
+    proactive = auto()
+    #:
+    unknown = auto()
+
+    @classmethod
+    def make_parser(cls) -> ParserFn:
+        """Makes a parser function.
+
+        Returns:
+            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
+                parser function that makes an instance of this enum from text.
+        """
+        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
+
+
+def make_device_private_info_parser() -> ParserFn:
+    """Device private information parser.
+
+    Ensures that we are not parsing invalid device private info output.
+
+    Returns:
+        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
+            function that parses the device private info from the TestPmd port info output.
+    """
+
+    def _validate(info: str):
+        info = info.strip()
+        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
+            return None
+        return info
+
+    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
+
+
+@dataclass
+class TestPmdPort(TextParser):
+    """Dataclass representing the result of testpmd's ``show port info`` command."""
+
+    #:
+    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
+    #:
+    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
+    #:
+    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
+    #:
+    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
+    #:
+    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
+    #:
+    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
+    #:
+    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
+    #:
+    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
+    #:
+    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
+    #:
+    is_allmulticast_mode_enabled: bool = field(
+        metadata=TextParser.find("Allmulticast mode: enabled")
+    )
+    #: Maximum number of MAC addresses
+    max_mac_addresses_num: int = field(
+        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
+    )
+    #: Maximum configurable length of RX packet
+    max_hash_mac_addresses_num: int = field(
+        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
+    )
+    #: Minimum size of RX buffer
+    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
+    #: Maximum configurable length of RX packet
+    max_rx_packet_length: int = field(
+        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
+    )
+    #: Maximum configurable size of LRO aggregated packet
+    max_lro_packet_size: int = field(
+        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
+    )
+
+    #: Current number of RX queues
+    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
+    #: Max possible RX queues
+    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
+    #: Max possible number of RXDs per queue
+    max_queue_rxd_num: int = field(
+        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
+    )
+    #: Min possible number of RXDs per queue
+    min_queue_rxd_num: int = field(
+        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
+    )
+    #: RXDs number alignment
+    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
+
+    #: Current number of TX queues
+    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
+    #: Max possible TX queues
+    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
+    #: Max possible number of TXDs per queue
+    max_queue_txd_num: int = field(
+        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
+    )
+    #: Min possible number of TXDs per queue
+    min_queue_txd_num: int = field(
+        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
+    )
+    #: TXDs number alignment
+    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
+    #: Max segment number per packet
+    max_packet_segment_num: int = field(
+        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
+    )
+    #: Max segment number per MTU/TSO
+    max_mtu_segment_num: int = field(
+        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
+    )
+
+    #:
+    device_capabilities: DeviceCapabilitiesFlag = field(
+        metadata=DeviceCapabilitiesFlag.make_parser(),
+    )
+    #:
+    device_error_handling_mode: DeviceErrorHandlingMode = field(
+        metadata=DeviceErrorHandlingMode.make_parser()
+    )
+    #:
+    device_private_info: str | None = field(
+        default=None,
+        metadata=make_device_private_info_parser(),
+    )
+
+    #:
+    hash_key_size: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
+    )
+    #:
+    redirection_table_size: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
+    )
+    #:
+    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
+        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
+    )
+
+    #:
+    mac_address: str | None = field(
+        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
+    )
+    #:
+    fw_version: str | None = field(
+        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
+    )
+    #:
+    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
+    #: Socket id of the memory allocation
+    mem_alloc_socket_id: int | None = field(
+        default=None,
+        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
+    )
+    #:
+    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
+
+    #:
+    vlan_offload: VLANOffloadFlag | None = field(
+        default=None,
+        metadata=VLANOffloadFlag.make_parser(),
+    )
+
+    #: Maximum size of RX buffer
+    max_rx_bufsize: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
+    )
+    #: Maximum number of VFs
+    max_vfs_num: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
+    )
+    #: Maximum number of VMDq pools
+    max_vmdq_pools_num: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
+    )
+
+    #:
+    switch_name: str | None = field(
+        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
+    )
+    #:
+    switch_domain_id: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
+    )
+    #:
+    switch_port_id: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
+    )
+    #:
+    switch_rx_domain: int | None = field(
+        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
+    )
+
+
 class TestPmdShell(InteractiveShell):
     """Testpmd interactive shell.
 
@@ -225,6 +716,50 @@  def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
                 f"Test pmd failed to set fwd mode to {mode.value}"
             )
 
+    def show_port_info_all(self) -> list[TestPmdPort]:
+        """Returns the information of all the ports.
+
+        Returns:
+            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
+        """
+        output = self.send_command("show port info all")
+
+        # Sample output of the "all" command looks like:
+        #
+        # <start>
+        #
+        #   ********************* Infos for port 0 *********************
+        #   Key: value
+        #
+        #   ********************* Infos for port 1 *********************
+        #   Key: value
+        # <end>
+        #
+        # Takes advantage of the double new line in between ports as end delimiter. But we need to
+        # artificially add a new line at the end to pick up the last port. Because commands are
+        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
+        # Therefore we also need to take the carriage return into account.
+        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
+        return [TestPmdPort.parse(block.group(0)) for block in iter]
+
+    def show_port_info(self, port_id: int) -> TestPmdPort:
+        """Returns the given port information.
+
+        Args:
+            port_id: The port ID to gather information for.
+
+        Raises:
+            InteractiveCommandExecutionError: If `port_id` is invalid.
+
+        Returns:
+            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
+        """
+        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
+        if output.startswith("Invalid port"):
+            raise InteractiveCommandExecutionError("invalid port given")
+
+        return TestPmdPort.parse(output)
+
     def close(self) -> None:
         """Overrides :meth:`~.interactive_shell.close`."""
         self.send_command("quit", "")