[4/5] dts: add `show port info` command to TestPmdShell

Message ID 20240412111136.3470304-5-luca.vizzarro@arm.com (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series dts: testpmd show port info/stats |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Luca Vizzarro April 12, 2024, 11:11 a.m. UTC
  Add a new TestPmdPort data structure to represent the output
returned by `show port info`, which is implemented as part of
TestPmdShell.

The TestPmdPort data structure and its derived classes are modelled
based on the relevant testpmd source code.

This implementation makes extensive use of regular expressions, which
all parse individually. The rationale behind this is to lower the risk
of the testpmd output changing as part of development. Therefore
minimising breakage.

Bugzilla ID: 1407

Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>
---
 dts/framework/remote_session/testpmd_shell.py | 473 +++++++++++++++++-
 1 file changed, 472 insertions(+), 1 deletion(-)
  

Comments

Juraj Linkeš April 16, 2024, 9:03 a.m. UTC | #1
On Fri, Apr 12, 2024 at 1:11 PM Luca Vizzarro <luca.vizzarro@arm.com> wrote:
>
> Add a new TestPmdPort data structure to represent the output
> returned by `show port info`, which is implemented as part of
> TestPmdShell.
>
> The TestPmdPort data structure and its derived classes are modelled
> based on the relevant testpmd source code.
>
> This implementation makes extensive use of regular expressions, which
> all parse individually. The rationale behind this is to lower the risk
> of the testpmd output changing as part of development. Therefore
> minimising breakage.
>
> Bugzilla ID: 1407
>
> Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
> Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>

<snip>

> +@dataclass
> +class TestPmdPort(TextParser):

This and the classes above are missing docstrings.

<snip>

> @@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
>                  f"Test pmd failed to set fwd mode to {mode.value}"
>              )
>
> +    def show_port_info_all(self) -> list[TestPmdPort]:
> +        """Returns the information of all the ports."""

Can we add sample output so that the format of what we're trying to
parse is clear?

> +        output = self.send_command("show port info all")
> +
> +        ports = []
> +        iter = re.finditer(r"\*+.+\*+", output)
> +        if next(iter, False):  # we are slicing retrospectively, skip first block
> +            start_pos = 0
> +            for block in iter:
> +                end_pos = block.start()
> +                ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
> +                start_pos = end_pos
> +
> +            ports.append(TestPmdPort.parse(output[start_pos:]))
> +
> +        return ports

Can this be done the same way it's done in the last commit?

iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*$", output, re.MULTILINE)
return [TestPmdPortStats.parse(block.group(1)) for block in iter]

Looks much better.

> +
> +    def show_port_info(self, port_id: int) -> TestPmdPort:
> +        """Returns the given port information.
> +
> +        Args:
> +            port_id: The port ID to gather information for.
> +
> +        Raises:
> +            InteractiveCommandExecutionError: If `port_id` is invalid.
> +        """
> +        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
> +        if output.startswith("Invalid port"):
> +            raise InteractiveCommandExecutionError("invalid port given")
> +
> +        return TestPmdPort.parse(output)
> +
>      def close(self) -> None:
>          """Overrides :meth:`~.interactive_shell.close`."""
>          self.send_command("quit", "")
> --
> 2.34.1
>
  
Luca Vizzarro April 16, 2024, 12:24 p.m. UTC | #2
On 16/04/2024 10:03, Juraj Linkeš wrote:
>> +@dataclass
>> +class TestPmdPort(TextParser):
> 
> This and the classes above are missing docstrings.

Ack.

>> @@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
>>                   f"Test pmd failed to set fwd mode to {mode.value}"
>>               )
>>
>> +    def show_port_info_all(self) -> list[TestPmdPort]:
>> +        """Returns the information of all the ports."""
> 
> Can we add sample output so that the format of what we're trying to
> parse is clear?

Ack.

>> +        output = self.send_command("show port info all")
>> +
>> +        ports = []
>> +        iter = re.finditer(r"\*+.+\*+", output)
>> +        if next(iter, False):  # we are slicing retrospectively, skip first block
>> +            start_pos = 0
>> +            for block in iter:
>> +                end_pos = block.start()
>> +                ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
>> +                start_pos = end_pos
>> +
>> +            ports.append(TestPmdPort.parse(output[start_pos:]))
>> +
>> +        return ports
> 
> Can this be done the same way it's done in the last commit?
> 
> iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*$", output, re.MULTILINE)
> return [TestPmdPortStats.parse(block.group(1)) for block in iter]
> 
> Looks much better.

