[V2,2/6] tests/checksum_offload: Refactor checksum_offload case.

Message ID 20230718012629.2471811-3-ke1.xu@intel.com (mailing list archive)
State Superseded
Headers
Series Rewrite test suite for checksum_offload, tso and vf_offload |

Commit Message

Ke Xu July 18, 2023, 1:26 a.m. UTC
Update checksum_offload scripts.

1. Using common test method from offload_common to update test cases and test scopes. Using HW and SW to describe checksum configuration. Add cases for enabling HW IP UDP TCP SCTP seperately. Add cases for inner HW enabled with enabling HW outer IP UDP seperately.

2. Update cases for NIC comaptibility. Add description for VLAN and AVX512 to fit specific testing scenarios required by several NICs.

3. RX scopes and offload flags are combined with varied packet type TX cases for better covering and more common validation.

Signed-off-by: Ke Xu <ke1.xu@intel.com>
---
 tests/TestSuite_checksum_offload.py | 1788 ++++++++++++++-------------
 1 file changed, 962 insertions(+), 826 deletions(-)
  

Patch

diff --git a/tests/TestSuite_checksum_offload.py b/tests/TestSuite_checksum_offload.py
index 0214231c..206c8b1f 100644
--- a/tests/TestSuite_checksum_offload.py
+++ b/tests/TestSuite_checksum_offload.py
@@ -1,5 +1,5 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
-# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2010-2023 Intel Corporation
 # Copyright(c) 2018-2019 The University of New Hampshire
 #
 
@@ -26,32 +26,206 @@  from scapy.utils import rdpcap, wrpcap
 
 import framework.packet as packet
 import framework.utils as utils
-from framework.exception import VerifyFailure
+import tests.offload_common as offload
+from framework.dut import Dut
+from framework.exception import VerifyFailure, VerifySkip
 from framework.pktgen import PacketGeneratorHelper
 from framework.pmd_output import PmdOutput
 from framework.rst import RstReport
 from framework.settings import FOLDERS
 from framework.test_capabilities import DRIVER_TEST_LACK_CAPA
-from framework.test_case import TestCase
+from framework.test_case import (
+    TestCase,
+    check_supported_nic,
+    skip_unsupported,
+    skip_unsupported_host_driver,
+    skip_unsupported_nic,
+    skip_unsupported_pkg,
+)
+
+VM_CORES_MASK = "all"
+DEFAULT_MTU = 1500
+
+CSUM_NON_TUNNEL_PACKETS = [
+    func(offload.CSUM_INNER_PACKET_PART[_type])
+    for _type in [
+        "IP/UDP",
+        "IP/TCP",
+        "IP/SCTP",
+        "IPv6/UDP",
+        "IPv6/TCP",
+        "IPv6/SCTP",
+    ]
+    for func in [
+        offload.get_packet_with_bad_no_checksum,
+        offload.get_packet_with_bad_ip_checksum,
+        offload.get_packet_with_bad_l4_checksum,
+        offload.get_packet_with_bad_all_checksum,
+    ]
+    for _ in range(offload.TX_REPEAT)
+]
 
-l3_proto_classes = [IP, IPv6]
+CSUM_TUNNEL_PACKETS = [
+    func(
+        offload.OUTER_PACKET_PART[_o_type] / offload.CSUM_INNER_PACKET_PART[_i_type]
+        if _o_type
+        else offload.CSUM_INNER_PACKET_PART[_i_type]
+    )
+    for _o_type in [
+        "IP/VXLAN",
+        "IPv6/VXLAN",
+        "IP/NVGRE",
+        "IPv6/NVGRE",
+    ]
+    for _i_type in [
+        "IP/UDP",
+        "IP/TCP",
+        "IP/SCTP",
+        "IPv6/UDP",
+        "IPv6/TCP",
+        "IPv6/SCTP",
+    ]
+    for func in [
+        offload.get_packet_with_bad_no_checksum,
+        offload.get_packet_with_bad_ip_checksum,
+        offload.get_packet_with_bad_l4_checksum,
+        offload.get_packet_with_bad_outer_ip_checksum,
+        offload.get_packet_with_bad_outer_l4_checksum,
+        offload.get_packet_with_bad_all_checksum,
+    ]
+    for _ in range(offload.TX_REPEAT)
+]
 
