[V1,1/6] tests/func_test_base:add new commom module to refactor func test cases

Message ID 20230608182742.360594-2-zhiminx.huang@intel.com (mailing list archive)
State Superseded
Headers
Series add new common module and add new suites |

Commit Message

Huang, ZhiminX June 8, 2023, 6:27 p.m. UTC
  for some vf test suites, there are duplicate methods implemented in
suites.
so add the public method into func_test_base,and encapsulate
classes for the basic testing process.

Signed-off-by: Zhimin Huang <zhiminx.huang@intel.com>
---
 tests/func_test_base.py | 977 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 977 insertions(+)
 create mode 100644 tests/func_test_base.py
  

Patch

diff --git a/tests/func_test_base.py b/tests/func_test_base.py
new file mode 100644
index 00000000..8134d5a1
--- /dev/null
+++ b/tests/func_test_base.py
@@ -0,0 +1,977 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+#
+
+import re
+import time
+import traceback
+
+from framework.packet import Packet
+from framework.pmd_output import PmdOutput
+from framework.virt_common import VM
+
+supported_vf_driver = ["pci-stub", "vfio-pci"]
+
+
+class FuncTestBase(object):
+    def __init__(self, test_case, tester_tx_interface, tester_rx_interface):
+        self.test_case = test_case
+        self.verify = self.test_case.verify
+        self.logger = test_case.logger
+        self.dut = self.test_case.dut
+        self.tester = self.test_case.tester
+
+        self.vm_info = []
+        self.pmd_session = None
+        self.tester_tx_interface = tester_tx_interface
+        self.tester_rx_interface = tester_rx_interface
+
+        self.pkt = Packet()
+        self.vf_driver = self.test_case.get_suite_cfg()["vf_driver"] or "pci-stub"
+        self.vm_specified_driver = self.get_vm_specified_driver()
+
+    def check_port_num_for_test(self, port_num: int):
+        """
+        check the port num for test requirement
+        """
+        dut_ports = self.dut.get_ports(self.test_case.nic)
+        self.verify(len(dut_ports) >= port_num, "Unsupported port num for test require")
+
+    def get_vm_specified_driver(self):
+        """
+        check the vf support driver and return vm driver
+        :return: vm_specified_driver
+        """
+        self.verify(self.vf_driver in supported_vf_driver, "Unsupported vf driver")
+        if self.vf_driver == "pci-stub":
+            vm_specified_driver = "pci-assign"
+        else:
+            vm_specified_driver = "vfio-pci"
+        return vm_specified_driver
+
+    def create_vf(self, pf_port, vfs_num: int, driver="default"):
+        """
+        create vfs, support multi pfs to create multi vfs
+        :param pf_port: pf port or port list
+        :param vfs_num: create vf num
+        :param driver: set vf driver on dpdk
+        :return: vf net device object
+        """
+        sriov_vfs_obj = []
+        try:
+            self.test_case.bind_nic_driver(
+                self.dut.get_ports(self.test_case.nic), driver=self.test_case.kdriver
+            )
+        except Exception as e:
+            self.logger.info(traceback.format_exc(e))
+        if hasattr(self.test_case, "pf_config"):
+            self.test_case.pf_config()
+        if isinstance(pf_port, int):
+            pf_port = [pf_port]
+        for _port in pf_port:
+            self.dut.generate_sriov_vfs_by_port(_port, vfs_num, driver=driver)
+            sriov_vfs_obj.append(self.dut.ports_info[_port]["vfs_port"])
+            self.dut.send_expect(
+                "ifconfig %s up" % self.dut.ports_info[_port]["intf"], "#"
+            )
+            res = self.dut.is_interface_up(self.dut.ports_info[_port]["intf"])
+            self.verify(
+                res, "%s link status is down" % self.dut.ports_info[_port]["intf"]
+            )
+        if hasattr(self.test_case, "vf_config"):
+            self.test_case.vf_config()
+        for vf_port in sriov_vfs_obj:
+            for _vf in vf_port:
+                _vf.bind_driver(self.vf_driver)
+        return sriov_vfs_obj
+
+    def destroy_vf(self, pf_port="all"):
+        """
+        destroy vfs
+        :param pf_port: select the pf port to destroy vfs.
+        the default will destroy all vfs.
+        """
+        pf_port = pf_port if isinstance(pf_port, list) else [pf_port]
+        if "all" in pf_port:
+            # destory all vfs
+            self.dut.destroy_all_sriov_vfs()
+        else:
+            for port in pf_port:
+                # destroy the vf on the specified port
+                self.dut.destroy_sriov_vfs_by_port(port)
+
+    def setup_vm_env(self, vm_name: str, sriov_pci: list):
+        """
+        passtrough the vfs into vm, start vm.
+        :param vm_name: select the vm to use
+        :param sriov_pci: pci list
+        :returns: vm object and vm dut session
+        """
+        try:
+            vm_obj = VM(self.dut, vm_name, self.test_case.suite_name)
+            for _pci in sriov_pci:
+                vm_obj.set_vm_device(
+                    driver=self.vm_specified_driver, **{"opt_host": _pci}
+                )
+            vm_dut = vm_obj.start()
+            if vm_dut is None:
+                raise Exception("Set up VM ENV failed!")
+            self.vm_info.append((vm_obj, vm_dut))
+            return vm_obj, vm_dut
+        except Exception as e:
+            self.destroy_vm_env(vm_obj_name=vm_name)
+            raise Exception(e)
+
+    def destroy_vm_env(self, vm_obj_name=""):
+        """
+        destroy the specified vm name or all vms
+        """
+        try:
+            for vm_obj, vm_session in self.vm_info:
+                if vm_obj_name == "":
+                    # destoty all vm
+                    vm_session.kill_all()
+                    vm_obj.stop()
+                    self.vm_info = []
+                elif vm_obj.vm_name == vm_obj_name:
+                    # destroy the vm with the specified name
+                    vm_session.kill_all()
+                    vm_obj.stop()
+                    self.vm_info.remove((vm_obj, vm_session))
+                else:
+                    self.logger.warning("VM %s not found!!!" % vm_obj_name)
+        except Exception as e:
+            self.dut.virt_exit()
+            time.sleep(3)
+            raise Exception(e)
+
+    def init_pmd_session(self, dut_obj):
+        """
+        init PMD session
+        """
+        self.pmd_session = PmdOutput(dut_obj)
+
+    def launch_testpmd(self, **kwargs):
+        """
+        launch testpmd with testpmd session
+        the default session is self.pmd_session
+        :return: testpmd output
+        """
+        pmd_session = kwargs.get("testpmd_obj") or self.pmd_session
+        out = pmd_session.start_testpmd(**kwargs)
+        return out
+
+    def execute_pmd_cmd(self, cmd, **kwargs):
+        """
+        execute multiple testpmd commands, return string output
+        :param cmd: testpmd cmd, support str and list
+        :return: testpmd output
+        """
+        pmd_session = kwargs.get("pmd_session") or self.pmd_session
+        _cmds = [cmd] if isinstance(cmd, str) else cmd
+        output = ""
+        for _cmd in _cmds:
+            output += pmd_session.execute_cmd(_cmd)
+        return output
+
+    def execute_host_cmd(self, cmd, **kwargs):
+        """
+        execute multiple host commands
+        :param cmd: host commands, support str and list
+        :return: host output
+        """
+        dut_obj = kwargs.get("dut_obj") or self.dut
+        _cmd = [cmd, "# ", 20] if isinstance(cmd, (str)) else cmd
+        return dut_obj.send_expect(*_cmd)
+
+    def build_dpdk_apps(self, app_path):
+        """
+        build dpdk apps and check the build status
+        """
+        out = self.dut.build_dpdk_apps(app_path)
+        self.verify("Error" not in out, "Compilation error")
+        self.verify("No such" not in out, "Compilation error")
+
+    def vf_test_preset_env_vm(self, pf_port, vfs_num, vm_name, driver="default"):
+        """
+        create vfs and setup vm env
+        """
+        if not isinstance(pf_port, list):
+            pf_port = [pf_port]
+        sriov_vfs_obj = self.create_vf(pf_port, vfs_num, driver=driver)
+        vf_list = []
+        for pf_id in pf_port:
+            vf_list += [
+                sriov_vfs_obj[pf_id][i].pci for i in range(len(sriov_vfs_obj[pf_id]))
+            ]
+        vm_obj, vm_dut = self.setup_vm_env(vm_name=vm_name, sriov_pci=vf_list)
+        return vm_obj, vm_dut
+
+    def get_vf_mac_through_pf(self, pf_intf):
+        """
+        use ip link show pf to get vf mac list
+        """
+        out = self.dut.send_expect(
+            "ip link show {}".format(pf_intf), "# ", alt_session=True
+        )
+        vf_mac_pattern = r"vf\s+\d+\s+.*\s+link\/ether\s+(\S+)\s+.*"
+        match = re.findall(vf_mac_pattern, out)
+        return match
+
+    @staticmethod
+    def generate_using_packets(pkt_type=None, pkt_str=None, **kwargs):
+        """
+        generate using pkts:
+            1.select the protocol type and generate the pkts through packet module
+            2.select customized pkt string
+        :return: pkt object
+        """
+        pkt = Packet()
+        dst_mac = kwargs.get("dst_mac")
+        vlan_id = kwargs.get("vlan_id")
+        if pkt_type:
+            pkt = Packet(pkt_type=pkt_type)
+            pkt.config_layer("ether", {"dst": dst_mac})
+            if vlan_id is not None:
+                pkt.config_layer("vlan", {"vlan": vlan_id})
+        elif pkt_str:
+            pkt.update_pkt(pkt_str)
+        else:
+            raise Exception("wrong pkt value")
+        return pkt
+
+    @staticmethod
+    def get_received_pkt_num(output, port_id=0):
+        """
+        use the testpmd output to get the receive pkts num for port
+        """
+        pkt_pattern = (
+            "port\s%d/queue\s\d+:\sreceived\s(\d+)\spackets.+?\n.*length=\d{2,}\s"
+            % port_id
+        )
+        received_data = re.findall(pkt_pattern, output)
+        received_pkts = sum(map(int, [i[0] for i in received_data]))
+        return received_pkts
+
+    @staticmethod
+    def get_hash_and_queues(out, port_id=0):
+        """
+        use testpmd output to get hash values and queues
+        """
+        hash_pattern = re.compile(
+            "port\s%s/queue\s\d+:\sreceived\s\d+\spackets.+?\n.*RSS\shash=(\w+)\s-\sRSS\squeue=(\w+)"
+            % port_id
+        )
+        hash_infos = hash_pattern.findall(out)
+        if len(hash_infos) == 0:
+            queue_pattern = re.compile("Receive\squeue=(\w+)")
+            queues = queue_pattern.findall(out)
+            return [], queues
+        hashes = [hash_info[0].strip() for hash_info in hash_infos]
+        queues = [hash_info[1].strip() for hash_info in hash_infos]
+        return hashes, queues
+
+    @staticmethod
+    def get_pkts_vlan_layer(pkt_obj: Packet, vlan_layer):
+        """
+        get pkts vlan layers
+        :param pkt_obj: pkt object
+        :param vlan_layer: vlan, prio etc...
+        :return: vlan id list, if pkt object consists of multiple vlan pkts
+        """
+        vlans = []
+        vlan_layers_list = []
+        for i in range(len(pkt_obj)):
+            vlan_dict = {}
+            try:
+                outer_vlan = pkt_obj.strip_element_layer3(vlan_layer, p_index=i)
+                vlan_dict["outer"] = outer_vlan
+            except Exception:
+                pass
+            try:
+                inner_vlan = pkt_obj.strip_element_layer4(vlan_layer, p_index=i)
+                vlan_dict["inner"] = inner_vlan
+            except Exception:
+                pass
+            vlans.append(vlan_dict)
+        for _vlan in vlans:
+            vlan_layers_list += list(_vlan.values())
+        return vlans, vlan_layers_list
+
+    def get_pmd_port_infomation(self, port_id=0, **kwargs):
+        """
+        use 'show ports info 0' to get port link status and port speed
+        """
+        pmd_session = kwargs.get("pmd_session") or self.pmd_session
+        link_status = pmd_session.get_port_link_status(port_id)
+        link_speed = pmd_session.get_port_link_speed(port_id)
+        return link_status, link_speed
+
+    def convert_driver_version_value(self, check_version):
+        """
+        convert the driver version to int list
+        take the first three values in the list for comparison and limit intree driver
+        for example:
+            6.0.7-060007-generic: [6, 0, 7-060007-generic]
+            1.11.0_rc59: [1, 11, 0]
+            1.11.11: [1, 11, 11]
+        """
+        try:
+            value_list = list(map(int, re.split(r"[.|_]", check_version)[:3]))
+        except ValueError as e:
+            self.logger.warning(e)
+            # the intree-driver has character, so set the return value is null list as the lowest driver version
+            return []
+        return value_list
+
+    @staticmethod
+    def get_pmd_rece_pkt_len(output):
+        """
+        get the pkt length in testpmd output
+        """
+        pkt_length = re.findall("length=(\d+)", output)
+        return pkt_length
+
+    def get_xstats_table(self, port_id_list):
+        """
+        use 'show port xstats' to get xstats info dict
+        """
+        xstats_data = dict()
+        if not isinstance(port_id_list, list):
+            port_id_list = [port_id_list]
+        for port_id in port_id_list:
+            out = self.execute_pmd_cmd("show port xstats %s" % port_id)
+            tmp_data = dict()
+            matches = re.findall(r"(\w+):\s+(\d+)", out)
+            for match in matches:
+                key = match[0]
+                value = int(match[1])
+                tmp_data[key] = value
+            xstats_data[port_id] = tmp_data
+        return xstats_data
+
+    @staticmethod
+    def generate_random_packets(
+        dstmac=None,
+        pktnum=100,
+        random_type=None,
+        ip_increase=True,
+        random_payload=False,
+        options=None,
+    ):
+        """
+        generate the random packets,
+        """
+        pkt = Packet()
+        pkt.generate_random_pkts(
+            dstmac=dstmac,
+            pktnum=pktnum,
+            random_type=random_type,
+            ip_increase=ip_increase,
+            random_payload=random_payload,
+            options=options,
+        )
+        return pkt
+
+    def start_tcpdump_output_pcap_file(self, port_inface, count=0, filters=None):
+        """
+        start tcpdump to capture the pkts
+        :param port_inface: port interface name
+        :param count: limit capture the pkts num
+        :param filters:
+            add filter: [{"layer": "ether", "config": {"src": "xxxx"}}]
+        :return:
+        """
+        index = self.tester.tcpdump_sniff_packets(
+            port_inface, count=count, filters=filters
+        )
+        return index
+
+    def stop_tcpdump_and_get_pkts(self, pcap_file):
+        """
+        stop tcpdump and parse the pcap file
+        """
+        pkts = self.tester.load_tcpdump_sniff_packets(pcap_file)
+        return pkts
+
+    def set_pmd_fwd_mode(self, fwd_mode="mac", pmd_session=None):
+        """
+        set testpmd fwd
+        """
+        pmd_session = pmd_session or self.pmd_session
+        self.execute_pmd_cmd(
+            ["set fwd %s" % fwd_mode, "set verbose 1", "start"], pmd_session=pmd_session
+        )
+
+    def send_pkts(
+        self,
+        pkt_list: list,
+        tester_tx_interface=None,
+        packet_count=1,
+        packet_interval=0.01,
+    ):
+        """
+        send pkts with packet obj
+        """
+        tester_tx_interface = (
+            self.tester_tx_interface
+            if tester_tx_interface is None
+            else tester_tx_interface
+        )
+        for _pkt in pkt_list:
+            _pkt.send_pkt(
+                crb=self.tester,
+                tx_port=tester_tx_interface,
+                count=packet_count,
+                interval=packet_interval,
+            )
+
+    def execute_fwd_check_process(
+        self,
+        packets,
+        pmd_commands=None,
+        rx_port=0,
+        tx_port=0,
+        packet_interval=0.01,
+        packet_count=1,
+        tcpdump_filter=None,
+        tester_tx_interface=None,
+        tester_rx_interface=None,
+    ):
+        """
+        pkt fwd flow: tcpdump ---> send pkt ---> testpmd output/tcpdump capture pkts
+        :return:
+            1.capture the tcpdump pkts
+            2.pmd output
+            3.use pmd output to get the pkts stats
+        """
+        if isinstance(packets, list):
+            pkt_list = packets
+        else:
+            pkt_list = [packets]
+        tester_rx_interface = (
+            self.tester_rx_interface
+            if tester_rx_interface is None
+            else tester_rx_interface
+        )
+        inst = self.start_tcpdump_output_pcap_file(
+            port_inface=tester_rx_interface,
+            count=len(packets) * packet_count,
+            filters=tcpdump_filter,
+        )
+        time.sleep(3)
+        if pmd_commands:
+            self.execute_pmd_cmd(pmd_commands)
+        self.execute_pmd_cmd("clear port stats all")
+        self.send_pkts(
+            pkt_list=pkt_list,
+            packet_count=packet_count,
+            packet_interval=packet_interval,
+            tester_tx_interface=tester_tx_interface,
+        )
+        time.sleep(packet_interval * len(pkt_list) * packet_count)
+        packets_captured = self.stop_tcpdump_and_get_pkts(inst)
+        self.logger.info("capture the pkt: {}".format(str(list(packets_captured))))
+        pmdout = self.pmd_session.get_output()
+        tx_stats = self.pmd_session.get_pmd_stats(tx_port)
+        rx_stats = self.pmd_session.get_pmd_stats(rx_port)
+        stats = {tx_port: tx_stats, rx_port: rx_stats}
+
+        return packets_captured, pmdout, stats
+
+
+class RxTxBaseTest(FuncTestBase):
+    def basic_rx_check(
+        self, packets_num, packet_dst_mac=None, pmd_commands=None, rx_port=0, tx_port=0
+    ):
+        """
+        set fwd rxonly and check the rece pkts num
+        """
+        self.set_pmd_fwd_mode(fwd_mode="rxonly")
+        random_pkt = self.generate_random_packets(
+            dstmac=packet_dst_mac, pktnum=packets_num
+        )
+        _, pmdout, stats = self.execute_fwd_check_process(
+            packets=random_pkt,
+            pmd_commands=pmd_commands,
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        self.verify(packet_dst_mac in pmdout, "receive packet fail")
+        rece_pkts_num = self.get_received_pkt_num(pmdout)
+        self.verify(rece_pkts_num == packets_num, "receive packet num is not match")
+
+    def basic_tx_check(self):
+        """
+        set fwd txonly, check:
+            1.testpmd output tx-pkts != 0
+            2.the tcpdump can capture pkts
+        """
+        self.execute_pmd_cmd("stop")
+        self.execute_pmd_cmd("set fwd txonly")
+        index = self.start_tcpdump_output_pcap_file(self.tester_rx_interface, count=100)
+        self.execute_pmd_cmd("start")
+        time.sleep(1)
+        self.execute_pmd_cmd("stop")
+        pkts_num = self.stop_tcpdump_and_get_pkts(index)
+        stats = self.pmd_session.get_pmd_stats(0)
+        self.verify(
+            stats["TX-packets"] != 0
+            and len(pkts_num) == 100
+            and stats["TX-packets"] > len(pkts_num),
+            "send packet num is not match",
+        )
+
+    def basic_macfwd_check(
+        self,
+        packet_num,
+        dst_mac=None,
+        check_miss=False,
+        pmd_commands=None,
+        rx_port=0,
+        tx_port=0,
+    ):
+        """
+        mac fwd, check rx-pkts and tx-pkt num is correct
+        if check_miss is true, it will check rx/tx is 0
+        """
+        random_pkt = self.generate_random_packets(dstmac=dst_mac, pktnum=packet_num)
+        if not pmd_commands:
+            self.set_pmd_fwd_mode()
+        packets_captured, pmdout, stats = self.execute_fwd_check_process(
+            packets=random_pkt,
+            pmd_commands=pmd_commands,
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        rece_pkts_num = self.get_received_pkt_num(pmdout)
+        if check_miss:
+            packet_num = 0
+        self.verify(
+            stats[tx_port]["RX-packets"] == packet_num,
+            "receive packet num is not match",
+        )
+        self.verify(
+            stats[tx_port]["RX-errors"] == 0, "some pkts have rx-errors in testpmd"
+        )
+        self.verify(
+            stats[rx_port]["TX-packets"] == packet_num,
+            "receive packet num is not match",
+        )
+        self.verify(
+            rece_pkts_num == packet_num == len(packets_captured),
+            "receive packet num is not match",
+        )
+
+    def basic_xstats_check(
+        self, packet_num, dst_mac=None, rx_port=0, tx_port=0, payload_size=64
+    ):
+        """
+        1. default stats check
+        2. send pkt and check testpmd stats and xstats
+        3. send pkt and clear port stats and check xstats
+        4. send pkt and clear xstats and check xstats
+        """
+        random_pkt = self.generate_random_packets(
+            dstmac=dst_mac,
+            pktnum=packet_num,
+            options={
+                "ip": {"src": "192.168.0.1", "dst": "192.168.1.1"},
+                "layers_config": [("raw", {"payload": ["58"] * payload_size})],
+            },
+        )
+        self.execute_pmd_cmd("clear port xstats all")
+        xstats_table = self.get_xstats_table([rx_port, tx_port])
+        for port in xstats_table.keys():
+            self.verify(
+                not any(xstats_table[port].values()),
+                "xstats Initial value error! port {} xstats "
+                "data is {}".format(port, xstats_table[port]),
+            )
+        _, _, stats_table = self.execute_fwd_check_process(
+            packets=random_pkt,
+            pmd_commands=[
+                "port config all rss all",
+                "set fwd mac",
+                "clear port xstats all",
+                "start",
+            ],
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        xstats_table = self.get_xstats_table([rx_port, tx_port])
+        return stats_table, xstats_table
+
+    def basic_promisc_check(
+        self, match_mac, unmatch_mac, pmd_commands=None, rx_port=0, tx_port=0
+    ):
+        """
+        use match and unmatch pkts to test promisc
+        test flow: default mode --> set promisc off --> set promisc on
+        note: if test vf promisc, confirm the vf primisc enable(kernel need to set trust on)
+        """
+        unmatch_pkt = self.generate_random_packets(dstmac=unmatch_mac, pktnum=1)
+        match_pkt = self.generate_random_packets(dstmac=match_mac, pktnum=1)
+        self.set_pmd_fwd_mode(fwd_mode="mac")
+        self.logger.info("check the default promisc mode")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=[unmatch_pkt, match_pkt],
+            pmd_commands=pmd_commands,
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        self.verify(
+            match_mac in pmdout and unmatch_mac in pmdout,
+            "enable promisc not receive all pkts",
+        )
+        self.logger.info("check disable promisc mode")
+        self.execute_pmd_cmd("set promisc all off")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port
+        )
+        self.verify(
+            match_mac in pmdout and unmatch_mac not in pmdout,
+            "disable promisc should receive match pkt",
+        )
+        self.logger.info("check re-enable promisc mode")
+        self.execute_pmd_cmd("set promisc all on")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port
+        )
+        self.verify(
+            match_mac in pmdout and unmatch_mac in pmdout,
+            "enable promisc should receive all pkt",
+        )
+
+    def basic_multicast_check(
+        self, normal_mac, multicast_mac, pmd_commands=None, rx_port=0, tx_port=0
+    ):
+        """
+        use normal mac and multicast mac to test
+        """
+        normal_pkt = self.generate_random_packets(dstmac=normal_mac, pktnum=1)
+        multicast_pkt = self.generate_random_packets(dstmac=multicast_mac, pktnum=1)
+        self.execute_pmd_cmd(
+            [
+                "set allmulti all off",
+                "set promisc all off",
+            ],
+        )
+        self.set_pmd_fwd_mode(fwd_mode="mac")
+        self.logger.info("check the default pmd multicast")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=[normal_pkt, multicast_pkt],
+            pmd_commands=pmd_commands,
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        self.verify(
+            normal_mac in pmdout and multicast_mac not in pmdout,
+            "the default can not receive multicast pkt",
+        )
+        self.execute_pmd_cmd(
+            [
+                "set allmulti all on",
+                "mcast_addr add 0 {}".format(multicast_mac),
+            ],
+        )
+        self.logger.info("check enable pmd multicast")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=[normal_pkt, multicast_pkt], rx_port=rx_port, tx_port=tx_port
+        )
+        self.verify(
+            normal_mac in pmdout and multicast_mac in pmdout,
+            "enable multicast not receive multicast pkt",
+        )
+
+    def basic_rss_check(
+        self, dst_mac, rss_type, queue_num, pmd_commands=None, rx_port=0, tx_port=0
+    ):
+        """
+        use pkt type mapping to rss type, and check rss func
+        """
+        rss2pkt_dict = {
+            "ip": "IP_RAW",
+            "tcp": "TCP",
+            "udp": "UDP",
+        }
+        rss_pkts = self.generate_random_packets(
+            dstmac=dst_mac, pktnum=30, random_type=[rss2pkt_dict[rss_type]]
+        )
+        self.set_pmd_fwd_mode(fwd_mode="mac")
+        self.execute_pmd_cmd("port config all rss %s" % rss_type)
+        _, pmdout, stats = self.execute_fwd_check_process(
+            packets=rss_pkts,
+            pmd_commands=pmd_commands,
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        hashes, queues = self.get_hash_and_queues(pmdout)
+        self.verify(
+            len(set(queues)) == int(queue_num)
+            and len(queues) == len(hashes) == stats[tx_port]["RX-packets"],
+            "some pkt and queue can not get get the hash",
+        )
+        return zip(hashes, queues)
+
+    def rss_reta_config_check(self, rss_reta: list, port_id=0, reta_size=64):
+        """
+        check the rss reta after setting new rss rate in testpmd
+        """
+        reta_mask = "0x{}".format(int(reta_size / 4) * "f")
+        default_rss_reta = self.execute_pmd_cmd(
+            "show port {} rss reta {} ({})".format(port_id, reta_size, reta_mask)
+        )
+        for i, j in zip(list(range(reta_size)), rss_reta):
+            self.execute_pmd_cmd("port config %d rss reta (%d,%d)" % (port_id, i, j))
+        change_rss_reta = self.execute_pmd_cmd(
+            "show port {} rss reta {} ({})".format(port_id, reta_size, reta_mask)
+        )
+        self.verify(default_rss_reta != change_rss_reta, "port config rss reta failed")
+
+    def rss_reta_hit_check(self, hash_table, rss_reta: list, reta_size=64):
+        """
+        check the rece pkts hash value can map the queue according to rss reta
+        """
+        hit_hash = False
+        for rss_hash, rss_queue in hash_table:
+            for i, j in zip(list(range(reta_size)), rss_reta):
+                if int(rss_hash, 16) % reta_size == i and int(rss_queue, 16) == j:
+                    hit_hash = True
+                    break
+                else:
+                    hit_hash = False
+            self.verify(hit_hash, "some pkt not directed by rss.")
+
+    def basic_rss_hash_key_check(
+        self, dst_mac, hash_key, port_id=0, pmd_commands=None, rx_port=0, tx_port=0
+    ):
+        """
+        check the hash values is different after setting rss hash key
+        """
+        pkt = self.generate_using_packets(pkt_type="UDP", dst_mac=dst_mac)
+        self.set_pmd_fwd_mode("mac")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=pkt, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port
+        )
+        hash_1, queue_1 = self.get_hash_and_queues(pmdout)
+        self.execute_pmd_cmd(
+            "port config {} rss-hash-key ipv4 {}".format(port_id, hash_key)
+        )
+        out = self.execute_pmd_cmd("show port 0 rss-hash key")
+        self.verify(hash_key.upper() in out, "rss hash key update failed")
+        _, pmdout, _ = self.execute_fwd_check_process(
+            packets=pkt, rx_port=rx_port, tx_port=tx_port
+        )
+        hash_2, queue_2 = self.get_hash_and_queues(pmdout)
+        self.verify(hash_1 != hash_2, "hash value should be different")
+
+    def basic_pmd_info_check(self, port_obj, port_id=0):
+        """
+        check the link speed and link status
+        """
+        link_status, link_speed = self.get_pmd_port_infomation(port_id)
+        link_speed_host = int(port_obj.get_nic_speed()) // 1000
+        self.verify(link_status == "up", "link stats has error")
+        self.verify(
+            int(link_speed) == link_speed_host,
+            "link speed has error",
+        )
+
+
+class VlanFuncBaseTest(FuncTestBase):
+    def vlan_pkts_fwd_check(
+        self, pkts, pmd_commands=None, port_id=0, rx_port=0, tx_port=0
+    ):
+        """
+        send pkts and return rece num, vlan dict and vlan id list
+        """
+        packets_captured, pmdout, _ = self.execute_fwd_check_process(
+            packets=pkts, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port
+        )
+        rece_num = self.get_received_pkt_num(pmdout, port_id=port_id)
+        vlans, vlan_id_list = self.get_pkts_vlan_layer(packets_captured, "vlan")
+        self.logger.info("capture the TX pkts vlans: {}".format(vlans))
+        return rece_num, packets_captured, vlans, vlan_id_list
+
+    def vlan_offload_flag_check(self, port_id=0, **kwargs):
+        """
+        check the vlan offload flag status:
+            filter="on"
+            strip="on"
+              ...
+        """
+        out = self.execute_pmd_cmd("show port info %d" % port_id)
+        for flag in kwargs.keys():
+            p = "VLAN offload.*\n.*?%s (\w+)" % flag
+            vlan_stats = re.search(p, out).group(1)
+            self.logger.info("{} flag is {}".format(flag, vlan_stats))
+            self.verify(
+                vlan_stats == kwargs[flag], "the vlan offload flag is incorrect"
+            )
+
+    def vlan_prio_check(self, pkts, **kwargs):
+        """
+
+        :param pkts:
+        :param kwargs:
+        :return:
+        """
+        packets_captured, _, _ = self.execute_fwd_check_process(packets=pkts)
+        vlans, _ = self.get_pkts_vlan_layer(packets_captured, "prio")
+        self.logger.info("vlan prio: {}".format(vlans))
+        for _prio in kwargs.keys():
+            for _vlan in vlans:
+                self.verify(
+                    _vlan[_prio] == kwargs[_prio], "the vlan prio values not matched"
+                )
+
+    def set_pvid_from_pf(self, pf_intf, vf_id=0, vlan_id=0):
+        """
+
+        :return:
+        """
+        self.execute_host_cmd(
+            "ip link set {} vf {} vlan {}".format(pf_intf, vf_id, vlan_id)
+        )
+        output = self.execute_host_cmd("ip link show {}".format(pf_intf))
+        if vlan_id != 0:
+            self.verify("vlan %d" % vlan_id in output, "Failed to add pvid on VF")
+        else:
+            self.verify("vlan" not in output, "Failed to add pvid on VF")
+
+    def basic_vlan_filter_check(
+        self,
+        vlan_id,
+        match_pkt,
+        unmatch_pkt,
+        pmd_commands=None,
+        port_id=0,
+        double_vlan=False,
+        rx_port=0,
+        tx_port=0,
+    ):
+        """
+        send match and unmatch pkt to check vlan filter
+        double vlan have 2 vlan id
+        """
+        if not isinstance(match_pkt, list):
+            match_pkt = [match_pkt]
+        if not isinstance(unmatch_pkt, list):
+            unmatch_pkt = [unmatch_pkt]
+        rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(
+            pkts=match_pkt + unmatch_pkt,
+            port_id=port_id,
+            pmd_commands=pmd_commands,
+            rx_port=rx_port,
+            tx_port=tx_port,
+        )
+        self.verify(
+            rece_num == len(match_pkt) and vlan_id in vlan_id_list,
+            "failed receive vlan pkts",
+        )
+        if double_vlan:
+            self.verify(
+                len(vlan_id_list) == len(match_pkt) * 2, "failed receive vlan pkts"
+            )
+        else:
+            self.verify(len(vlan_id_list) == len(match_pkt), "failed receive vlan pkts")
+
+    def basic_vlan_strip_check(
+        self,
+        vlan_id,
+        match_pkt,
+        pmd_commands=None,
+        port_id=0,
+        double_vlan=False,
+        rx_port=0,
+        tx_port=0,
+    ):
+        """
+        send vlan pkts to check vlan strip
+        single vlan: after strip, the rece pkt not have vlan id
+        double vlan: after strip, the rece pkt have single vlan
+        """
+        rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(
+            match_pkt,
+            port_id=port_id,
+            pmd_commands=pmd_commands,
+            tx_port=tx_port,
+            rx_port=rx_port,
+        )
+        if double_vlan:
+            self.verify(
+                rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt),
+                "Failed to strip double vlan tag",
+            )
+        else:
+            self.verify(
+                rece_num == len(match_pkt) and len(vlan_id_list) == 0,
+                "Failed to strip vlan tag",
+            )
+        self.execute_pmd_cmd("vlan set strip off %s" % port_id)
+        rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(
+            match_pkt, port_id=port_id, tx_port=tx_port, rx_port=rx_port
+        )
+        if double_vlan:
+            self.verify(
+                rece_num == len(match_pkt)
+                and len(vlan_id_list) == len(match_pkt) * 2
+                and vlan_id in vlan_id_list,
+                "Failed to receive vlan pkts with vlan tag",
+            )
+        else:
+            self.verify(
+                rece_num == len(match_pkt)
+                and len(vlan_id_list) == len(match_pkt)
+                and vlan_id in vlan_id_list,
+                "Failed to receive vlan pkts with vlan tag",
+            )
+
+    def basic_vlan_insert_check(
+        self,
+        vlan_id,
+        insert_vlan,
+        match_pkt,
+        pmd_commands=None,
+        port_id=0,
+        double_vlan=None,
+        rx_port=0,
+        tx_port=0,
+    ):
+        """
+        single vlan insert: send normal pkt, the tx get the single vlan pkt
+        double vlan insert: send single vlan pkt, the vlan insert to outer vlan and the default vlan become inner vlan
+        """
+        rece_num, _, vlans, vlan_id_list = self.vlan_pkts_fwd_check(
+            match_pkt,
+            port_id=port_id,
+            pmd_commands=pmd_commands,
+            tx_port=tx_port,
+            rx_port=rx_port,
+        )
+        if double_vlan:
+            self.verify(
+                rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt) * 2,
+                "Failed to receive vlan pkts with vlan tag",
+            )
+            self.verify(
+                all(
+                    [
+                        insert_vlan == _vlan["outer"] and vlan_id == _vlan["inner"]
+                        for _vlan in vlans
+                    ]
+                ),
+                "the insert vlan is incorrect",
+            )
+        else:
+            self.verify(
+                rece_num == len(match_pkt)
+                and len(vlan_id_list) == len(match_pkt)
+                and insert_vlan in vlan_id_list,
+                "Failed to receive vlan pkts with vlan tag",
+            )
+            self.verify(
+                all([insert_vlan == _vlan["outer"] for _vlan in vlans]),
+                "the insert vlan is incorrect",
+            )