I agree that it looks much better. I gave it a first attempt to come up 
with a regular expression that is not too complicated and is able to 
match blocks individually. I've noticed that blocks start with:

   \n********* Infos for port X ************

but don't have an actual ending delimiter, unlike for the stats. I'll 
experiment with some look ahead constructs. The easiest solution is to 
match everything that is not * ([^*]+) but can we be certain that there 
won't be any asterisk in the actual information?
  
Juraj Linkeš April 17, 2024, 1:22 p.m. UTC | #3
On Tue, Apr 16, 2024 at 2:24 PM Luca Vizzarro <Luca.Vizzarro@arm.com> wrote:
>
> On 16/04/2024 10:03, Juraj Linkeš wrote:
> >> +@dataclass
> >> +class TestPmdPort(TextParser):
> >
> > This and the classes above are missing docstrings.
>
> Ack.
>
> >> @@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
> >>                   f"Test pmd failed to set fwd mode to {mode.value}"
> >>               )
> >>
> >> +    def show_port_info_all(self) -> list[TestPmdPort]:
> >> +        """Returns the information of all the ports."""
> >
> > Can we add sample output so that the format of what we're trying to
> > parse is clear?
>
> Ack.
>
> >> +        output = self.send_command("show port info all")
> >> +
> >> +        ports = []
> >> +        iter = re.finditer(r"\*+.+\*+", output)
> >> +        if next(iter, False):  # we are slicing retrospectively, skip first block
> >> +            start_pos = 0
> >> +            for block in iter:
> >> +                end_pos = block.start()
> >> +                ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
> >> +                start_pos = end_pos
> >> +
> >> +            ports.append(TestPmdPort.parse(output[start_pos:]))
> >> +
> >> +        return ports
> >
> > Can this be done the same way it's done in the last commit?
> >
> > iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*$", output, re.MULTILINE)
> > return [TestPmdPortStats.parse(block.group(1)) for block in iter]
> >
> > Looks much better.
>
> I agree that it looks much better. I gave it a first attempt to come up
> with a regular expression that is not too complicated and is able to
> match blocks individually. I've noticed that blocks start with:
>
>    \n********* Infos for port X ************
>
> but don't have an actual ending delimiter, unlike for the stats.

Ah, so that's the difference and the reason. I guess the ending
delimiter is either the start of the next section of the prompt (or
the end of the string).

> I'll
> experiment with some look ahead constructs. The easiest solution is to
> match everything that is not * ([^*]+) but can we be certain that there
> won't be any asterisk in the actual information?

We can't. But we can be reasonably certain there won't be five
consecutive asterisks, so maybe we can work with that.
  
Luca Vizzarro April 17, 2024, 2:25 p.m. UTC | #4
On 17/04/2024 14:22, Juraj Linkeš wrote:
>> I agree that it looks much better. I gave it a first attempt to come up
>> with a regular expression that is not too complicated and is able to
>> match blocks individually. I've noticed that blocks start with:
>>
>>     \n********* Infos for port X ************
>>
>> but don't have an actual ending delimiter, unlike for the stats.
> 
> Ah, so that's the difference and the reason. I guess the ending
> delimiter is either the start of the next section of the prompt (or
> the end of the string).

Yes, but it's not as trivial unfortunately. In the current code I am 
effectively just finding all the start positions and slice.

>> I'll
>> experiment with some look ahead constructs. The easiest solution is to
>> match everything that is not * ([^*]+) but can we be certain that there
>> won't be any asterisk in the actual information?
> 
> We can't. But we can be reasonably certain there won't be five
> consecutive asterisks, so maybe we can work with that.

We can work with that by using look ahead constructs as mentioned, which 
can be quite intensive. For example:

   /(?<=\n\*).*?(?=\n\*|$)/gs

looks for the start delimiter and for the start of the next block or the 
end. This works perfectly! But it's performing 9576 steps (!) for just 
two ports. The current solution only takes 10 steps in total.
  
