[v2,2/2] dts: port over unified packet suite

Message ID 20240823202244.9184-3-dmarx@iol.unh.edu (mailing list archive)
State New
Delegated to: Paul Szczepanek
Headers
Series dts: port over unified packet type suite |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/loongarch-compilation success Compilation OK
ci/loongarch-unit-testing success Unit Testing PASS
ci/Intel-compilation success Compilation OK
ci/intel-Testing success Testing PASS
ci/intel-Functional success Functional PASS
ci/github-robot: build success github build: passed
ci/iol-compile-amd64-testing pending Testing pending
ci/iol-unit-amd64-testing success Testing PASS
ci/iol-compile-arm64-testing success Testing PASS
ci/iol-sample-apps-testing success Testing PASS
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-broadcom-Functional success Functional Testing PASS
ci/iol-marvell-Functional success Functional Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-intel-Performance success Performance Testing PASS

Commit Message

Dean Marx Aug. 23, 2024, 8:22 p.m. UTC
Port over unified packet testing suite from old DTS. This suite
tests the ability of the PMD to recognize valid or invalid packet flags.

Depends-on: Patch-143033
("dts: add text parser for testpmd verbose output")
Depends-on: Patch-142691
("dts: add send_packets to test suites and rework
packet addressing")
Depends-on: Patch-143005
("dts: add functions to testpmd shell")

Signed-off-by: Dean Marx <dmarx@iol.unh.edu>
---
 dts/tests/TestSuite_uni_pkt.py | 239 +++++++++++++++++++++++++++++++++
 1 file changed, 239 insertions(+)
 create mode 100644 dts/tests/TestSuite_uni_pkt.py
  

Comments

Jeremy Spewock Sept. 4, 2024, 7:23 p.m. UTC | #1
I just left some general formatting comments and a few idea that I
thought might be helpful, but it looks good to me in general:

Reviewed-by: Jeremy Spewock <jspewock@iol.unh.edu>

On Fri, Aug 23, 2024 at 4:22 PM Dean Marx <dmarx@iol.unh.edu> wrote:
>
> Port over unified packet testing suite from old DTS. This suite
> tests the ability of the PMD to recognize valid or invalid packet flags.
>
> Depends-on: Patch-143033
> ("dts: add text parser for testpmd verbose output")
> Depends-on: Patch-142691
> ("dts: add send_packets to test suites and rework
> packet addressing")
> Depends-on: Patch-143005
> ("dts: add functions to testpmd shell")
>
> Signed-off-by: Dean Marx <dmarx@iol.unh.edu>
> ---
<snip>
> +
> +from scapy.packet import Packet  # type: ignore[import-untyped]
> +from scapy.layers.vxlan import VXLAN  # type: ignore[import-untyped]
> +from scapy.contrib.nsh import NSH  # type: ignore[import-untyped]
> +from scapy.layers.inet import IP, ICMP, TCP, UDP, GRE  # type: ignore[import-untyped]
> +from scapy.layers.l2 import Ether, ARP  # type: ignore[import-untyped]
> +from scapy.packet import Raw

I think this import also needs a type: ignore comment, but regardless
it is probably better to combine it with the other file that imports
from the `scapy.packet` library to keep things concise.

> +from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment  # type: ignore[import-untyped]
> +from scapy.layers.sctp import SCTP, SCTPChunkData  # type: ignore[import-untyped]
> +
> +from framework.remote_session.testpmd_shell import (SimpleForwardingModes, TestPmdShell,
> +                                                    RtePTypes, TestPmdVerbosePacket)

There isn't anything wrong with breaking lines like this, but it is a
little different than how the formatting script would separate it and
how it is done in other parts of the framework. It might be worth
putting the newline after the opening parenthesis to match how it is
done elsewhere.

> +from framework.test_suite import TestSuite
> +
> +
> +class TestUniPkt(TestSuite):
> +    """DPDK Unified packet test suite.
> +
> +    This testing suite uses testpmd's verbose output hardware/software
> +    packet type field to verify the ability of the driver to recognize
> +    unified packet types when receiving different packets.
> +
> +    """
> +
> +    def set_up_suite(self) -> None:
> +        """Set up the test suite.
> +
> +        Setup:
> +            Verify that at least two ports are open for session.
> +        """
> +        self.verify(len(self._port_links) > 1, "Not enough ports")
> +
> +    def send_packet_and_verify_flags(self, expected_flags: list[RtePTypes],
> +                                     packet: Packet, testpmd: TestPmdShell) -> None:

Same thing here as the comment above, generally these function
definitions elsewhere in the framework will break the lines at the
opening parenthesis and then again before the closing one.

Additionally, something that might save you some space/complexity in
this method is you can combine all of your flags into one rather than
storing them in a list, and compare the combinations rather than
looping through and comparing them one at a time. Since flag values
are really just bytes and combinations of flags are always unique, if
expected_flags were just the type RtePTypes, instead of looping
through all of the individual values you could have your boolean check
be something like:

any(packet.dst_mac == "00:00:00:00:00:01" and (expected_flags in
packet.hw_ptype or expected_flags in packet.sw_ptype) for packet in
verbose_output)

and that "in" statement will check for you that all of the flags that
are part of expected_flags are present in the sw_ptype and hw_ptype
flags.

> +        """Sends a packet to the DUT and verifies the verbose ptype flags."""
> +        testpmd.start()
> +        self.send_packet_and_capture(packet=packet)
> +        verbose_output = testpmd.extract_verbose_output(testpmd.stop())
> +        valid = self.check_for_matching_packet(output=verbose_output, flags=expected_flags)
> +        self.verify(valid, f"Packet type flag did not match the expected flag: {expected_flags}.")
> +
> +    def check_for_matching_packet(self, output: list[TestPmdVerbosePacket],
> +                                  flags: list[RtePTypes]) -> bool:
> +        """Returns :data:`True` if the packet in verbose output contains all specified flags."""
> +        for packet in output:
> +            if packet.dst_mac == "00:00:00:00:00:01":
> +                for flag in flags:
> +                    if (flag not in packet.hw_ptype and flag not in packet.sw_ptype):
> +                        return False
> +        return True

This might just be personal preference too, but I think it is a little
easier to read if methods are defined above the one where they are
used.

> +
> +    def setup_session(self, testpmd: TestPmdShell) -> None:
> +        """Sets the forwarding and verbose mode of each test case interactive shell session."""
> +        testpmd.set_forward_mode(SimpleForwardingModes.rxonly)
> +        testpmd.set_verbose(level=1)
> +
> +    def test_l2_packet_detect(self) -> None:
> +        """Verify the correct flags are shown in verbose output when sending L2 packets."""
> +        mac_id = "00:00:00:00:00:01"
> +        packet_list = [
> +            Ether(dst=mac_id, type=0x88f7) / Raw(),
> +            Ether(dst=mac_id) / ARP() / Raw()
> +        ]
> +        flag_list = [
> +            [RtePTypes.L2_ETHER_TIMESYNC],
> +            [RtePTypes.L2_ETHER_ARP]
> +        ]

I think this idea of having two lists where the indices line up is
very intuitive and I like how it makes things simple to pass to the
send and verify method. One thing that I thought of that could be
interesting (and feel free to not do it this way, I just thought it
was a cool idea so I figured I'd share) is you could make the packets
and the flags a tuple, and then you could just unpack the tuple when
you call the function. The only downside to this is the lines could
get a little long in some of the later test cases because the list of
flags is long, but you might be able to get around that by having a
variable for common flags that are present in items in the list,  and
then just appending to the end of that. Something like:

params_list = [
    (Ether(dst=mac_id, type=0x88f7) / Raw(), RtePTypes.L2_ETHER_TIMESYNC),
    ...
]
with TestPmdShell(node=self.sut_node) as testpmd:
            self.setup_session(testpmd=testpmd)
            for p in params_list:
                self.send_packet_and_verify_flags(*p, testpmd=testpmd)

> +        with TestPmdShell(node=self.sut_node) as testpmd:
> +            self.setup_session(testpmd=testpmd)
> +            for i in range(0, len(packet_list)):
> +                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
> +                                                  packet=packet_list[i], testpmd=testpmd)
> +

Do you think it would be worth splitting this part of creating testpmd
and running the verify function into a helper method so that you don't
have to repeat yourself in these test cases? You could even put it all
in setup_testpmd and just rename it to something like
`run_flag_testing` and have it accept the two lists of packets and
flags as input.

> +
> +    def test_l3_l4_packet_detect(self) -> None:
> +        """Verify correct flags are shown in the verbose output when sending IP/L4 packets."""
> +        mac_id = "00:00:00:00:00:01"
> +        packet_list = [
> +            Ether(dst=mac_id) / IP() / Raw(),
> +            Ether(dst=mac_id) / IP() / UDP() / Raw(),
> +            Ether(dst=mac_id) / IP() / TCP() / Raw(),
> +            Ether(dst=mac_id) / IP() / SCTP() / Raw(),
> +            Ether(dst=mac_id) / IP() / ICMP() / Raw(),
> +            Ether(dst=mac_id) / IP(frag=5) / TCP() / Raw(),
> +        ]
> +        flag_list = [
> +            [RtePTypes.L3_IPV4, RtePTypes.L2_ETHER],

If you decided to make this a flag instead of a list like I had
mentioned above, this line would become:

RtePTypes.L3_IPV4 | RtePTypes.L2_ETHER,

> +            [RtePTypes.L4_UDP],
> +            [RtePTypes.L4_TCP],
> +            [RtePTypes.L4_SCTP],
> +            [RtePTypes.L4_ICMP],
> +            [RtePTypes.L4_FRAG, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L2_ETHER]

And this one would be:

RtePTypes.L4_FRAG | RtePTypes.L3_IPV4_EXT_UNKNOWN | RtePTypes.L2_ETHER

> +        ]
> +        with TestPmdShell(node=self.sut_node) as testpmd:
> +            self.setup_session(testpmd=testpmd)
> +            for i in range(0, len(packet_list)):
> +                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
> +                                                  packet=packet_list[i], testpmd=testpmd)
> +
<snip>
> 2.44.0
>
  

Patch

diff --git a/dts/tests/TestSuite_uni_pkt.py b/dts/tests/TestSuite_uni_pkt.py
new file mode 100644
index 0000000000..90a8f35fa1
--- /dev/null
+++ b/dts/tests/TestSuite_uni_pkt.py
@@ -0,0 +1,239 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2024 University of New Hampshire
+
+"""Unified packet type flag testing suite.
+
+According to DPDK documentation, each Poll Mode Driver should reserve 32 bits
+of packet headers for unified packet type flags. These flags serve as an
+identifier for user applications, and are divided into subcategories:
+L2, L3, L4, tunnel, inner L2, inner L3, and inner L4 types.
+This suite verifies the ability of the driver to recognize these types.
+
+"""
+
+from scapy.packet import Packet  # type: ignore[import-untyped]
+from scapy.layers.vxlan import VXLAN  # type: ignore[import-untyped]
+from scapy.contrib.nsh import NSH  # type: ignore[import-untyped]
+from scapy.layers.inet import IP, ICMP, TCP, UDP, GRE  # type: ignore[import-untyped]
+from scapy.layers.l2 import Ether, ARP  # type: ignore[import-untyped]
+from scapy.packet import Raw
+from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment  # type: ignore[import-untyped]
+from scapy.layers.sctp import SCTP, SCTPChunkData  # type: ignore[import-untyped]
+
+from framework.remote_session.testpmd_shell import (SimpleForwardingModes, TestPmdShell,
+                                                    RtePTypes, TestPmdVerbosePacket)
+from framework.test_suite import TestSuite
+
+
+class TestUniPkt(TestSuite):
+    """DPDK Unified packet test suite.
+
+    This testing suite uses testpmd's verbose output hardware/software
+    packet type field to verify the ability of the driver to recognize
+    unified packet types when receiving different packets.
+
+    """
+
+    def set_up_suite(self) -> None:
+        """Set up the test suite.
+
+        Setup:
+            Verify that at least two ports are open for session.
+        """
+        self.verify(len(self._port_links) > 1, "Not enough ports")
+
+    def send_packet_and_verify_flags(self, expected_flags: list[RtePTypes],
+                                     packet: Packet, testpmd: TestPmdShell) -> None:
+        """Sends a packet to the DUT and verifies the verbose ptype flags."""
+        testpmd.start()
+        self.send_packet_and_capture(packet=packet)
+        verbose_output = testpmd.extract_verbose_output(testpmd.stop())
+        valid = self.check_for_matching_packet(output=verbose_output, flags=expected_flags)
+        self.verify(valid, f"Packet type flag did not match the expected flag: {expected_flags}.")
+
+    def check_for_matching_packet(self, output: list[TestPmdVerbosePacket],
+                                  flags: list[RtePTypes]) -> bool:
+        """Returns :data:`True` if the packet in verbose output contains all specified flags."""
+        for packet in output:
+            if packet.dst_mac == "00:00:00:00:00:01":
+                for flag in flags:
+                    if (flag not in packet.hw_ptype and flag not in packet.sw_ptype):
+                        return False
+        return True
+
+    def setup_session(self, testpmd: TestPmdShell) -> None:
+        """Sets the forwarding and verbose mode of each test case interactive shell session."""
+        testpmd.set_forward_mode(SimpleForwardingModes.rxonly)
+        testpmd.set_verbose(level=1)
+
+    def test_l2_packet_detect(self) -> None:
+        """Verify the correct flags are shown in verbose output when sending L2 packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id, type=0x88f7) / Raw(),
+            Ether(dst=mac_id) / ARP() / Raw()
+        ]
+        flag_list = [
+            [RtePTypes.L2_ETHER_TIMESYNC],
+            [RtePTypes.L2_ETHER_ARP]
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)
+
+
+    def test_l3_l4_packet_detect(self) -> None:
+        """Verify correct flags are shown in the verbose output when sending IP/L4 packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id) / IP() / Raw(),
+            Ether(dst=mac_id) / IP() / UDP() / Raw(),
+            Ether(dst=mac_id) / IP() / TCP() / Raw(),
+            Ether(dst=mac_id) / IP() / SCTP() / Raw(),
+            Ether(dst=mac_id) / IP() / ICMP() / Raw(),
+            Ether(dst=mac_id) / IP(frag=5) / TCP() / Raw(),
+        ]
+        flag_list = [
+            [RtePTypes.L3_IPV4, RtePTypes.L2_ETHER],
+            [RtePTypes.L4_UDP],
+            [RtePTypes.L4_TCP],
+            [RtePTypes.L4_SCTP],
+            [RtePTypes.L4_ICMP],
+            [RtePTypes.L4_FRAG, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L2_ETHER]
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)
+
+    def test_ipv6_l4_packet_detect(self) -> None:
+        """Verify correct flags are shown in the verbose output when sending IPv6/L4 packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id) / IPv6() / Raw(),
+            Ether(dst=mac_id) / IPv6() / UDP() / Raw(),
+            Ether(dst=mac_id) / IPv6() / TCP() / Raw(),
+            Ether(dst=mac_id) / IPv6() / IPv6ExtHdrFragment() / Raw()
+        ]
+        flag_list = [
+            [RtePTypes.L2_ETHER, RtePTypes.L3_IPV6],
+            [RtePTypes.L4_UDP],
+            [RtePTypes.L4_TCP],
+            [RtePTypes.L3_IPV6_EXT_UNKNOWN]
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)
+
+    def test_l3_tunnel_packet_detect(self) -> None:
+        """Verify correct flags are shown in the verbose output when sending IPv6/L4 packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id) / IP() / IP(frag=5) / UDP() / Raw(),
+            Ether(dst=mac_id) / IP() / IP() / Raw(),
+            Ether(dst=mac_id) / IP() / IP() / UDP() / Raw(),
+            Ether(dst=mac_id) / IP() / IP() / TCP() / Raw(),
+            Ether(dst=mac_id) / IP() / IP() / SCTP() / Raw(),
+            Ether(dst=mac_id) / IP() / IP() / ICMP() / Raw(),
+            Ether(dst=mac_id) / IP() / IPv6() / IPv6ExtHdrFragment() / Raw()
+        ]
+        flag_list = [
+            [RtePTypes.TUNNEL_IP, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.INNER_L4_FRAG],
+            [RtePTypes.TUNNEL_IP, RtePTypes.INNER_L4_NONFRAG],
+            [RtePTypes.TUNNEL_IP, RtePTypes.INNER_L4_UDP],
+            [RtePTypes.TUNNEL_IP, RtePTypes.INNER_L4_TCP],
+            [RtePTypes.TUNNEL_IP, RtePTypes.INNER_L4_SCTP],
+            [RtePTypes.TUNNEL_IP, RtePTypes.INNER_L4_ICMP],
+            [RtePTypes.TUNNEL_IP, RtePTypes.INNER_L3_IPV6_EXT_UNKNOWN, RtePTypes.INNER_L4_FRAG]
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)
+
+    def test_gre_tunnel_packet_detect(self) -> None:
+        """Verify the correct flags are shown in the verbose output when sending GRE packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id) / IP() / GRE() / IP(frag=5) / Raw(),
+            Ether(dst=mac_id) / IP() / GRE() / IP() / Raw(),
+            Ether(dst=mac_id) / IP() / GRE() / IP() / UDP() / Raw(),
+            Ether(dst=mac_id) / IP() / GRE() / IP() / TCP() / Raw(),
+            Ether(dst=mac_id) / IP() / GRE() / IP() / SCTP() / Raw(),
+            Ether(dst=mac_id) / IP() / GRE() / IP() / ICMP() / Raw()
+        ]
+        flag_list = [
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_FRAG, RtePTypes.INNER_L3_IPV4_EXT_UNKNOWN],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_NONFRAG],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_UDP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_TCP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_SCTP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_ICMP]
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)
+
+    def test_vxlan_tunnel_packet_detect(self) -> None:
+        """Verify the correct flags are shown in the verbose output when sending VXLAN packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether() / IP(frag=5) / Raw(),
+            Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether() / IP() / Raw(),
+            Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether() / IP() / UDP() / Raw(),
+            Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether() / IP() / TCP() / Raw(),
+            Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether() / IP() / SCTP() / Raw(),
+            Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether() / IP() / ICMP() / Raw(),
+            (Ether(dst=mac_id) / IP() / UDP() / VXLAN() / Ether()
+             / IPv6() / IPv6ExtHdrFragment() / Raw())
+        ]
+        flag_list = [
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_FRAG, RtePTypes.INNER_L3_IPV4_EXT_UNKNOWN],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_NONFRAG],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_UDP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_TCP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_SCTP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L4_ICMP],
+            [RtePTypes.TUNNEL_GRENAT, RtePTypes.INNER_L3_IPV6_EXT_UNKNOWN, RtePTypes.INNER_L4_FRAG]
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            testpmd.rx_vxlan(vxlan_id=4789, port_id=0, add=True)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)
+
+    def test_nsh_packet_detect(self) -> None:
+        """Verify the correct flags are shown in the verbose output when sending NSH packets."""
+        mac_id = "00:00:00:00:00:01"
+        packet_list = [
+            Ether(dst=mac_id, type=0x894f) / NSH() / IP(),
+            Ether(dst=mac_id, type=0x894f) / NSH() / IP() / ICMP(),
+            Ether(dst=mac_id, type=0x894f) / NSH() / IP(frag=1, flags="MF"),
+            Ether(dst=mac_id, type=0x894f) / NSH() / IP() / TCP(),
+            Ether(dst=mac_id, type=0x894f) / NSH() / IP() / UDP(),
+            Ether(dst=mac_id, type=0x894f) / NSH() / IP() / SCTP(tag=1) / SCTPChunkData(data='x'),
+            Ether(dst=mac_id, type=0x894f) / NSH() / IPv6()
+        ]
+        flag_list = [
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L4_NONFRAG],
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L4_ICMP],
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L4_FRAG],
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L4_TCP],
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L4_UDP],
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV4_EXT_UNKNOWN, RtePTypes.L4_SCTP],
+            [RtePTypes.L2_ETHER_NSH, RtePTypes.L3_IPV6_EXT_UNKNOWN, RtePTypes.L4_NONFRAG],
+        ]
+        with TestPmdShell(node=self.sut_node) as testpmd:
+            self.setup_session(testpmd=testpmd)
+            for i in range(0, len(packet_list)):
+                self.send_packet_and_verify_flags(expected_flags=flag_list[i],
+                                                  packet=packet_list[i], testpmd=testpmd)