From patchwork Thu Jun 8 18:27:37 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Huang, ZhiminX" X-Patchwork-Id: 128416 Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 4F34642C5C; Thu, 8 Jun 2023 12:15:22 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 4D04440EF0; Thu, 8 Jun 2023 12:15:22 +0200 (CEST) Received: from mga06.intel.com (mga06b.intel.com [134.134.136.31]) by mails.dpdk.org (Postfix) with ESMTP id 5127240042 for ; Thu, 8 Jun 2023 12:15:20 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1686219320; x=1717755320; h=from:to:cc:subject:date:message-id:in-reply-to: references:mime-version:content-transfer-encoding; bh=Nd9LimIjOMVFSWqRWMUJ+HnA2OE7Scz07nWSvwWBajs=; b=Q8i/9gZ9HziPhpRJvgqvBe1ixH20N/EBSu6cxE6OOB76/PnD29+2XKkv t3yQETPXyxg5MNRjZmtg6JA/yZh/VFptGwkSQDKPG1QSc6SBT1ArnYcLB bAL0IpKlZ2bv8mepsWCYmiLv6Psvwasv1AKHuNjGr1kfzvJmgpGJ7cWCH vjXBc1ocgsOc8qYq16L/RUNqFHGZTI4fLBxOLCIdsU7p7YS230jyAe/9E JfCZi6Txe017C2GJdZoIhcnVZa2AAU5RKwL4OBdRheJQBpMduHkTwPqER J97CBok4cQ5HdF1os64CT6x19aqldcE5fcL0DYzAClLG9PS2AymAcetW9 g==; X-IronPort-AV: E=McAfee;i="6600,9927,10734"; a="420832621" X-IronPort-AV: E=Sophos;i="6.00,226,1681196400"; d="scan'208";a="420832621" Received: from orsmga003.jf.intel.com ([10.7.209.27]) by orsmga104.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 08 Jun 2023 03:15:16 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=McAfee;i="6600,9927,10734"; a="660327052" X-IronPort-AV: E=Sophos;i="6.00,226,1681196400"; d="scan'208";a="660327052" Received: from unknown (HELO localhost.localdomain) ([10.239.252.96]) by orsmga003-auth.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 08 Jun 2023 03:15:11 -0700 From: Zhimin Huang To: dts@dpdk.org Cc: Zhimin Huang Subject: [dts][PATCH V1 1/6] tests/func_test_base:add new commom module to refactor func test cases Date: Thu, 8 Jun 2023 18:27:37 +0000 Message-Id: <20230608182742.360594-2-zhiminx.huang@intel.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20230608182742.360594-1-zhiminx.huang@intel.com> References: <20230608182742.360594-1-zhiminx.huang@intel.com> MIME-Version: 1.0 X-BeenThere: dts@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: test suite reviews and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dts-bounces@dpdk.org for some vf test suites, there are duplicate methods implemented in suites. so add the public method into func_test_base,and encapsulate classes for the basic testing process. Signed-off-by: Zhimin Huang --- tests/func_test_base.py | 977 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 977 insertions(+) create mode 100644 tests/func_test_base.py diff --git a/tests/func_test_base.py b/tests/func_test_base.py new file mode 100644 index 00000000..8134d5a1 --- /dev/null +++ b/tests/func_test_base.py @@ -0,0 +1,977 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2023 Intel Corporation +# + +import re +import time +import traceback + +from framework.packet import Packet +from framework.pmd_output import PmdOutput +from framework.virt_common import VM + +supported_vf_driver = ["pci-stub", "vfio-pci"] + + +class FuncTestBase(object): + def __init__(self, test_case, tester_tx_interface, tester_rx_interface): + self.test_case = test_case + self.verify = self.test_case.verify + self.logger = test_case.logger + self.dut = self.test_case.dut + self.tester = self.test_case.tester + + self.vm_info = [] + self.pmd_session = None + self.tester_tx_interface = tester_tx_interface + self.tester_rx_interface = tester_rx_interface + + self.pkt = Packet() + self.vf_driver = self.test_case.get_suite_cfg()["vf_driver"] or "pci-stub" + self.vm_specified_driver = self.get_vm_specified_driver() + + def check_port_num_for_test(self, port_num: int): + """ + check the port num for test requirement + """ + dut_ports = self.dut.get_ports(self.test_case.nic) + self.verify(len(dut_ports) >= port_num, "Unsupported port num for test require") + + def get_vm_specified_driver(self): + """ + check the vf support driver and return vm driver + :return: vm_specified_driver + """ + self.verify(self.vf_driver in supported_vf_driver, "Unsupported vf driver") + if self.vf_driver == "pci-stub": + vm_specified_driver = "pci-assign" + else: + vm_specified_driver = "vfio-pci" + return vm_specified_driver + + def create_vf(self, pf_port, vfs_num: int, driver="default"): + """ + create vfs, support multi pfs to create multi vfs + :param pf_port: pf port or port list + :param vfs_num: create vf num + :param driver: set vf driver on dpdk + :return: vf net device object + """ + sriov_vfs_obj = [] + try: + self.test_case.bind_nic_driver( + self.dut.get_ports(self.test_case.nic), driver=self.test_case.kdriver + ) + except Exception as e: + self.logger.info(traceback.format_exc(e)) + if hasattr(self.test_case, "pf_config"): + self.test_case.pf_config() + if isinstance(pf_port, int): + pf_port = [pf_port] + for _port in pf_port: + self.dut.generate_sriov_vfs_by_port(_port, vfs_num, driver=driver) + sriov_vfs_obj.append(self.dut.ports_info[_port]["vfs_port"]) + self.dut.send_expect( + "ifconfig %s up" % self.dut.ports_info[_port]["intf"], "#" + ) + res = self.dut.is_interface_up(self.dut.ports_info[_port]["intf"]) + self.verify( + res, "%s link status is down" % self.dut.ports_info[_port]["intf"] + ) + if hasattr(self.test_case, "vf_config"): + self.test_case.vf_config() + for vf_port in sriov_vfs_obj: + for _vf in vf_port: + _vf.bind_driver(self.vf_driver) + return sriov_vfs_obj + + def destroy_vf(self, pf_port="all"): + """ + destroy vfs + :param pf_port: select the pf port to destroy vfs. + the default will destroy all vfs. + """ + pf_port = pf_port if isinstance(pf_port, list) else [pf_port] + if "all" in pf_port: + # destory all vfs + self.dut.destroy_all_sriov_vfs() + else: + for port in pf_port: + # destroy the vf on the specified port + self.dut.destroy_sriov_vfs_by_port(port) + + def setup_vm_env(self, vm_name: str, sriov_pci: list): + """ + passtrough the vfs into vm, start vm. + :param vm_name: select the vm to use + :param sriov_pci: pci list + :returns: vm object and vm dut session + """ + try: + vm_obj = VM(self.dut, vm_name, self.test_case.suite_name) + for _pci in sriov_pci: + vm_obj.set_vm_device( + driver=self.vm_specified_driver, **{"opt_host": _pci} + ) + vm_dut = vm_obj.start() + if vm_dut is None: + raise Exception("Set up VM ENV failed!") + self.vm_info.append((vm_obj, vm_dut)) + return vm_obj, vm_dut + except Exception as e: + self.destroy_vm_env(vm_obj_name=vm_name) + raise Exception(e) + + def destroy_vm_env(self, vm_obj_name=""): + """ + destroy the specified vm name or all vms + """ + try: + for vm_obj, vm_session in self.vm_info: + if vm_obj_name == "": + # destoty all vm + vm_session.kill_all() + vm_obj.stop() + self.vm_info = [] + elif vm_obj.vm_name == vm_obj_name: + # destroy the vm with the specified name + vm_session.kill_all() + vm_obj.stop() + self.vm_info.remove((vm_obj, vm_session)) + else: + self.logger.warning("VM %s not found!!!" % vm_obj_name) + except Exception as e: + self.dut.virt_exit() + time.sleep(3) + raise Exception(e) + + def init_pmd_session(self, dut_obj): + """ + init PMD session + """ + self.pmd_session = PmdOutput(dut_obj) + + def launch_testpmd(self, **kwargs): + """ + launch testpmd with testpmd session + the default session is self.pmd_session + :return: testpmd output + """ + pmd_session = kwargs.get("testpmd_obj") or self.pmd_session + out = pmd_session.start_testpmd(**kwargs) + return out + + def execute_pmd_cmd(self, cmd, **kwargs): + """ + execute multiple testpmd commands, return string output + :param cmd: testpmd cmd, support str and list + :return: testpmd output + """ + pmd_session = kwargs.get("pmd_session") or self.pmd_session + _cmds = [cmd] if isinstance(cmd, str) else cmd + output = "" + for _cmd in _cmds: + output += pmd_session.execute_cmd(_cmd) + return output + + def execute_host_cmd(self, cmd, **kwargs): + """ + execute multiple host commands + :param cmd: host commands, support str and list + :return: host output + """ + dut_obj = kwargs.get("dut_obj") or self.dut + _cmd = [cmd, "# ", 20] if isinstance(cmd, (str)) else cmd + return dut_obj.send_expect(*_cmd) + + def build_dpdk_apps(self, app_path): + """ + build dpdk apps and check the build status + """ + out = self.dut.build_dpdk_apps(app_path) + self.verify("Error" not in out, "Compilation error") + self.verify("No such" not in out, "Compilation error") + + def vf_test_preset_env_vm(self, pf_port, vfs_num, vm_name, driver="default"): + """ + create vfs and setup vm env + """ + if not isinstance(pf_port, list): + pf_port = [pf_port] + sriov_vfs_obj = self.create_vf(pf_port, vfs_num, driver=driver) + vf_list = [] + for pf_id in pf_port: + vf_list += [ + sriov_vfs_obj[pf_id][i].pci for i in range(len(sriov_vfs_obj[pf_id])) + ] + vm_obj, vm_dut = self.setup_vm_env(vm_name=vm_name, sriov_pci=vf_list) + return vm_obj, vm_dut + + def get_vf_mac_through_pf(self, pf_intf): + """ + use ip link show pf to get vf mac list + """ + out = self.dut.send_expect( + "ip link show {}".format(pf_intf), "# ", alt_session=True + ) + vf_mac_pattern = r"vf\s+\d+\s+.*\s+link\/ether\s+(\S+)\s+.*" + match = re.findall(vf_mac_pattern, out) + return match + + @staticmethod + def generate_using_packets(pkt_type=None, pkt_str=None, **kwargs): + """ + generate using pkts: + 1.select the protocol type and generate the pkts through packet module + 2.select customized pkt string + :return: pkt object + """ + pkt = Packet() + dst_mac = kwargs.get("dst_mac") + vlan_id = kwargs.get("vlan_id") + if pkt_type: + pkt = Packet(pkt_type=pkt_type) + pkt.config_layer("ether", {"dst": dst_mac}) + if vlan_id is not None: + pkt.config_layer("vlan", {"vlan": vlan_id}) + elif pkt_str: + pkt.update_pkt(pkt_str) + else: + raise Exception("wrong pkt value") + return pkt + + @staticmethod + def get_received_pkt_num(output, port_id=0): + """ + use the testpmd output to get the receive pkts num for port + """ + pkt_pattern = ( + "port\s%d/queue\s\d+:\sreceived\s(\d+)\spackets.+?\n.*length=\d{2,}\s" + % port_id + ) + received_data = re.findall(pkt_pattern, output) + received_pkts = sum(map(int, [i[0] for i in received_data])) + return received_pkts + + @staticmethod + def get_hash_and_queues(out, port_id=0): + """ + use testpmd output to get hash values and queues + """ + hash_pattern = re.compile( + "port\s%s/queue\s\d+:\sreceived\s\d+\spackets.+?\n.*RSS\shash=(\w+)\s-\sRSS\squeue=(\w+)" + % port_id + ) + hash_infos = hash_pattern.findall(out) + if len(hash_infos) == 0: + queue_pattern = re.compile("Receive\squeue=(\w+)") + queues = queue_pattern.findall(out) + return [], queues + hashes = [hash_info[0].strip() for hash_info in hash_infos] + queues = [hash_info[1].strip() for hash_info in hash_infos] + return hashes, queues + + @staticmethod + def get_pkts_vlan_layer(pkt_obj: Packet, vlan_layer): + """ + get pkts vlan layers + :param pkt_obj: pkt object + :param vlan_layer: vlan, prio etc... + :return: vlan id list, if pkt object consists of multiple vlan pkts + """ + vlans = [] + vlan_layers_list = [] + for i in range(len(pkt_obj)): + vlan_dict = {} + try: + outer_vlan = pkt_obj.strip_element_layer3(vlan_layer, p_index=i) + vlan_dict["outer"] = outer_vlan + except Exception: + pass + try: + inner_vlan = pkt_obj.strip_element_layer4(vlan_layer, p_index=i) + vlan_dict["inner"] = inner_vlan + except Exception: + pass + vlans.append(vlan_dict) + for _vlan in vlans: + vlan_layers_list += list(_vlan.values()) + return vlans, vlan_layers_list + + def get_pmd_port_infomation(self, port_id=0, **kwargs): + """ + use 'show ports info 0' to get port link status and port speed + """ + pmd_session = kwargs.get("pmd_session") or self.pmd_session + link_status = pmd_session.get_port_link_status(port_id) + link_speed = pmd_session.get_port_link_speed(port_id) + return link_status, link_speed + + def convert_driver_version_value(self, check_version): + """ + convert the driver version to int list + take the first three values in the list for comparison and limit intree driver + for example: + 6.0.7-060007-generic: [6, 0, 7-060007-generic] + 1.11.0_rc59: [1, 11, 0] + 1.11.11: [1, 11, 11] + """ + try: + value_list = list(map(int, re.split(r"[.|_]", check_version)[:3])) + except ValueError as e: + self.logger.warning(e) + # the intree-driver has character, so set the return value is null list as the lowest driver version + return [] + return value_list + + @staticmethod + def get_pmd_rece_pkt_len(output): + """ + get the pkt length in testpmd output + """ + pkt_length = re.findall("length=(\d+)", output) + return pkt_length + + def get_xstats_table(self, port_id_list): + """ + use 'show port xstats' to get xstats info dict + """ + xstats_data = dict() + if not isinstance(port_id_list, list): + port_id_list = [port_id_list] + for port_id in port_id_list: + out = self.execute_pmd_cmd("show port xstats %s" % port_id) + tmp_data = dict() + matches = re.findall(r"(\w+):\s+(\d+)", out) + for match in matches: + key = match[0] + value = int(match[1]) + tmp_data[key] = value + xstats_data[port_id] = tmp_data + return xstats_data + + @staticmethod + def generate_random_packets( + dstmac=None, + pktnum=100, + random_type=None, + ip_increase=True, + random_payload=False, + options=None, + ): + """ + generate the random packets, + """ + pkt = Packet() + pkt.generate_random_pkts( + dstmac=dstmac, + pktnum=pktnum, + random_type=random_type, + ip_increase=ip_increase, + random_payload=random_payload, + options=options, + ) + return pkt + + def start_tcpdump_output_pcap_file(self, port_inface, count=0, filters=None): + """ + start tcpdump to capture the pkts + :param port_inface: port interface name + :param count: limit capture the pkts num + :param filters: + add filter: [{"layer": "ether", "config": {"src": "xxxx"}}] + :return: + """ + index = self.tester.tcpdump_sniff_packets( + port_inface, count=count, filters=filters + ) + return index + + def stop_tcpdump_and_get_pkts(self, pcap_file): + """ + stop tcpdump and parse the pcap file + """ + pkts = self.tester.load_tcpdump_sniff_packets(pcap_file) + return pkts + + def set_pmd_fwd_mode(self, fwd_mode="mac", pmd_session=None): + """ + set testpmd fwd + """ + pmd_session = pmd_session or self.pmd_session + self.execute_pmd_cmd( + ["set fwd %s" % fwd_mode, "set verbose 1", "start"], pmd_session=pmd_session + ) + + def send_pkts( + self, + pkt_list: list, + tester_tx_interface=None, + packet_count=1, + packet_interval=0.01, + ): + """ + send pkts with packet obj + """ + tester_tx_interface = ( + self.tester_tx_interface + if tester_tx_interface is None + else tester_tx_interface + ) + for _pkt in pkt_list: + _pkt.send_pkt( + crb=self.tester, + tx_port=tester_tx_interface, + count=packet_count, + interval=packet_interval, + ) + + def execute_fwd_check_process( + self, + packets, + pmd_commands=None, + rx_port=0, + tx_port=0, + packet_interval=0.01, + packet_count=1, + tcpdump_filter=None, + tester_tx_interface=None, + tester_rx_interface=None, + ): + """ + pkt fwd flow: tcpdump ---> send pkt ---> testpmd output/tcpdump capture pkts + :return: + 1.capture the tcpdump pkts + 2.pmd output + 3.use pmd output to get the pkts stats + """ + if isinstance(packets, list): + pkt_list = packets + else: + pkt_list = [packets] + tester_rx_interface = ( + self.tester_rx_interface + if tester_rx_interface is None + else tester_rx_interface + ) + inst = self.start_tcpdump_output_pcap_file( + port_inface=tester_rx_interface, + count=len(packets) * packet_count, + filters=tcpdump_filter, + ) + time.sleep(3) + if pmd_commands: + self.execute_pmd_cmd(pmd_commands) + self.execute_pmd_cmd("clear port stats all") + self.send_pkts( + pkt_list=pkt_list, + packet_count=packet_count, + packet_interval=packet_interval, + tester_tx_interface=tester_tx_interface, + ) + time.sleep(packet_interval * len(pkt_list) * packet_count) + packets_captured = self.stop_tcpdump_and_get_pkts(inst) + self.logger.info("capture the pkt: {}".format(str(list(packets_captured)))) + pmdout = self.pmd_session.get_output() + tx_stats = self.pmd_session.get_pmd_stats(tx_port) + rx_stats = self.pmd_session.get_pmd_stats(rx_port) + stats = {tx_port: tx_stats, rx_port: rx_stats} + + return packets_captured, pmdout, stats + + +class RxTxBaseTest(FuncTestBase): + def basic_rx_check( + self, packets_num, packet_dst_mac=None, pmd_commands=None, rx_port=0, tx_port=0 + ): + """ + set fwd rxonly and check the rece pkts num + """ + self.set_pmd_fwd_mode(fwd_mode="rxonly") + random_pkt = self.generate_random_packets( + dstmac=packet_dst_mac, pktnum=packets_num + ) + _, pmdout, stats = self.execute_fwd_check_process( + packets=random_pkt, + pmd_commands=pmd_commands, + rx_port=rx_port, + tx_port=tx_port, + ) + self.verify(packet_dst_mac in pmdout, "receive packet fail") + rece_pkts_num = self.get_received_pkt_num(pmdout) + self.verify(rece_pkts_num == packets_num, "receive packet num is not match") + + def basic_tx_check(self): + """ + set fwd txonly, check: + 1.testpmd output tx-pkts != 0 + 2.the tcpdump can capture pkts + """ + self.execute_pmd_cmd("stop") + self.execute_pmd_cmd("set fwd txonly") + index = self.start_tcpdump_output_pcap_file(self.tester_rx_interface, count=100) + self.execute_pmd_cmd("start") + time.sleep(1) + self.execute_pmd_cmd("stop") + pkts_num = self.stop_tcpdump_and_get_pkts(index) + stats = self.pmd_session.get_pmd_stats(0) + self.verify( + stats["TX-packets"] != 0 + and len(pkts_num) == 100 + and stats["TX-packets"] > len(pkts_num), + "send packet num is not match", + ) + + def basic_macfwd_check( + self, + packet_num, + dst_mac=None, + check_miss=False, + pmd_commands=None, + rx_port=0, + tx_port=0, + ): + """ + mac fwd, check rx-pkts and tx-pkt num is correct + if check_miss is true, it will check rx/tx is 0 + """ + random_pkt = self.generate_random_packets(dstmac=dst_mac, pktnum=packet_num) + if not pmd_commands: + self.set_pmd_fwd_mode() + packets_captured, pmdout, stats = self.execute_fwd_check_process( + packets=random_pkt, + pmd_commands=pmd_commands, + rx_port=rx_port, + tx_port=tx_port, + ) + rece_pkts_num = self.get_received_pkt_num(pmdout) + if check_miss: + packet_num = 0 + self.verify( + stats[tx_port]["RX-packets"] == packet_num, + "receive packet num is not match", + ) + self.verify( + stats[tx_port]["RX-errors"] == 0, "some pkts have rx-errors in testpmd" + ) + self.verify( + stats[rx_port]["TX-packets"] == packet_num, + "receive packet num is not match", + ) + self.verify( + rece_pkts_num == packet_num == len(packets_captured), + "receive packet num is not match", + ) + + def basic_xstats_check( + self, packet_num, dst_mac=None, rx_port=0, tx_port=0, payload_size=64 + ): + """ + 1. default stats check + 2. send pkt and check testpmd stats and xstats + 3. send pkt and clear port stats and check xstats + 4. send pkt and clear xstats and check xstats + """ + random_pkt = self.generate_random_packets( + dstmac=dst_mac, + pktnum=packet_num, + options={ + "ip": {"src": "192.168.0.1", "dst": "192.168.1.1"}, + "layers_config": [("raw", {"payload": ["58"] * payload_size})], + }, + ) + self.execute_pmd_cmd("clear port xstats all") + xstats_table = self.get_xstats_table([rx_port, tx_port]) + for port in xstats_table.keys(): + self.verify( + not any(xstats_table[port].values()), + "xstats Initial value error! port {} xstats " + "data is {}".format(port, xstats_table[port]), + ) + _, _, stats_table = self.execute_fwd_check_process( + packets=random_pkt, + pmd_commands=[ + "port config all rss all", + "set fwd mac", + "clear port xstats all", + "start", + ], + rx_port=rx_port, + tx_port=tx_port, + ) + xstats_table = self.get_xstats_table([rx_port, tx_port]) + return stats_table, xstats_table + + def basic_promisc_check( + self, match_mac, unmatch_mac, pmd_commands=None, rx_port=0, tx_port=0 + ): + """ + use match and unmatch pkts to test promisc + test flow: default mode --> set promisc off --> set promisc on + note: if test vf promisc, confirm the vf primisc enable(kernel need to set trust on) + """ + unmatch_pkt = self.generate_random_packets(dstmac=unmatch_mac, pktnum=1) + match_pkt = self.generate_random_packets(dstmac=match_mac, pktnum=1) + self.set_pmd_fwd_mode(fwd_mode="mac") + self.logger.info("check the default promisc mode") + _, pmdout, _ = self.execute_fwd_check_process( + packets=[unmatch_pkt, match_pkt], + pmd_commands=pmd_commands, + rx_port=rx_port, + tx_port=tx_port, + ) + self.verify( + match_mac in pmdout and unmatch_mac in pmdout, + "enable promisc not receive all pkts", + ) + self.logger.info("check disable promisc mode") + self.execute_pmd_cmd("set promisc all off") + _, pmdout, _ = self.execute_fwd_check_process( + packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port + ) + self.verify( + match_mac in pmdout and unmatch_mac not in pmdout, + "disable promisc should receive match pkt", + ) + self.logger.info("check re-enable promisc mode") + self.execute_pmd_cmd("set promisc all on") + _, pmdout, _ = self.execute_fwd_check_process( + packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port + ) + self.verify( + match_mac in pmdout and unmatch_mac in pmdout, + "enable promisc should receive all pkt", + ) + + def basic_multicast_check( + self, normal_mac, multicast_mac, pmd_commands=None, rx_port=0, tx_port=0 + ): + """ + use normal mac and multicast mac to test + """ + normal_pkt = self.generate_random_packets(dstmac=normal_mac, pktnum=1) + multicast_pkt = self.generate_random_packets(dstmac=multicast_mac, pktnum=1) + self.execute_pmd_cmd( + [ + "set allmulti all off", + "set promisc all off", + ], + ) + self.set_pmd_fwd_mode(fwd_mode="mac") + self.logger.info("check the default pmd multicast") + _, pmdout, _ = self.execute_fwd_check_process( + packets=[normal_pkt, multicast_pkt], + pmd_commands=pmd_commands, + rx_port=rx_port, + tx_port=tx_port, + ) + self.verify( + normal_mac in pmdout and multicast_mac not in pmdout, + "the default can not receive multicast pkt", + ) + self.execute_pmd_cmd( + [ + "set allmulti all on", + "mcast_addr add 0 {}".format(multicast_mac), + ], + ) + self.logger.info("check enable pmd multicast") + _, pmdout, _ = self.execute_fwd_check_process( + packets=[normal_pkt, multicast_pkt], rx_port=rx_port, tx_port=tx_port + ) + self.verify( + normal_mac in pmdout and multicast_mac in pmdout, + "enable multicast not receive multicast pkt", + ) + + def basic_rss_check( + self, dst_mac, rss_type, queue_num, pmd_commands=None, rx_port=0, tx_port=0 + ): + """ + use pkt type mapping to rss type, and check rss func + """ + rss2pkt_dict = { + "ip": "IP_RAW", + "tcp": "TCP", + "udp": "UDP", + } + rss_pkts = self.generate_random_packets( + dstmac=dst_mac, pktnum=30, random_type=[rss2pkt_dict[rss_type]] + ) + self.set_pmd_fwd_mode(fwd_mode="mac") + self.execute_pmd_cmd("port config all rss %s" % rss_type) + _, pmdout, stats = self.execute_fwd_check_process( + packets=rss_pkts, + pmd_commands=pmd_commands, + rx_port=rx_port, + tx_port=tx_port, + ) + hashes, queues = self.get_hash_and_queues(pmdout) + self.verify( + len(set(queues)) == int(queue_num) + and len(queues) == len(hashes) == stats[tx_port]["RX-packets"], + "some pkt and queue can not get get the hash", + ) + return zip(hashes, queues) + + def rss_reta_config_check(self, rss_reta: list, port_id=0, reta_size=64): + """ + check the rss reta after setting new rss rate in testpmd + """ + reta_mask = "0x{}".format(int(reta_size / 4) * "f") + default_rss_reta = self.execute_pmd_cmd( + "show port {} rss reta {} ({})".format(port_id, reta_size, reta_mask) + ) + for i, j in zip(list(range(reta_size)), rss_reta): + self.execute_pmd_cmd("port config %d rss reta (%d,%d)" % (port_id, i, j)) + change_rss_reta = self.execute_pmd_cmd( + "show port {} rss reta {} ({})".format(port_id, reta_size, reta_mask) + ) + self.verify(default_rss_reta != change_rss_reta, "port config rss reta failed") + + def rss_reta_hit_check(self, hash_table, rss_reta: list, reta_size=64): + """ + check the rece pkts hash value can map the queue according to rss reta + """ + hit_hash = False + for rss_hash, rss_queue in hash_table: + for i, j in zip(list(range(reta_size)), rss_reta): + if int(rss_hash, 16) % reta_size == i and int(rss_queue, 16) == j: + hit_hash = True + break + else: + hit_hash = False + self.verify(hit_hash, "some pkt not directed by rss.") + + def basic_rss_hash_key_check( + self, dst_mac, hash_key, port_id=0, pmd_commands=None, rx_port=0, tx_port=0 + ): + """ + check the hash values is different after setting rss hash key + """ + pkt = self.generate_using_packets(pkt_type="UDP", dst_mac=dst_mac) + self.set_pmd_fwd_mode("mac") + _, pmdout, _ = self.execute_fwd_check_process( + packets=pkt, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port + ) + hash_1, queue_1 = self.get_hash_and_queues(pmdout) + self.execute_pmd_cmd( + "port config {} rss-hash-key ipv4 {}".format(port_id, hash_key) + ) + out = self.execute_pmd_cmd("show port 0 rss-hash key") + self.verify(hash_key.upper() in out, "rss hash key update failed") + _, pmdout, _ = self.execute_fwd_check_process( + packets=pkt, rx_port=rx_port, tx_port=tx_port + ) + hash_2, queue_2 = self.get_hash_and_queues(pmdout) + self.verify(hash_1 != hash_2, "hash value should be different") + + def basic_pmd_info_check(self, port_obj, port_id=0): + """ + check the link speed and link status + """ + link_status, link_speed = self.get_pmd_port_infomation(port_id) + link_speed_host = int(port_obj.get_nic_speed()) // 1000 + self.verify(link_status == "up", "link stats has error") + self.verify( + int(link_speed) == link_speed_host, + "link speed has error", + ) + + +class VlanFuncBaseTest(FuncTestBase): + def vlan_pkts_fwd_check( + self, pkts, pmd_commands=None, port_id=0, rx_port=0, tx_port=0 + ): + """ + send pkts and return rece num, vlan dict and vlan id list + """ + packets_captured, pmdout, _ = self.execute_fwd_check_process( + packets=pkts, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port + ) + rece_num = self.get_received_pkt_num(pmdout, port_id=port_id) + vlans, vlan_id_list = self.get_pkts_vlan_layer(packets_captured, "vlan") + self.logger.info("capture the TX pkts vlans: {}".format(vlans)) + return rece_num, packets_captured, vlans, vlan_id_list + + def vlan_offload_flag_check(self, port_id=0, **kwargs): + """ + check the vlan offload flag status: + filter="on" + strip="on" + ... + """ + out = self.execute_pmd_cmd("show port info %d" % port_id) + for flag in kwargs.keys(): + p = "VLAN offload.*\n.*?%s (\w+)" % flag + vlan_stats = re.search(p, out).group(1) + self.logger.info("{} flag is {}".format(flag, vlan_stats)) + self.verify( + vlan_stats == kwargs[flag], "the vlan offload flag is incorrect" + ) + + def vlan_prio_check(self, pkts, **kwargs): + """ + + :param pkts: + :param kwargs: + :return: + """ + packets_captured, _, _ = self.execute_fwd_check_process(packets=pkts) + vlans, _ = self.get_pkts_vlan_layer(packets_captured, "prio") + self.logger.info("vlan prio: {}".format(vlans)) + for _prio in kwargs.keys(): + for _vlan in vlans: + self.verify( + _vlan[_prio] == kwargs[_prio], "the vlan prio values not matched" + ) + + def set_pvid_from_pf(self, pf_intf, vf_id=0, vlan_id=0): + """ + + :return: + """ + self.execute_host_cmd( + "ip link set {} vf {} vlan {}".format(pf_intf, vf_id, vlan_id) + ) + output = self.execute_host_cmd("ip link show {}".format(pf_intf)) + if vlan_id != 0: + self.verify("vlan %d" % vlan_id in output, "Failed to add pvid on VF") + else: + self.verify("vlan" not in output, "Failed to add pvid on VF") + + def basic_vlan_filter_check( + self, + vlan_id, + match_pkt, + unmatch_pkt, + pmd_commands=None, + port_id=0, + double_vlan=False, + rx_port=0, + tx_port=0, + ): + """ + send match and unmatch pkt to check vlan filter + double vlan have 2 vlan id + """ + if not isinstance(match_pkt, list): + match_pkt = [match_pkt] + if not isinstance(unmatch_pkt, list): + unmatch_pkt = [unmatch_pkt] + rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check( + pkts=match_pkt + unmatch_pkt, + port_id=port_id, + pmd_commands=pmd_commands, + rx_port=rx_port, + tx_port=tx_port, + ) + self.verify( + rece_num == len(match_pkt) and vlan_id in vlan_id_list, + "failed receive vlan pkts", + ) + if double_vlan: + self.verify( + len(vlan_id_list) == len(match_pkt) * 2, "failed receive vlan pkts" + ) + else: + self.verify(len(vlan_id_list) == len(match_pkt), "failed receive vlan pkts") + + def basic_vlan_strip_check( + self, + vlan_id, + match_pkt, + pmd_commands=None, + port_id=0, + double_vlan=False, + rx_port=0, + tx_port=0, + ): + """ + send vlan pkts to check vlan strip + single vlan: after strip, the rece pkt not have vlan id + double vlan: after strip, the rece pkt have single vlan + """ + rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check( + match_pkt, + port_id=port_id, + pmd_commands=pmd_commands, + tx_port=tx_port, + rx_port=rx_port, + ) + if double_vlan: + self.verify( + rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt), + "Failed to strip double vlan tag", + ) + else: + self.verify( + rece_num == len(match_pkt) and len(vlan_id_list) == 0, + "Failed to strip vlan tag", + ) + self.execute_pmd_cmd("vlan set strip off %s" % port_id) + rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check( + match_pkt, port_id=port_id, tx_port=tx_port, rx_port=rx_port + ) + if double_vlan: + self.verify( + rece_num == len(match_pkt) + and len(vlan_id_list) == len(match_pkt) * 2 + and vlan_id in vlan_id_list, + "Failed to receive vlan pkts with vlan tag", + ) + else: + self.verify( + rece_num == len(match_pkt) + and len(vlan_id_list) == len(match_pkt) + and vlan_id in vlan_id_list, + "Failed to receive vlan pkts with vlan tag", + ) + + def basic_vlan_insert_check( + self, + vlan_id, + insert_vlan, + match_pkt, + pmd_commands=None, + port_id=0, + double_vlan=None, + rx_port=0, + tx_port=0, + ): + """ + single vlan insert: send normal pkt, the tx get the single vlan pkt + double vlan insert: send single vlan pkt, the vlan insert to outer vlan and the default vlan become inner vlan + """ + rece_num, _, vlans, vlan_id_list = self.vlan_pkts_fwd_check( + match_pkt, + port_id=port_id, + pmd_commands=pmd_commands, + tx_port=tx_port, + rx_port=rx_port, + ) + if double_vlan: + self.verify( + rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt) * 2, + "Failed to receive vlan pkts with vlan tag", + ) + self.verify( + all( + [ + insert_vlan == _vlan["outer"] and vlan_id == _vlan["inner"] + for _vlan in vlans + ] + ), + "the insert vlan is incorrect", + ) + else: + self.verify( + rece_num == len(match_pkt) + and len(vlan_id_list) == len(match_pkt) + and insert_vlan in vlan_id_list, + "Failed to receive vlan pkts with vlan tag", + ) + self.verify( + all([insert_vlan == _vlan["outer"] for _vlan in vlans]), + "the insert vlan is incorrect", + )