@@ -112,7 +112,9 @@ def __init__(
the name of the underlying node which it is running on.
**kwargs: Any additional arguments if any.
"""
+ node.register_shell(self)
self._node = node
+
if name is None:
name = type(self).__name__
self._logger = get_dts_logger(f"{node.name}.{name}")
@@ -460,6 +460,10 @@ def _run_test_suite(
DtsStage.test_suite_setup, Path(SETTINGS.output_dir, test_suite_name)
)
test_suite = test_suite_with_cases.test_suite_class(sut_node, tg_node, topology)
+
+ sut_node.enter_scope("suite")
+ tg_node.enter_scope("suite")
+
try:
self._logger.info(f"Starting test suite setup: {test_suite_name}")
test_suite.set_up_suite()
@@ -479,7 +483,6 @@ def _run_test_suite(
try:
self._logger.set_stage(DtsStage.test_suite_teardown)
test_suite.tear_down_suite()
- sut_node.kill_cleanup_dpdk_apps()
test_suite_result.update_teardown(Result.PASS)
except Exception as e:
self._logger.exception(f"Test suite teardown ERROR: {test_suite_name}")
@@ -488,6 +491,10 @@ def _run_test_suite(
"the next test suite may be affected."
)
test_suite_result.update_setup(Result.ERROR, e)
+
+ sut_node.exit_scope()
+ tg_node.exit_scope()
+
if len(test_suite_result.get_errors()) > 0 and test_suite.is_blocking:
raise BlockingTestSuiteError(test_suite_name)
@@ -511,6 +518,9 @@ def _execute_test_suite(
"""
self._logger.set_stage(DtsStage.test_suite)
for test_case in test_cases:
+ test_suite.sut_node.enter_scope("case")
+ test_suite.tg_node.enter_scope("case")
+
test_case_name = test_case.__name__
test_case_result = test_suite_result.add_test_case(test_case_name)
all_attempts = SETTINGS.re_run + 1
@@ -531,6 +541,9 @@ def _execute_test_suite(
)
test_case_result.update_setup(Result.SKIP)
+ test_suite.sut_node.exit_scope()
+ test_suite.tg_node.exit_scope()
+
def _run_test_case(
self,
test_suite: TestSuite,
@@ -221,24 +221,23 @@ def get_supported_capabilities(
)
if cls.capabilities_to_check:
capabilities_to_check_map = cls._get_decorated_capabilities_map()
- with TestPmdShell(
- sut_node, privileged=True, disable_device_start=True
- ) as testpmd_shell:
- for (
- conditional_capability_fn,
- capabilities,
- ) in capabilities_to_check_map.items():
- supported_capabilities: set[NicCapability] = set()
- unsupported_capabilities: set[NicCapability] = set()
- capability_fn = cls._reduce_capabilities(
- capabilities, supported_capabilities, unsupported_capabilities
- )
- if conditional_capability_fn:
- capability_fn = conditional_capability_fn(capability_fn)
- capability_fn(testpmd_shell)
- for capability in capabilities:
- if capability.nic_capability in supported_capabilities:
- supported_conditional_capabilities.add(capability)
+ testpmd_shell = TestPmdShell(sut_node, privileged=True, disable_device_start=True)
+ for (
+ conditional_capability_fn,
+ capabilities,
+ ) in capabilities_to_check_map.items():
+ supported_capabilities: set[NicCapability] = set()
+ unsupported_capabilities: set[NicCapability] = set()
+ capability_fn = cls._reduce_capabilities(
+ capabilities, supported_capabilities, unsupported_capabilities
+ )
+ if conditional_capability_fn:
+ capability_fn = conditional_capability_fn(capability_fn)
+ capability_fn(testpmd_shell)
+ for capability in capabilities:
+ if capability.nic_capability in supported_capabilities:
+ supported_conditional_capabilities.add(capability)
+ testpmd_shell._close()
logger.debug(f"Found supported capabilities {supported_conditional_capabilities}.")
return supported_conditional_capabilities
@@ -14,6 +14,7 @@
"""
from abc import ABC
+from typing import TYPE_CHECKING, Literal, TypeVar
from framework.config import (
OS,
@@ -21,7 +22,7 @@
NodeConfiguration,
TestRunConfiguration,
)
-from framework.exception import ConfigurationError
+from framework.exception import ConfigurationError, InternalError
from framework.logger import DTSLogger, get_dts_logger
from .cpu import (
@@ -35,6 +36,15 @@
from .os_session import OSSession
from .port import Port
+if TYPE_CHECKING:
+ from framework.remote_session.single_active_interactive_shell import (
+ SingleActiveInteractiveShell,
+ )
+
+T = TypeVar("T")
+Scope = Literal["unknown", "suite", "case"]
+ScopedShell = tuple[Scope, SingleActiveInteractiveShell]
+
class Node(ABC):
"""The base class for node management.
@@ -62,6 +72,8 @@ class Node(ABC):
_logger: DTSLogger
_other_sessions: list[OSSession]
_test_run_config: TestRunConfiguration
+ _active_shells: list[ScopedShell]
+ _scope_stack: list[Scope]
def __init__(self, node_config: NodeConfiguration):
"""Connect to the node and gather info during initialization.
@@ -90,6 +102,8 @@ def __init__(self, node_config: NodeConfiguration):
self._other_sessions = []
self._init_ports()
+ self._active_shells = []
+ self._scope_stack = []
def _init_ports(self) -> None:
self.ports = [Port(self.name, port_config) for port_config in self.config.ports]
@@ -119,6 +133,55 @@ def tear_down_test_run(self) -> None:
Additional steps can be added by extending the method in subclasses with the use of super().
"""
+ @property
+ def current_scope(self) -> Scope:
+ """The current scope of the test run."""
+ try:
+ return self._scope_stack[-1]
+ except IndexError:
+ return "unknown"
+
+ def enter_scope(self, next_scope: Scope) -> None:
+ """Prepare the node for a new testing scope."""
+ self._scope_stack.append(next_scope)
+
+ def exit_scope(self) -> Scope:
+ """Clean up the node after the current testing scope.
+
+ This method must guarantee to never fail from a Node failure during runtime.
+
+ Returns:
+ The scope before exiting.
+
+ Raises:
+ InternalError: If there was no scope to exit from.
+ """
+ try:
+ current_scope = self._scope_stack.pop()
+ except IndexError:
+ raise InternalError("Attempted to exit a scope when the node wasn't in any.")
+ else:
+ self.clean_up_shells(current_scope)
+ return current_scope
+
+ def register_shell(self, shell: SingleActiveInteractiveShell) -> None:
+ """Register a new shell to the pool of active shells."""
+ self._active_shells.append((self.current_scope, shell))
+
+ def find_active_shell(self, shell_class: type[T]) -> T | None:
+ """Retrieve an active shell of a specific class."""
+ return next(sh for _, sh in self._active_shells if type(sh) is shell_class)
+
+ def clean_up_shells(self, scope: Scope) -> None:
+ """Clean up shells from the given `scope`."""
+ zombie_shells_indices = [
+ i for i, (shell_scope, _) in enumerate(self._active_shells) if scope == shell_scope
+ ]
+
+ for i in reversed(zombie_shells_indices):
+ self._active_shells[i][1]._close()
+ del self._active_shells[i]
+
def create_session(self, name: str) -> OSSession:
"""Create and return a new OS-aware remote session.
@@ -33,7 +33,7 @@
from framework.remote_session.remote_session import CommandResult
from framework.utils import MesonArgs, TarCompressionFormat
-from .node import Node
+from .node import Node, Scope
from .os_session import OSSession, OSSessionInfo
from .virtual_device import VirtualDevice
@@ -458,7 +458,7 @@ def build_dpdk_app(self, app_name: str, **meson_dpdk_args: str | bool) -> PurePa
self.remote_dpdk_build_dir, "examples", f"dpdk-{app_name}"
)
- def kill_cleanup_dpdk_apps(self) -> None:
+ def _kill_cleanup_dpdk_apps(self) -> None:
"""Kill all dpdk applications on the SUT, then clean up hugepages."""
if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
# we can use the session if it exists and responds
@@ -468,6 +468,16 @@ def kill_cleanup_dpdk_apps(self) -> None:
self._dpdk_kill_session = self.create_session("dpdk_kill")
self.dpdk_prefix_list = []
+ def exit_scope(self) -> Scope:
+ """Extend :meth:`~.node.Node.exit_test_suite_scope`.
+
+ Add the DPDK apps clean up.
+ """
+ previous_scope = super().exit_scope()
+ if previous_scope == "suite":
+ self._kill_cleanup_dpdk_apps()
+ return previous_scope
+
def run_dpdk_app(
self, app_path: PurePath, eal_params: EalParams, timeout: float = 30
) -> CommandResult:
@@ -18,16 +18,16 @@ class TestBlocklist(TestSuite):
def verify_blocklisted_ports(self, ports_to_block: list[Port]):
"""Runs testpmd with the given ports blocklisted and verifies the ports."""
- with TestPmdShell(self.sut_node, allowed_ports=[], blocked_ports=ports_to_block) as testpmd:
- allowlisted_ports = {port.device_name for port in testpmd.show_port_info_all()}
- blocklisted_ports = {port.pci for port in ports_to_block}
+ testpmd = TestPmdShell(self.sut_node, allowed_ports=[], blocked_ports=ports_to_block)
+ allowlisted_ports = {port.device_name for port in testpmd.show_port_info_all()}
+ blocklisted_ports = {port.pci for port in ports_to_block}
- # sanity check
- allowed_len = len(allowlisted_ports - blocklisted_ports)
- self.verify(allowed_len > 0, "At least one port should have been allowed")
+ # sanity check
+ allowed_len = len(allowlisted_ports - blocklisted_ports)
+ self.verify(allowed_len > 0, "At least one port should have been allowed")
- blocked = not allowlisted_ports & blocklisted_ports
- self.verify(blocked, "At least one port was not blocklisted")
+ blocked = not allowlisted_ports & blocklisted_ports
+ self.verify(blocked, "At least one port was not blocklisted")
@func_test
def no_blocklisted(self):
@@ -117,16 +117,16 @@ def test_insert_checksums(self) -> None:
Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload),
Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- self.setup_hw_offload(testpmd=testpmd)
- testpmd.start()
- self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
- for i in range(0, len(packet_list)):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ self.setup_hw_offload(testpmd=testpmd)
+ testpmd.start()
+ self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
+ for i in range(0, len(packet_list)):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+ )
@func_test
def test_no_insert_checksums(self) -> None:
@@ -139,15 +139,15 @@ def test_no_insert_checksums(self) -> None:
Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload),
Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- testpmd.start()
- self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
- for i in range(0, len(packet_list)):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ testpmd.start()
+ self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
+ for i in range(0, len(packet_list)):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+ )
@func_test
def test_l4_rx_checksum(self) -> None:
@@ -159,18 +159,18 @@ def test_l4_rx_checksum(self) -> None:
Ether(dst=mac_id) / IP() / UDP(chksum=0xF),
Ether(dst=mac_id) / IP() / TCP(chksum=0xF),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- self.setup_hw_offload(testpmd=testpmd)
- for i in range(0, 2):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
- )
- for i in range(2, 4):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ self.setup_hw_offload(testpmd=testpmd)
+ for i in range(0, 2):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+ )
+ for i in range(2, 4):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+ )
@func_test
def test_l3_rx_checksum(self) -> None:
@@ -182,18 +182,18 @@ def test_l3_rx_checksum(self) -> None:
Ether(dst=mac_id) / IP(chksum=0xF) / UDP(),
Ether(dst=mac_id) / IP(chksum=0xF) / TCP(),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- self.setup_hw_offload(testpmd=testpmd)
- for i in range(0, 2):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
- )
- for i in range(2, 4):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=True, goodIP=False, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ self.setup_hw_offload(testpmd=testpmd)
+ for i in range(0, 2):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+ )
+ for i in range(2, 4):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=True, goodIP=False, testpmd=testpmd, id=mac_id
+ )
@func_test
def test_validate_rx_checksum(self) -> None:
@@ -209,22 +209,22 @@ def test_validate_rx_checksum(self) -> None:
Ether(dst=mac_id) / IPv6(src="::1") / UDP(chksum=0xF),
Ether(dst=mac_id) / IPv6(src="::1") / TCP(chksum=0xF),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- self.setup_hw_offload(testpmd=testpmd)
- for i in range(0, 4):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
- )
- for i in range(4, 6):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
- )
- for i in range(6, 8):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ self.setup_hw_offload(testpmd=testpmd)
+ for i in range(0, 4):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+ )
+ for i in range(4, 6):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
+ )
+ for i in range(6, 8):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+ )
@requires(NicCapability.RX_OFFLOAD_VLAN)
@func_test
@@ -238,20 +238,20 @@ def test_vlan_checksum(self) -> None:
Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / UDP(chksum=0xF) / Raw(payload),
Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / TCP(chksum=0xF) / Raw(payload),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- self.setup_hw_offload(testpmd=testpmd)
- testpmd.start()
- self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
- for i in range(0, 2):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
- )
- for i in range(2, 4):
- self.send_packet_and_verify_checksum(
- packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ self.setup_hw_offload(testpmd=testpmd)
+ testpmd.start()
+ self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
+ for i in range(0, 2):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
+ )
+ for i in range(2, 4):
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+ )
@requires(NicCapability.RX_OFFLOAD_SCTP_CKSUM)
@func_test
@@ -262,14 +262,14 @@ def test_validate_sctp_checksum(self) -> None:
Ether(dst=mac_id) / IP() / SCTP(),
Ether(dst=mac_id) / IP() / SCTP(chksum=0xF),
]
- with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.csum)
- testpmd.set_verbose(level=1)
- testpmd.csum_set_hw(layers=ChecksumOffloadOptions.sctp)
- testpmd.start()
- self.send_packet_and_verify_checksum(
- packet=packet_list[0], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
- )
- self.send_packet_and_verify_checksum(
- packet=packet_list[1], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
- )
+ testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.set_verbose(level=1)
+ testpmd.csum_set_hw(layers=ChecksumOffloadOptions.sctp)
+ testpmd.start()
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[0], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+ )
+ self.send_packet_and_verify_checksum(
+ packet=packet_list[1], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+ )
@@ -83,37 +83,37 @@ def wrap(self: "TestDynamicQueueConf", is_rx_testing: bool) -> None:
while len(queues_to_config) < self.num_ports_to_modify:
queues_to_config.add(random.randint(1, self.number_of_queues - 1))
unchanged_queues = set(range(self.number_of_queues)) - queues_to_config
- with TestPmdShell(
+ testpmd = TestPmdShell(
self.sut_node,
port_topology=PortTopology.chained,
rx_queues=self.number_of_queues,
tx_queues=self.number_of_queues,
- ) as testpmd:
- for q in queues_to_config:
- testpmd.stop_port_queue(port_id, q, is_rx_testing)
- testpmd.set_forward_mode(SimpleForwardingModes.mac)
-
- test_meth(
- self,
- port_id,
- queues_to_config,
- unchanged_queues,
- testpmd,
- is_rx_testing,
- )
-
- for queue_id in queues_to_config:
- testpmd.start_port_queue(port_id, queue_id, is_rx_testing)
+ )
+ for q in queues_to_config:
+ testpmd.stop_port_queue(port_id, q, is_rx_testing)
+ testpmd.set_forward_mode(SimpleForwardingModes.mac)
+
+ test_meth(
+ self,
+ port_id,
+ queues_to_config,
+ unchanged_queues,
+ testpmd,
+ is_rx_testing,
+ )
+
+ for queue_id in queues_to_config:
+ testpmd.start_port_queue(port_id, queue_id, is_rx_testing)
- testpmd.start()
- self.send_packets_with_different_addresses(self.number_of_packets_to_send)
- forwarding_stats = testpmd.stop()
- for queue_id in queues_to_config:
- self.verify(
- self.port_queue_in_stats(port_id, is_rx_testing, queue_id, forwarding_stats),
- f"Modified queue {queue_id} on port {port_id} failed to receive traffic after"
- "being started again.",
- )
+ testpmd.start()
+ self.send_packets_with_different_addresses(self.number_of_packets_to_send)
+ forwarding_stats = testpmd.stop()
+ for queue_id in queues_to_config:
+ self.verify(
+ self.port_queue_in_stats(port_id, is_rx_testing, queue_id, forwarding_stats),
+ f"Modified queue {queue_id} on port {port_id} failed to receive traffic after"
+ "being started again.",
+ )
return wrap
@@ -44,20 +44,20 @@ def l2fwd_integrity(self) -> None:
"""
queues = [1, 2, 4, 8]
- with TestPmdShell(
+ shell = TestPmdShell(
self.sut_node,
lcore_filter_specifier=LogicalCoreCount(cores_per_socket=4),
forward_mode=SimpleForwardingModes.mac,
eth_peer=[EthPeer(1, self.tg_node.ports[1].mac_address)],
disable_device_start=True,
- ) as shell:
- for queues_num in queues:
- self._logger.info(f"Testing L2 forwarding with {queues_num} queue(s)")
- shell.set_ports_queues(queues_num)
- shell.start()
+ )
+ for queues_num in queues:
+ self._logger.info(f"Testing L2 forwarding with {queues_num} queue(s)")
+ shell.set_ports_queues(queues_num)
+ shell.start()
- received_packets = self.send_packets_and_capture(self.packets)
- expected_packets = self.get_expected_packets(self.packets)
- self.match_all_packets(expected_packets, received_packets)
+ received_packets = self.send_packets_and_capture(self.packets)
+ expected_packets = self.get_expected_packets(self.packets)
+ self.match_all_packets(expected_packets, received_packets)
- shell.stop()
+ shell.stop()
@@ -101,22 +101,22 @@ def test_add_remove_mac_addresses(self) -> None:
Remove the fake mac address from the PMD's address pool.
Send a packet with the fake mac address to the PMD. (Should not receive)
"""
- with TestPmdShell(self.sut_node) as testpmd:
- testpmd.set_promisc(0, enable=False)
- testpmd.start()
- mac_address = self._sut_port_ingress.mac_address
-
- # Send a packet with NIC default mac address
- self.send_packet_and_verify(mac_address=mac_address, should_receive=True)
- # Send a packet with different mac address
- fake_address = "00:00:00:00:00:01"
- self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
-
- # Add mac address to pool and rerun tests
- testpmd.set_mac_addr(0, mac_address=fake_address, add=True)
- self.send_packet_and_verify(mac_address=fake_address, should_receive=True)
- testpmd.set_mac_addr(0, mac_address=fake_address, add=False)
- self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
+ testpmd = TestPmdShell(self.sut_node)
+ testpmd.set_promisc(0, enable=False)
+ testpmd.start()
+ mac_address = self._sut_port_ingress.mac_address
+
+ # Send a packet with NIC default mac address
+ self.send_packet_and_verify(mac_address=mac_address, should_receive=True)
+ # Send a packet with different mac address
+ fake_address = "00:00:00:00:00:01"
+ self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
+
+ # Add mac address to pool and rerun tests
+ testpmd.set_mac_addr(0, mac_address=fake_address, add=True)
+ self.send_packet_and_verify(mac_address=fake_address, should_receive=True)
+ testpmd.set_mac_addr(0, mac_address=fake_address, add=False)
+ self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
@func_test
def test_invalid_address(self) -> None:
@@ -137,44 +137,42 @@ def test_invalid_address(self) -> None:
Determine the device's mac address pool size, and fill the pool with fake addresses.
Attempt to add another fake mac address, overloading the address pool. (Should fail)
"""
- with TestPmdShell(self.sut_node) as testpmd:
- testpmd.start()
- mac_address = self._sut_port_ingress.mac_address
- try:
- testpmd.set_mac_addr(0, "00:00:00:00:00:00", add=True)
- self.verify(False, "Invalid mac address added.")
- except InteractiveCommandExecutionError:
- pass
- try:
- testpmd.set_mac_addr(0, mac_address, add=False)
- self.verify(False, "Default mac address removed.")
- except InteractiveCommandExecutionError:
- pass
- # Should be no errors adding this twice
- testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
- testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
- # Double check to see if default mac address can be removed
- try:
- testpmd.set_mac_addr(0, mac_address, add=False)
- self.verify(False, "Default mac address removed.")
- except InteractiveCommandExecutionError:
- pass
-
- for i in range(testpmd.show_port_info(0).max_mac_addresses_num - 1):
- # A0 fake address based on the index 'i'.
- fake_address = str(hex(i)[2:].zfill(12))
- # Insert ':' characters every two indexes to create a fake mac address.
- fake_address = ":".join(
- fake_address[x : x + 2] for x in range(0, len(fake_address), 2)
- )
- testpmd.set_mac_addr(0, fake_address, add=True, verify=False)
- try:
- testpmd.set_mac_addr(0, "E" + mac_address[1:], add=True)
- # We add an extra address to compensate for mac address pool inconsistencies.
- testpmd.set_mac_addr(0, "F" + mac_address[1:], add=True)
- self.verify(False, "Mac address limit exceeded.")
- except InteractiveCommandExecutionError:
- pass
+ testpmd = TestPmdShell(self.sut_node)
+ testpmd.start()
+ mac_address = self._sut_port_ingress.mac_address
+ try:
+ testpmd.set_mac_addr(0, "00:00:00:00:00:00", add=True)
+ self.verify(False, "Invalid mac address added.")
+ except InteractiveCommandExecutionError:
+ pass
+ try:
+ testpmd.set_mac_addr(0, mac_address, add=False)
+ self.verify(False, "Default mac address removed.")
+ except InteractiveCommandExecutionError:
+ pass
+ # Should be no errors adding this twice
+ testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
+ testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
+ # Double check to see if default mac address can be removed
+ try:
+ testpmd.set_mac_addr(0, mac_address, add=False)
+ self.verify(False, "Default mac address removed.")
+ except InteractiveCommandExecutionError:
+ pass
+
+ for i in range(testpmd.show_port_info(0).max_mac_addresses_num - 1):
+ # A0 fake address based on the index 'i'.
+ fake_address = str(hex(i)[2:].zfill(12))
+ # Insert ':' characters every two indexes to create a fake mac address.
+ fake_address = ":".join(fake_address[x : x + 2] for x in range(0, len(fake_address), 2))
+ testpmd.set_mac_addr(0, fake_address, add=True, verify=False)
+ try:
+ testpmd.set_mac_addr(0, "E" + mac_address[1:], add=True)
+ # We add an extra address to compensate for mac address pool inconsistencies.
+ testpmd.set_mac_addr(0, "F" + mac_address[1:], add=True)
+ self.verify(False, "Mac address limit exceeded.")
+ except InteractiveCommandExecutionError:
+ pass
@requires(NicCapability.MCAST_FILTERING)
@func_test
@@ -191,14 +189,14 @@ def test_multicast_filter(self) -> None:
Remove the fake multicast address from the PMDs multicast address filter.
Send a packet with the fake multicast address to the PMD. (Should not receive)
"""
- with TestPmdShell(self.sut_node) as testpmd:
- testpmd.start()
- testpmd.set_promisc(0, enable=False)
- multicast_address = "01:00:5E:00:00:00"
+ testpmd = TestPmdShell(self.sut_node)
+ testpmd.set_promisc(0, enable=False)
+ multicast_address = "01:00:5E:00:00:00"
- testpmd.set_multicast_mac_addr(0, multi_addr=multicast_address, add=True)
- self.send_packet_and_verify(multicast_address, should_receive=True)
+ testpmd.set_multicast_mac_addr(0, multi_addr=multicast_address, add=True)
+ testpmd.start()
+ self.send_packet_and_verify(multicast_address, should_receive=True)
- # Remove multicast filter and verify the packet was not received.
- testpmd.set_multicast_mac_addr(0, multicast_address, add=False)
- self.send_packet_and_verify(multicast_address, should_receive=False)
+ # Remove multicast filter and verify the packet was not received.
+ testpmd.set_multicast_mac_addr(0, multicast_address, add=False)
+ self.send_packet_and_verify(multicast_address, should_receive=False)
@@ -103,26 +103,24 @@ def pmd_scatter(self, mbsize: int) -> None:
Test:
Start testpmd and run functional test with preset mbsize.
"""
- with TestPmdShell(
+ testpmd = TestPmdShell(
self.sut_node,
forward_mode=SimpleForwardingModes.mac,
mbcache=200,
mbuf_size=[mbsize],
max_pkt_len=9000,
tx_offloads=0x00008000,
- ) as testpmd:
- testpmd.start()
-
- for offset in [-1, 0, 1, 4, 5]:
- recv_payload = self.scatter_pktgen_send_packet(mbsize + offset)
- self._logger.debug(
- f"Payload of scattered packet after forwarding: \n{recv_payload}"
- )
- self.verify(
- ("58 " * 8).strip() in recv_payload,
- "Payload of scattered packet did not match expected payload with offset "
- f"{offset}.",
- )
+ )
+ testpmd.start()
+
+ for offset in [-1, 0, 1, 4, 5]:
+ recv_payload = self.scatter_pktgen_send_packet(mbsize + offset)
+ self._logger.debug(f"Payload of scattered packet after forwarding: \n{recv_payload}")
+ self.verify(
+ ("58 " * 8).strip() in recv_payload,
+ "Payload of scattered packet did not match expected payload with offset "
+ f"{offset}.",
+ )
@requires(NicCapability.SCATTERED_RX_ENABLED)
@func_test
@@ -104,8 +104,8 @@ def test_devices_listed_in_testpmd(self) -> None:
Test:
List all devices found in testpmd and verify the configured devices are among them.
"""
- with TestPmdShell(self.sut_node) as testpmd:
- dev_list = [str(x) for x in testpmd.get_devices()]
+ testpmd = TestPmdShell(self.sut_node)
+ dev_list = [str(x) for x in testpmd.get_devices()]
for nic in self.nics_in_node:
self.verify(
nic.pci in dev_list,
@@ -124,10 +124,10 @@ def test_vlan_receipt_no_stripping(self) -> None:
Test:
Create an interactive testpmd shell and verify a VLAN packet.
"""
- with TestPmdShell(node=self.sut_node) as testpmd:
- self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
- testpmd.start()
- self.send_vlan_packet_and_verify(True, strip=False, vlan_id=1)
+ testpmd = TestPmdShell(node=self.sut_node)
+ self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
+ testpmd.start()
+ self.send_vlan_packet_and_verify(True, strip=False, vlan_id=1)
@requires(NicCapability.RX_OFFLOAD_VLAN_STRIP)
@func_test
@@ -137,11 +137,11 @@ def test_vlan_receipt_stripping(self) -> None:
Test:
Create an interactive testpmd shell and verify a VLAN packet.
"""
- with TestPmdShell(node=self.sut_node) as testpmd:
- self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
- testpmd.set_vlan_strip(port=0, enable=True)
- testpmd.start()
- self.send_vlan_packet_and_verify(should_receive=True, strip=True, vlan_id=1)
+ testpmd = TestPmdShell(node=self.sut_node)
+ self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
+ testpmd.set_vlan_strip(port=0, enable=True)
+ testpmd.start()
+ self.send_vlan_packet_and_verify(should_receive=True, strip=True, vlan_id=1)
@func_test
def test_vlan_no_receipt(self) -> None:
@@ -150,10 +150,10 @@ def test_vlan_no_receipt(self) -> None:
Test:
Create an interactive testpmd shell and verify a VLAN packet.
"""
- with TestPmdShell(node=self.sut_node) as testpmd:
- self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
- testpmd.start()
- self.send_vlan_packet_and_verify(should_receive=False, strip=False, vlan_id=2)
+ testpmd = TestPmdShell(node=self.sut_node)
+ self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
+ testpmd.start()
+ self.send_vlan_packet_and_verify(should_receive=False, strip=False, vlan_id=2)
@func_test
def test_vlan_header_insertion(self) -> None:
@@ -162,11 +162,11 @@ def test_vlan_header_insertion(self) -> None:
Test:
Create an interactive testpmd shell and verify a non-VLAN packet.
"""
- with TestPmdShell(node=self.sut_node) as testpmd:
- testpmd.set_forward_mode(SimpleForwardingModes.mac)
- testpmd.set_promisc(port=0, enable=False)
- testpmd.stop_all_ports()
- testpmd.tx_vlan_set(port=1, enable=True, vlan=51)
- testpmd.start_all_ports()
- testpmd.start()
- self.send_packet_and_verify_insertion(expected_id=51)
+ testpmd = TestPmdShell(node=self.sut_node)
+ testpmd.set_forward_mode(SimpleForwardingModes.mac)
+ testpmd.set_promisc(port=0, enable=False)
+ testpmd.stop_all_ports()
+ testpmd.tx_vlan_set(port=1, enable=True, vlan=51)
+ testpmd.start_all_ports()
+ testpmd.start()
+ self.send_packet_and_verify_insertion(expected_id=51)