[V1,2/6] tests/kernelpf_vf:add new suite to cover most of the basic vf cases

Message ID 20230608182742.360594-3-zhiminx.huang@intel.com (mailing list archive)
State Superseded
Headers
Series add new common module and add new suites |

Commit Message

Huang, ZhiminX June 8, 2023, 6:27 p.m. UTC
  add kernelpf_vf new suite and refactor with func_test_base common module:
cover kernelpf_iavf/vf_vlan/vf_macfilter/vf_rss/vf_xstats_check.

Signed-off-by: Zhimin Huang <zhiminx.huang@intel.com>
---
 tests/TestSuite_kernelpf_vf.py | 1007 ++++++++++++++++++++++++++++++++
 1 file changed, 1007 insertions(+)
 create mode 100644 tests/TestSuite_kernelpf_vf.py
  

Comments

Tu, Lijuan June 14, 2023, 2:03 a.m. UTC | #1
> -----Original Message-----
> From: Zhimin Huang <zhiminx.huang@intel.com>
> Sent: Friday, June 9, 2023 2:28 AM
> To: dts@dpdk.org
> Cc: Huang, ZhiminX <zhiminx.huang@intel.com>
> Subject: [dts][PATCH V1 2/6] tests/kernelpf_vf:add new suite to cover most of
> the basic vf cases
> 
> add kernelpf_vf new suite and refactor with func_test_base common module:
> cover kernelpf_iavf/vf_vlan/vf_macfilter/vf_rss/vf_xstats_check.
> 
> Signed-off-by: Zhimin Huang <zhiminx.huang@intel.com>
> ---
>  tests/TestSuite_kernelpf_vf.py | 1007 ++++++++++++++++++++++++++++++++
>  1 file changed, 1007 insertions(+)
>  create mode 100644 tests/TestSuite_kernelpf_vf.py
> 
......
> +
> +    def test_vf_multicast(self):
> +        self.rxtx_base.launch_testpmd()
> +        self.rxtx_base.basic_multicast_check(
> +            normal_mac=VF_MAC_ADDR, multicast_mac=MULTICAST_MAC_ADDR
> +        )
> +
> +    def test_vf_broadcast(self):
> +        self.rxtx_base.launch_testpmd()
> +        self.rxtx_base.basic_rx_check(packets_num=1,
> + packet_dst_mac=BROADCAST_MAC_ADDR)

I don't think it is a good idea to launch testpmd in every single case. It costs a lot of time. 
A good suite and cases organization could avoid this and save a lot of execution time.
1. adjust test cases that could use same args for testpmd.
2. separate those testpmd with different args to other suites.

> +
> +    def test_vf_queue_start_stop(self):
> +        self.rxtx_base.launch_testpmd()
> +        self.rxtx_base.basic_macfwd_check(
> +            packet_num=4, dst_mac=VF_MAC_ADDR,
> rx_port=self.used_dut_rx_port
> +        )
> +        packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
> +            packets=self.rxtx_base.generate_random_packets(
> +                dstmac=VF_MAC_ADDR, pktnum=4
> +            ),
> +            rx_port=self.used_dut_rx_port,
> +            pmd_commands=["stop", "port 0 rxq 0 stop", "start"],
> +        )
> +        self.verify(
> +            len(packets_captured) == 0
> +            and stats[self.used_dut_rx_port]["TX-packets"] == 0,
> +            "receive packet num is not match",
> +        )
> +        packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
> +            packets=self.rxtx_base.generate_random_packets(
> +                dstmac=VF_MAC_ADDR, pktnum=4
> +            ),
> +            rx_port=self.used_dut_rx_port,
> +            pmd_commands=["stop", "port 0 rxq 0 start", "port 1 txq 0 stop",
> "start"],
> +        )
> +        self.verify(
> +            len(packets_captured) == 0
> +            and stats[self.used_dut_rx_port]["TX-packets"] == 0,
> +            "receive packet num is not match",
> +        )
> +        self.rxtx_base.basic_macfwd_check(
> +            packet_num=4,
> +            dst_mac=VF_MAC_ADDR,
> +            rx_port=self.used_dut_rx_port,
> +            pmd_commands=["stop", "port 1 txq 0 start", "start"],
> +        )
> +
  

Patch