Luca Vizzarro April 17, 2024, 3:29 p.m. UTC | #5
On 17/04/2024 15:25, Luca Vizzarro wrote:
> On 17/04/2024 14:22, Juraj Linkeš wrote:
>>> I'll
>>> experiment with some look ahead constructs. The easiest solution is to
>>> match everything that is not * ([^*]+) but can we be certain that there
>>> won't be any asterisk in the actual information?
>>
>> We can't. But we can be reasonably certain there won't be five
>> consecutive asterisks, so maybe we can work with that.
> 
> We can work with that by using look ahead constructs as mentioned, which 
> can be quite intensive. For example:
> 
>    /(?<=\n\*).*?(?=\n\*|$)/gs
> 
> looks for the start delimiter and for the start of the next block or the 
> end. This works perfectly! But it's performing 9576 steps (!) for just 
> two ports. The current solution only takes 10 steps in total.

Thinking of it... we are not really aiming for performance, so I guess 
if it simplifies and it's justifiable, then it's not a problem. 
Especially since this command shouldn't be called continuosly.

The equivalent /\n\*.+?(?=\n\*|$)/gs (but slightly more optimised) takes 
approximately 3*input_length steps to run (according to regex101 at 
least). If that's reasonable enough, I can do this:

   iter = re.finditer(input, "\n\*.+?(?=\n\*|$)", re.S)
   return [TestPmdPortInfo.parse(match.group(0)) for match in iter]

Another optimization is artificially adding a `\n*` delimiter at the end 
before feeding it to the regex, thus removing the alternative case (|$), 
and making it 2*len steps:

   input += "\n*"
   iter = re.finditer(input, "\n\*.+?(?=\n\*)", re.S)
   return [TestPmdPortInfo.parse(match.group(0)) for match in iter]

Let me know what you think!

Best,
Luca
  
Juraj Linkeš April 18, 2024, 6:41 a.m. UTC | #6
On Wed, Apr 17, 2024 at 5:29 PM Luca Vizzarro <Luca.Vizzarro@arm.com> wrote:
>
> On 17/04/2024 15:25, Luca Vizzarro wrote:
> > On 17/04/2024 14:22, Juraj Linkeš wrote:
> >>> I'll
> >>> experiment with some look ahead constructs. The easiest solution is to
> >>> match everything that is not * ([^*]+) but can we be certain that there
> >>> won't be any asterisk in the actual information?
> >>
> >> We can't. But we can be reasonably certain there won't be five
> >> consecutive asterisks, so maybe we can work with that.
> >
> > We can work with that by using look ahead constructs as mentioned, which
> > can be quite intensive. For example:
> >
> >    /(?<=\n\*).*?(?=\n\*|$)/gs
> >
> > looks for the start delimiter and for the start of the next block or the
> > end. This works perfectly! But it's performing 9576 steps (!) for just
> > two ports. The current solution only takes 10 steps in total.
>
> Thinking of it... we are not really aiming for performance, so I guess
> if it simplifies and it's justifiable, then it's not a problem.
> Especially since this command shouldn't be called continuosly.
>

We have to weigh the pros and cons on an individual basis. In this
case, the output is going to be short so basically any solution is
going to be indistinguishable from any other, performance wise.

> The equivalent /\n\*.+?(?=\n\*|$)/gs (but slightly more optimised) takes
> approximately 3*input_length steps to run (according to regex101 at
> least). If that's reasonable enough, I can do this:
>
>    iter = re.finditer(input, "\n\*.+?(?=\n\*|$)", re.S)
>    return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>
> Another optimization is artificially adding a `\n*` delimiter at the end
> before feeding it to the regex, thus removing the alternative case (|$),
> and making it 2*len steps:
>
>    input += "\n*"
>    iter = re.finditer(input, "\n\*.+?(?=\n\*)", re.S)
>    return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>

I like this second one a bit more. How does the performance change if
we try to match four asterisks "\n\****.+?(?=\n\****)"? Four asterisks
shouldn't randomly be in the output as that's basically another
delimited.

And we should document this in the docstring - sample output, then
explain the extra characters and the regex itself. We shouldn't forget
this in the other commit as well (show port stats).

> Let me know what you think!
>
> Best,
> Luca
  
