get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/129264/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 129264,
    "url": "http://patchwork.dpdk.org/api/patches/129264/?format=api",
    "web_url": "http://patchwork.dpdk.org/project/dts/patch/20230705105344.1031168-2-zhiminx.huang@intel.com/",
    "project": {
        "id": 3,
        "url": "http://patchwork.dpdk.org/api/projects/3/?format=api",
        "name": "DTS",
        "link_name": "dts",
        "list_id": "dts.dpdk.org",
        "list_email": "dts@dpdk.org",
        "web_url": "",
        "scm_url": "git://dpdk.org/tools/dts",
        "webscm_url": "http://git.dpdk.org/tools/dts/",
        "list_archive_url": "https://inbox.dpdk.org/dts",
        "list_archive_url_format": "https://inbox.dpdk.org/dts/{}",
        "commit_url_format": ""
    },
    "msgid": "<20230705105344.1031168-2-zhiminx.huang@intel.com>",
    "list_archive_url": "https://inbox.dpdk.org/dts/20230705105344.1031168-2-zhiminx.huang@intel.com",
    "date": "2023-07-05T10:53:39",
    "name": "[V2,1/6] tests/func_test_base:add new commom module to refactor func test cases",
    "commit_ref": null,
    "pull_url": null,
    "state": "new",
    "archived": false,
    "hash": "001c3012e8db9a906abfa97c120a782b3921c265",
    "submitter": {
        "id": 1685,
        "url": "http://patchwork.dpdk.org/api/people/1685/?format=api",
        "name": "Huang, ZhiminX",
        "email": "zhiminx.huang@intel.com"
    },
    "delegate": null,
    "mbox": "http://patchwork.dpdk.org/project/dts/patch/20230705105344.1031168-2-zhiminx.huang@intel.com/mbox/",
    "series": [
        {
            "id": 28816,
            "url": "http://patchwork.dpdk.org/api/series/28816/?format=api",
            "web_url": "http://patchwork.dpdk.org/project/dts/list/?series=28816",
            "date": "2023-07-05T10:53:38",
            "name": "add new common module and add new suites",
            "version": 2,
            "mbox": "http://patchwork.dpdk.org/series/28816/mbox/"
        }
    ],
    "comments": "http://patchwork.dpdk.org/api/patches/129264/comments/",
    "check": "pending",
    "checks": "http://patchwork.dpdk.org/api/patches/129264/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dts-bounces@dpdk.org>",
        "X-Original-To": "patchwork@inbox.dpdk.org",
        "Delivered-To": "patchwork@inbox.dpdk.org",
        "Received": [
            "from mails.dpdk.org (mails.dpdk.org [217.70.189.124])\n\tby inbox.dpdk.org (Postfix) with ESMTP id ECAE442DAB;\n\tWed,  5 Jul 2023 04:38:19 +0200 (CEST)",
            "from mails.dpdk.org (localhost [127.0.0.1])\n\tby mails.dpdk.org (Postfix) with ESMTP id E771C40A8B;\n\tWed,  5 Jul 2023 04:38:19 +0200 (CEST)",
            "from mga12.intel.com (mga12.intel.com [192.55.52.136])\n by mails.dpdk.org (Postfix) with ESMTP id E203040FAE\n for <dts@dpdk.org>; Wed,  5 Jul 2023 04:38:17 +0200 (CEST)",
            "from fmsmga008.fm.intel.com ([10.253.24.58])\n by fmsmga106.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384;\n 04 Jul 2023 19:38:17 -0700",
            "from unknown (HELO localhost.localdomain) ([10.239.252.96])\n by fmsmga008-auth.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384;\n 04 Jul 2023 19:38:08 -0700"
        ],
        "DKIM-Signature": "v=1; a=rsa-sha256; c=relaxed/simple;\n d=intel.com; i=@intel.com; q=dns/txt; s=Intel;\n t=1688524698; x=1720060698;\n h=from:to:cc:subject:date:message-id:in-reply-to:\n references:mime-version:content-transfer-encoding;\n bh=PwVz7sGLHG10d/3xOtAddipaxgP4PnE9vrObpvd0IX0=;\n b=KduUbDS6/CPR2D1AO9kA7GOnKgo+3oS5X2scW9ttIIvkJkKkok+SHNyo\n qxXKdxbsxNWVVW7OOP5fAoe9q2PlLk47X9PHOrFzq0YkanX6zLhS59Kqx\n G+UdF7ipsFikEu7U80ih/zG1IEFajbtpNOnZ7pe0K7whxcerwZ688zNBh\n lFRSSKB0fvzzslwzFCdOHEsGIHhxb2BD1Wxr3L9OCgRAFDRYIfmcsyn5S\n NY5vJEtfRM42m2s7wVHjdeS10uJoKAP+TGDnpRvkC5mCci0fPpww2Xmjb\n ZcPa89YdeYKlsfY1CK/mcdhrXNVc4/FRE4yidxKnEO/f/+G1jt7zxqlkL w==;",
        "X-IronPort-AV": [
            "E=McAfee;i=\"6600,9927,10761\"; a=\"342821689\"",
            "E=Sophos;i=\"6.01,181,1684825200\"; d=\"scan'208\";a=\"342821689\"",
            "E=McAfee;i=\"6600,9927,10761\"; a=\"784364003\"",
            "E=Sophos;i=\"6.01,181,1684825200\"; d=\"scan'208\";a=\"784364003\""
        ],
        "X-ExtLoop1": "1",
        "From": "Zhimin Huang <zhiminx.huang@intel.com>",
        "To": "dts@dpdk.org",
        "Cc": "Zhimin Huang <zhiminx.huang@intel.com>",
        "Subject": "[dts][PATCH V2 1/6] tests/func_test_base:add new commom module to\n refactor func test cases",
        "Date": "Wed,  5 Jul 2023 10:53:39 +0000",
        "Message-Id": "<20230705105344.1031168-2-zhiminx.huang@intel.com>",
        "X-Mailer": "git-send-email 2.25.1",
        "In-Reply-To": "<20230705105344.1031168-1-zhiminx.huang@intel.com>",
        "References": "<20230705105344.1031168-1-zhiminx.huang@intel.com>",
        "MIME-Version": "1.0",
        "Content-Transfer-Encoding": "8bit",
        "X-BeenThere": "dts@dpdk.org",
        "X-Mailman-Version": "2.1.29",
        "Precedence": "list",
        "List-Id": "test suite reviews and discussions <dts.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dts>,\n <mailto:dts-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dts/>",
        "List-Post": "<mailto:dts@dpdk.org>",
        "List-Help": "<mailto:dts-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dts>,\n <mailto:dts-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dts-bounces@dpdk.org"
    },
    "content": "for some vf test suites, there are duplicate methods implemented in\nsuite.\nso add the public method into func_test_base,and encapsulate classes for\nthe basic testing process.\n\nSigned-off-by: Zhimin Huang <zhiminx.huang@intel.com>\n---\n tests/func_test_base.py | 977 ++++++++++++++++++++++++++++++++++++++++\n 1 file changed, 977 insertions(+)\n create mode 100644 tests/func_test_base.py",
    "diff": "diff --git a/tests/func_test_base.py b/tests/func_test_base.py\nnew file mode 100644\nindex 00000000..8b946950\n--- /dev/null\n+++ b/tests/func_test_base.py\n@@ -0,0 +1,977 @@\n+# SPDX-License-Identifier: BSD-3-Clause\n+# Copyright(c) 2023 Intel Corporation\n+#\n+\n+import re\n+import time\n+import traceback\n+\n+from framework.packet import Packet\n+from framework.pmd_output import PmdOutput\n+from framework.virt_common import VM\n+\n+supported_vf_driver = [\"pci-stub\", \"vfio-pci\"]\n+\n+\n+class FuncTestBase(object):\n+    def __init__(self, test_case, tester_tx_interface, tester_rx_interface):\n+        self.test_case = test_case\n+        self.verify = self.test_case.verify\n+        self.logger = test_case.logger\n+        self.dut = self.test_case.dut\n+        self.tester = self.test_case.tester\n+\n+        self.vm_info = []\n+        self.pmd_session = None\n+        self.tester_tx_interface = tester_tx_interface\n+        self.tester_rx_interface = tester_rx_interface\n+\n+        self.pkt = Packet()\n+        self.vf_driver = self.test_case.get_suite_cfg()[\"vf_driver\"] or \"pci-stub\"\n+        self.vm_specified_driver = self.get_vm_specified_driver()\n+\n+    def check_port_num_for_test(self, port_num: int):\n+        \"\"\"\n+        check the port num for test requirement\n+        \"\"\"\n+        dut_ports = self.dut.get_ports(self.test_case.nic)\n+        self.verify(len(dut_ports) >= port_num, \"Unsupported port num for test require\")\n+\n+    def get_vm_specified_driver(self):\n+        \"\"\"\n+        check the vf support driver and return vm driver\n+        :return: vm_specified_driver\n+        \"\"\"\n+        self.verify(self.vf_driver in supported_vf_driver, \"Unsupported vf driver\")\n+        if self.vf_driver == \"pci-stub\":\n+            vm_specified_driver = \"pci-assign\"\n+        else:\n+            vm_specified_driver = \"vfio-pci\"\n+        return vm_specified_driver\n+\n+    def create_vf(self, pf_port, vfs_num: int, driver=\"default\"):\n+        \"\"\"\n+        create vfs, support multi pfs to create multi vfs\n+        :param pf_port: pf port or port list\n+        :param vfs_num: create vf num\n+        :param driver: set vf driver on dpdk\n+        :return: vf net device object\n+        \"\"\"\n+        sriov_vfs_obj = []\n+        try:\n+            self.test_case.bind_nic_driver(\n+                self.dut.get_ports(self.test_case.nic), driver=self.test_case.kdriver\n+            )\n+        except Exception as e:\n+            self.logger.info(traceback.format_exc(e))\n+        if hasattr(self.test_case, \"pf_config\"):\n+            self.test_case.pf_config()\n+        if isinstance(pf_port, int):\n+            pf_port = [pf_port]\n+        for _port in pf_port:\n+            self.dut.generate_sriov_vfs_by_port(_port, vfs_num, driver=driver)\n+            sriov_vfs_obj.append(self.dut.ports_info[_port][\"vfs_port\"])\n+            self.dut.send_expect(\n+                \"ifconfig %s up\" % self.dut.ports_info[_port][\"intf\"], \"#\"\n+            )\n+            res = self.dut.is_interface_up(self.dut.ports_info[_port][\"intf\"])\n+            self.verify(\n+                res, \"%s link status is down\" % self.dut.ports_info[_port][\"intf\"]\n+            )\n+        if hasattr(self.test_case, \"vf_config\"):\n+            self.test_case.vf_config()\n+        for vf_port in sriov_vfs_obj:\n+            for _vf in vf_port:\n+                _vf.bind_driver(self.vf_driver)\n+        return sriov_vfs_obj\n+\n+    def destroy_vf(self, pf_port=\"all\"):\n+        \"\"\"\n+        destroy vfs\n+        :param pf_port: select the pf port to destroy vfs.\n+        the default will destroy all vfs.\n+        \"\"\"\n+        pf_port = pf_port if isinstance(pf_port, list) else [pf_port]\n+        if \"all\" in pf_port:\n+            # destory all vfs\n+            self.dut.destroy_all_sriov_vfs()\n+        else:\n+            for port in pf_port:\n+                # destroy the vf on the specified port\n+                self.dut.destroy_sriov_vfs_by_port(port)\n+\n+    def setup_vm_env(self, vm_name: str, sriov_pci: list):\n+        \"\"\"\n+        passtrough the vfs into vm, start vm.\n+        :param vm_name: select the vm to use\n+        :param sriov_pci: pci list\n+        :returns: vm object and vm dut session\n+        \"\"\"\n+        try:\n+            vm_obj = VM(self.dut, vm_name, self.test_case.suite_name)\n+            for _pci in sriov_pci:\n+                vm_obj.set_vm_device(\n+                    driver=self.vm_specified_driver, **{\"opt_host\": _pci}\n+                )\n+            vm_dut = vm_obj.start()\n+            if vm_dut is None:\n+                raise Exception(\"Set up VM ENV failed!\")\n+            self.vm_info.append((vm_obj, vm_dut))\n+            return vm_obj, vm_dut\n+        except Exception as e:\n+            self.destroy_vm_env(vm_obj_name=vm_name)\n+            raise Exception(e)\n+\n+    def destroy_vm_env(self, vm_obj_name=\"\"):\n+        \"\"\"\n+        destroy the specified vm name or all vms\n+        \"\"\"\n+        try:\n+            for vm_obj, vm_session in self.vm_info:\n+                if vm_obj_name == \"\":\n+                    # destoty all vm\n+                    vm_session.kill_all()\n+                    vm_obj.stop()\n+                    self.vm_info = []\n+                elif vm_obj.vm_name == vm_obj_name:\n+                    # destroy the vm with the specified name\n+                    vm_session.kill_all()\n+                    vm_obj.stop()\n+                    self.vm_info.remove((vm_obj, vm_session))\n+                else:\n+                    self.logger.warning(\"VM %s not found!!!\" % vm_obj_name)\n+        except Exception as e:\n+            self.dut.virt_exit()\n+            time.sleep(3)\n+            raise Exception(e)\n+\n+    def init_pmd_session(self, dut_obj):\n+        \"\"\"\n+        init PMD session\n+        \"\"\"\n+        self.pmd_session = PmdOutput(dut_obj)\n+\n+    def launch_testpmd(self, **kwargs):\n+        \"\"\"\n+        launch testpmd with testpmd session\n+        the default session is self.pmd_session\n+        :return: testpmd output\n+        \"\"\"\n+        pmd_session = kwargs.get(\"testpmd_obj\") or self.pmd_session\n+        out = pmd_session.start_testpmd(**kwargs)\n+        return out\n+\n+    def execute_pmd_cmd(self, cmd, **kwargs):\n+        \"\"\"\n+        execute multiple testpmd commands, return string output\n+        :param cmd: testpmd cmd, support str and list\n+        :return: testpmd output\n+        \"\"\"\n+        pmd_session = kwargs.get(\"pmd_session\") or self.pmd_session\n+        _cmds = [cmd] if isinstance(cmd, str) else cmd\n+        output = \"\"\n+        for _cmd in _cmds:\n+            output += pmd_session.execute_cmd(_cmd)\n+        return output\n+\n+    def execute_host_cmd(self, cmd, **kwargs):\n+        \"\"\"\n+        execute multiple host commands\n+        :param cmd: host commands, support str and list\n+        :return: host output\n+        \"\"\"\n+        dut_obj = kwargs.get(\"dut_obj\") or self.dut\n+        _cmd = [cmd, \"# \", 20] if isinstance(cmd, (str)) else cmd\n+        return dut_obj.send_expect(*_cmd)\n+\n+    def build_dpdk_apps(self, app_path):\n+        \"\"\"\n+        build dpdk apps and check the build status\n+        \"\"\"\n+        out = self.dut.build_dpdk_apps(app_path)\n+        self.verify(\"Error\" not in out, \"Compilation error\")\n+        self.verify(\"No such\" not in out, \"Compilation error\")\n+\n+    def vf_test_preset_env_vm(self, pf_port, vfs_num, vm_name, driver=\"default\"):\n+        \"\"\"\n+        create vfs and setup vm env\n+        \"\"\"\n+        if not isinstance(pf_port, list):\n+            pf_port = [pf_port]\n+        sriov_vfs_obj = self.create_vf(pf_port, vfs_num, driver=driver)\n+        vf_list = []\n+        for pf_id in pf_port:\n+            vf_list += [\n+                sriov_vfs_obj[pf_id][i].pci for i in range(len(sriov_vfs_obj[pf_id]))\n+            ]\n+        vm_obj, vm_dut = self.setup_vm_env(vm_name=vm_name, sriov_pci=vf_list)\n+        return vm_obj, vm_dut\n+\n+    def get_vf_mac_through_pf(self, pf_intf):\n+        \"\"\"\n+        use ip link show pf to get vf mac list\n+        \"\"\"\n+        out = self.dut.send_expect(\n+            \"ip link show {}\".format(pf_intf), \"# \", alt_session=True\n+        )\n+        vf_mac_pattern = r\"vf\\s+\\d+\\s+.*\\s+link\\/ether\\s+(\\S+)\\s+.*\"\n+        match = re.findall(vf_mac_pattern, out)\n+        return match\n+\n+    @staticmethod\n+    def generate_using_packets(pkt_type=None, pkt_str=None, **kwargs):\n+        \"\"\"\n+        generate using pkts:\n+            1.select the protocol type and generate the pkts through packet module\n+            2.select customized pkt string\n+        :return: pkt object\n+        \"\"\"\n+        pkt = Packet()\n+        dst_mac = kwargs.get(\"dst_mac\")\n+        vlan_id = kwargs.get(\"vlan_id\")\n+        if pkt_type:\n+            pkt = Packet(pkt_type=pkt_type)\n+            pkt.config_layer(\"ether\", {\"dst\": dst_mac})\n+            if vlan_id is not None:\n+                pkt.config_layer(\"vlan\", {\"vlan\": vlan_id})\n+        elif pkt_str:\n+            pkt.update_pkt(pkt_str)\n+        else:\n+            raise Exception(\"wrong pkt value\")\n+        return pkt\n+\n+    @staticmethod\n+    def get_received_pkt_num(output, port_id=0):\n+        \"\"\"\n+        use the testpmd output to get the receive pkts num for port\n+        \"\"\"\n+        pkt_pattern = (\n+            \"port\\s%d/queue\\s\\d+:\\sreceived\\s(\\d+)\\spackets.+?\\n.*length=\\d{2,}\\s\"\n+            % port_id\n+        )\n+        received_data = re.findall(pkt_pattern, output)\n+        received_pkts = sum(map(int, [i[0] for i in received_data]))\n+        return received_pkts\n+\n+    @staticmethod\n+    def get_hash_and_queues(out, port_id=0):\n+        \"\"\"\n+        use testpmd output to get hash values and queues\n+        \"\"\"\n+        hash_pattern = re.compile(\n+            \"port\\s%s/queue\\s\\d+:\\sreceived\\s\\d+\\spackets.+?\\n.*RSS\\shash=(\\w+)\\s-\\sRSS\\squeue=(\\w+)\"\n+            % port_id\n+        )\n+        hash_infos = hash_pattern.findall(out)\n+        if len(hash_infos) == 0:\n+            queue_pattern = re.compile(\"Receive\\squeue=(\\w+)\")\n+            queues = queue_pattern.findall(out)\n+            return [], queues\n+        hashes = [hash_info[0].strip() for hash_info in hash_infos]\n+        queues = [hash_info[1].strip() for hash_info in hash_infos]\n+        return hashes, queues\n+\n+    @staticmethod\n+    def get_pkts_vlan_layer(pkt_obj: Packet, vlan_layer):\n+        \"\"\"\n+        get pkts vlan layers\n+        :param pkt_obj: pkt object\n+        :param vlan_layer: vlan, prio etc...\n+        :return: vlan id list, if pkt object consists of multiple vlan pkts\n+        \"\"\"\n+        vlans = []\n+        vlan_layers_list = []\n+        for i in range(len(pkt_obj)):\n+            vlan_dict = {}\n+            try:\n+                outer_vlan = pkt_obj.strip_element_layer3(vlan_layer, p_index=i)\n+                vlan_dict[\"outer\"] = outer_vlan\n+            except Exception:\n+                pass\n+            try:\n+                inner_vlan = pkt_obj.strip_element_layer4(vlan_layer, p_index=i)\n+                vlan_dict[\"inner\"] = inner_vlan\n+            except Exception:\n+                pass\n+            vlans.append(vlan_dict)\n+        for _vlan in vlans:\n+            vlan_layers_list += list(_vlan.values())\n+        return vlans, vlan_layers_list\n+\n+    def get_pmd_port_infomation(self, port_id=0, **kwargs):\n+        \"\"\"\n+        use 'show ports info 0' to get port link status and port speed\n+        \"\"\"\n+        pmd_session = kwargs.get(\"pmd_session\") or self.pmd_session\n+        link_status = pmd_session.get_port_link_status(port_id)\n+        link_speed = pmd_session.get_port_link_speed(port_id)\n+        return link_status, link_speed\n+\n+    def convert_driver_version_value(self, check_version):\n+        \"\"\"\n+        convert the driver version to int list\n+        take the first three values in the list for comparison and limit intree driver\n+        for example:\n+            6.0.7-060007-generic: [6, 0, 7-060007-generic]\n+            1.11.0_rc59: [1, 11, 0]\n+            1.11.11: [1, 11, 11]\n+        \"\"\"\n+        try:\n+            value_list = list(map(int, re.split(r\"[.|_]\", check_version)[:3]))\n+        except ValueError as e:\n+            self.logger.warning(e)\n+            # the intree-driver has character, so set the return value is null list as the lowest driver version\n+            return []\n+        return value_list\n+\n+    @staticmethod\n+    def get_pmd_rece_pkt_len(output):\n+        \"\"\"\n+        get the pkt length in testpmd output\n+        \"\"\"\n+        pkt_length = re.findall(\"length=(\\d+)\", output)\n+        return pkt_length\n+\n+    def get_xstats_table(self, port_id_list):\n+        \"\"\"\n+        use 'show port xstats' to get xstats info dict\n+        \"\"\"\n+        xstats_data = dict()\n+        if not isinstance(port_id_list, list):\n+            port_id_list = [port_id_list]\n+        for port_id in port_id_list:\n+            out = self.execute_pmd_cmd(\"show port xstats %s\" % port_id)\n+            tmp_data = dict()\n+            matches = re.findall(r\"(\\w+):\\s+(\\d+)\", out)\n+            for match in matches:\n+                key = match[0]\n+                value = int(match[1])\n+                tmp_data[key] = value\n+            xstats_data[port_id] = tmp_data\n+        return xstats_data\n+\n+    @staticmethod\n+    def generate_random_packets(\n+        dstmac=None,\n+        pktnum=100,\n+        random_type=None,\n+        ip_increase=True,\n+        random_payload=False,\n+        options=None,\n+    ):\n+        \"\"\"\n+        generate the random packets,\n+        \"\"\"\n+        pkt = Packet()\n+        pkt.generate_random_pkts(\n+            dstmac=dstmac,\n+            pktnum=pktnum,\n+            random_type=random_type,\n+            ip_increase=ip_increase,\n+            random_payload=random_payload,\n+            options=options,\n+        )\n+        return pkt\n+\n+    def start_tcpdump_output_pcap_file(self, port_inface, count=0, filters=None):\n+        \"\"\"\n+        start tcpdump to capture the pkts\n+        :param port_inface: port interface name\n+        :param count: limit capture the pkts num\n+        :param filters:\n+            add filter: [{\"layer\": \"ether\", \"config\": {\"src\": \"xxxx\"}}]\n+        :return:\n+        \"\"\"\n+        index = self.tester.tcpdump_sniff_packets(\n+            port_inface, count=count, filters=filters\n+        )\n+        return index\n+\n+    def stop_tcpdump_and_get_pkts(self, pcap_file):\n+        \"\"\"\n+        stop tcpdump and parse the pcap file\n+        \"\"\"\n+        pkts = self.tester.load_tcpdump_sniff_packets(pcap_file)\n+        return pkts\n+\n+    def set_pmd_fwd_mode(self, fwd_mode=\"mac\", pmd_session=None):\n+        \"\"\"\n+        set testpmd fwd\n+        \"\"\"\n+        pmd_session = pmd_session or self.pmd_session\n+        self.execute_pmd_cmd(\n+            [\"set fwd %s\" % fwd_mode, \"set verbose 1\", \"start\"], pmd_session=pmd_session\n+        )\n+\n+    def send_pkts(\n+        self,\n+        pkt_list: list,\n+        tester_tx_interface=None,\n+        packet_count=1,\n+        packet_interval=0.01,\n+    ):\n+        \"\"\"\n+        send pkts with packet obj\n+        \"\"\"\n+        tester_tx_interface = (\n+            self.tester_tx_interface\n+            if tester_tx_interface is None\n+            else tester_tx_interface\n+        )\n+        for _pkt in pkt_list:\n+            _pkt.send_pkt(\n+                crb=self.tester,\n+                tx_port=tester_tx_interface,\n+                count=packet_count,\n+                interval=packet_interval,\n+            )\n+\n+    def execute_fwd_check_process(\n+        self,\n+        packets,\n+        pmd_commands=None,\n+        rx_port=0,\n+        tx_port=0,\n+        packet_interval=0.01,\n+        packet_count=1,\n+        tcpdump_filter=None,\n+        tester_tx_interface=None,\n+        tester_rx_interface=None,\n+    ):\n+        \"\"\"\n+        pkt fwd flow: tcpdump ---> send pkt ---> testpmd output/tcpdump capture pkts\n+        :return:\n+            1.capture the tcpdump pkts\n+            2.pmd output\n+            3.use pmd output to get the pkts stats\n+        \"\"\"\n+        if isinstance(packets, list):\n+            pkt_list = packets\n+        else:\n+            pkt_list = [packets]\n+        tester_rx_interface = (\n+            self.tester_rx_interface\n+            if tester_rx_interface is None\n+            else tester_rx_interface\n+        )\n+        inst = self.start_tcpdump_output_pcap_file(\n+            port_inface=tester_rx_interface,\n+            count=len(packets) * packet_count,\n+            filters=tcpdump_filter,\n+        )\n+        time.sleep(3)\n+        if pmd_commands:\n+            self.execute_pmd_cmd(pmd_commands)\n+        self.execute_pmd_cmd(\"clear port stats all\")\n+        self.send_pkts(\n+            pkt_list=pkt_list,\n+            packet_count=packet_count,\n+            packet_interval=packet_interval,\n+            tester_tx_interface=tester_tx_interface,\n+        )\n+        time.sleep(packet_interval * len(pkt_list) * packet_count)\n+        packets_captured = self.stop_tcpdump_and_get_pkts(inst)\n+        self.logger.info(\"capture the pkt: {}\".format(str(list(packets_captured))))\n+        pmdout = self.pmd_session.get_output()\n+        tx_stats = self.pmd_session.get_pmd_stats(tx_port)\n+        rx_stats = self.pmd_session.get_pmd_stats(rx_port)\n+        stats = {tx_port: tx_stats, rx_port: rx_stats}\n+\n+        return packets_captured, pmdout, stats\n+\n+\n+class RxTxBaseTest(FuncTestBase):\n+    def basic_rx_check(\n+        self, packets_num, packet_dst_mac=None, pmd_commands=None, rx_port=0, tx_port=0\n+    ):\n+        \"\"\"\n+        set fwd rxonly and check the rece pkts num\n+        \"\"\"\n+        self.set_pmd_fwd_mode(fwd_mode=\"rxonly\")\n+        random_pkt = self.generate_random_packets(\n+            dstmac=packet_dst_mac, pktnum=packets_num\n+        )\n+        _, pmdout, stats = self.execute_fwd_check_process(\n+            packets=random_pkt,\n+            pmd_commands=pmd_commands,\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        self.verify(packet_dst_mac in pmdout, \"receive packet fail\")\n+        rece_pkts_num = self.get_received_pkt_num(pmdout)\n+        self.verify(rece_pkts_num == packets_num, \"receive packet num is not match\")\n+\n+    def basic_tx_check(self):\n+        \"\"\"\n+        set fwd txonly, check:\n+            1.testpmd output tx-pkts != 0\n+            2.the tcpdump can capture pkts\n+        \"\"\"\n+        self.execute_pmd_cmd(\"stop\")\n+        self.execute_pmd_cmd(\"set fwd txonly\")\n+        index = self.start_tcpdump_output_pcap_file(self.tester_rx_interface, count=100)\n+        self.execute_pmd_cmd(\"start\")\n+        time.sleep(1)\n+        self.execute_pmd_cmd(\"stop\")\n+        pkts_num = self.stop_tcpdump_and_get_pkts(index)\n+        stats = self.pmd_session.get_pmd_stats(0)\n+        self.verify(\n+            stats[\"TX-packets\"] != 0\n+            and len(pkts_num) == 100\n+            and stats[\"TX-packets\"] > len(pkts_num),\n+            \"send packet num is not match\",\n+        )\n+\n+    def basic_macfwd_check(\n+        self,\n+        packet_num,\n+        dst_mac=None,\n+        check_miss=False,\n+        pmd_commands=None,\n+        rx_port=0,\n+        tx_port=0,\n+    ):\n+        \"\"\"\n+        mac fwd, check rx-pkts and tx-pkt num is correct\n+        if check_miss is true, it will check rx/tx is 0\n+        \"\"\"\n+        random_pkt = self.generate_random_packets(dstmac=dst_mac, pktnum=packet_num)\n+        if not pmd_commands:\n+            self.set_pmd_fwd_mode()\n+        packets_captured, pmdout, stats = self.execute_fwd_check_process(\n+            packets=random_pkt,\n+            pmd_commands=pmd_commands,\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        rece_pkts_num = self.get_received_pkt_num(pmdout)\n+        if check_miss:\n+            packet_num = 0\n+        self.verify(\n+            stats[tx_port][\"RX-packets\"] == packet_num,\n+            \"receive packet num is not match\",\n+        )\n+        self.verify(\n+            stats[tx_port][\"RX-errors\"] == 0, \"some pkts have rx-errors in testpmd\"\n+        )\n+        self.verify(\n+            stats[rx_port][\"TX-packets\"] == packet_num,\n+            \"receive packet num is not match\",\n+        )\n+        self.verify(\n+            rece_pkts_num == packet_num == len(packets_captured),\n+            \"receive packet num is not match\",\n+        )\n+\n+    def basic_xstats_check(\n+        self, packet_num, dst_mac=None, rx_port=0, tx_port=0, payload_size=64\n+    ):\n+        \"\"\"\n+        1. default stats check\n+        2. send pkt and check testpmd stats and xstats\n+        3. send pkt and clear port stats and check xstats\n+        4. send pkt and clear xstats and check xstats\n+        \"\"\"\n+        random_pkt = self.generate_random_packets(\n+            dstmac=dst_mac,\n+            pktnum=packet_num,\n+            options={\n+                \"ip\": {\"src\": \"192.168.0.1\", \"dst\": \"192.168.1.1\"},\n+                \"layers_config\": [(\"raw\", {\"payload\": [\"58\"] * payload_size})],\n+            },\n+        )\n+        self.execute_pmd_cmd(\"clear port xstats all\")\n+        xstats_table = self.get_xstats_table([rx_port, tx_port])\n+        for port in xstats_table.keys():\n+            self.verify(\n+                not any(xstats_table[port].values()),\n+                \"xstats Initial value error! port {} xstats \"\n+                \"data is {}\".format(port, xstats_table[port]),\n+            )\n+        _, _, stats_table = self.execute_fwd_check_process(\n+            packets=random_pkt,\n+            pmd_commands=[\n+                \"port config all rss all\",\n+                \"set fwd mac\",\n+                \"clear port xstats all\",\n+                \"start\",\n+            ],\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        xstats_table = self.get_xstats_table([rx_port, tx_port])\n+        return stats_table, xstats_table\n+\n+    def basic_promisc_check(\n+        self, match_mac, unmatch_mac, pmd_commands=None, rx_port=0, tx_port=0\n+    ):\n+        \"\"\"\n+        use match and unmatch pkts to test promisc\n+        test flow: default mode --> set promisc off --> set promisc on\n+        note: if test vf promisc, confirm the vf primisc enable(kernel need to set trust on)\n+        \"\"\"\n+        unmatch_pkt = self.generate_random_packets(dstmac=unmatch_mac, pktnum=1)\n+        match_pkt = self.generate_random_packets(dstmac=match_mac, pktnum=1)\n+        self.set_pmd_fwd_mode(fwd_mode=\"mac\")\n+        self.logger.info(\"check the default promisc mode\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=[unmatch_pkt, match_pkt],\n+            pmd_commands=pmd_commands,\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        self.verify(\n+            match_mac in pmdout and unmatch_mac in pmdout,\n+            \"enable promisc not receive all pkts\",\n+        )\n+        self.logger.info(\"check disable promisc mode\")\n+        self.execute_pmd_cmd(\"set promisc all off\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port\n+        )\n+        self.verify(\n+            match_mac in pmdout and unmatch_mac not in pmdout,\n+            \"disable promisc should receive match pkt\",\n+        )\n+        self.logger.info(\"check re-enable promisc mode\")\n+        self.execute_pmd_cmd(\"set promisc all on\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port\n+        )\n+        self.verify(\n+            match_mac in pmdout and unmatch_mac in pmdout,\n+            \"enable promisc should receive all pkt\",\n+        )\n+\n+    def basic_multicast_check(\n+        self, normal_mac, multicast_mac, pmd_commands=None, rx_port=0, tx_port=0\n+    ):\n+        \"\"\"\n+        use normal mac and multicast mac to test\n+        \"\"\"\n+        normal_pkt = self.generate_random_packets(dstmac=normal_mac, pktnum=1)\n+        multicast_pkt = self.generate_random_packets(dstmac=multicast_mac, pktnum=1)\n+        self.execute_pmd_cmd(\n+            [\n+                \"set allmulti all off\",\n+                \"set promisc all off\",\n+            ],\n+        )\n+        self.set_pmd_fwd_mode(fwd_mode=\"mac\")\n+        self.logger.info(\"check the default pmd multicast\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=[normal_pkt, multicast_pkt],\n+            pmd_commands=pmd_commands,\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        self.verify(\n+            normal_mac in pmdout and multicast_mac not in pmdout,\n+            \"the default can not receive multicast pkt\",\n+        )\n+        self.execute_pmd_cmd(\n+            [\n+                \"set allmulti all on\",\n+                \"mcast_addr add 0 {}\".format(multicast_mac),\n+            ],\n+        )\n+        self.logger.info(\"check enable pmd multicast\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=[normal_pkt, multicast_pkt], rx_port=rx_port, tx_port=tx_port\n+        )\n+        self.verify(\n+            normal_mac in pmdout and multicast_mac in pmdout,\n+            \"enable multicast not receive multicast pkt\",\n+        )\n+\n+    def basic_rss_check(\n+        self, dst_mac, rss_type, queue_num, pmd_commands=None, rx_port=0, tx_port=0\n+    ):\n+        \"\"\"\n+        use pkt type mapping to rss type, and check rss func\n+        \"\"\"\n+        rss2pkt_dict = {\n+            \"ip\": \"IP_RAW\",\n+            \"tcp\": \"TCP\",\n+            \"udp\": \"UDP\",\n+        }\n+        rss_pkts = self.generate_random_packets(\n+            dstmac=dst_mac, pktnum=30, random_type=[rss2pkt_dict[rss_type]]\n+        )\n+        self.execute_pmd_cmd(\"port config all rss %s\" % rss_type)\n+        self.set_pmd_fwd_mode(fwd_mode=\"mac\")\n+        _, pmdout, stats = self.execute_fwd_check_process(\n+            packets=rss_pkts,\n+            pmd_commands=pmd_commands,\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        hashes, queues = self.get_hash_and_queues(pmdout)\n+        self.verify(\n+            len(set(queues)) == int(queue_num)\n+            or len(queues) == len(hashes) == stats[tx_port][\"RX-packets\"],\n+            \"some pkt and queue can not get get the hash\",\n+        )\n+        return zip(hashes, queues)\n+\n+    def rss_reta_config_check(self, rss_reta: list, port_id=0, reta_size=64):\n+        \"\"\"\n+        check the rss reta after setting new rss rate in testpmd\n+        \"\"\"\n+        reta_mask = \"0x{}\".format(int(reta_size / 4) * \"f\")\n+        default_rss_reta = self.execute_pmd_cmd(\n+            \"show port {} rss reta {} ({})\".format(port_id, reta_size, reta_mask)\n+        )\n+        for i, j in zip(list(range(reta_size)), rss_reta):\n+            self.execute_pmd_cmd(\"port config %d rss reta (%d,%d)\" % (port_id, i, j))\n+        change_rss_reta = self.execute_pmd_cmd(\n+            \"show port {} rss reta {} ({})\".format(port_id, reta_size, reta_mask)\n+        )\n+        self.verify(default_rss_reta != change_rss_reta, \"port config rss reta failed\")\n+\n+    def rss_reta_hit_check(self, hash_table, rss_reta: list, reta_size=64):\n+        \"\"\"\n+        check the rece pkts hash value can map the queue according to rss reta\n+        \"\"\"\n+        hit_hash = False\n+        for rss_hash, rss_queue in hash_table:\n+            for i, j in zip(list(range(reta_size)), rss_reta):\n+                if int(rss_hash, 16) % reta_size == i and int(rss_queue, 16) == j:\n+                    hit_hash = True\n+                    break\n+                else:\n+                    hit_hash = False\n+            self.verify(hit_hash, \"some pkt not directed by rss.\")\n+\n+    def basic_rss_hash_key_check(\n+        self, dst_mac, hash_key, port_id=0, pmd_commands=None, rx_port=0, tx_port=0\n+    ):\n+        \"\"\"\n+        check the hash values is different after setting rss hash key\n+        \"\"\"\n+        pkt = self.generate_using_packets(pkt_type=\"UDP\", dst_mac=dst_mac)\n+        self.set_pmd_fwd_mode(\"mac\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=pkt, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port\n+        )\n+        hash_1, queue_1 = self.get_hash_and_queues(pmdout)\n+        self.execute_pmd_cmd(\n+            \"port config {} rss-hash-key ipv4 {}\".format(port_id, hash_key)\n+        )\n+        out = self.execute_pmd_cmd(\"show port 0 rss-hash key\")\n+        self.verify(hash_key.upper() in out, \"rss hash key update failed\")\n+        _, pmdout, _ = self.execute_fwd_check_process(\n+            packets=pkt, rx_port=rx_port, tx_port=tx_port\n+        )\n+        hash_2, queue_2 = self.get_hash_and_queues(pmdout)\n+        self.verify(hash_1 != hash_2, \"hash value should be different\")\n+\n+    def basic_pmd_info_check(self, port_obj, port_id=0):\n+        \"\"\"\n+        check the link speed and link status\n+        \"\"\"\n+        link_status, link_speed = self.get_pmd_port_infomation(port_id)\n+        link_speed_host = int(port_obj.get_nic_speed()) // 1000\n+        self.verify(link_status == \"up\", \"link stats has error\")\n+        self.verify(\n+            int(link_speed) == link_speed_host,\n+            \"link speed has error\",\n+        )\n+\n+\n+class VlanFuncBaseTest(FuncTestBase):\n+    def vlan_pkts_fwd_check(\n+        self, pkts, pmd_commands=None, port_id=0, rx_port=0, tx_port=0\n+    ):\n+        \"\"\"\n+        send pkts and return rece num, vlan dict and vlan id list\n+        \"\"\"\n+        packets_captured, pmdout, _ = self.execute_fwd_check_process(\n+            packets=pkts, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port\n+        )\n+        rece_num = self.get_received_pkt_num(pmdout, port_id=port_id)\n+        vlans, vlan_id_list = self.get_pkts_vlan_layer(packets_captured, \"vlan\")\n+        self.logger.info(\"capture the TX pkts vlans: {}\".format(vlans))\n+        return rece_num, packets_captured, vlans, vlan_id_list\n+\n+    def vlan_offload_flag_check(self, port_id=0, **kwargs):\n+        \"\"\"\n+        check the vlan offload flag status:\n+            filter=\"on\"\n+            strip=\"on\"\n+              ...\n+        \"\"\"\n+        out = self.execute_pmd_cmd(\"show port info %d\" % port_id)\n+        for flag in kwargs.keys():\n+            p = \"VLAN offload.*\\n.*?%s (\\w+)\" % flag\n+            vlan_stats = re.search(p, out).group(1)\n+            self.logger.info(\"{} flag is {}\".format(flag, vlan_stats))\n+            self.verify(\n+                vlan_stats == kwargs[flag], \"the vlan offload flag is incorrect\"\n+            )\n+\n+    def vlan_prio_check(self, pkts, **kwargs):\n+        \"\"\"\n+\n+        :param pkts:\n+        :param kwargs:\n+        :return:\n+        \"\"\"\n+        packets_captured, _, _ = self.execute_fwd_check_process(packets=pkts)\n+        vlans, _ = self.get_pkts_vlan_layer(packets_captured, \"prio\")\n+        self.logger.info(\"vlan prio: {}\".format(vlans))\n+        for _prio in kwargs.keys():\n+            for _vlan in vlans:\n+                self.verify(\n+                    _vlan[_prio] == kwargs[_prio], \"the vlan prio values not matched\"\n+                )\n+\n+    def set_pvid_from_pf(self, pf_intf, vf_id=0, vlan_id=0):\n+        \"\"\"\n+\n+        :return:\n+        \"\"\"\n+        self.execute_host_cmd(\n+            \"ip link set {} vf {} vlan {}\".format(pf_intf, vf_id, vlan_id)\n+        )\n+        output = self.execute_host_cmd(\"ip link show {}\".format(pf_intf))\n+        if vlan_id != 0:\n+            self.verify(\"vlan %d\" % vlan_id in output, \"Failed to add pvid on VF\")\n+        else:\n+            self.verify(\"vlan\" not in output, \"Failed to add pvid on VF\")\n+\n+    def basic_vlan_filter_check(\n+        self,\n+        vlan_id,\n+        match_pkt,\n+        unmatch_pkt,\n+        pmd_commands=None,\n+        port_id=0,\n+        double_vlan=False,\n+        rx_port=0,\n+        tx_port=0,\n+    ):\n+        \"\"\"\n+        send match and unmatch pkt to check vlan filter\n+        double vlan have 2 vlan id\n+        \"\"\"\n+        if not isinstance(match_pkt, list):\n+            match_pkt = [match_pkt]\n+        if not isinstance(unmatch_pkt, list):\n+            unmatch_pkt = [unmatch_pkt]\n+        rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(\n+            pkts=match_pkt + unmatch_pkt,\n+            port_id=port_id,\n+            pmd_commands=pmd_commands,\n+            rx_port=rx_port,\n+            tx_port=tx_port,\n+        )\n+        self.verify(\n+            rece_num == len(match_pkt) and vlan_id in vlan_id_list,\n+            \"failed receive vlan pkts\",\n+        )\n+        if double_vlan:\n+            self.verify(\n+                len(vlan_id_list) == len(match_pkt) * 2, \"failed receive vlan pkts\"\n+            )\n+        else:\n+            self.verify(len(vlan_id_list) == len(match_pkt), \"failed receive vlan pkts\")\n+\n+    def basic_vlan_strip_check(\n+        self,\n+        vlan_id,\n+        match_pkt,\n+        pmd_commands=None,\n+        port_id=0,\n+        double_vlan=False,\n+        rx_port=0,\n+        tx_port=0,\n+    ):\n+        \"\"\"\n+        send vlan pkts to check vlan strip\n+        single vlan: after strip, the rece pkt not have vlan id\n+        double vlan: after strip, the rece pkt have single vlan\n+        \"\"\"\n+        rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(\n+            match_pkt,\n+            port_id=port_id,\n+            pmd_commands=pmd_commands,\n+            tx_port=tx_port,\n+            rx_port=rx_port,\n+        )\n+        if double_vlan:\n+            self.verify(\n+                rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt),\n+                \"Failed to strip double vlan tag\",\n+            )\n+        else:\n+            self.verify(\n+                rece_num == len(match_pkt) and len(vlan_id_list) == 0,\n+                \"Failed to strip vlan tag\",\n+            )\n+        self.execute_pmd_cmd(\"vlan set strip off %s\" % port_id)\n+        rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(\n+            match_pkt, port_id=port_id, tx_port=tx_port, rx_port=rx_port\n+        )\n+        if double_vlan:\n+            self.verify(\n+                rece_num == len(match_pkt)\n+                and len(vlan_id_list) == len(match_pkt) * 2\n+                and vlan_id in vlan_id_list,\n+                \"Failed to receive vlan pkts with vlan tag\",\n+            )\n+        else:\n+            self.verify(\n+                rece_num == len(match_pkt)\n+                and len(vlan_id_list) == len(match_pkt)\n+                and vlan_id in vlan_id_list,\n+                \"Failed to receive vlan pkts with vlan tag\",\n+            )\n+\n+    def basic_vlan_insert_check(\n+        self,\n+        vlan_id,\n+        insert_vlan,\n+        match_pkt,\n+        pmd_commands=None,\n+        port_id=0,\n+        double_vlan=None,\n+        rx_port=0,\n+        tx_port=0,\n+    ):\n+        \"\"\"\n+        single vlan insert: send normal pkt, the tx get the single vlan pkt\n+        double vlan insert: send single vlan pkt, the vlan insert to outer vlan and the default vlan become inner vlan\n+        \"\"\"\n+        rece_num, _, vlans, vlan_id_list = self.vlan_pkts_fwd_check(\n+            match_pkt,\n+            port_id=port_id,\n+            pmd_commands=pmd_commands,\n+            tx_port=tx_port,\n+            rx_port=rx_port,\n+        )\n+        if double_vlan:\n+            self.verify(\n+                rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt) * 2,\n+                \"Failed to receive vlan pkts with vlan tag\",\n+            )\n+            self.verify(\n+                all(\n+                    [\n+                        insert_vlan == _vlan[\"outer\"] and vlan_id == _vlan[\"inner\"]\n+                        for _vlan in vlans\n+                    ]\n+                ),\n+                \"the insert vlan is incorrect\",\n+            )\n+        else:\n+            self.verify(\n+                rece_num == len(match_pkt)\n+                and len(vlan_id_list) == len(match_pkt)\n+                and insert_vlan in vlan_id_list,\n+                \"Failed to receive vlan pkts with vlan tag\",\n+            )\n+            self.verify(\n+                all([insert_vlan == _vlan[\"outer\"] for _vlan in vlans]),\n+                \"the insert vlan is incorrect\",\n+            )\n",
    "prefixes": [
        "V2",
        "1/6"
    ]
}