-l4_proto_classes = [
-    UDP,
-    TCP,
+CSUM_TUNNEL_COMMONPKG_PACKETS = [
+    func(
+        offload.OUTER_PACKET_PART[_o_type] / offload.CSUM_INNER_PACKET_PART[_i_type]
+        if _o_type
+        else offload.CSUM_INNER_PACKET_PART[_i_type]
+    )
+    for _o_type in [
+        "IP/VXLAN",
+        "IPv6/VXLAN",
+        "IP/VXLAN-GPE",
+        "IPv6/VXLAN-GPE",
+        "IP/VXLAN-GPE/Ether",
+        "IPv6/VXLAN-GPE/Ether",
+        "IP/GRE",
+        "IPv6/GRE",
+        "IP/GRE/Ether",
+        "IPv6/GRE/Ether",
+        "IP/NVGRE",
+        "IPv6/NVGRE",
+        "IP/GTPU",
+        "IPv6/GTPU",
+    ]
+    for _i_type in [
+        "IP/UDP",
+        "IP/TCP",
+        "IP/SCTP",
+        "IPv6/UDP",
+        "IPv6/TCP",
+        "IPv6/SCTP",
+    ]
+    for func in [
+        offload.get_packet_with_bad_no_checksum,
+        offload.get_packet_with_bad_ip_checksum,
+        offload.get_packet_with_bad_l4_checksum,
+        offload.get_packet_with_bad_outer_ip_checksum,
+        offload.get_packet_with_bad_outer_l4_checksum,
+        offload.get_packet_with_bad_all_checksum,
+    ]
+    for _ in range(offload.TX_REPEAT)
 ]
 
-tunnelling_proto_classes = [
-    VXLAN,
-    GRE,
+CSUM_VLAN_NON_TUNNEL_PACKETS = [
+    func(offload.CSUM_INNER_PACKET_PART[_type])
+    for _type in [
+        "VLAN/IP/UDP",
+        "VLAN/IP/TCP",
+        "VLAN/IP/SCTP",
+        "VLAN/IPv6/UDP",
+        "VLAN/IPv6/TCP",
+        "VLAN/IPv6/SCTP",
+    ]
+    for func in [
+        offload.get_packet_with_bad_no_checksum,
+        offload.get_packet_with_bad_ip_checksum,
+        offload.get_packet_with_bad_l4_checksum,
+        offload.get_packet_with_bad_all_checksum,
+    ]
+    for _ in range(offload.TX_REPEAT)
 ]
 
-l3_protos = ["IP", "IPv6"]
+CSUM_VLAN_TUNNEL_PACKETS = [
+    func(
+        offload.OUTER_PACKET_PART[_o_type] / offload.CSUM_INNER_PACKET_PART[_i_type]
+        if _o_type
+        else offload.CSUM_INNER_PACKET_PART[_i_type]
+    )
+    for _o_type in [
+        "VLAN/IP/VXLAN",
+        "VLAN/IPv6/VXLAN",
+        "VLAN/IP/NVGRE",
+        "VLAN/IPv6/NVGRE",
+    ]
+    for _i_type in [
+        "IP/UDP",
+        "IP/TCP",
+        "IP/SCTP",
+        "IPv6/UDP",
+        "IPv6/TCP",
+        "IPv6/SCTP",
+    ]
+    for func in [
+        offload.get_packet_with_bad_no_checksum,
+        offload.get_packet_with_bad_ip_checksum,
+        offload.get_packet_with_bad_l4_checksum,
+        offload.get_packet_with_bad_outer_ip_checksum,
+        offload.get_packet_with_bad_outer_l4_checksum,
+        offload.get_packet_with_bad_all_checksum,
+    ]
+    for _ in range(offload.TX_REPEAT)
+]
 
-l4_protos = [
-    "UDP",
-    "TCP",
-    "SCTP",
+CSUM_VLAN_TUNNEL_COMMONPKG_PACKETS = [
+    func(
+        offload.OUTER_PACKET_PART[_o_type] / offload.CSUM_INNER_PACKET_PART[_i_type]
+        if _o_type
+        else offload.CSUM_INNER_PACKET_PART[_i_type]
+    )
+    for _o_type in [
+        "VLAN/IP/VXLAN",
+        "VLAN/IPv6/VXLAN",
+        "VLAN/IP/VXLAN-GPE",
+        "VLAN/IPv6/VXLAN-GPE",
+        "VLAN/IP/VXLAN-GPE/Ether",
+        "VLAN/IPv6/VXLAN-GPE/Ether",
+        "VLAN/IP/GRE",
+        "VLAN/IPv6/GRE",
+        "VLAN/IP/GRE/Ether",
+        "VLAN/IPv6/GRE/Ether",
+        "VLAN/IP/NVGRE",
+        "VLAN/IPv6/NVGRE",
+        "VLAN/IP/GTPU",
+        "VLAN/IPv6/GTPU",
+    ]
+    for _i_type in [
+        "IP/UDP",
+        "IP/TCP",
+        "IP/SCTP",
+        "IPv6/UDP",
+        "IPv6/TCP",
+        "IPv6/SCTP",
+    ]
+    for func in [
+        offload.get_packet_with_bad_no_checksum,
+        offload.get_packet_with_bad_ip_checksum,
+        offload.get_packet_with_bad_l4_checksum,
+        offload.get_packet_with_bad_outer_ip_checksum,
+        offload.get_packet_with_bad_outer_l4_checksum,
+        offload.get_packet_with_bad_all_checksum,
+    ]
+    for _ in range(offload.TX_REPEAT)
 ]
 
 
@@ -61,10 +235,24 @@  class TestChecksumOffload(TestCase):
         Run at the start of each test suite.
         Checksum offload prerequisites.
         """
+
         # Based on h/w type, choose how many ports to use
         self.dut_ports = self.dut.get_ports(self.nic)
         # Verify that enough ports are available
         self.verify(len(self.dut_ports) >= 1, "Insufficient ports for testing")
+        self.dut_pci = [
+            self.dut.ports_info[self.dut_ports[0]]["pci"],
+            self.dut.ports_info[self.dut_ports[1]]["pci"],
+        ]
+        self.tester_ports = [
+            self.tester.get_local_port(self.dut_ports[0]),
+            self.tester.get_local_port(self.dut_ports[1]),
+        ]
+        self.tester_intf = [
+            self.tester.get_interface(self.tester_ports[0]),
+            self.tester.get_interface(self.tester_ports[1]),
+        ]
+        self.bind_nic_driver(self.dut_ports, "vfio-pci")
         self.pmdout: PmdOutput = PmdOutput(self.dut)
         self.portMask = utils.create_mask([self.dut_ports[0]])
         self.ports_socket = self.dut.get_numa_id(self.dut_ports[0])
@@ -77,624 +265,795 @@  class TestChecksumOffload(TestCase):
         # log debug used
         self.count = 0
 
+        # supported packets
+        self.packets_csum_non_tunnel = []
+        self.packets_csum_tunnel = []
+        self.support_rx_tunnel = False
+        if self.kdriver in {"ice", "i40e"}:
+            self.packets_csum_non_tunnel.extend(CSUM_NON_TUNNEL_PACKETS)
+            self.packets_csum_non_tunnel.extend(CSUM_VLAN_NON_TUNNEL_PACKETS)
+        else:
+            self.packets_csum_non_tunnel.extend(CSUM_NON_TUNNEL_PACKETS)
+        if (
+            self.kdriver in {"ice", "i40e"}
+            and self.pkg
+            and self.pkg.get("type")
+            and "comm" in self.pkg.get("type")
+        ):
+            self.packets_csum_tunnel.extend(CSUM_TUNNEL_COMMONPKG_PACKETS)
+            self.packets_csum_tunnel.extend(CSUM_VLAN_TUNNEL_COMMONPKG_PACKETS)
+            self.support_rx_tunnel = True
+        elif self.kdriver in {"ice", "i40e"}:
+            self.packets_csum_tunnel.extend(CSUM_TUNNEL_PACKETS)
+            self.packets_csum_tunnel.extend(CSUM_VLAN_TUNNEL_PACKETS)
+            self.support_rx_tunnel = True
+        else:
+            self.packets_csum_tunnel.extend(CSUM_TUNNEL_PACKETS)
+            self.support_rx_tunnel = True
+        _pkgtype = self.pkg.get("type")
+        self.is_ice = self.kdriver == "ice"
+        self.is_ice_ddp_os = self.is_ice and _pkgtype and "os default" in _pkgtype.lower()
+        self.is_ice_ddp_comms = self.is_ice and _pkgtype and "comms" in _pkgtype.lower()
+        self.is_i40e = self.kdriver == "i40e"
+
     def set_up(self):
         """
         Run before each test case.
         """
-        self.pmdout.start_testpmd(
-            "Default",
-            "--portmask=%s " % (self.portMask)
-            + " --enable-rx-cksum "
-            + "--port-topology=loop",
-            socket=self.ports_socket,
-        )
-        self.dut.send_expect("set verbose 1", "testpmd>")
-        self.dut.send_expect("set fwd csum", "testpmd>")
-        self.dut.send_expect("csum mac-swap off 0", "testpmd>")
-
-    def checksum_enablehw(self, port):
-        self.dut.send_expect("port stop all", "testpmd>")
-        self.dut.send_expect("rx_vxlan_port add 4789 0 ", "testpmd>")
-        self.dut.send_expect("csum set ip hw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set udp hw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set tcp hw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set sctp hw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set outer-ip hw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set outer-udp hw %d" % port, "testpmd>")
-        self.dut.send_expect("csum parse-tunnel on %d" % port, "testpmd>")
-        self.dut.send_expect("port start all", "testpmd>")
-
-    def checksum_enablesw(self, port):
-        self.dut.send_expect("port stop all", "testpmd>")
-        self.dut.send_expect("csum set ip sw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set udp sw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set tcp sw %d" % port, "testpmd>")
-        self.dut.send_expect("csum set sctp sw %d" % port, "testpmd>")
-        self.dut.send_expect("port start all", "testpmd>")
-
-    def get_chksum_values(self, packets_expected):
-        """
-        Validate the checksum flags.
-        """
-        checksum_pattern = re.compile("chksum.*=.*(0x[0-9a-z]+)")
-
-        chksum = dict()
-
-        self.tester.send_expect("scapy", ">>> ")
-
-        for packet_type in list(packets_expected.keys()):
-            self.tester.send_expect("p = %s" % packets_expected[packet_type], ">>>")
-            out = self.tester.send_command("p.show2()", timeout=1)
-            chksums = checksum_pattern.findall(out)
-            chksum[packet_type] = chksums
-
-        self.tester.send_expect("exit()", "#")
+        pass
 
-        return chksum
+    def test_checksum_hw_ip(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
+        )
 
-    def checksum_valid_flags(self, packets_sent, flag):
-        """
-        Sends packets and check the checksum valid-flags.
-        """
-        self.dut.send_expect("start", "testpmd>")
-        tx_interface = self.tester.get_interface(
-            self.tester.get_local_port(self.dut_ports[0])
+    def test_checksum_hw_udp(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set udp hw 1",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
         )
-        for packet_type in list(packets_sent.keys()):
-            self.pkt = packet.Packet(pkt_str=packets_sent[packet_type])
-            self.pkt.send_pkt(self.tester, tx_interface, count=4)
-            out = self.dut.get_session_output(timeout=1)
-            lines = out.split("\r\n")
 
-            # collect the checksum result
-            for line in lines:
-                line = line.strip()
-                if len(line) != 0 and line.startswith("rx"):
-                    # IPv6 don't be checksum, so always show "GOOD"
-                    if packet_type.startswith("IPv6"):
-                        if "RTE_MBUF_F_RX_L4_CKSUM" not in line:
-                            self.verify(0, "There is no checksum flags appeared!")
-                        else:
-                            if flag == 1:
-                                self.verify(
-                                    "RTE_MBUF_F_RX_L4_CKSUM_GOOD" in line,
-                                    "Packet Rx L4 checksum valid-flags error!",
-                                )
-                            elif flag == 0:
-                                self.verify(
-                                    "RTE_MBUF_F_RX_L4_CKSUM_BAD" in line
-                                    or "RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN" in line,
-                                    "Packet Rx L4 checksum valid-flags error!",
-                                )
-                    else:
-                        if "RTE_MBUF_F_RX_L4_CKSUM" not in line:
-                            self.verify(0, "There is no L4 checksum flags appeared!")
-                        elif "RTE_MBUF_F_RX_IP_CKSUM" not in line:
-                            self.verify(0, "There is no IP checksum flags appeared!")
-                        else:
-                            if flag == 1:
-                                self.verify(
-                                    "RTE_MBUF_F_RX_L4_CKSUM_GOOD" in line,
-                                    "Packet Rx L4 checksum valid-flags error!",
-                                )
-                                self.verify(
-                                    "RTE_MBUF_F_RX_IP_CKSUM_GOOD" in line,
-                                    "Packet Rx IP checksum valid-flags error!",
-                                )
-                            elif flag == 0:
-                                self.verify(
-                                    "RTE_MBUF_F_RX_L4_CKSUM_BAD" in line
-                                    or "RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN" in line,
-                                    "Packet Rx L4 checksum valid-flags error!",
-                                )
-                                self.verify(
-                                    "RTE_MBUF_F_RX_IP_CKSUM_BAD" in line,
-                                    "Packet Rx IP checksum valid-flags error!",
-                                )
+    def test_checksum_hw_tcp(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set tcp hw 1",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
+        )
 
-        self.dut.send_expect("stop", "testpmd>")
+    def test_checksum_hw_sctp(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set sctp hw 1",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+        )
 
-    def checksum_validate(self, packets_sent, packets_expected):
-        """
-        Validate the checksum.
-        """
-        tx_interface = self.tester.get_interface(
-            self.tester.get_local_port(self.dut_ports[0])
+    def test_checksum_hw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
         )
-        rx_interface = self.tester.get_interface(
-            self.tester.get_local_port(self.dut_ports[0])
+
+    def test_checksum_tunnel_hw_outer_ip(self):
+        _ddp_type = self.pkg.get("type")
+        if self.kdriver == "ice" and (not _ddp_type or "os" in _ddp_type.lower()):
+            raise VerifySkip("OS Default DDP does not support this case")
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "csum set outer-ip hw 1",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_outer_udp=True,
+            allow_tx_bad_outer_udp=(not self.is_ice_ddp_comms),
+            allow_rx_bad_outer_l4=(not self.is_ice_ddp_comms),
         )
 
-        sniff_src = "52:00:00:00:00:00"
-        result = dict()
+    def test_checksum_tunnel_hw_outer_udp(self):
+        _ddp_type = self.pkg.get("type")
+        if self.kdriver == "ice" and (not _ddp_type or "os" in _ddp_type.lower()):
+            raise VerifySkip("OS Default DDP does not support this case")
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "csum set outer-udp hw 1",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_bad_outer_udp=(not self.is_ice_ddp_comms),
+            allow_rx_bad_outer_l4=(not self.is_ice_ddp_comms),
+        )
 
-        chksum = self.get_chksum_values(packets_expected)
-        inst = self.tester.tcpdump_sniff_packets(
-            intf=rx_interface,
-            count=len(packets_sent) * 4,
-            filters=[{"layer": "ether", "config": {"src": sniff_src}}],
+    def test_checksum_tunnel_hw_outer_sw_inner(self):
+        _ddp_type = self.pkg.get("type")
+        if self.kdriver == "ice" and (not _ddp_type or "os" in _ddp_type.lower()):
+            raise VerifySkip("OS Default DDP does not support this case")
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set outer-ip hw 1",
+                "csum set outer-udp hw 1",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
+            allow_tx_bad_outer_udp=(not self.is_ice_ddp_comms),
+            allow_rx_bad_outer_l4=(not self.is_ice_ddp_comms),
         )
 
-        self.pkt = packet.Packet()
-        for packet_type in list(packets_sent.keys()):
-            self.pkt.append_pkt(packets_sent[packet_type])
-        self.pkt.send_pkt(crb=self.tester, tx_port=tx_interface, count=4)
+    def test_checksum_tunnel_hw_all(self):
+        _ddp_type = self.pkg.get("type")
+        if self.kdriver == "ice" and (not _ddp_type or "os" in _ddp_type.lower()):
+            raise VerifySkip("OS Default DDP does not support this case")
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "csum set outer-ip hw 1",
+                "csum set outer-udp hw 1",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_bad_outer_udp=(not self.is_ice_ddp_comms),
+            allow_rx_bad_outer_l4=(not self.is_ice_ddp_comms),
+        )
 
-        p = self.tester.load_tcpdump_sniff_packets(inst)
-        nr_packets = len(p)
-        print(p)
-        packets_received = [
-            p[i].sprintf("%IP.chksum%;%TCP.chksum%;%UDP.chksum%;%SCTP.chksum%")
-            for i in range(nr_packets)
+    @check_supported_nic(
+        [
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
         ]
-        print(len(packets_sent), len(packets_received))
-        self.verify(
-            len(packets_sent) * 4 == len(packets_received), "Unexpected Packets Drop"
+    )
+    def test_checksum_tunnel_hw_all_avx512(self):
+        _ddp_type = self.pkg.get("type")
+        if self.kdriver == "ice" and (not _ddp_type or "os" in _ddp_type.lower()):
+            raise VerifySkip("OS Default DDP does not support this case")
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=512,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "csum set outer-ip hw 1",
+                "csum set outer-udp hw 1",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_bad_outer_udp=(not self.is_ice_ddp_comms),
+            allow_rx_bad_outer_l4=(not self.is_ice_ddp_comms),
         )
 
-        for packet_received in packets_received:
-            (
-                ip_checksum,
-                tcp_checksum,
-                udp_checksum,
-                sctp_checksum,
-            ) = packet_received.split(";")
-
-            packet_type = ""
-            l4_checksum = ""
-            if tcp_checksum != "??":
-                packet_type = "TCP"
-                l4_checksum = tcp_checksum
-            elif udp_checksum != "??":
-                packet_type = "UDP"
-                l4_checksum = udp_checksum
-            elif sctp_checksum != "??":
-                packet_type = "SCTP"
-                l4_checksum = sctp_checksum
-
-            if ip_checksum != "??":
-                packet_type = "IP/" + packet_type
-                if chksum[packet_type] != [ip_checksum, l4_checksum]:
-                    result[packet_type] = packet_type + " checksum error"
-            else:
-                packet_type = "IPv6/" + packet_type
-                if chksum[packet_type] != [l4_checksum]:
-                    result[packet_type] = packet_type + " checksum error"
-
-        return result
-
-    def send_scapy_packet(self, packet: str):
-        itf = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
-
-        self.tester.scapy_foreground()
-        self.tester.scapy_append(f'sendp({packet}, iface="{itf}")')
-        return self.tester.scapy_execute()
-
-    def get_pkt_rx_l4_cksum(self, testpmd_output: str) -> bool:
-        return self.checksum_flags_are_good("RTE_MBUF_F_RX_L4_CKSUM_", testpmd_output)
-
-    def get_pkt_rx_ip_cksum(self, testpmd_output: str) -> bool:
-        return self.checksum_flags_are_good("RTE_MBUF_F_RX_IP_CKSUM_", testpmd_output)
-
-    def send_pkt_expect_good_bad_from_flag(
-        self, pkt_str: str, flag: str, test_name: str, should_pass: bool = True
-    ):
-        self.pmdout.get_output(timeout=1)  # Remove any old output
-        self.scapy_exec(f"sendp({pkt_str}, iface=iface)")
-        testpmd_output: str = self.pmdout.get_output(timeout=1)
-        self.verify(
-            flag in testpmd_output,
-            f"Flag {flag[:-1]} not found for test {test_name}, please run test_rx_checksum_valid_flags.",
-        )
-        self.verify(
-            (flag + "UNKNOWN") not in testpmd_output,
-            f"Flag {flag[:-1]} was found to be unknown for test {test_name}, indicating a possible lack of support",
+    def test_checksum_sw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
         )
-        if should_pass:
-            if flag + "GOOD" in testpmd_output:
-                return
-            else:  # flag + "BAD" in testpmd_output
-                self.verify(
-                    False, f"{flag}BAD was found in output, expecting {flag}GOOD."
-                )
-        else:
-            if flag + "BAD" in testpmd_output:
-                return
-            else:  # flag + "GOOD" in testpmd_output
-                self.verify(
-                    False, f"{flag}GOOD was found in output, expecting {flag}BAD."
-                )
 
-    def send_pkt_expect_good_bad_from_flag_catch_failure(
-        self, pkt_str: str, flag: str, test_name: str, should_pass: bool = True
-    ) -> Union[VerifyFailure, None]:
-        try:
-            self.send_pkt_expect_good_bad_from_flag(
-                pkt_str, flag, test_name, should_pass=should_pass
-            )
-        except VerifyFailure as vf:
-            return vf
-
-        return None
-
-    def validate_checksum(self, pkt, pkt_type, inner_flag=False) -> bool:
-        """
-        @param pkt: The packet to validate the checksum of.
-        @return: Whether the checksum was valid.
-        """
-        if pkt is None:
-            return False
-        for i in range(0, len(l3_protos)):
-            if l3_protos[i] in pkt:
-                l3 = l3_protos[i]
-        for j in range(0, len(l4_protos)):
-            if l4_protos[j] in pkt:
-                layer = l4_proto_classes[j]
-                csum = pkt[layer].chksum
-                if csum is None:
-                    csum = 0
-                del pkt[layer].chksum
-                # Converting it to raw will calculate the checksum
-                correct_csum = layer(bytes(Raw(pkt[layer]))).chksum
-                if correct_csum == csum:
-                    # checksum value is correct
-                    return False
-                else:
-                    if inner_flag:
-                        print(
-                            "{} pkg[{}] VXLAN/{}/{} inner checksum {} is not correct {}".format(
-                                pkt_type,
-                                self.count,
-                                l3,
-                                l4_protos[j],
-                                hex(correct_csum),
-                                hex(csum),
-                            )
-                        )
-                    else:
-                        print(
-                            "{} pkg[{}] {}/{} outer checksum {} is not correct {}".format(
-                                pkt_type,
-                                self.count,
-                                l3,
-                                l4_protos[j],
-                                hex(correct_csum),
-                                hex(csum),
-                            )
-                        )
-                    return True
-        return False
-
-    def scapy_exec(self, cmd: str, timeout=1) -> str:
-        return self.tester.send_expect(cmd, ">>>", timeout=timeout)
-
-    def get_packets(self, dut_mac, tester_mac):
-        eth = Ether(dst=dut_mac, src=tester_mac)
-        packets = []
-        checksum_options = (
-            {},
-            {"chksum": 0xF},
+    @skip_unsupported(reason="Tunnel SW checksum TX is not properly implemented.")
+    def test_checksum_tunnel_sw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
         )
-        # Untunneled
-        for l3 in l3_proto_classes:
-            for l4 in l4_proto_classes:
-                for chksum in checksum_options:
-                    # The packet's data can be used to identify how the packet was constructed so avoid any issues with
-                    # ordering
-                    pkt = eth / l3() / l4(**chksum) / (b"X" * 48)
-                    # Prevents the default behavior which adds DNS headers
-                    if l4 == UDP:
-                        pkt[UDP].dport, pkt[UDP].sport = 1001, 1001
-                    packets.append(pkt)
 
-        # Tunneled
-        # VXLAN
-        for l3 in l3_proto_classes:
-            for l4 in l4_proto_classes:
-                for outer_arg in checksum_options:
-                    for inner_arg in checksum_options:
-                        pkt = (
-                            eth
-                            / l3()
-                            / UDP(**outer_arg)
-                            / VXLAN()
-                            / Ether()
-                            / l3()
-                            / l4(**inner_arg)
-                            / (b"Y" * 48)
-                        )
-                        # Prevents the default behavior which adds DNS headers
-                        if l4 == UDP:
-                            pkt[VXLAN][UDP].dport, pkt[VXLAN][UDP].sport = 1001, 1001
-                        packets.append(pkt)
-        # GRE
-        for l3 in l3_proto_classes:
-            for l4 in l4_proto_classes:
-                for chksum in checksum_options:
-                    pkt = eth / l3() / GRE() / l3() / l4(**chksum) / (b"Z" * 48)
-                    # Prevents the default behavior which adds DNS headers
-                    if l4 == UDP:
-                        pkt[GRE][UDP].dport, pkt[GRE][UDP].sport = 1001, 1001
-                    packets.append(pkt)
-
-        return packets
-
-    def send_tx_package(
-        self, packet_file_path, capture_file_path, packets, iface, dut_mac
-    ):
-        if os.path.isfile(capture_file_path):
-            os.remove(capture_file_path)
-        src_mac = "52:00:00:00:00:00"
-        self.tester.send_expect(
-            f"tcpdump -i '{iface}' ether src {src_mac} -s 0 -w {capture_file_path} -Q in &",
-            "# ",
+    @skip_unsupported(reason="Tunnel SW checksum TX is not properly implemented.")
+    @check_supported_nic(
+        [
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
+        ]
+    )
+    def test_checksum_tunnel_sw_all_avx512(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.pmdout,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=512,
+            testpmd_log_level={"ice": 7, "iavf": 8},
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "csum parse-tunnel on 1",
+                "rx_vxlan_port add 4789 0",
+                "port start all",
+            ],
+            packets=self.packets_csum_non_tunnel + self.packets_csum_tunnel,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
         )
 
-        if os.path.isfile(packet_file_path):
-            os.remove(packet_file_path)
-        wrpcap(packet_file_path, packets)
-        self.tester.session.copy_file_to(packet_file_path, packet_file_path)
-
-        # send packet
-        self.tester.send_expect("scapy", ">>>")
-        self.scapy_exec(f"packets = rdpcap('{packet_file_path}')")
-        for i in range(0, len(packets)):
-            self.scapy_exec(f"packets[{i}].show")
-            self.scapy_exec(f"sendp(packets[{i}], iface='{iface}')")
-            self.pmdout.get_output(timeout=0.5)
-            self.dut.send_expect(
-                "show port stats {}".format(self.dut_ports[0]), "testpmd>"
-            )
-        self.tester.send_expect("quit()", "# ")
-
-        time.sleep(1)
-        self.tester.send_expect("killall tcpdump", "#")
-        time.sleep(1)
-        self.tester.send_expect('echo "Cleaning buffer"', "#")
-        time.sleep(1)
-        return
-
-    def validate_packet_list_checksums(self, packets):
-        error_messages = []
-        untunnelled_error_message = (
-            f"un-tunneled checksum state for pkg[%s] with a invalid checksum."
-        )
-        vxlan_error_message = (
-            f"VXLAN tunnelled checksum state for pkg[%s]  with a invalid checksum."
-        )
-        gre_error_message = (
-            f"GRE tunnelled checksum state for pkg[%s] with a invalid checksum."
+    @skip_unsupported_nic(
+        [
+            "I40E_10G-SFP_X710",
+            "I40E_40G-QSFP_A",
+            "I40E_40G-QSFP_B",
+            "I40E_25G-25G_SFP28",
+            "I40E_10G-SFP_X722",
+            "I40E_10G-10G_BASE_T_X722",
+            "I40E_10G-10G_BASE_T_BC",
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
+        ]
+    )
+    def test_checksum_vlan_hw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.vm0_testpmd,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "vlan set filter on 0",
+                "port start all",
+                "rx_vlan add 100 0",
+            ],
+            packets=CSUM_VLAN_NON_TUNNEL_PACKETS,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
         )
 
-        for packet in packets:
-            self.count = self.count + 1
-            payload: str
-            # try:
-            payload = packet[Raw].load.decode("utf-8")
-            # # This error usually happens with tunneling protocols, and means that an additional cast is needed
-            # except UnicodeDecodeError:
-            #     for proto in tunnelling_proto_classes:
-            if "X" in payload:
-                if self.validate_checksum(packet, "un-tunneled"):
-                    error_messages.append(untunnelled_error_message % self.count)
-            elif "Y" in payload:
-                if self.validate_checksum(
-                    packet[VXLAN][Ether], "VXLAN", inner_flag=True
-                ):
-                    error_messages.append(vxlan_error_message % self.count)
-                # IntelĀ® Ethernet 700 Series not support outer udp checksum
-                if self.is_eth_series_nic(700):
-                    continue
-                if self.validate_checksum(packet, "VXLAN"):
-                    error_messages.append(vxlan_error_message % self.count)
-            elif "Z" in payload:
-                if self.validate_checksum(packet, "GRE"):
-                    error_messages.append(gre_error_message % self.count)
-        return error_messages
-
-    #
-    #
-    #
-    # Test Cases
-    #
-    def test_checksum_offload_with_vlan(self):
-        """
-        Do not insert IPv4/IPv6 UDP/TCP checksum on the transmit packet.
-        Verify that the same number of packet are correctly received on the
-        traffic generator side.
-        """
-        # mac = self.dut.get_mac_address(self.dut_ports[0])
-        mac = "52:00:00:00:00:01"
-
-        pktsChkErr = {
-            "IP/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/UDP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IP/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/TCP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IP/SCTP": 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/SCTP(chksum=0x0)/("X"*48)'
-            % mac,
-            "IPv6/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IPv6(src="::1")/UDP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IPv6/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IPv6(src="::1")/TCP(chksum=0xf)/("X"*46)'
-            % mac,
-        }
-
-        pkts = {
-            "IP/UDP": 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/UDP()/("X"*46)'
-            % mac,
-            "IP/TCP": 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/TCP()/("X"*46)'
-            % mac,
-            "IP/SCTP": 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/SCTP()/("X"*48)'
-            % mac,
-            "IPv6/UDP": 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IPv6(src="::1")/UDP()/("X"*46)'
-            % mac,
-            "IPv6/TCP": 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IPv6(src="::1")/TCP()/("X"*46)'
-            % mac,
-        }
-
-        if self.kdriver in DRIVER_TEST_LACK_CAPA["sctp_tx_offload"]:
-            del pktsChkErr["IP/SCTP"]
-            del pkts["IP/SCTP"]
-
-        self.checksum_enablehw(self.dut_ports[0])
-        self.dut.send_expect("start", "testpmd>")
-        result = self.checksum_validate(pktsChkErr, pkts)
-        self.dut.send_expect("stop", "testpmd>")
-        self.verify(len(result) == 0, ",".join(list(result.values())))
-
-    def test_rx_checksum_valid_flags(self):
-        """
-        Insert right and wrong IPv4/IPv6 UDP/TCP/SCTP checksum on the
-        transmit packet.Enable Checksum offload.
-        Verify the checksum valid-flags.
-        """
-        mac = "52:00:00:00:00:01"
-
-        pkts_ref = {
-            "IP/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP()/UDP()/("X"*46)'
-            % mac,
-            "IP/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1")/TCP()/("X"*46)'
-            % mac,
-            "IP/SCTP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1")/SCTP()/("X"*48)'
-            % mac,
-            "IPv6/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP()/("X"*46)'
-            % mac,
-            "IPv6/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP()/("X"*46)'
-            % mac,
-        }
-
-        self.checksum_enablehw(self.dut_ports[0])
-
-        # get the packet checksum value
-        result = self.get_chksum_values(pkts_ref)
-
-        # set the expected checksum values same with the actual values
-        pkts_good = {
-            "IP/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=int(%s))/UDP(chksum=int(%s))/("X"*46)'
-            % (mac, result["IP/UDP"][0], result["IP/UDP"][1]),
-            "IP/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=int(%s))/TCP(chksum=int(%s))/("X"*46)'
-            % (mac, result["IP/TCP"][0], result["IP/TCP"][1]),
-            "IP/SCTP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=int(%s))/SCTP(chksum=int(%s))/("X"*48)'
-            % (mac, result["IP/SCTP"][0], result["IP/SCTP"][1]),
-            "IPv6/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP(chksum=int(%s))/("X"*46)'
-            % (mac, result["IPv6/UDP"][0]),
-            "IPv6/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP(chksum=int(%s))/("X"*46)'
-            % (mac, result["IPv6/TCP"][0]),
-        }
-
-        # set the expected checksum values different from the actual values
-        pkts_bad = {
-            "IP/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=0x0)/UDP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IP/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=0x0)/TCP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IP/SCTP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=0x0)/SCTP(chksum=0xf)/("X"*48)'
-            % mac,
-            "IPv6/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IPv6/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP(chksum=0xf)/("X"*46)'
-            % mac,
-        }
-
-        if self.kdriver in DRIVER_TEST_LACK_CAPA["sctp_tx_offload"]:
-            del pkts_good["IP/SCTP"]
-            del pkts_bad["IP/SCTP"]
-            del pkts_ref["IP/SCTP"]
-
-        # send the packet checksum value same with the expected value
-        self.checksum_valid_flags(pkts_good, 1)
-        # send the packet checksum value different from the expected value
-        self.checksum_valid_flags(pkts_bad, 0)
-
-    def test_insert_checksum_on_the_transmit_packet(self):
-        """
-        Insert IPv4/IPv6 UDP/TCP/SCTP checksum on the transmit packet.
-        Enable Checksum offload.
-        Verify that the same number of packet are correctly received on the
-        traffic generator side.
-        """
-        mac = "52:00:00:00:00:01"
-
-        pkts = {
-            "IP/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=0x0)/UDP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IP/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=0x0)/TCP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IP/SCTP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=0x0)/SCTP(chksum=0x0)/("X"*48)'
-            % mac,
-            "IPv6/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP(chksum=0xf)/("X"*46)'
-            % mac,
-            "IPv6/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP(chksum=0xf)/("X"*46)'
-            % mac,
-        }
-
-        pkts_ref = {
-            "IP/UDP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IP(src="127.0.0.1")/UDP()/("X"*46)'
-            % mac,
-            "IP/TCP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IP(src="127.0.0.1")/TCP()/("X"*46)'
-            % mac,
-            "IP/SCTP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IP(src="127.0.0.1")/SCTP()/("X"*48)'
-            % mac,
-            "IPv6/UDP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IPv6(src="::1")/UDP()/("X"*46)'
-            % mac,
-            "IPv6/TCP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IPv6(src="::1")/TCP()/("X"*46)'
-            % mac,
-        }
-
-        if self.kdriver in DRIVER_TEST_LACK_CAPA["sctp_tx_offload"]:
-            del pkts["IP/SCTP"]
-            del pkts_ref["IP/SCTP"]
-
-        self.checksum_enablehw(self.dut_ports[0])
-
-        self.dut.send_expect("start", "testpmd>")
-
-        result = self.checksum_validate(pkts, pkts_ref)
-
-        self.dut.send_expect("stop", "testpmd>")
-
-        self.verify(len(result) == 0, ",".join(list(result.values())))
+    @skip_unsupported_nic(
+        [
+            "I40E_10G-SFP_X710",
+            "I40E_40G-QSFP_A",
+            "I40E_40G-QSFP_B",
+            "I40E_25G-25G_SFP28",
+            "I40E_10G-SFP_X722",
+            "I40E_10G-10G_BASE_T_X722",
+            "I40E_10G-10G_BASE_T_BC",
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
+        ]
+    )
+    def test_checksum_vlan_tunnel_hw_outer_sw_inner(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.vm0_testpmd,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set outer-ip hw 1",
+                "csum set outer-udp hw 1",
+                "csum parse-tunnel on 1",
+                "vlan set filter on 0",
+                "port start all",
+                "rx_vlan add 100 0",
+            ],
+            packets=CSUM_VLAN_NON_TUNNEL_PACKETS + CSUM_VLAN_TUNNEL_PACKETS,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
+        )
 
-    def test_do_not_insert_checksum_on_the_transmit_packet(self):
-        """
-        Do not insert IPv4/IPv6 UDP/TCP checksum on the transmit packet.
-        Disable Checksum offload.
-        Verify that the same number of packet are correctly received on
-        the traffic generator side.
-        """
-        mac = "52:00:00:00:00:01"
-        sndIP = "10.0.0.1"
-        sndIPv6 = "::1"
-        sndPkts = {
-            "IP/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="%s",chksum=0x0)/UDP(chksum=0xf)/("X"*46)'
-            % (mac, sndIP),
-            "IP/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="%s",chksum=0x0)/TCP(chksum=0xf)/("X"*46)'
-            % (mac, sndIP),
-            "IPv6/UDP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="%s")/UDP(chksum=0xf)/("X"*46)'
-            % (mac, sndIPv6),
-            "IPv6/TCP": 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="%s")/TCP(chksum=0xf)/("X"*46)'
-            % (mac, sndIPv6),
-        }
+    @skip_unsupported_nic(
+        [
+            "I40E_10G-SFP_X710",
+            "I40E_40G-QSFP_A",
+            "I40E_40G-QSFP_B",
+            "I40E_25G-25G_SFP28",
+            "I40E_10G-SFP_X722",
+            "I40E_10G-10G_BASE_T_X722",
+            "I40E_10G-10G_BASE_T_BC",
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
+        ]
+    )
+    def test_checksum_vlan_tunnel_hw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.vm0_testpmd,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum set ip hw 1",
+                "csum set udp hw 1",
+                "csum set tcp hw 1",
+                "csum set sctp hw 1",
+                "csum set outer-ip hw 1",
+                "csum set outer-udp hw 1",
+                "csum parse-tunnel on 1",
+                "vlan set filter on 0",
+                "port start all",
+                "rx_vlan add 100 0",
+            ],
+            packets=CSUM_VLAN_NON_TUNNEL_PACKETS + CSUM_VLAN_TUNNEL_PACKETS,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+        )
 
-        expIP = sndIP
-        expIPv6 = sndIPv6
-        expPkts = {
-            "IP/UDP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IP(src="%s")/UDP()/("X"*46)'
-            % (mac, expIP),
-            "IP/TCP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IP(src="%s")/TCP()/("X"*46)'
-            % (mac, expIP),
-            "IPv6/UDP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IPv6(src="%s")/UDP()/("X"*46)'
-            % (mac, expIPv6),
-            "IPv6/TCP": 'Ether(dst="02:00:00:00:00:00", src="%s")/IPv6(src="%s")/TCP()/("X"*46)'
-            % (mac, expIPv6),
-        }
+    @skip_unsupported_nic(
+        [
+            "I40E_10G-SFP_X710",
+            "I40E_40G-QSFP_A",
+            "I40E_40G-QSFP_B",
+            "I40E_25G-25G_SFP28",
+            "I40E_10G-SFP_X722",
+            "I40E_10G-10G_BASE_T_X722",
+            "I40E_10G-10G_BASE_T_BC",
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
+        ]
+    )
+    def test_checksum_vlan_sw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.vm0_testpmd,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "vlan set filter on 0",
+                "port start all",
+                "rx_vlan add 100 0",
+            ],
+            packets=CSUM_VLAN_NON_TUNNEL_PACKETS + CSUM_VLAN_NON_TUNNEL_PACKETS,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
+        )
 
-        self.dut.send_expect("start", "testpmd>")
-        result = self.checksum_validate(sndPkts, expPkts)
+    @skip_unsupported(reason="Tunnel SW checksum TX is not properly implemented.")
+    @skip_unsupported_nic(
+        [
+            "I40E_10G-SFP_X710",
+            "I40E_40G-QSFP_A",
+            "I40E_40G-QSFP_B",
+            "I40E_25G-25G_SFP28",
+            "I40E_10G-SFP_X722",
+            "I40E_10G-10G_BASE_T_X722",
+            "I40E_10G-10G_BASE_T_BC",
+            "ICE_100G-E810C_QSFP",
+            "ICE_25G-E810C_SFP",
+            "ICE_25G-E823C_QSFP",
+            "ICE_25G-E810_XXV_SFP",
+        ]
+    )
+    def test_checksum_vlan_tunnel_sw_all(self):
+        offload.execute_test_checksum(
+            case=self,
+            testpmd=self.vm0_testpmd,
+            tester=self.tester,
+            tester_tx_interface=self.tester_intf[0],
+            tester_rx_interface=self.tester_intf[1],
+            testpmd_port_0=self.dut_pci[0],
+            testpmd_port_1=self.dut_pci[1],
+            testpmd_enable_dcf_0=False,
+            testpmd_enable_dcf_1=False,
+            testpmd_bitwidth=None,
+            testpmd_other_eal_param="",
+            testpmd_param=" --enable-rx-cksum ",
+            testpmd_commands=[
+                "set verbose 1",
+                "set fwd csum",
+                "csum mac-swap off 0",
+                "csum mac-swap off 1",
+                "set promisc 0 on",
+                "set promisc 1 on",
+                "port stop all",
+                "csum parse-tunnel on 1",
+                "vlan set filter on 0",
+                "port start all",
+                "rx_vlan add 100 0",
+            ],
+            packets=CSUM_VLAN_NON_TUNNEL_PACKETS + CSUM_VLAN_TUNNEL_PACKETS,
+            packet_interval=offload.TX_INTERVAL,
+            support_rx_tunnel=self.support_rx_tunnel,
+            allow_tx_zero_inner_sctp=True,
+        )
 
-        self.verify(len(result) == 0, ",".join(list(result.values())))
+    def checksum_enablehw(self, port):
+        self.dut.send_expect("port stop all", "testpmd>")
+        self.dut.send_expect("rx_vxlan_port add 4789 0 ", "testpmd>")
+        self.dut.send_expect("csum set ip hw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set udp hw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set tcp hw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set sctp hw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set outer-ip hw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set outer-udp hw %d" % port, "testpmd>")
+        self.dut.send_expect("csum parse-tunnel on %d" % port, "testpmd>")
+        self.dut.send_expect("port start all", "testpmd>")
 
-        self.dut.send_expect("stop", "testpmd>")
+    def checksum_enablesw(self, port):
+        self.dut.send_expect("port stop all", "testpmd>")
+        self.dut.send_expect("csum set ip sw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set udp sw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set tcp sw %d" % port, "testpmd>")
+        self.dut.send_expect("csum set sctp sw %d" % port, "testpmd>")
+        self.dut.send_expect("port start all", "testpmd>")
 
     def benchmark(self, lcore, ptype, mode, flow_format, size_list, nic):
         """
@@ -810,236 +1169,13 @@  class TestChecksumOffload(TestCase):
             self.dut.send_expect("quit", "#", 10)
             self.result_table_print()
 
-    def test_hardware_checksum_check_ip_rx(self):
-        self.tester.send_expect("scapy", ">>>")
-        self.checksum_enablehw(self.dut_ports[0])
-        self.dut.send_expect("start", "testpmd>")
-        self.pmdout.wait_link_status_up(self.dut_ports[0])
-        verification_errors: List[VerifyFailure] = []
-
-        iface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
-        dut_mac = "52:00:00:00:00:01"
-        tester_mac = "52:00:00:00:00:00"
-        self.scapy_exec(f"eth = Ether(dst='{dut_mac}', src='{tester_mac}')")
-        self.scapy_exec(f"iface = '{iface}'")
-
-        # Untunnelled
-        for l4 in l4_protos:
-            for chksum in "", "chksum=0xf":
-                vf = self.send_pkt_expect_good_bad_from_flag_catch_failure(
-                    f"eth/IP({chksum})/{l4}()/('X'*50)",
-                    "RTE_MBUF_F_RX_IP_CKSUM_",
-                    f"{l4}",
-                    should_pass=(chksum == ""),
-                )
-                if vf is not None:
-                    verification_errors.append(vf)
-
-        for err in verification_errors:
-            self.logger.error(str(err))
-        self.verify(len(verification_errors) == 0, "See previous output")
-
-        self.tester.send_expect("quit()", "# ")
-        self.dut.send_expect("stop", "testpmd>")
-
-    def test_hardware_checksum_check_ip_tx(self):
-        self.checksum_enablehw(self.dut_ports[0])
-        self.dut.send_expect("start", "testpmd>")
-        self.pmdout.wait_link_status_up(self.dut_ports[0])
-        verification_errors: List[VerifyFailure] = []
-
-        iface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
-        dut_mac = "52:00:00:00:00:01"
-        tester_mac = "52:00:00:00:00:00"
-        eth = Ether(dst=dut_mac, src=tester_mac)
-
-        checksum_options = (
-            {},
-            {"chksum": 0xF},
-        )
-
-        packets = [
-            eth / IP(**chksum) / TCP() / Raw(load=str(int(len(chksum) != 1)))
-            for chksum in checksum_options
-        ]
-
-        capture_file_name = "test_hardware_checksum_check_l3_tx_capture.pcap"
-
-        packet_file_path = "/tmp/test_hardware_checksum_check_l3_tx_packets.pcap"
-        capture_file_path = "/tmp/tester/" + capture_file_name
-
-        self.send_tx_package(
-            packet_file_path, capture_file_path, packets, iface, dut_mac
-        )
-
-        self.tester.session.copy_file_from(
-            capture_file_path, "output/tmp/pcap/" + capture_file_name
-        )
-        captured_packets = rdpcap("output/tmp/pcap/" + capture_file_name)
-
-        self.verify(
-            len(packets) == len(captured_packets), "Not all packets were received"
-        )
-
-        error_messages = []
-        for pkt in captured_packets:
-            should_pass = pkt[TCP].payload.build() == b"1"
-            if not (self.validate_checksum(pkt, IP) == should_pass):
-                error_messages.append(
-                    f"A packet was marked as having a"
-                    f"{' valid' if should_pass == '' else 'n invalid'}"
-                    f" checksum when it should have had the opposite."
-                )
-
-        self.dut.send_expect("stop", "testpmd>")
-        if len(error_messages) != 0:
-            for error_msg in error_messages:
-                self.logger.error(error_msg)
-            self.verify(False, "See prior output")
-
-    def test_hardware_checksum_check_l4_rx(self):
-        self.checksum_enablehw(self.dut_ports[0])
-        self.dut.send_expect("start", "testpmd>")
-        self.pmdout.wait_link_status_up("all")
-        verification_errors: List[VerifyFailure] = []
-
-        iface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
-        dut_mac = "52:00:00:00:00:01"
-        tester_mac = "52:00:00:00:00:00"
-        self.tester.send_expect("scapy", ">>> ")
-        self.scapy_exec(f"eth = Ether(dst='{dut_mac}', src='{tester_mac}')")
-        self.scapy_exec(f"iface = '{iface}'")
-        # Untunneled
-        for l3 in l3_protos:
-            for l4 in l4_protos:
-                for chksum in "", "chksum=0xf":
-                    vf = self.send_pkt_expect_good_bad_from_flag_catch_failure(
-                        f"eth/{l3}()/{l4}({chksum})/('X'*48)",
-                        "RTE_MBUF_F_RX_L4_CKSUM_",
-                        f"{l3}/{l4}",
-                        should_pass=(chksum == ""),
-                    )
-                    if vf is not None:
-                        verification_errors.append(vf)
-
-        # Tunneled
-        # VXLAN
-        for l3 in l3_protos:
-            for l4 in l4_protos:
-                for outer_arg in "", "chksum=0xf":
-                    for inner_arg in "", "chksum=0xf":
-                        for flag in (
-                            "RTE_MBUF_F_RX_L4_CKSUM_",
-                            "RTE_MBUF_F_RX_OUTER_L4_CKSUM_",
-                        ):
-                            if flag == "RTE_MBUF_F_RX_L4_CKSUM_":
-                                should_pass = inner_arg == ""
-                            else:  # flag == RTE_MBUF_F_RX_OUTER_L4_CKSUM_
-                                # IntelĀ® Ethernet 700 Series not support outer checksum
-                                if self.is_eth_series_nic(700):
-                                    continue
-                                should_pass = outer_arg == ""
-                            vf = self.send_pkt_expect_good_bad_from_flag_catch_failure(
-                                f"eth/{l3}()/UDP(dport=4789,{outer_arg})/VXLAN()/eth/{l3}()/"
-                                f"{l4}({inner_arg})/('X'*48)",
-                                flag,
-                                f"{l3}/UDP/VXLAN/{l3}/{l4}",
-                                should_pass=should_pass,
-                            )
-
-                            if vf is not None:
-                                verification_errors.append(vf)
-
-        # GRE
-        for l3 in l3_protos:
-            for l4 in l4_protos:
-                for inner_arg in "", "chksum=0xf":
-                    should_pass: bool = inner_arg == ""
-                    vf = self.send_pkt_expect_good_bad_from_flag_catch_failure(
-                        f"eth/{l3}()/GRE()/{l3}()/{l4}({inner_arg})/('X'*48)",
-                        "RTE_MBUF_F_RX_L4_CKSUM_",
-                        f"{l3}/GRE/{l3}/{l4}",
-                        should_pass=should_pass,
-                    )
-
-                    if vf is not None:
-                        verification_errors.append(vf)
-
-        # This section is commented out because GENEVE is not supported in the current version (2.3.3) that is used
-        # in dts. This will be available in scapy 2.4.3, so this was added and commented out so that when scapy is
-        # updated this test case can easily take advantage of the new functionality.
-
-        # # GENEVE
-        # # This import is over here so that it is not forgotten when the update happens
-        # from scapy.contrib.geneve import GENEVE
-        # for l3_outer in l3_protos:
-        #     for l4_outer in l4_protos:
-        #         for l3_inner in l3_protos:
-        #             for l4 in l4_protos:
-        #                 for outer_arg in "", "chksum=0xf":
-        #                     for inner_arg in "", "chksum=0xf":
-        #                         for flag in "RTE_MBUF_F_RX_L4_CKSUM_", "RTE_MBUF_F_RX_OUTER_L4_CKSUM_":
-        #                             should_pass: bool = inner_arg == "" if flag == "RTE_MBUF_F_RX_L4_CKSUM_" else outer_arg == ""
-        #                             vf = self.send_pkt_expect_good_bad_from_flag_catch_failure(
-        #                                 f"eth/{l3_outer}()/{l4_outer}({outer_arg})/GENEVE()/eth/{l3_inner}()/"
-        #                                 f"{l4}({inner_arg})/('X'*50)",
-        #                                 flag, f"{l3_outer}/{l4_outer}/VXLAN/{l3_inner}/{l4}",
-        #                                 should_pass=should_pass)
-        #
-        #                             if vf is not None:
-        #                                 verification_errors.append(vf)
-
-        self.tester.send_expect("quit()", "#")
-        self.dut.send_expect("stop", "testpmd>")
-
-        for err in verification_errors:
-            self.logger.error(str(err))
-        self.verify(len(verification_errors) == 0, "See previous output")
-
-    def test_hardware_checksum_check_l4_tx(self):
-        self.checksum_enablehw(self.dut_ports[0])
-        self.dut.send_expect("start", "testpmd>")
-        self.pmdout.wait_link_status_up("all")
-        verification_errors: List[VerifyFailure] = []
-
-        iface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
-        dut_mac = "52:00:00:00:00:01"
-        tester_mac = "52:00:00:00:00:00"
-        packets = self.get_packets(dut_mac, tester_mac)
-
-        capture_file_name = "test_hardware_checksum_check_l4_tx_capture.pcap"
-
-        packet_file_path = "/tmp/test_hardware_checksum_check_l4_tx_packets.pcap"
-        capture_file_path = "/tmp/tester/" + capture_file_name
-
-        self.send_tx_package(
-            packet_file_path, capture_file_path, packets, iface, dut_mac
-        )
-
-        self.tester.session.copy_file_from(
-            capture_file_path, "output/tmp/pcap/" + capture_file_name
-        )
-
-        captured_packets = rdpcap("output/tmp/pcap/" + capture_file_name)
-        self.verify(
-            len(packets) == len(captured_packets), "Not all packets were received"
-        )
-
-        self.dut.send_expect("stop", "testpmd>")
-
-        self.count = 0
-        error_messages = self.validate_packet_list_checksums(captured_packets)
-
-        if len(error_messages) != 0:
-            for error_msg in error_messages:
-                self.logger.error(error_msg)
-            self.verify(False, "See prior output")
-
     def tear_down(self):
         """
         Run after each test case.
         """
-        self.dut.send_expect("quit", "#")
+        self.dut.send_expect("quit", "# ")
+        self.dut.kill_all()
+        time.sleep(2)
 
     def tear_down_all(self):
         """