Luca Vizzarro April 18, 2024, 10:52 a.m. UTC | #7
On 18/04/2024 07:41, Juraj Linkeš wrote:
>> The equivalent /\n\*.+?(?=\n\*|$)/gs (but slightly more optimised) takes
>> approximately 3*input_length steps to run (according to regex101 at
>> least). If that's reasonable enough, I can do this:
>>
>>     iter = re.finditer(input, "\n\*.+?(?=\n\*|$)", re.S)
>>     return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>>
>> Another optimization is artificially adding a `\n*` delimiter at the end
>> before feeding it to the regex, thus removing the alternative case (|$),
>> and making it 2*len steps:
>>
>>     input += "\n*"
>>     iter = re.finditer(input, "\n\*.+?(?=\n\*)", re.S)
>>     return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>>
> 
> I like this second one a bit more. How does the performance change if
> we try to match four asterisks "\n\****.+?(?=\n\****)"? Four asterisks
> shouldn't randomly be in the output as that's basically another
> delimited.

The difference is negligible as the regex walks every character anyways 
– while there is a match. Either \* or \*{4} will result in a match. The 
problem is that if an attempt of match fails, the regex backtracks, this 
is where the problem is. Of course if we already matched 4 asterisks, 
then the backtracking can skip the whole sequence at once (compared to 
1), but it's only going to be 3 steps less per **** found. It's a bigger 
difference if we attempt to match all the asterisks. The lookahead 
construct also increases backtracking.

In the meantime, still by amending the output, I've got a solution that 
doesn't perform any look aheads:

   \*{21}.+?\n{2}

instead of treating blocks as they are (\n******<snip>\n), we can add an 
extra \n at the end and treat the blocks as: ******<snip>\n\n. Basically 
this assumes that an empty line is the end delimiter. This takes 
input_length/2 steps!

Of course in reality every \n is \r\n, as I've discovered that when 
shells are invoked using paramiko, the stream becomes CRLF for some 
reason I haven't explored. I think this was worth mentioning for 
everybody, in case surprise carriage returns may reveal disruptive.

> And we should document this in the docstring - sample output, then
> explain the extra characters and the regex itself. We shouldn't forget
> this in the other commit as well (show port stats).

Ack.
  

Patch

diff --git a/dts/framework/remote_session/testpmd_shell.py b/dts/framework/remote_session/testpmd_shell.py
index cb2ab6bd00..3cf123ff57 100644
--- a/dts/framework/remote_session/testpmd_shell.py
+++ b/dts/framework/remote_session/testpmd_shell.py
@@ -1,6 +1,7 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2023 University of New Hampshire
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
+# Copyright(c) 2024 Arm Limited
 
 """Testpmd interactive shell.
 
@@ -15,14 +16,19 @@ 
     testpmd_shell.close()
 """
 
+from dataclasses import dataclass, field
+import re
 import time
-from enum import auto
+from enum import Flag, auto
 from pathlib import PurePath
 from typing import Callable, ClassVar
+from typing_extensions import Self
 
 from framework.exception import InteractiveCommandExecutionError
 from framework.settings import SETTINGS
 from framework.utils import StrEnum
+from framework import parser
+from framework.parser import TextParser
 
 from .interactive_shell import InteractiveShell
 
@@ -80,6 +86,439 @@  class TestPmdForwardingModes(StrEnum):
     recycle_mbufs = auto()
 
 