diff --git a/tests/TestSuite_kernelpf_vf.py b/tests/TestSuite_kernelpf_vf.py
new file mode 100644
index 00000000..cbecea14
--- /dev/null
+++ b/tests/TestSuite_kernelpf_vf.py
@@ -0,0 +1,1007 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+#
+
+import random
+
+from framework.exception import VerifyFailure
+from framework.settings import DPDK_RXMODE_SETTING, ETH_800_SERIES, load_global_setting
+from framework.test_case import TestCase, check_supported_nic
+
+from .func_test_base import *
+
+VF_MAC_ADDR = "00:01:23:45:67:89"
+VF_WRONG_MAC_ADDR = "00:01:23:45:67:99"
+MULTICAST_MAC_ADDR = "01:80:C2:00:00:08"
+BROADCAST_MAC_ADDR = "FF:FF:FF:FF:FF:FF"
+
+MAX_VLAN_ID = 4095
+RANDOM_VLAN_ID = random.randint(2, MAX_VLAN_ID)
+OUTER_VLAN_ID = 1
+INNTER_VLAN_ID = 2
+
+MATCHED_VLAN_PKT = FuncTestBase.generate_using_packets(
+    pkt_type="VLAN_UDP", vlan_id=RANDOM_VLAN_ID, dst_mac=VF_MAC_ADDR
+)
+UNMATCHED_VLAN_PKT = FuncTestBase.generate_using_packets(
+    pkt_type="VLAN_UDP", vlan_id=RANDOM_VLAN_ID - 1, dst_mac=VF_MAC_ADDR
+)
+
+MATCHED_DOUBLE_VLAN_PKT = [
+    FuncTestBase.generate_using_packets(pkt_str=_pkt)
+    for _pkt in [
+        'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x8100,prio=1)/Dot1Q(vlan=%d,type=0x0800,prio=2)/IP('
+        'src="196.222.232.221")/("X"*480)'
+        % (VF_MAC_ADDR, OUTER_VLAN_ID, INNTER_VLAN_ID),
+        'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x0800)/IP('
+        'src="196.222.232.221")/("X"*480)' % (VF_MAC_ADDR, OUTER_VLAN_ID),
+    ]
+]
+
+UNMATCHED_DOUBLE_VLAN_PKT = [
+    FuncTestBase.generate_using_packets(pkt_str=_pkt)
+    for _pkt in [
+        'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x8100,prio=1)/Dot1Q(vlan=%d,type=0x0800,prio=2)/IP('
+        'src="196.222.232.221")/("X"*480)'
+        % (VF_MAC_ADDR, OUTER_VLAN_ID + 10, INNTER_VLAN_ID),
+        'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x0800)/IP('
+        'src="196.222.232.221")/("X"*480)' % (VF_MAC_ADDR, OUTER_VLAN_ID + 10),
+    ]
+]
+
+
+class TestKernelpfVf(TestCase):
+    def set_up_all(self):
+        self.dut_port = self.dut.get_ports(self.nic)
+        self.used_dut_tx_port = self.dut_port[0]
+        self.used_dut_rx_port = self.dut_port[1]
+        self.port_obj = [self.dut.ports_info[port]["port"] for port in self.dut_port]
+        self.tester_tx_interface = self.tester.get_interface(
+            self.tester.get_local_port(self.used_dut_tx_port)
+        )
+        self.tester_rx_interface = self.tester.get_interface(
+            self.tester.get_local_port(self.used_dut_rx_port)
+        )
+        self.rxtx_base = RxTxBaseTest(
+            self, self.tester_tx_interface, self.tester_rx_interface
+        )
+        self.vlan_func = VlanFuncBaseTest(
+            self, self.tester_tx_interface, self.tester_rx_interface
+        )
+        self.rxtx_base.check_port_num_for_test(2)
+        self.setup_env_configuration()
+        self.rx_mode = load_global_setting(DPDK_RXMODE_SETTING)
+
+    def set_up(self):
+        pass
+
+    def pf_config(self):
+        self.flag = "vf-vlan-pruning"
+        self.dut.bind_interfaces_linux(self.kdriver)
+        self.default_stats = self.dut.get_priv_flags_state(
+            self.port_obj[self.used_dut_tx_port].get_interface_name(), self.flag
+        )
+        if not self.default_stats:
+            self.logger.warning(
+                f"{self.kdriver + '_' + self.nic_obj.driver_version} driver does not have vf-vlan-pruning flag."
+            )
+        if (
+            any([self.is_eth_series_nic(800), self.kdriver == "i40e"])
+            and self.default_stats
+        ):
+            self.dut.send_expect(
+                "ethtool --set-priv-flags %s %s on"
+                % (
+                    self.port_obj[self.used_dut_tx_port].get_interface_name(),
+                    self.flag,
+                ),
+                "# ",
+            )
+            self.dut.send_expect(
+                "ethtool --set-priv-flags %s %s on"
+                % (
+                    self.port_obj[self.used_dut_rx_port].get_interface_name(),
+                    self.flag,
+                ),
+                "# ",
+            )
+
+    def vf_config(self):
+        self.orig_vf_mac = self.rxtx_base.get_vf_mac_through_pf(
+            self.port_obj[self.used_dut_tx_port].get_interface_name()
+        )
+        self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(mac=VF_MAC_ADDR)
+        self.dut.send_expect(
+            "ip link set %s vf 0 spoofchk off"
+            % (self.port_obj[self.used_dut_tx_port].get_interface_name()),
+            "# ",
+        )
+        self.dut.send_expect(
+            "ip link set %s vf 0 spoofchk off"
+            % (self.port_obj[self.used_dut_rx_port].get_interface_name()),
+            "# ",
+        )
+
+    def setup_env_configuration(self):
+        self.vm_obj, self.vm_dut = self.rxtx_base.vf_test_preset_env_vm(
+            pf_port=[self.used_dut_tx_port, self.used_dut_rx_port],
+            vfs_num=1,
+            vm_name="vm0",
+        )
+        self.rxtx_base.init_pmd_session(self.vm_dut)
+        self.vlan_func.init_pmd_session(self.vm_dut)
+
+    def test_vf_basic_rxtx(self):
+        self.rxtx_base.launch_testpmd()
+        self.rxtx_base.basic_rx_check(packets_num=100, packet_dst_mac=VF_MAC_ADDR)
+        self.rxtx_base.basic_tx_check()
+
+    def test_vf_promisc_mode(self):
+        try:
+            self.rxtx_base.execute_host_cmd(
+                "ip link set dev %s vf 0 trust on"
+                % self.port_obj[self.used_dut_tx_port].get_interface_name()
+            )
+            self.rxtx_base.launch_testpmd()
+            self.rxtx_base.basic_promisc_check(
+                match_mac=VF_MAC_ADDR, unmatch_mac=VF_WRONG_MAC_ADDR
+            )
+        except Exception as e:
+            raise VerifyFailure(e)
+        finally:
+            self.rxtx_base.pmd_session.quit()
+            self.rxtx_base.execute_host_cmd(
+                "ip link set dev %s vf 0 trust off"
+                % self.port_obj[self.used_dut_tx_port].get_interface_name()
+            )
+            self.vf_config()
+
+    def test_vf_multicast(self):
+        self.rxtx_base.launch_testpmd()
+        self.rxtx_base.basic_multicast_check(
+            normal_mac=VF_MAC_ADDR, multicast_mac=MULTICAST_MAC_ADDR
+        )
+
+    def test_vf_broadcast(self):
+        self.rxtx_base.launch_testpmd()
+        self.rxtx_base.basic_rx_check(packets_num=1, packet_dst_mac=BROADCAST_MAC_ADDR)
+
+    def test_vf_queue_start_stop(self):
+        self.rxtx_base.launch_testpmd()
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=4, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+        )
+        packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
+            packets=self.rxtx_base.generate_random_packets(
+                dstmac=VF_MAC_ADDR, pktnum=4
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=["stop", "port 0 rxq 0 stop", "start"],
+        )
+        self.verify(
+            len(packets_captured) == 0
+            and stats[self.used_dut_rx_port]["TX-packets"] == 0,
+            "receive packet num is not match",
+        )
+        packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
+            packets=self.rxtx_base.generate_random_packets(
+                dstmac=VF_MAC_ADDR, pktnum=4
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=["stop", "port 0 rxq 0 start", "port 1 txq 0 stop", "start"],
+        )
+        self.verify(
+            len(packets_captured) == 0
+            and stats[self.used_dut_rx_port]["TX-packets"] == 0,
+            "receive packet num is not match",
+        )
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=4,
+            dst_mac=VF_MAC_ADDR,
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=["stop", "port 1 txq 0 start", "start"],
+        )
+
+    def test_vf_rss(self):
+        rss_type = ["ip", "tcp", "udp"]
+        rss_reta = [3, 2, 1, 0] * 16
+        self.rxtx_base.launch_testpmd(param="--rxq=4 --txq=4")
+        self.rxtx_base.rss_reta_config_check(rss_reta)
+        for _rss in rss_type:
+            hash_table = self.rxtx_base.basic_rss_check(
+                dst_mac=VF_MAC_ADDR, rss_type=_rss, queue_num=4
+            )
+            self.rxtx_base.rss_reta_hit_check(hash_table, rss_reta)
+            self.rxtx_base.execute_pmd_cmd("stop")
+
+    def test_vf_rss_hash_key(self):
+        update_hash_key = "1b9d58a4b961d9cd1c56ad1621c3ad51632c16a5d16c21c3513d132c135d132c13ad1531c23a51d6ac49879c499d798a7d949c8a"
+        self.rxtx_base.launch_testpmd(param="--rxq=4 --txq=4")
+        self.rxtx_base.basic_rss_hash_key_check(
+            dst_mac=VF_MAC_ADDR, hash_key=update_hash_key
+        )
+
+    def test_vf_rss_rxq_txq_inconsistent(self):
+        params = [
+            "--rxq=4 --txq=8",
+            "--rxq=6 --txq=8",
+            "--rxq=3 --txq=9",
+            "--rxq=4 --txq=16",
+        ]
+        if self.kdriver == "ixgbe":
+            params = [
+                "--rxq=2 --txq=4",
+                "--rxq=1 --txq=2",
+            ]
+        for param in params:
+            try:
+                queue_num = re.search(r"--rxq=(\d+)", param).group(1)
+                self.rxtx_base.launch_testpmd(param=param)
+                self.rxtx_base.basic_rss_check(
+                    dst_mac=VF_MAC_ADDR, rss_type="ip", queue_num=queue_num
+                )
+            except Exception as e:
+                raise VerifyFailure(e)
+            finally:
+                self.rxtx_base.pmd_session.quit()
+
+    def test_vf_port_start_stop(self):
+        self.rxtx_base.launch_testpmd()
+        for i in range(10):
+            self.rxtx_base.execute_pmd_cmd("port stop all")
+            self.rxtx_base.execute_pmd_cmd("port start all")
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+        )
+
+    def test_vf_statistic_reset(self):
+        self.rxtx_base.launch_testpmd()
+        out = self.rxtx_base.execute_pmd_cmd("show port stats all")
+        self.verify(
+            "RX-packets: 0" in out and "TX-packets: 0" in out,
+            "receive some misc packet",
+        )
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+        )
+        self.rxtx_base.execute_pmd_cmd("clear port stats all")
+        out = self.rxtx_base.execute_pmd_cmd("show port stats all")
+        self.verify(
+            "RX-packets: 0" in out and "TX-packets: 0" in out,
+            "clear port stats fail",
+        )
+
+    def test_vf_information(self):
+        self.rxtx_base.launch_testpmd()
+        self.rxtx_base.basic_pmd_info_check(self.port_obj[0])
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+        )
+
+    def test_vf_xstats_check(self):
+        try:
+            self.tester.send_expect(
+                "ifconfig {} mtu {}".format(self.tester_tx_interface, 3000), "# "
+            )
+            if self.kdriver == "ixgbe":
+                self.rxtx_base.execute_host_cmd(
+                    "ifconfig {} mtu 3000".format(
+                        self.port_obj[self.used_dut_tx_port].get_interface_name()
+                    )
+                )
+            self.rxtx_base.launch_testpmd(param="--rxq=4 --txq=4 --max-pkt-len=9000")
+            for _payload_size in [64, 128, 256, 512, 1024, 1523]:
+
+                stats_table, xstats_table = self.rxtx_base.basic_xstats_check(
+                    packet_num=100,
+                    dst_mac=VF_MAC_ADDR,
+                    rx_port=self.used_dut_rx_port,
+                    payload_size=_payload_size,
+                )
+                self.verify(
+                    xstats_table[self.used_dut_tx_port]["rx_good_packets"]
+                    == stats_table[self.used_dut_tx_port]["RX-packets"]
+                    == xstats_table[self.used_dut_rx_port]["tx_good_packets"]
+                    == stats_table[self.used_dut_rx_port]["TX-packets"]
+                    == 100,
+                    "pkt recieve or transport count error!",
+                )
+                self.verify(
+                    xstats_table[self.used_dut_tx_port]["rx_good_bytes"]
+                    == stats_table[self.used_dut_tx_port]["RX-bytes"]
+                    == xstats_table[self.used_dut_rx_port]["tx_good_bytes"]
+                    == stats_table[self.used_dut_rx_port]["TX-bytes"],
+                    "pkt recieve or transport bytes error!",
+                )
+        except Exception as e:
+            raise VerifyFailure(e)
+        finally:
+            self.tester.send_expect(
+                "ifconfig {} mtu {}".format(self.tester_tx_interface, 1500), "# "
+            )
+            if self.kdriver == "ixgbe":
+                self.rxtx_base.execute_host_cmd(
+                    "ifconfig {} mtu 1500".format(
+                        self.port_obj[self.used_dut_tx_port].get_interface_name()
+                    )
+                )
+
+    def test_vf_unicast(self):
+        self.rxtx_base.launch_testpmd()
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=10,
+            dst_mac=VF_WRONG_MAC_ADDR,
+            check_miss=True,
+            pmd_commands=[
+                "set promisc all off",
+                "set allmulti all off",
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+            rx_port=self.used_dut_rx_port,
+        )
+        self.rxtx_base.basic_macfwd_check(
+            packet_num=10, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+        )
+
+    def test_vf_mac_add_filter(self):
+        try:
+            self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(
+                mac=self.orig_vf_mac[0]
+            )
+            self.rxtx_base.launch_testpmd()
+            default_vf_mac = self.rxtx_base.get_vf_mac_through_pf(
+                self.port_obj[self.used_dut_tx_port].get_interface_name()
+            )
+            self.rxtx_base.basic_macfwd_check(
+                packet_num=100,
+                dst_mac=VF_MAC_ADDR,
+                rx_port=self.used_dut_rx_port,
+                pmd_commands=[
+                    "port stop all",
+                    "port config all crc-strip on",
+                    "port start all",
+                    "set promisc all off",
+                    "mac_addr add 0 {}".format(VF_MAC_ADDR),
+                    "set verbose 1",
+                    "set fwd mac",
+                    "start",
+                ],
+            )
+            self.rxtx_base.basic_macfwd_check(
+                packet_num=100,
+                dst_mac=default_vf_mac[0],
+                rx_port=self.used_dut_rx_port,
+                pmd_commands=["clear port stats all"],
+            )
+            self.rxtx_base.basic_macfwd_check(
+                packet_num=100,
+                dst_mac=VF_MAC_ADDR,
+                rx_port=self.used_dut_rx_port,
+                check_miss=True,
+                pmd_commands=[
+                    "clear port stats all",
+                    "mac_addr remove 0 {}".format(VF_MAC_ADDR),
+                ],
+            )
+            self.rxtx_base.basic_macfwd_check(
+                packet_num=100,
+                dst_mac=VF_WRONG_MAC_ADDR,
+                rx_port=self.used_dut_rx_port,
+                check_miss=True,
+                pmd_commands=[
+                    "clear port stats all",
+                ],
+            )
+        except Exception as e:
+            raise VerifyFailure(e)
+        finally:
+            self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(mac=VF_MAC_ADDR)
+
+    def test_vf_vlan_filter(self):
+        self.vlan_func.launch_testpmd()
+        self.vlan_func.basic_vlan_filter_check(
+            vlan_id=RANDOM_VLAN_ID,
+            match_pkt=MATCHED_VLAN_PKT,
+            unmatch_pkt=UNMATCHED_VLAN_PKT,
+            pmd_commands=[
+                "set promisc all off",
+                "vlan set filter on 0",
+                "vlan set strip off 0",
+                "rx_vlan add %d 0" % RANDOM_VLAN_ID,
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+            rx_port=self.used_dut_rx_port,
+        )
+        _, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=MATCHED_VLAN_PKT,
+            pmd_commands=[
+                "rx_vlan rm %d 0" % RANDOM_VLAN_ID,
+                "vlan set filter off 0",
+            ],
+        )
+        if (
+            (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+            or (self.kdriver == "i40e" and not self.default_stats)
+            or (self.kdriver == "ice" and not self.default_stats)
+        ):
+            self.verify(len(vlan_id_list) == 1, "Failed to received vlan packet!!!")
+        else:
+            self.verify(len(vlan_id_list) == 0, "Failed to received vlan packet!!!")
+
+    def test_vf_vlan_strip(self):
+        self.vlan_func.launch_testpmd()
+        self.vlan_func.basic_vlan_strip_check(
+            vlan_id=RANDOM_VLAN_ID,
+            match_pkt=MATCHED_VLAN_PKT,
+            pmd_commands=[
+                "set promisc all off",
+                "vlan set filter on 0",
+                "rx_vlan add %d 0" % RANDOM_VLAN_ID,
+                "vlan set strip on 0",
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+            rx_port=self.used_dut_rx_port,
+        )
+
+    def test_vf_vlan_insertion(self):
+        self.vlan_func.launch_testpmd()
+        self.vlan_func.basic_vlan_insert_check(
+            vlan_id=RANDOM_VLAN_ID,
+            insert_vlan=RANDOM_VLAN_ID,
+            match_pkt=self.vlan_func.generate_using_packets(
+                pkt_type="UDP", dst_mac=VF_MAC_ADDR
+            ),
+            pmd_commands=[
+                "port stop all",
+                "set promisc all off",
+                "vlan set filter on 0",
+                "tx_vlan set 1 %s" % RANDOM_VLAN_ID,
+                "rx_vlan add %s 0" % RANDOM_VLAN_ID,
+                "port start all",
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+        )
+
+    def test_vf_vlan_pvid_base_tx(self):
+        try:
+            self.vlan_func.set_pvid_from_pf(
+                pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+                vlan_id=RANDOM_VLAN_ID,
+            )
+            self.vlan_func.launch_testpmd()
+            packets_captured, _, _ = self.vlan_func.execute_fwd_check_process(
+                packets=self.vlan_func.generate_using_packets(
+                    pkt_type="UDP",
+                    dst_mac=self.vlan_func.get_vf_mac_through_pf(
+                        pf_intf=self.port_obj[
+                            self.used_dut_rx_port
+                        ].get_interface_name()
+                    ),
+                ),
+                tx_port=1,
+                tester_tx_interface=self.tester_rx_interface,
+                tester_rx_interface=self.tester_tx_interface,
+                pmd_commands=["set fwd mac", "set verbose 1", "start"],
+            )
+            self.verify(len(packets_captured) == 1, "Not receive expected packet")
+        except Exception as e:
+            raise VerifyFailure(e)
+        finally:
+            self.vlan_func.execute_host_cmd(
+                "ip link set {} vf 0 vlan 0".format(
+                    self.port_obj[self.used_dut_tx_port].get_interface_name()
+                )
+            )
+
+    def test_vf_vlan_pvid_base_rx(self):
+        try:
+            self.vlan_func.set_pvid_from_pf(
+                pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+                vlan_id=RANDOM_VLAN_ID,
+            )
+            self.vlan_func.launch_testpmd()
+            rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+                pkts=[MATCHED_VLAN_PKT, UNMATCHED_VLAN_PKT],
+                pmd_commands=[
+                    "set fwd rxonly",
+                    "set verbose 1",
+                    "start",
+                ],
+            )
+            self.verify(rece_num == 1, "Failed to received matched vlan pkt")
+            rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+                pkts=self.vlan_func.generate_using_packets(
+                    pkt_type="UDP", dst_mac=VF_MAC_ADDR
+                ),
+                rx_port=self.used_dut_rx_port,
+            )
+            self.verify(rece_num == 0, "Failed to received udp pkt without vlan")
+            self.vlan_func.pmd_session.quit()
+            self.vlan_func.set_pvid_from_pf(
+                pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+                vlan_id=0,
+            )
+            self.vlan_func.launch_testpmd()
+            rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+                pkts=MATCHED_VLAN_PKT,
+                rx_port=self.used_dut_rx_port,
+                pmd_commands=[
+                    "set fwd rxonly",
+                    "set verbose 1",
+                    "start",
+                ],
+            )
+            if (
+                (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+                or (self.kdriver == "i40e" and not self.default_stats)
+                or (self.kdriver == "ice" and not self.default_stats)
+            ):
+                self.verify(rece_num == 1, "Failed to received vlan packet!!!")
+            else:
+                self.verify(rece_num == 0, "Failed to received vlan packet!!!")
+
+            rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+                pkts=[
+                    self.vlan_func.generate_using_packets(
+                        pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+                    ),
+                    self.vlan_func.generate_using_packets(
+                        pkt_type="UDP", dst_mac=VF_MAC_ADDR
+                    ),
+                ],
+                rx_port=self.used_dut_rx_port,
+            )
+            self.verify(rece_num == 2, "Not received expect packet")
+        except Exception as e:
+            raise VerifyFailure(e)
+        finally:
+            self.vlan_func.execute_host_cmd(
+                "ip link set {} vf 0 vlan 0".format(
+                    self.port_obj[self.used_dut_tx_port].get_interface_name()
+                )
+            )
+
+    def test_vf_vlan_rx_combination(self):
+        rx_vlans = [1, RANDOM_VLAN_ID, MAX_VLAN_ID]
+        param = "--enable-hw-vlan" if self.kdriver != "ixgbe" else ""
+        self.rxtx_base.launch_testpmd(param=param)
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=self.vlan_func.generate_using_packets(
+                pkt_type="UDP", dst_mac=VF_MAC_ADDR
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "set fwd rxonly",
+                "set verbose 1",
+                "vlan set strip on 0",
+                "vlan set filter on 0",
+                "set promisc all off",
+                "start",
+            ],
+        )
+        self.verify(rece_num == 1, "Not received normal packet as default")
+        _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+            packets=self.vlan_func.generate_using_packets(
+                pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+            ),
+            rx_port=self.used_dut_rx_port,
+        )
+        self.verify("VLAN tci=0x0" in pmdout, "Not received vlan 0 packet as default")
+        for rx_vlan in rx_vlans:
+            _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+                packets=self.vlan_func.generate_using_packets(
+                    pkt_type="VLAN_UDP", vlan_id=rx_vlan, dst_mac=VF_MAC_ADDR
+                ),
+                rx_port=self.used_dut_rx_port,
+                pmd_commands="rx_vlan add {} 0".format(rx_vlan),
+            )
+            self.verify(
+                "VLAN tci={}".format(hex(rx_vlan)) in pmdout,
+                "Not received expected vlan packet",
+            )
+            if rx_vlan == MAX_VLAN_ID:
+                continue
+            rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+                pkts=self.vlan_func.generate_using_packets(
+                    pkt_type="VLAN_UDP", vlan_id=rx_vlan + 1, dst_mac=VF_MAC_ADDR
+                ),
+                rx_port=self.used_dut_rx_port,
+            )
+            self.verify(rece_num == 0, "failed to receive unmatch vlan pkt")
+        for rx_vlan in rx_vlans:
+            self.vlan_func.execute_pmd_cmd("rx_vlan rm {} 0".format(rx_vlan))
+        _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+            packets=self.vlan_func.generate_using_packets(
+                pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+            ),
+            rx_port=self.used_dut_rx_port,
+        )
+        self.verify("VLAN tci=0x0" in pmdout, "Not received vlan 0 packet as default")
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=self.vlan_func.generate_using_packets(
+                pkt_type="UDP", dst_mac=VF_MAC_ADDR
+            ),
+            rx_port=self.used_dut_rx_port,
+        )
+        self.verify(
+            rece_num == 1, "Not received normal packet after remove vlan filter"
+        )
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=MATCHED_VLAN_PKT,
+            rx_port=self.used_dut_rx_port,
+        )
+        if (
+            (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+            or (self.kdriver == "i40e" and not self.default_stats)
+            or (self.kdriver == "ice" and not self.default_stats)
+        ):
+            self.verify(rece_num == 1, "Failed to received vlan packet!!!")
+        else:
+            self.verify(rece_num == 0, "Failed to received vlan packet!!!")
+
+    def test_vf_vlan_promisc(self):
+        self.vlan_func.launch_testpmd()
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=[
+                self.vlan_func.generate_using_packets(
+                    pkt_str=[
+                        'Ether(dst="{}",type=0x8100)/Dot1Q(vlan=100,type=0x0800)/IP(src="196.222.232.{}")/("X"*480)'.format(
+                            VF_MAC_ADDR, i
+                        )
+                        for i in range(10)
+                    ]
+                )
+            ],
+            pmd_commands=[
+                "port stop all",
+                "set promisc all on",
+                "set fwd mac",
+                "set verbose 1",
+                "vlan set filter off 0",
+                "vlan set strip off 0",
+                "port start all",
+                "start",
+            ],
+        )
+        if (
+            (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+            or (self.kdriver == "i40e" and not self.default_stats)
+            or (self.kdriver == "ice" and not self.default_stats)
+        ):
+            self.verify(rece_num == 10, "Not receive expected packet")
+        else:
+            self.verify(rece_num == 0, "Receive expected packet")
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=[
+                self.vlan_func.generate_using_packets(
+                    pkt_str=[
+                        'Ether(dst="{}")/IP(src="196.222.232.{}")/("X"*480)'.format(
+                            VF_MAC_ADDR, i
+                        )
+                        for i in range(10)
+                    ]
+                )
+            ],
+        )
+        self.verify(rece_num == 10, "Not receive expected packet")
+
+    def test_iavf_dual_vlan_filter(self):
+        self.vlan_func.launch_testpmd()
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=MATCHED_DOUBLE_VLAN_PKT,
+            pmd_commands=[
+                "vlan set filter on 0",
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+        )
+        self.vlan_func.vlan_offload_flag_check(filter="on")
+        if self.default_stats:
+            self.verify(rece_num == 0, "Failed received vlan packet!")
+        else:
+            self.verify(
+                rece_num == len(MATCHED_DOUBLE_VLAN_PKT),
+                "Failed received vlan packet!",
+            )
+        self.vlan_func.basic_vlan_filter_check(
+            vlan_id=OUTER_VLAN_ID,
+            match_pkt=MATCHED_DOUBLE_VLAN_PKT[0],
+            unmatch_pkt=UNMATCHED_DOUBLE_VLAN_PKT[0],
+            pmd_commands=[
+                "rx_vlan add %d 0" % OUTER_VLAN_ID,
+            ],
+            double_vlan=True,
+            rx_port=self.used_dut_rx_port,
+        )
+        self.vlan_func.vlan_prio_check(
+            pkts=MATCHED_DOUBLE_VLAN_PKT[0], outer=1, inner=2
+        )
+        self.vlan_func.basic_vlan_filter_check(
+            vlan_id=OUTER_VLAN_ID,
+            match_pkt=MATCHED_DOUBLE_VLAN_PKT[1],
+            unmatch_pkt=UNMATCHED_DOUBLE_VLAN_PKT[1],
+            double_vlan=False,
+            rx_port=self.used_dut_rx_port,
+        )
+        rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=MATCHED_DOUBLE_VLAN_PKT,
+            pmd_commands=[
+                "rx_vlan rm %d 0" % OUTER_VLAN_ID,
+            ],
+        )
+        if self.default_stats:
+            self.verify(rece_num == 0, "Failed error received vlan packet!")
+        else:
+            self.verify(rece_num == 2, "Failed error received vlan packet!")
+
+    def test_iavf_dual_vlan_strip(self):
+        self.vlan_func.launch_testpmd()
+        rece_num, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=MATCHED_DOUBLE_VLAN_PKT,
+            pmd_commands=[
+                "vlan set filter on 0",
+                "rx_vlan add %d 0" % OUTER_VLAN_ID,
+                "vlan set strip on 0",
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+        )
+        self.vlan_func.vlan_offload_flag_check(filter="on", strip="on")
+        self.verify(
+            rece_num == len(MATCHED_DOUBLE_VLAN_PKT)
+            and len(vlan_id_list) == 1
+            and INNTER_VLAN_ID in vlan_id_list,
+            "Failed to receive vlan pkts with vlan tag",
+        )
+        rece_num, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+            pkts=MATCHED_DOUBLE_VLAN_PKT,
+            pmd_commands=[
+                "vlan set strip off 0",
+            ],
+        )
+        self.vlan_func.vlan_offload_flag_check(strip="off")
+        self.verify(
+            rece_num == len(MATCHED_DOUBLE_VLAN_PKT) and len(vlan_id_list) == 3,
+            "Failed to receive vlan pkts with vlan tag",
+        )
+
+    def test_iavf_dual_vlan_insert(self):
+        """
+        Test case: IAVF DUAL VLAN header insertion
+        """
+
+        """
+        according to dpdk commit d048a0aaae27809523969904c2f7b71fe3cc1bb6,
+        when the ice driver version newer than 1.8.9, avx512 tx path not support
+        insert correct vlag tag(outer of QinQ)
+        """
+        if self.rx_mode == "avx512" and self.vlan_func.convert_driver_version_value(
+            self.nic_obj.driver_version
+        ) > self.vlan_func.convert_driver_version_value("1.8.9"):
+            self.skip_case(False, "avx512 tx path not support insert correct vlan tag")
+        self.vlan_func.launch_testpmd()
+        self.vlan_func.basic_vlan_insert_check(
+            vlan_id=RANDOM_VLAN_ID,
+            insert_vlan=INNTER_VLAN_ID,
+            match_pkt=MATCHED_VLAN_PKT,
+            pmd_commands=[
+                "vlan set filter on 0",
+                "rx_vlan add {} 0".format(RANDOM_VLAN_ID),
+                "port stop 1",
+                "tx_vlan set 1 {}".format(INNTER_VLAN_ID),
+                "port start 1",
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+            rx_port=self.used_dut_rx_port,
+            double_vlan=INNTER_VLAN_ID,
+        )
+        self.vlan_func.basic_vlan_insert_check(
+            vlan_id=RANDOM_VLAN_ID,
+            insert_vlan=INNTER_VLAN_ID,
+            match_pkt=self.vlan_func.generate_using_packets(
+                pkt_type="UDP", dst_mac=VF_MAC_ADDR
+            ),
+            rx_port=self.used_dut_rx_port,
+        )
+
+    @check_supported_nic(ETH_800_SERIES)
+    def test_enable_disable_iavf_crc_strip(self):
+        """
+        Test case: Enable/disable AVF CRC stripping
+        """
+        self.rxtx_base.launch_testpmd(param="--disable-crc-strip")
+        packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+            packets=self.rxtx_base.generate_random_packets(
+                dstmac=VF_MAC_ADDR, pktnum=1
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=["set verbose 1", "set fwd mac", "start"],
+        )
+        pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            int(pkt_len_list[0]) + 4 == stats[self.used_dut_tx_port]["RX-bytes"],
+            "CRC strip off failed",
+        )
+
+        packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+            packets=self.rxtx_base.generate_random_packets(
+                dstmac=VF_MAC_ADDR, pktnum=1
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "stop",
+                "port stop 0",
+                "port config 0 rx_offload keep_crc off",
+                "port start 0",
+                "start",
+            ],
+        )
+        pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            int(pkt_len_list[0]) == stats[self.used_dut_tx_port]["RX-bytes"],
+            "CRC strip off failed",
+        )
+
+        packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+            packets=self.rxtx_base.generate_random_packets(
+                dstmac=VF_MAC_ADDR, pktnum=1
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "stop",
+                "port stop 0",
+                "port config 0 rx_offload keep_crc on",
+                "port start 0",
+                "start",
+            ],
+        )
+        pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            int(pkt_len_list[0]) + 4 == stats[self.used_dut_tx_port]["RX-bytes"],
+            "CRC strip off failed",
+        )
+
+        self.rxtx_base.pmd_session.quit()
+        self.rxtx_base.launch_testpmd()
+        packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+            packets=self.rxtx_base.generate_random_packets(
+                dstmac=VF_MAC_ADDR, pktnum=1
+            ),
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "set verbose 1",
+                "set fwd mac",
+                "start",
+            ],
+        )
+        pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            int(pkt_len_list[0]) == stats[self.used_dut_tx_port]["RX-bytes"],
+            "CRC strip off failed",
+        )
+
+    @check_supported_nic(ETH_800_SERIES)
+    def test_crc_strip_iavf_vlan_strip_coexists(self):
+        """
+        Test case: IAVF CRC strip and Vlan strip co-exists
+        """
+        self.vlan_func.launch_testpmd()
+        self.vlan_func.set_pmd_fwd_mode("mac")
+        self.vlan_func.vlan_offload_flag_check(strip="off")
+        # vlan strip off, CRC strip on
+        self.vlan_func.execute_pmd_cmd(["stop", "vlan set strip off 0"])
+        self.vlan_func.vlan_offload_flag_check(strip="off")
+        packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+            packets=MATCHED_DOUBLE_VLAN_PKT[0],
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "vlan set filter on 0",
+                "rx_vlan add %d 0" % OUTER_VLAN_ID,
+                "start",
+            ],
+        )
+        _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+        pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            len(vlan_id_list) == 2
+            and stats[self.used_dut_tx_port]["RX-bytes"]
+            == stats[self.used_dut_rx_port]["TX-bytes"]
+            == int(pkt_len_list[0]),
+            "CRC strip on and vlan strip off should have same values for rx/tx bytes and pkt_len",
+        )
+        compare_pkt_len = int(pkt_len_list[0])
+        # vlan strip on, CRC strip on
+        self.vlan_func.execute_pmd_cmd("vlan set strip on 0")
+        self.vlan_func.vlan_offload_flag_check(strip="on")
+        packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+            packets=MATCHED_DOUBLE_VLAN_PKT[0],
+            rx_port=self.used_dut_rx_port,
+        )
+        _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+        pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0]) + 4
+            and int(pkt_len_list[0]) + 4 == compare_pkt_len
+            and len(vlan_id_list) == 1,
+            "CRC strip on, vlan strip on, coexists test failed",
+        )
+        # vlan strip off, CRC strip off
+        self.vlan_func.execute_pmd_cmd("vlan set strip off 0")
+        self.vlan_func.vlan_offload_flag_check(strip="off")
+        packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+            packets=MATCHED_DOUBLE_VLAN_PKT[0],
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "stop",
+                "port stop 0",
+                "port config 0 rx_offload keep_crc on",
+                "port start 0",
+                "start",
+            ],
+        )
+        _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+        pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0]) + 4
+            and int(pkt_len_list[0]) == compare_pkt_len
+            and len(vlan_id_list) == 2,
+            "CRC strip off, vlan strip off, coexists test failed",
+        )
+        # vlan strip on, CRC strip off
+        out = self.vlan_func.execute_pmd_cmd("vlan set strip on 0")
+        self.verify(
+            "iavf_config_vlan_strip_v2(): fail to execute command VIRTCHNL_OP_ENABLE_VLAN_STRIPPING_V2"
+            in out,
+            "set vlan strip on successfully",
+        )
+        packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+            packets=MATCHED_DOUBLE_VLAN_PKT[0],
+            rx_port=self.used_dut_rx_port,
+        )
+        _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+        pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+        self.verify(
+            stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0]) + 4
+            and int(pkt_len_list[0]) == compare_pkt_len
+            and len(vlan_id_list) == 2,
+            "CRC strip off, vlan strip off, coexists test failed",
+        )
+        # vlan strip off, CRC strip on
+        self.vlan_func.execute_pmd_cmd("vlan set strip off 0")
+        self.vlan_func.vlan_offload_flag_check(strip="off")
+        packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+            packets=MATCHED_DOUBLE_VLAN_PKT[0],
+            rx_port=self.used_dut_rx_port,
+            pmd_commands=[
+                "stop",
+                "port stop 0",
+                "port config 0 rx_offload keep_crc off",
+                "port start 0",
+                "start",
+            ],
+        )
+        self.verify(
+            stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0])
+            and int(pkt_len_list[0]) == compare_pkt_len
+            and len(vlan_id_list) == 2,
+            "CRC strip on, vlan strip off, coexists test failed",
+        )
+
+    def tear_down(self):
+        self.rxtx_base.pmd_session.quit()
+
+    def tear_down_all(self):
+        self.rxtx_base.destroy_vm_env()
+        self.rxtx_base.destroy_vf()