[RFC,1/2] dts: add scoping and shell registration to Node

Message ID 20241220172429.2194811-1-luca.vizzarro@arm.com (mailing list archive)
State New
Delegated to: Paul Szczepanek
Series dts: add basic scope to improve shell handling |


Context Check Description
ci/loongarch-compilation warning apply patch failure
ci/checkpatch success coding style OK
ci/iol-testing warning apply patch failure

Commit Message

Luca Vizzarro Dec. 20, 2024, 5:24 p.m. UTC
Add a basic scoping mechanism to Nodes, to improve the control over
test suite led environmental changes. Moreover, keep a pool of active
shells based on scope, therefore allowing shells to register

Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
 .../single_active_interactive_shell.py        |   2 +
 dts/framework/runner.py                       |  15 +-
 dts/framework/testbed_model/capability.py     |  35 ++--
 dts/framework/testbed_model/node.py           |  65 ++++++-
 dts/framework/testbed_model/sut_node.py       |  14 +-
 dts/tests/TestSuite_blocklist.py              |  16 +-
 dts/tests/TestSuite_checksum_offload.py       | 168 +++++++++---------
 dts/tests/TestSuite_dynamic_queue_conf.py     |  52 +++---
 dts/tests/TestSuite_l2fwd.py                  |  20 +--
 dts/tests/TestSuite_mac_filter.py             | 124 +++++++------
 dts/tests/TestSuite_pmd_buffer_scatter.py     |  26 ++-
 dts/tests/TestSuite_smoke_tests.py            |   4 +-
 dts/tests/TestSuite_vlan.py                   |  42 ++---
 13 files changed, 333 insertions(+), 250 deletions(-)