+class VLANOffloadFlag(Flag):
+    #:
+    STRIP = auto()
+    #:
+    FILTER = auto()
+    #:
+    EXTEND = auto()
+    #:
+    QINQ_STRIP = auto()
+
+    @classmethod
+    def from_str_dict(cls, d):
+        """Makes an instance from a dictionary containing the flag member names with an "on" value."""
+        flag = cls(0)
+        for name in cls.__members__:
+            if d.get(name) == "on":
+                flag |= cls[name]
+        return flag
+
+
+class RSSOffloadTypesFlag(Flag):
+    #:
+    ipv4 = auto()
+    #:
+    ipv4_frag = auto()
+    #:
+    ipv4_tcp = auto()
+    #:
+    ipv4_udp = auto()
+    #:
+    ipv4_sctp = auto()
+    #:
+    ipv4_other = auto()
+    #:
+    ipv6 = auto()
+    #:
+    ipv6_frag = auto()
+    #:
+    ipv6_tcp = auto()
+    #:
+    ipv6_udp = auto()
+    #:
+    ipv6_sctp = auto()
+    #:
+    ipv6_other = auto()
+    #:
+    l2_payload = auto()
+    #:
+    ipv6_ex = auto()
+    #:
+    ipv6_tcp_ex = auto()
+    #:
+    ipv6_udp_ex = auto()
+    #:
+    port = auto()
+    #:
+    vxlan = auto()
+    #:
+    geneve = auto()
+    #:
+    nvgre = auto()
+    #:
+    user_defined_22 = auto()
+    #:
+    gtpu = auto()
+    #:
+    eth = auto()
+    #:
+    s_vlan = auto()
+    #:
+    c_vlan = auto()
+    #:
+    esp = auto()
+    #:
+    ah = auto()
+    #:
+    l2tpv3 = auto()
+    #:
+    pfcp = auto()
+    #:
+    pppoe = auto()
+    #:
+    ecpri = auto()
+    #:
+    mpls = auto()
+    #:
+    ipv4_chksum = auto()
+    #:
+    l4_chksum = auto()
+    #:
+    l2tpv2 = auto()
+    #:
+    ipv6_flow_label = auto()
+    #:
+    user_defined_38 = auto()
+    #:
+    user_defined_39 = auto()
+    #:
+    user_defined_40 = auto()
+    #:
+    user_defined_41 = auto()
+    #:
+    user_defined_42 = auto()
+    #:
+    user_defined_43 = auto()
+    #:
+    user_defined_44 = auto()
+    #:
+    user_defined_45 = auto()
+    #:
+    user_defined_46 = auto()
+    #:
+    user_defined_47 = auto()
+    #:
+    user_defined_48 = auto()
+    #:
+    user_defined_49 = auto()
+    #:
+    user_defined_50 = auto()
+    #:
+    user_defined_51 = auto()
+    #:
+    l3_pre96 = auto()
+    #:
+    l3_pre64 = auto()
+    #:
+    l3_pre56 = auto()
+    #:
+    l3_pre48 = auto()
+    #:
+    l3_pre40 = auto()
+    #:
+    l3_pre32 = auto()
+    #:
+    l2_dst_only = auto()
+    #:
+    l2_src_only = auto()
+    #:
+    l4_dst_only = auto()
+    #:
+    l4_src_only = auto()
+    #:
+    l3_dst_only = auto()
+    #:
+    l3_src_only = auto()
+
+    #:
+    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
+    #:
+    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
+    #:
+    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
+    #:
+    sctp = ipv4_sctp | ipv6_sctp
+    #:
+    tunnel = vxlan | geneve | nvgre
+    #:
+    vlan = s_vlan | c_vlan
+    #:
+    all = (
+        eth
+        | vlan
+        | ip
+        | tcp
+        | udp
+        | sctp
+        | l2_payload
+        | l2tpv3
+        | esp
+        | ah
+        | pfcp
+        | gtpu
+        | ecpri
+        | mpls
+        | l2tpv2
+    )
+
+    @classmethod
+    def from_list_string(cls, names: str) -> Self:
+        flag = cls(0)
+        for name in names.split():
+            flag |= cls.from_str(name)
+        return flag
+
+    @classmethod
+    def from_str(cls, name: str) -> Self:
+        member_name = name.strip().replace("-", "_")
+        return cls[member_name]
+
+    def __str__(self):
+        return self.name.replace("_", "-")
+
+
+class DeviceCapabilitiesFlag(Flag):
+    RUNTIME_RX_QUEUE_SETUP = auto()
+    """Device supports Rx queue setup after device started."""
+    RUNTIME_TX_QUEUE_SETUP = auto()
+    """Device supports Tx queue setup after device started."""
+    RXQ_SHARE = auto()
+    """Device supports shared Rx queue among ports within Rx domain and switch domain."""
+    FLOW_RULE_KEEP = auto()
+    """Device supports keeping flow rules across restart."""
+    FLOW_SHARED_OBJECT_KEEP = auto()
+    """Device supports keeping shared flow objects across restart."""
+
+
+class DeviceErrorHandlingMode(StrEnum):
+    #:
+    none = auto()
+    #:
+    passive = auto()
+    #:
+    proactive = auto()
+    #:
+    unknown = auto()
+
+
+def _validate_device_private_info(info: str) -> str | None:
+    """Ensure that we are not parsing invalid device private info output."""
+    info = info.strip()
+    if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
+        return None
+    return info
+
+
+@dataclass
+class TestPmdPort(TextParser):
+    #:
+    id: int = field(metadata=parser.to_int(parser.regex(r"Infos for port (\d+)\b")))
+    #:
+    device_name: str = field(metadata=parser.regex(r"Device name: ([^\r\n]+)"))
+    #:
+    driver_name: str = field(metadata=parser.regex(r"Driver name: ([^\r\n]+)"))
+    #:
+    socket_id: int = field(metadata=parser.to_int(parser.regex(r"Connect to socket: (\d+)")))
+    #:
+    is_link_up: bool = field(metadata=parser.eq("up", parser.regex(r"Link status: (up|down)")))
+    #:
+    link_speed: str = field(metadata=parser.regex(r"Link speed: ([^\r\n]+)"))
+    #:
+    is_link_full_duplex: bool = field(
+        metadata=parser.eq("full", parser.regex(r"Link duplex: (full|half)-duplex"))
+    )
+    #:
+    is_link_autonegotiated: bool = field(
+        metadata=parser.to_bool(parser.regex(r"Autoneg status: (On|Off)"))
+    )
+    #:
+    is_promiscuous_mode_enabled: bool = field(
+        metadata=parser.to_bool(parser.regex(r"Promiscuous mode: (enabled|disabled)"))
+    )
+    #:
+    is_allmulticast_mode_enabled: bool = field(
+        metadata=parser.to_bool(parser.regex(r"Allmulticast mode: (enabled|disabled)"))
+    )
+    #: Maximum number of MAC addresses
+    max_mac_addresses_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Maximum number of MAC addresses: (\d+)"))
+    )
+    #: Maximum configurable length of RX packet
+    max_hash_mac_addresses_num: int = field(
+        metadata=parser.to_int(
+            parser.regex(r"Maximum number of MAC addresses of hash filtering: (\d+)")
+        )
+    )
+    #: Minimum size of RX buffer
+    min_rx_bufsize: int = field(
+        metadata=parser.to_int(parser.regex(r"Minimum size of RX buffer: (\d+)"))
+    )
+    #: Maximum configurable length of RX packet
+    max_rx_packet_length: int = field(
+        metadata=parser.to_int(parser.regex(r"Maximum configurable length of RX packet: (\d+)"))
+    )
+    #: Maximum configurable size of LRO aggregated packet
+    max_lro_packet_size: int = field(
+        metadata=parser.to_int(
+            parser.regex(r"Maximum configurable size of LRO aggregated packet: (\d+)")
+        )
+    )
+
+    #: Current number of RX queues
+    rx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Current number of RX queues: (\d+)"))
+    )
+    #: Max possible RX queues
+    max_rx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible RX queues: (\d+)"))
+    )
+    #: Max possible number of RXDs per queue
+    max_queue_rxd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible number of RXDs per queue: (\d+)"))
+    )
+    #: Min possible number of RXDs per queue
+    min_queue_rxd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Min possible number of RXDs per queue: (\d+)"))
+    )
+    #: RXDs number alignment
+    rxd_alignment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"RXDs number alignment: (\d+)"))
+    )
+
+    #: Current number of TX queues
+    tx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Current number of TX queues: (\d+)"))
+    )
+    #: Max possible TX queues
+    max_tx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible TX queues: (\d+)"))
+    )
+    #: Max possible number of TXDs per queue
+    max_queue_txd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible number of TXDs per queue: (\d+)"))
+    )
+    #: Min possible number of TXDs per queue
+    min_queue_txd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Min possible number of TXDs per queue: (\d+)"))
+    )
+    #: TXDs number alignment
+    txd_alignment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"TXDs number alignment: (\d+)"))
+    )
+    #: Max segment number per packet
+    max_packet_segment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max segment number per packet: (\d+)"))
+    )
+    #: Max segment number per MTU/TSO
+    max_mtu_segment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max segment number per MTU\/TSO: (\d+)"))
+    )
+
+    #:
+    device_capabilities: DeviceCapabilitiesFlag = field(
+        metadata=parser.chain(
+            DeviceCapabilitiesFlag,
+            parser.to_int(parser.regex(r"Device capabilities: (0x[A-Fa-f\d]+)")),
+        )
+    )
+    #:
+    device_error_handling_mode: DeviceErrorHandlingMode = field(
+        metadata=parser.chain(
+            DeviceErrorHandlingMode, parser.regex(r"Device error handling mode: (\w+)")
+        )
+    )
+    #:
+    device_private_info: str | None = field(
+        default=None,
+        metadata=parser.chain(
+            _validate_device_private_info,
+            parser.regex(r"Device private info:\s+([\s\S]+)", re.MULTILINE),
+        ),
+    )
+
+    #:
+    hash_key_size: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Hash key size in bytes: (\d+)"))
+    )
+    #:
+    redirection_table_size: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Redirection table size: (\d+)"))
+    )
+    #:
+    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
+        default=RSSOffloadTypesFlag(0),
+        metadata=parser.chain(
+            RSSOffloadTypesFlag.from_list_string,
+            parser.regex(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
+        ),
+    )
+
+    #:
+    mac_address: str | None = field(
+        default=None, metadata=parser.regex(r"MAC address: ([A-Fa-f0-9:]+)")
+    )
+    #:
+    fw_version: str | None = field(
+        default=None, metadata=parser.regex(r"Firmware-version: ([^\r\n]+)")
+    )
+    #:
+    dev_args: str | None = field(default=None, metadata=parser.regex(r"Devargs: ([^\r\n]+)"))
+    #: Socket id of the memory allocation
+    mem_alloc_socket_id: int | None = field(
+        default=None,
+        metadata=parser.to_int(parser.regex(r"memory allocation on the socket: (\d+)")),
+    )
+    #:
+    mtu: int | None = field(default=None, metadata=parser.to_int(parser.regex(r"MTU: (\d+)")))
+
+    #:
+    vlan_offload: VLANOffloadFlag | None = field(
+        default=None,
+        metadata=parser.chain(
+            VLANOffloadFlag.from_str_dict,
+            parser.regex(
+                r"VLAN offload:\s+"
+                r"strip (?P<STRIP>on|off), "
+                r"filter (?P<FILTER>on|off), "
+                r"extend (?P<EXTEND>on|off), "
+                r"qinq strip (?P<QINQ_STRIP>on|off)$",
+                re.MULTILINE,
+                named=True,
+            ),
+        ),
+    )
+
+    #: Maximum size of RX buffer
+    max_rx_bufsize: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Maximum size of RX buffer: (\d+)"))
+    )
+    #: Maximum number of VFs
+    max_vfs_num: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Maximum number of VFs: (\d+)"))
+    )
+    #: Maximum number of VMDq pools
+    max_vmdq_pools_num: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Maximum number of VMDq pools: (\d+)"))
+    )
+
+    #:
+    switch_name: str | None = field(default=None, metadata=parser.regex(r"Switch name: ([\r\n]+)"))
+    #:
+    switch_domain_id: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Switch domain Id: (\d+)"))
+    )
+    #:
+    switch_port_id: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Switch Port Id: (\d+)"))
+    )
+    #:
+    switch_rx_domain: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Switch Rx domain: (\d+)"))
+    )
+
+
 class TestPmdShell(InteractiveShell):
     """Testpmd interactive shell.
 
@@ -225,6 +664,38 @@  def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
                 f"Test pmd failed to set fwd mode to {mode.value}"
             )
 
+    def show_port_info_all(self) -> list[TestPmdPort]:
+        """Returns the information of all the ports."""
+        output = self.send_command("show port info all")
+
+        ports = []
+        iter = re.finditer(r"\*+.+\*+", output)
+        if next(iter, False):  # we are slicing retrospectively, skip first block
+            start_pos = 0
+            for block in iter:
+                end_pos = block.start()
+                ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
+                start_pos = end_pos
+
+            ports.append(TestPmdPort.parse(output[start_pos:]))
+
+        return ports
+
+    def show_port_info(self, port_id: int) -> TestPmdPort:
+        """Returns the given port information.
+
+        Args:
+            port_id: The port ID to gather information for.
+
+        Raises:
+            InteractiveCommandExecutionError: If `port_id` is invalid.
+        """
+        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
+        if output.startswith("Invalid port"):
+            raise InteractiveCommandExecutionError("invalid port given")
+
+        return TestPmdPort.parse(output)
+
     def close(self) -> None:
         """Overrides :meth:`~.interactive_shell.close`."""
         self.send_command("quit", "")