diff --git a/dts/framework/remote_session/single_active_interactive_shell.py b/dts/framework/remote_session/single_active_interactive_shell.py
index c43c54e457..910af8f655 100644
--- a/dts/framework/remote_session/single_active_interactive_shell.py
+++ b/dts/framework/remote_session/single_active_interactive_shell.py
@@ -112,7 +112,9 @@  def __init__(
                 the name of the underlying node which it is running on.
             **kwargs: Any additional arguments if any.
+        node.register_shell(self)
         self._node = node
         if name is None:
             name = type(self).__name__
         self._logger = get_dts_logger(f"{node.name}.{name}")
diff --git a/dts/framework/runner.py b/dts/framework/runner.py
index 510be1a870..fd3a934a9f 100644
--- a/dts/framework/runner.py
+++ b/dts/framework/runner.py
@@ -460,6 +460,10 @@  def _run_test_suite(
             DtsStage.test_suite_setup, Path(SETTINGS.output_dir, test_suite_name)
         test_suite = test_suite_with_cases.test_suite_class(sut_node, tg_node, topology)
+        sut_node.enter_scope("suite")
+        tg_node.enter_scope("suite")
             self._logger.info(f"Starting test suite setup: {test_suite_name}")
@@ -479,7 +483,6 @@  def _run_test_suite(
-                sut_node.kill_cleanup_dpdk_apps()
             except Exception as e:
                 self._logger.exception(f"Test suite teardown ERROR: {test_suite_name}")
@@ -488,6 +491,10 @@  def _run_test_suite(
                     "the next test suite may be affected."
                 test_suite_result.update_setup(Result.ERROR, e)
+            sut_node.exit_scope()
+            tg_node.exit_scope()
             if len(test_suite_result.get_errors()) > 0 and test_suite.is_blocking:
                 raise BlockingTestSuiteError(test_suite_name)
@@ -511,6 +518,9 @@  def _execute_test_suite(
         for test_case in test_cases:
+            test_suite.sut_node.enter_scope("case")
+            test_suite.tg_node.enter_scope("case")
             test_case_name = test_case.__name__
             test_case_result = test_suite_result.add_test_case(test_case_name)
             all_attempts = SETTINGS.re_run + 1
@@ -531,6 +541,9 @@  def _execute_test_suite(
+            test_suite.sut_node.exit_scope()
+            test_suite.tg_node.exit_scope()
     def _run_test_case(
         test_suite: TestSuite,
diff --git a/dts/framework/testbed_model/capability.py b/dts/framework/testbed_model/capability.py
index 6a7a1f5b6c..e883f59d11 100644
--- a/dts/framework/testbed_model/capability.py
+++ b/dts/framework/testbed_model/capability.py
@@ -221,24 +221,23 @@  def get_supported_capabilities(
         if cls.capabilities_to_check:
             capabilities_to_check_map = cls._get_decorated_capabilities_map()
-            with TestPmdShell(
-                sut_node, privileged=True, disable_device_start=True
-            ) as testpmd_shell:
-                for (
-                    conditional_capability_fn,
-                    capabilities,
-                ) in capabilities_to_check_map.items():
-                    supported_capabilities: set[NicCapability] = set()
-                    unsupported_capabilities: set[NicCapability] = set()
-                    capability_fn = cls._reduce_capabilities(
-                        capabilities, supported_capabilities, unsupported_capabilities
-                    )
-                    if conditional_capability_fn:
-                        capability_fn = conditional_capability_fn(capability_fn)
-                    capability_fn(testpmd_shell)
-                    for capability in capabilities:
-                        if capability.nic_capability in supported_capabilities:
-                            supported_conditional_capabilities.add(capability)
+            testpmd_shell = TestPmdShell(sut_node, privileged=True, disable_device_start=True)
+            for (
+                conditional_capability_fn,
+                capabilities,
+            ) in capabilities_to_check_map.items():
+                supported_capabilities: set[NicCapability] = set()
+                unsupported_capabilities: set[NicCapability] = set()
+                capability_fn = cls._reduce_capabilities(
+                    capabilities, supported_capabilities, unsupported_capabilities
+                )
+                if conditional_capability_fn:
+                    capability_fn = conditional_capability_fn(capability_fn)
+                capability_fn(testpmd_shell)
+                for capability in capabilities:
+                    if capability.nic_capability in supported_capabilities:
+                        supported_conditional_capabilities.add(capability)
+            testpmd_shell._close()
         logger.debug(f"Found supported capabilities {supported_conditional_capabilities}.")
         return supported_conditional_capabilities
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index c6f12319ca..4f06968adc 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -14,6 +14,7 @@ 
 from abc import ABC
+from typing import TYPE_CHECKING, Literal, TypeVar
 from framework.config import (
@@ -21,7 +22,7 @@ 
-from framework.exception import ConfigurationError
+from framework.exception import ConfigurationError, InternalError
 from framework.logger import DTSLogger, get_dts_logger
 from .cpu import (
@@ -35,6 +36,15 @@ 
 from .os_session import OSSession
 from .port import Port
+    from framework.remote_session.single_active_interactive_shell import (
+        SingleActiveInteractiveShell,
+    )
+T = TypeVar("T")
+Scope = Literal["unknown", "suite", "case"]
+ScopedShell = tuple[Scope, SingleActiveInteractiveShell]
 class Node(ABC):
     """The base class for node management.
@@ -62,6 +72,8 @@  class Node(ABC):
     _logger: DTSLogger
     _other_sessions: list[OSSession]
     _test_run_config: TestRunConfiguration
+    _active_shells: list[ScopedShell]
+    _scope_stack: list[Scope]
     def __init__(self, node_config: NodeConfiguration):
         """Connect to the node and gather info during initialization.
@@ -90,6 +102,8 @@  def __init__(self, node_config: NodeConfiguration):
         self._other_sessions = []
+        self._active_shells = []
+        self._scope_stack = []
     def _init_ports(self) -> None:
         self.ports = [Port(self.name, port_config) for port_config in self.config.ports]
@@ -119,6 +133,55 @@  def tear_down_test_run(self) -> None:
         Additional steps can be added by extending the method in subclasses with the use of super().
+    @property
+    def current_scope(self) -> Scope:
+        """The current scope of the test run."""
+        try:
+            return self._scope_stack[-1]
+        except IndexError:
+            return "unknown"
+    def enter_scope(self, next_scope: Scope) -> None:
+        """Prepare the node for a new testing scope."""
+        self._scope_stack.append(next_scope)
+    def exit_scope(self) -> Scope:
+        """Clean up the node after the current testing scope.
+        This method must guarantee to never fail from a Node failure during runtime.
+        Returns:
+            The scope before exiting.
+        Raises:
+            InternalError: If there was no scope to exit from.
+        """
+        try:
+            current_scope = self._scope_stack.pop()
+        except IndexError:
+            raise InternalError("Attempted to exit a scope when the node wasn't in any.")
+        else:
+            self.clean_up_shells(current_scope)
+            return current_scope
+    def register_shell(self, shell: SingleActiveInteractiveShell) -> None:
+        """Register a new shell to the pool of active shells."""
+        self._active_shells.append((self.current_scope, shell))
+    def find_active_shell(self, shell_class: type[T]) -> T | None:
+        """Retrieve an active shell of a specific class."""
+        return next(sh for _, sh in self._active_shells if type(sh) is shell_class)
+    def clean_up_shells(self, scope: Scope) -> None:
+        """Clean up shells from the given `scope`."""
+        zombie_shells_indices = [
+            i for i, (shell_scope, _) in enumerate(self._active_shells) if scope == shell_scope
+        ]
+        for i in reversed(zombie_shells_indices):
+            self._active_shells[i][1]._close()
+            del self._active_shells[i]
     def create_session(self, name: str) -> OSSession:
         """Create and return a new OS-aware remote session.
diff --git a/dts/framework/testbed_model/sut_node.py b/dts/framework/testbed_model/sut_node.py
index a9dc0a474a..3427596bd0 100644
--- a/dts/framework/testbed_model/sut_node.py
+++ b/dts/framework/testbed_model/sut_node.py
@@ -33,7 +33,7 @@ 
 from framework.remote_session.remote_session import CommandResult
 from framework.utils import MesonArgs, TarCompressionFormat
-from .node import Node
+from .node import Node, Scope
 from .os_session import OSSession, OSSessionInfo
 from .virtual_device import VirtualDevice
@@ -458,7 +458,7 @@  def build_dpdk_app(self, app_name: str, **meson_dpdk_args: str | bool) -> PurePa
             self.remote_dpdk_build_dir, "examples", f"dpdk-{app_name}"
-    def kill_cleanup_dpdk_apps(self) -> None:
+    def _kill_cleanup_dpdk_apps(self) -> None:
         """Kill all dpdk applications on the SUT, then clean up hugepages."""
         if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
             # we can use the session if it exists and responds
@@ -468,6 +468,16 @@  def kill_cleanup_dpdk_apps(self) -> None:
             self._dpdk_kill_session = self.create_session("dpdk_kill")
         self.dpdk_prefix_list = []
+    def exit_scope(self) -> Scope:
+        """Extend :meth:`~.node.Node.exit_test_suite_scope`.
+        Add the DPDK apps clean up.
+        """
+        previous_scope = super().exit_scope()
+        if previous_scope == "suite":
+            self._kill_cleanup_dpdk_apps()
+        return previous_scope
     def run_dpdk_app(
         self, app_path: PurePath, eal_params: EalParams, timeout: float = 30
     ) -> CommandResult:
diff --git a/dts/tests/TestSuite_blocklist.py b/dts/tests/TestSuite_blocklist.py
index b9e9cd1d1a..edce042f38 100644
--- a/dts/tests/TestSuite_blocklist.py
+++ b/dts/tests/TestSuite_blocklist.py
@@ -18,16 +18,16 @@  class TestBlocklist(TestSuite):
     def verify_blocklisted_ports(self, ports_to_block: list[Port]):
         """Runs testpmd with the given ports blocklisted and verifies the ports."""
-        with TestPmdShell(self.sut_node, allowed_ports=[], blocked_ports=ports_to_block) as testpmd:
-            allowlisted_ports = {port.device_name for port in testpmd.show_port_info_all()}
-            blocklisted_ports = {port.pci for port in ports_to_block}
+        testpmd = TestPmdShell(self.sut_node, allowed_ports=[], blocked_ports=ports_to_block)
+        allowlisted_ports = {port.device_name for port in testpmd.show_port_info_all()}
+        blocklisted_ports = {port.pci for port in ports_to_block}
-            # sanity check
-            allowed_len = len(allowlisted_ports - blocklisted_ports)
-            self.verify(allowed_len > 0, "At least one port should have been allowed")
+        # sanity check
+        allowed_len = len(allowlisted_ports - blocklisted_ports)
+        self.verify(allowed_len > 0, "At least one port should have been allowed")
-            blocked = not allowlisted_ports & blocklisted_ports
-            self.verify(blocked, "At least one port was not blocklisted")
+        blocked = not allowlisted_ports & blocklisted_ports
+        self.verify(blocked, "At least one port was not blocklisted")
     def no_blocklisted(self):
diff --git a/dts/tests/TestSuite_checksum_offload.py b/dts/tests/TestSuite_checksum_offload.py
index c1680bd388..58b9609849 100644
--- a/dts/tests/TestSuite_checksum_offload.py
+++ b/dts/tests/TestSuite_checksum_offload.py
@@ -117,16 +117,16 @@  def test_insert_checksums(self) -> None:
             Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload),
             Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            self.setup_hw_offload(testpmd=testpmd)
-            testpmd.start()
-            self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
-            for i in range(0, len(packet_list)):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
-                )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        self.setup_hw_offload(testpmd=testpmd)
+        testpmd.start()
+        self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
+        for i in range(0, len(packet_list)):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+            )
     def test_no_insert_checksums(self) -> None:
@@ -139,15 +139,15 @@  def test_no_insert_checksums(self) -> None:
             Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload),
             Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            testpmd.start()
-            self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
-            for i in range(0, len(packet_list)):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
-                )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        testpmd.start()
+        self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
+        for i in range(0, len(packet_list)):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+            )
     def test_l4_rx_checksum(self) -> None:
@@ -159,18 +159,18 @@  def test_l4_rx_checksum(self) -> None:
             Ether(dst=mac_id) / IP() / UDP(chksum=0xF),
             Ether(dst=mac_id) / IP() / TCP(chksum=0xF),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            self.setup_hw_offload(testpmd=testpmd)
-            for i in range(0, 2):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
-                )
-            for i in range(2, 4):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
-                )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        self.setup_hw_offload(testpmd=testpmd)
+        for i in range(0, 2):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+            )
+        for i in range(2, 4):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+            )
     def test_l3_rx_checksum(self) -> None:
@@ -182,18 +182,18 @@  def test_l3_rx_checksum(self) -> None:
             Ether(dst=mac_id) / IP(chksum=0xF) / UDP(),
             Ether(dst=mac_id) / IP(chksum=0xF) / TCP(),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            self.setup_hw_offload(testpmd=testpmd)
-            for i in range(0, 2):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
-                )
-            for i in range(2, 4):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=True, goodIP=False, testpmd=testpmd, id=mac_id
-                )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        self.setup_hw_offload(testpmd=testpmd)
+        for i in range(0, 2):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+            )
+        for i in range(2, 4):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=True, goodIP=False, testpmd=testpmd, id=mac_id
+            )
     def test_validate_rx_checksum(self) -> None:
@@ -209,22 +209,22 @@  def test_validate_rx_checksum(self) -> None:
             Ether(dst=mac_id) / IPv6(src="::1") / UDP(chksum=0xF),
             Ether(dst=mac_id) / IPv6(src="::1") / TCP(chksum=0xF),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            self.setup_hw_offload(testpmd=testpmd)
-            for i in range(0, 4):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
-                )
-            for i in range(4, 6):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
-                )
-            for i in range(6, 8):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
-                )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        self.setup_hw_offload(testpmd=testpmd)
+        for i in range(0, 4):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+            )
+        for i in range(4, 6):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
+            )
+        for i in range(6, 8):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+            )
@@ -238,20 +238,20 @@  def test_vlan_checksum(self) -> None:
             Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / UDP(chksum=0xF) / Raw(payload),
             Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / TCP(chksum=0xF) / Raw(payload),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            self.setup_hw_offload(testpmd=testpmd)
-            testpmd.start()
-            self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
-            for i in range(0, 2):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
-                )
-            for i in range(2, 4):
-                self.send_packet_and_verify_checksum(
-                    packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
-                )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        self.setup_hw_offload(testpmd=testpmd)
+        testpmd.start()
+        self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
+        for i in range(0, 2):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
+            )
+        for i in range(2, 4):
+            self.send_packet_and_verify_checksum(
+                packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+            )
@@ -262,14 +262,14 @@  def test_validate_sctp_checksum(self) -> None:
             Ether(dst=mac_id) / IP() / SCTP(),
             Ether(dst=mac_id) / IP() / SCTP(chksum=0xF),
-        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.csum)
-            testpmd.set_verbose(level=1)
-            testpmd.csum_set_hw(layers=ChecksumOffloadOptions.sctp)
-            testpmd.start()
-            self.send_packet_and_verify_checksum(
-                packet=packet_list[0], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
-            )
-            self.send_packet_and_verify_checksum(
-                packet=packet_list[1], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
-            )
+        testpmd = TestPmdShell(node=self.sut_node, enable_rx_cksum=True)
+        testpmd.set_forward_mode(SimpleForwardingModes.csum)
+        testpmd.set_verbose(level=1)
+        testpmd.csum_set_hw(layers=ChecksumOffloadOptions.sctp)
+        testpmd.start()
+        self.send_packet_and_verify_checksum(
+            packet=packet_list[0], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
+        )
+        self.send_packet_and_verify_checksum(
+            packet=packet_list[1], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
+        )
diff --git a/dts/tests/TestSuite_dynamic_queue_conf.py b/dts/tests/TestSuite_dynamic_queue_conf.py
index e55716f545..caf820151e 100644
--- a/dts/tests/TestSuite_dynamic_queue_conf.py
+++ b/dts/tests/TestSuite_dynamic_queue_conf.py
@@ -83,37 +83,37 @@  def wrap(self: "TestDynamicQueueConf", is_rx_testing: bool) -> None:
         while len(queues_to_config) < self.num_ports_to_modify:
             queues_to_config.add(random.randint(1, self.number_of_queues - 1))
         unchanged_queues = set(range(self.number_of_queues)) - queues_to_config
-        with TestPmdShell(
+        testpmd = TestPmdShell(
-        ) as testpmd:
-            for q in queues_to_config:
-                testpmd.stop_port_queue(port_id, q, is_rx_testing)
-            testpmd.set_forward_mode(SimpleForwardingModes.mac)
-            test_meth(
-                self,
-                port_id,
-                queues_to_config,
-                unchanged_queues,
-                testpmd,
-                is_rx_testing,
-            )
-            for queue_id in queues_to_config:
-                testpmd.start_port_queue(port_id, queue_id, is_rx_testing)
+        )
+        for q in queues_to_config:
+            testpmd.stop_port_queue(port_id, q, is_rx_testing)
+        testpmd.set_forward_mode(SimpleForwardingModes.mac)
+        test_meth(
+            self,
+            port_id,
+            queues_to_config,
+            unchanged_queues,
+            testpmd,
+            is_rx_testing,
+        )
+        for queue_id in queues_to_config:
+            testpmd.start_port_queue(port_id, queue_id, is_rx_testing)
-            testpmd.start()
-            self.send_packets_with_different_addresses(self.number_of_packets_to_send)
-            forwarding_stats = testpmd.stop()
-            for queue_id in queues_to_config:
-                self.verify(
-                    self.port_queue_in_stats(port_id, is_rx_testing, queue_id, forwarding_stats),
-                    f"Modified queue {queue_id} on port {port_id} failed to receive traffic after"
-                    "being started again.",
-                )
+        testpmd.start()
+        self.send_packets_with_different_addresses(self.number_of_packets_to_send)
+        forwarding_stats = testpmd.stop()
+        for queue_id in queues_to_config:
+            self.verify(
+                self.port_queue_in_stats(port_id, is_rx_testing, queue_id, forwarding_stats),
+                f"Modified queue {queue_id} on port {port_id} failed to receive traffic after"
+                "being started again.",
+            )
     return wrap
diff --git a/dts/tests/TestSuite_l2fwd.py b/dts/tests/TestSuite_l2fwd.py
index 0f6ff18907..9acc4365ea 100644
--- a/dts/tests/TestSuite_l2fwd.py
+++ b/dts/tests/TestSuite_l2fwd.py
@@ -44,20 +44,20 @@  def l2fwd_integrity(self) -> None:
         queues = [1, 2, 4, 8]
-        with TestPmdShell(
+        shell = TestPmdShell(
             eth_peer=[EthPeer(1, self.tg_node.ports[1].mac_address)],
-        ) as shell:
-            for queues_num in queues:
-                self._logger.info(f"Testing L2 forwarding with {queues_num} queue(s)")
-                shell.set_ports_queues(queues_num)
-                shell.start()
+        )
+        for queues_num in queues:
+            self._logger.info(f"Testing L2 forwarding with {queues_num} queue(s)")
+            shell.set_ports_queues(queues_num)
+            shell.start()
-                received_packets = self.send_packets_and_capture(self.packets)
-                expected_packets = self.get_expected_packets(self.packets)
-                self.match_all_packets(expected_packets, received_packets)
+            received_packets = self.send_packets_and_capture(self.packets)
+            expected_packets = self.get_expected_packets(self.packets)
+            self.match_all_packets(expected_packets, received_packets)
-                shell.stop()
+            shell.stop()
diff --git a/dts/tests/TestSuite_mac_filter.py b/dts/tests/TestSuite_mac_filter.py
index 11e4b595c7..ac9ceefd85 100644
--- a/dts/tests/TestSuite_mac_filter.py
+++ b/dts/tests/TestSuite_mac_filter.py
@@ -101,22 +101,22 @@  def test_add_remove_mac_addresses(self) -> None:
             Remove the fake mac address from the PMD's address pool.
             Send a packet with the fake mac address to the PMD. (Should not receive)
-        with TestPmdShell(self.sut_node) as testpmd:
-            testpmd.set_promisc(0, enable=False)
-            testpmd.start()
-            mac_address = self._sut_port_ingress.mac_address
-            # Send a packet with NIC default mac address
-            self.send_packet_and_verify(mac_address=mac_address, should_receive=True)
-            # Send a packet with different mac address
-            fake_address = "00:00:00:00:00:01"
-            self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
-            # Add mac address to pool and rerun tests
-            testpmd.set_mac_addr(0, mac_address=fake_address, add=True)
-            self.send_packet_and_verify(mac_address=fake_address, should_receive=True)
-            testpmd.set_mac_addr(0, mac_address=fake_address, add=False)
-            self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
+        testpmd = TestPmdShell(self.sut_node)
+        testpmd.set_promisc(0, enable=False)
+        testpmd.start()
+        mac_address = self._sut_port_ingress.mac_address
+        # Send a packet with NIC default mac address
+        self.send_packet_and_verify(mac_address=mac_address, should_receive=True)
+        # Send a packet with different mac address
+        fake_address = "00:00:00:00:00:01"
+        self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
+        # Add mac address to pool and rerun tests
+        testpmd.set_mac_addr(0, mac_address=fake_address, add=True)
+        self.send_packet_and_verify(mac_address=fake_address, should_receive=True)
+        testpmd.set_mac_addr(0, mac_address=fake_address, add=False)
+        self.send_packet_and_verify(mac_address=fake_address, should_receive=False)
     def test_invalid_address(self) -> None:
@@ -137,44 +137,42 @@  def test_invalid_address(self) -> None:
             Determine the device's mac address pool size, and fill the pool with fake addresses.
             Attempt to add another fake mac address, overloading the address pool. (Should fail)
-        with TestPmdShell(self.sut_node) as testpmd:
-            testpmd.start()
-            mac_address = self._sut_port_ingress.mac_address
-            try:
-                testpmd.set_mac_addr(0, "00:00:00:00:00:00", add=True)
-                self.verify(False, "Invalid mac address added.")
-            except InteractiveCommandExecutionError:
-                pass
-            try:
-                testpmd.set_mac_addr(0, mac_address, add=False)
-                self.verify(False, "Default mac address removed.")
-            except InteractiveCommandExecutionError:
-                pass
-            # Should be no errors adding this twice
-            testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
-            testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
-            # Double check to see if default mac address can be removed
-            try:
-                testpmd.set_mac_addr(0, mac_address, add=False)
-                self.verify(False, "Default mac address removed.")
-            except InteractiveCommandExecutionError:
-                pass
-            for i in range(testpmd.show_port_info(0).max_mac_addresses_num - 1):
-                # A0 fake address based on the index 'i'.
-                fake_address = str(hex(i)[2:].zfill(12))
-                # Insert ':' characters every two indexes to create a fake mac address.
-                fake_address = ":".join(
-                    fake_address[x : x + 2] for x in range(0, len(fake_address), 2)
-                )
-                testpmd.set_mac_addr(0, fake_address, add=True, verify=False)
-            try:
-                testpmd.set_mac_addr(0, "E" + mac_address[1:], add=True)
-                # We add an extra address to compensate for mac address pool inconsistencies.
-                testpmd.set_mac_addr(0, "F" + mac_address[1:], add=True)
-                self.verify(False, "Mac address limit exceeded.")
-            except InteractiveCommandExecutionError:
-                pass
+        testpmd = TestPmdShell(self.sut_node)
+        testpmd.start()
+        mac_address = self._sut_port_ingress.mac_address
+        try:
+            testpmd.set_mac_addr(0, "00:00:00:00:00:00", add=True)
+            self.verify(False, "Invalid mac address added.")
+        except InteractiveCommandExecutionError:
+            pass
+        try:
+            testpmd.set_mac_addr(0, mac_address, add=False)
+            self.verify(False, "Default mac address removed.")
+        except InteractiveCommandExecutionError:
+            pass
+        # Should be no errors adding this twice
+        testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
+        testpmd.set_mac_addr(0, "1" + mac_address[1:], add=True)
+        # Double check to see if default mac address can be removed
+        try:
+            testpmd.set_mac_addr(0, mac_address, add=False)
+            self.verify(False, "Default mac address removed.")
+        except InteractiveCommandExecutionError:
+            pass
+        for i in range(testpmd.show_port_info(0).max_mac_addresses_num - 1):
+            # A0 fake address based on the index 'i'.
+            fake_address = str(hex(i)[2:].zfill(12))
+            # Insert ':' characters every two indexes to create a fake mac address.
+            fake_address = ":".join(fake_address[x : x + 2] for x in range(0, len(fake_address), 2))
+            testpmd.set_mac_addr(0, fake_address, add=True, verify=False)
+        try:
+            testpmd.set_mac_addr(0, "E" + mac_address[1:], add=True)
+            # We add an extra address to compensate for mac address pool inconsistencies.
+            testpmd.set_mac_addr(0, "F" + mac_address[1:], add=True)
+            self.verify(False, "Mac address limit exceeded.")
+        except InteractiveCommandExecutionError:
+            pass
@@ -191,14 +189,14 @@  def test_multicast_filter(self) -> None:
             Remove the fake multicast address from the PMDs multicast address filter.
             Send a packet with the fake multicast address to the PMD. (Should not receive)
-        with TestPmdShell(self.sut_node) as testpmd:
-            testpmd.start()
-            testpmd.set_promisc(0, enable=False)
-            multicast_address = "01:00:5E:00:00:00"
+        testpmd = TestPmdShell(self.sut_node)
+        testpmd.set_promisc(0, enable=False)
+        multicast_address = "01:00:5E:00:00:00"
-            testpmd.set_multicast_mac_addr(0, multi_addr=multicast_address, add=True)
-            self.send_packet_and_verify(multicast_address, should_receive=True)
+        testpmd.set_multicast_mac_addr(0, multi_addr=multicast_address, add=True)
+        testpmd.start()
+        self.send_packet_and_verify(multicast_address, should_receive=True)
-            # Remove multicast filter and verify the packet was not received.
-            testpmd.set_multicast_mac_addr(0, multicast_address, add=False)
-            self.send_packet_and_verify(multicast_address, should_receive=False)
+        # Remove multicast filter and verify the packet was not received.
+        testpmd.set_multicast_mac_addr(0, multicast_address, add=False)
+        self.send_packet_and_verify(multicast_address, should_receive=False)
diff --git a/dts/tests/TestSuite_pmd_buffer_scatter.py b/dts/tests/TestSuite_pmd_buffer_scatter.py
index b2f42425d4..ffb345ae4e 100644
--- a/dts/tests/TestSuite_pmd_buffer_scatter.py
+++ b/dts/tests/TestSuite_pmd_buffer_scatter.py
@@ -103,26 +103,24 @@  def pmd_scatter(self, mbsize: int) -> None:
             Start testpmd and run functional test with preset mbsize.
-        with TestPmdShell(
+        testpmd = TestPmdShell(
-        ) as testpmd:
-            testpmd.start()
-            for offset in [-1, 0, 1, 4, 5]:
-                recv_payload = self.scatter_pktgen_send_packet(mbsize + offset)
-                self._logger.debug(
-                    f"Payload of scattered packet after forwarding: \n{recv_payload}"
-                )
-                self.verify(
-                    ("58 " * 8).strip() in recv_payload,
-                    "Payload of scattered packet did not match expected payload with offset "
-                    f"{offset}.",
-                )
+        )
+        testpmd.start()
+        for offset in [-1, 0, 1, 4, 5]:
+            recv_payload = self.scatter_pktgen_send_packet(mbsize + offset)
+            self._logger.debug(f"Payload of scattered packet after forwarding: \n{recv_payload}")
+            self.verify(
+                ("58 " * 8).strip() in recv_payload,
+                "Payload of scattered packet did not match expected payload with offset "
+                f"{offset}.",
+            )
diff --git a/dts/tests/TestSuite_smoke_tests.py b/dts/tests/TestSuite_smoke_tests.py
index bc3a2a6bf9..0681ad4ea7 100644
--- a/dts/tests/TestSuite_smoke_tests.py
+++ b/dts/tests/TestSuite_smoke_tests.py
@@ -104,8 +104,8 @@  def test_devices_listed_in_testpmd(self) -> None:
             List all devices found in testpmd and verify the configured devices are among them.
-        with TestPmdShell(self.sut_node) as testpmd:
-            dev_list = [str(x) for x in testpmd.get_devices()]
+        testpmd = TestPmdShell(self.sut_node)
+        dev_list = [str(x) for x in testpmd.get_devices()]
         for nic in self.nics_in_node:
                 nic.pci in dev_list,
diff --git a/dts/tests/TestSuite_vlan.py b/dts/tests/TestSuite_vlan.py
index c67520baef..ede1f69495 100644
--- a/dts/tests/TestSuite_vlan.py
+++ b/dts/tests/TestSuite_vlan.py
@@ -124,10 +124,10 @@  def test_vlan_receipt_no_stripping(self) -> None:
             Create an interactive testpmd shell and verify a VLAN packet.
-        with TestPmdShell(node=self.sut_node) as testpmd:
-            self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
-            testpmd.start()
-            self.send_vlan_packet_and_verify(True, strip=False, vlan_id=1)
+        testpmd = TestPmdShell(node=self.sut_node)
+        self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
+        testpmd.start()
+        self.send_vlan_packet_and_verify(True, strip=False, vlan_id=1)
@@ -137,11 +137,11 @@  def test_vlan_receipt_stripping(self) -> None:
             Create an interactive testpmd shell and verify a VLAN packet.
-        with TestPmdShell(node=self.sut_node) as testpmd:
-            self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
-            testpmd.set_vlan_strip(port=0, enable=True)
-            testpmd.start()
-            self.send_vlan_packet_and_verify(should_receive=True, strip=True, vlan_id=1)
+        testpmd = TestPmdShell(node=self.sut_node)
+        self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
+        testpmd.set_vlan_strip(port=0, enable=True)
+        testpmd.start()
+        self.send_vlan_packet_and_verify(should_receive=True, strip=True, vlan_id=1)
     def test_vlan_no_receipt(self) -> None:
@@ -150,10 +150,10 @@  def test_vlan_no_receipt(self) -> None:
             Create an interactive testpmd shell and verify a VLAN packet.
-        with TestPmdShell(node=self.sut_node) as testpmd:
-            self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
-            testpmd.start()
-            self.send_vlan_packet_and_verify(should_receive=False, strip=False, vlan_id=2)
+        testpmd = TestPmdShell(node=self.sut_node)
+        self.vlan_setup(testpmd=testpmd, port_id=0, filtered_id=1)
+        testpmd.start()
+        self.send_vlan_packet_and_verify(should_receive=False, strip=False, vlan_id=2)
     def test_vlan_header_insertion(self) -> None:
@@ -162,11 +162,11 @@  def test_vlan_header_insertion(self) -> None:
             Create an interactive testpmd shell and verify a non-VLAN packet.
-        with TestPmdShell(node=self.sut_node) as testpmd:
-            testpmd.set_forward_mode(SimpleForwardingModes.mac)
-            testpmd.set_promisc(port=0, enable=False)
-            testpmd.stop_all_ports()
-            testpmd.tx_vlan_set(port=1, enable=True, vlan=51)
-            testpmd.start_all_ports()
-            testpmd.start()
-            self.send_packet_and_verify_insertion(expected_id=51)
+        testpmd = TestPmdShell(node=self.sut_node)
+        testpmd.set_forward_mode(SimpleForwardingModes.mac)
+        testpmd.set_promisc(port=0, enable=False)
+        testpmd.stop_all_ports()
+        testpmd.tx_vlan_set(port=1, enable=True, vlan=51)
+        testpmd.start_all_ports()
+        testpmd.start()
+        self.send_packet_and_verify_insertion(expected_id=51)