[v1,2/2] dts: replace pexpect with fabric

Message ID 20230403114608.1423020-2-juraj.linkes@pantheon.tech (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series [v1,1/2] dts: fabric requirements |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/loongarch-compilation success Compilation OK
ci/loongarch-unit-testing success Unit Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-mellanox-Performance success Performance Testing PASS
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-broadcom-Functional success Functional Testing PASS
ci/iol-x86_64-compile-testing success Testing PASS
ci/iol-aarch64-unit-testing success Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-unit-testing success Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/github-robot: build success github build: passed
ci/intel-Testing success Testing PASS
ci/iol-testing success Testing PASS
ci/iol-x86_64-unit-testing success Testing PASS
ci/intel-Functional success Functional PASS
ci/iol-aarch64-compile-testing success Testing PASS
ci/iol-abi-testing success Testing PASS

Commit Message

Juraj Linkeš April 3, 2023, 11:46 a.m. UTC
Pexpect is not a dedicated SSH connection library while Fabric is. With
Fabric, all SSH-related logic is provided and we can just focus on
what's DTS specific.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 doc/guides/tools/dts.rst                      |  29 +-
 dts/conf.yaml                                 |   2 +-
 dts/framework/exception.py                    |  10 +-
 dts/framework/remote_session/linux_session.py |  31 +-
 dts/framework/remote_session/os_session.py    |  51 +++-
 dts/framework/remote_session/posix_session.py |  48 +--
 .../remote_session/remote/remote_session.py   |  35 ++-
 .../remote_session/remote/ssh_session.py      | 287 ++++++------------
 dts/framework/testbed_model/sut_node.py       |  12 +-
 dts/framework/utils.py                        |   9 -
 10 files changed, 237 insertions(+), 277 deletions(-)
  

Patch

diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index ebd6dceb6a..d15826c098 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -95,9 +95,14 @@  Setting up DTS environment
 
 #. **SSH Connection**
 
-   DTS uses Python pexpect for SSH connections between DTS environment and the other hosts.
-   The pexpect implementation is a wrapper around the ssh command in the DTS environment.
-   This means it'll use the SSH agent providing the ssh command and its keys.
+   DTS uses the Fabric Python library for SSH connections between DTS environment
+   and the other hosts.
+   The authentication method used is pubkey authentication.
+   Fabric tries to use a passed key/certificate,
+   then any key it can with through an SSH agent,
+   then any "id_rsa", "id_dsa" or "id_ecdsa" key discoverable in ``~/.ssh/``
+   (with any matching OpenSSH-style certificates).
+   DTS doesn't pass any keys, so Fabric tries to use the other two methods.
 
 
 Setting up System Under Test
@@ -132,6 +137,21 @@  There are two areas that need to be set up on a System Under Test:
      It's possible to use the hugepage configuration already present on the SUT.
      If you wish to do so, don't specify the hugepage configuration in the DTS config file.
 
+#. **User with administrator privileges**
+
+.. _sut_admin_user:
+
+   DTS needs administrator privileges to run DPDK applications (such as testpmd) on the SUT.
+   The SUT user must be able run commands in privileged mode without asking for password.
+   On most Linux distributions, it's a matter of setting up passwordless sudo:
+
+   #. Run ``sudo visudo`` and check that it contains ``%sudo   ALL=(ALL:ALL) ALL``.
+
+   #. Add the SUT user to the sudo group with:
+
+   .. code-block:: console
+
+      sudo usermod -aG sudo <sut_user>
 
 Running DTS
 -----------
@@ -151,7 +171,8 @@  which is a template that illustrates what can be configured in DTS:
      :start-at: executions:
 
 
-The user must be root or any other user with prompt starting with ``#``.
+The user must have :ref:`administrator privileges <sut_admin_user>`
+which don't require password authentication.
 The other fields are mostly self-explanatory
 and documented in more detail in ``dts/framework/config/conf_yaml_schema.json``.
 
diff --git a/dts/conf.yaml b/dts/conf.yaml
index a9bd8a3ecf..129801d87c 100644
--- a/dts/conf.yaml
+++ b/dts/conf.yaml
@@ -16,7 +16,7 @@  executions:
 nodes:
   - name: "SUT 1"
     hostname: sut1.change.me.localhost
-    user: root
+    user: dtsuser
     arch: x86_64
     os: linux
     lcores: ""
diff --git a/dts/framework/exception.py b/dts/framework/exception.py
index ca353d98fc..44ff4e979a 100644
--- a/dts/framework/exception.py
+++ b/dts/framework/exception.py
@@ -62,13 +62,19 @@  class SSHConnectionError(DTSError):
     """
 
     host: str
+    errors: list[str]
     severity: ClassVar[ErrorSeverity] = ErrorSeverity.SSH_ERR
 
-    def __init__(self, host: str):
+    def __init__(self, host: str, errors: list[str] | None = None):
         self.host = host
+        self.errors = [] if errors is None else errors
 
     def __str__(self) -> str:
-        return f"Error trying to connect with {self.host}"
+        message = f"Error trying to connect with {self.host}."
+        if self.errors:
+            message += f" Errors encountered while retrying: {', '.join(self.errors)}"
+
+        return message
 
 
 class SSHSessionDeadError(DTSError):
diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py
index a1e3bc3a92..f13f399121 100644
--- a/dts/framework/remote_session/linux_session.py
+++ b/dts/framework/remote_session/linux_session.py
@@ -14,10 +14,11 @@  class LinuxSession(PosixSession):
     The implementation of non-Posix compliant parts of Linux remote sessions.
     """
 
+    def _get_privileged_command(self, command: str) -> str:
+        return f"sudo -- sh -c '{command}'"
+
     def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]:
-        cpu_info = self.remote_session.send_command(
-            "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#"
-        ).stdout
+        cpu_info = self.send_command("lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#").stdout
         lcores = []
         for cpu_line in cpu_info.splitlines():
             lcore, core, socket, node = map(int, cpu_line.split(","))
@@ -45,20 +46,20 @@  def setup_hugepages(self, hugepage_amount: int, force_first_numa: bool) -> None:
         self._mount_huge_pages()
 
     def _get_hugepage_size(self) -> int:
-        hugepage_size = self.remote_session.send_command(
+        hugepage_size = self.send_command(
             "awk '/Hugepagesize/ {print $2}' /proc/meminfo"
         ).stdout
         return int(hugepage_size)
 
     def _get_hugepages_total(self) -> int:
-        hugepages_total = self.remote_session.send_command(
+        hugepages_total = self.send_command(
             "awk '/HugePages_Total/ { print $2 }' /proc/meminfo"
         ).stdout
         return int(hugepages_total)
 
     def _get_numa_nodes(self) -> list[int]:
         try:
-            numa_count = self.remote_session.send_command(
+            numa_count = self.send_command(
                 "cat /sys/devices/system/node/online", verify=True
             ).stdout
             numa_range = expand_range(numa_count)
@@ -70,14 +71,12 @@  def _get_numa_nodes(self) -> list[int]:
     def _mount_huge_pages(self) -> None:
         self._logger.info("Re-mounting Hugepages.")
         hugapge_fs_cmd = "awk '/hugetlbfs/ { print $2 }' /proc/mounts"
-        self.remote_session.send_command(f"umount $({hugapge_fs_cmd})")
-        result = self.remote_session.send_command(hugapge_fs_cmd)
+        self.send_command(f"umount $({hugapge_fs_cmd})")
+        result = self.send_command(hugapge_fs_cmd)
         if result.stdout == "":
             remote_mount_path = "/mnt/huge"
-            self.remote_session.send_command(f"mkdir -p {remote_mount_path}")
-            self.remote_session.send_command(
-                f"mount -t hugetlbfs nodev {remote_mount_path}"
-            )
+            self.send_command(f"mkdir -p {remote_mount_path}")
+            self.send_command(f"mount -t hugetlbfs nodev {remote_mount_path}")
 
     def _supports_numa(self) -> bool:
         # the system supports numa if self._numa_nodes is non-empty and there are more
@@ -94,14 +93,12 @@  def _configure_huge_pages(
         )
         if force_first_numa and self._supports_numa():
             # clear non-numa hugepages
-            self.remote_session.send_command(
-                f"echo 0 | sudo tee {hugepage_config_path}"
-            )
+            self.send_command(f"echo 0 | tee {hugepage_config_path}", privileged=True)
             hugepage_config_path = (
                 f"/sys/devices/system/node/node{self._numa_nodes[0]}/hugepages"
                 f"/hugepages-{size}kB/nr_hugepages"
             )
 
-        self.remote_session.send_command(
-            f"echo {amount} | sudo tee {hugepage_config_path}"
+        self.send_command(
+            f"echo {amount} | tee {hugepage_config_path}", privileged=True
         )
diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index 4c48ae2567..bfd70bd480 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -10,7 +10,7 @@ 
 from framework.logger import DTSLOG
 from framework.settings import SETTINGS
 from framework.testbed_model import LogicalCore
-from framework.utils import EnvVarsDict, MesonArgs
+from framework.utils import MesonArgs
 
 from .remote import CommandResult, RemoteSession, create_remote_session
 
@@ -53,17 +53,32 @@  def is_alive(self) -> bool:
     def send_command(
         self,
         command: str,
-        timeout: float,
+        timeout: float = SETTINGS.timeout,
+        privileged: bool = False,
         verify: bool = False,
-        env: EnvVarsDict | None = None,
+        env: dict | None = None,
     ) -> CommandResult:
         """
         An all-purpose API in case the command to be executed is already
         OS-agnostic, such as when the path to the executed command has been
         constructed beforehand.
         """
+        if privileged:
+            command = self._get_privileged_command(command)
+
         return self.remote_session.send_command(command, timeout, verify, env)
 
+    @abstractmethod
+    def _get_privileged_command(self, command: str) -> str:
+        """Modify the command so that it executes with administrative privileges.
+
+        Args:
+            command: The command to modify.
+
+        Returns:
+            The modified command that executes with administrative privileges.
+        """
+
     @abstractmethod
     def guess_dpdk_remote_dir(self, remote_dir) -> PurePath:
         """
@@ -90,17 +105,35 @@  def join_remote_path(self, *args: str | PurePath) -> PurePath:
         """
 
     @abstractmethod
-    def copy_file(
+    def copy_from(
         self,
         source_file: str | PurePath,
         destination_file: str | PurePath,
-        source_remote: bool = False,
     ) -> None:
+        """Copy a file from the remote Node to the local filesystem.
+
+        Copy source_file from the remote Node associated with this remote
+        session to destination_file on the local filesystem.
+
+        Args:
+            source_file: the file on the remote Node.
+            destination_file: a file or directory path on the local filesystem.
         """
+
+    @abstractmethod
+    def copy_to(
+        self,
+        source_file: str | PurePath,
+        destination_file: str | PurePath,
+    ) -> None:
+        """Copy a file from local filesystem to the remote Node.
+
         Copy source_file from local filesystem to destination_file
-        on the remote Node associated with the remote session.
-        If source_remote is True, reverse the direction - copy source_file from the
-        associated remote Node to destination_file on local storage.
+        on the remote Node associated with this remote session.
+
+        Args:
+            source_file: the file on the local filesystem.
+            destination_file: a file or directory path on the remote Node.
         """
 
     @abstractmethod
@@ -128,7 +161,7 @@  def extract_remote_tarball(
     @abstractmethod
     def build_dpdk(
         self,
-        env_vars: EnvVarsDict,
+        env_vars: dict,
         meson_args: MesonArgs,
         remote_dpdk_dir: str | PurePath,
         remote_dpdk_build_dir: str | PurePath,
diff --git a/dts/framework/remote_session/posix_session.py b/dts/framework/remote_session/posix_session.py
index d38062e8d6..8ca0acb429 100644
--- a/dts/framework/remote_session/posix_session.py
+++ b/dts/framework/remote_session/posix_session.py
@@ -9,7 +9,7 @@ 
 from framework.config import Architecture
 from framework.exception import DPDKBuildError, RemoteCommandExecutionError
 from framework.settings import SETTINGS
-from framework.utils import EnvVarsDict, MesonArgs
+from framework.utils import MesonArgs
 
 from .os_session import OSSession
 
@@ -34,7 +34,7 @@  def combine_short_options(**opts: bool) -> str:
 
     def guess_dpdk_remote_dir(self, remote_dir) -> PurePosixPath:
         remote_guess = self.join_remote_path(remote_dir, "dpdk-*")
-        result = self.remote_session.send_command(f"ls -d {remote_guess} | tail -1")
+        result = self.send_command(f"ls -d {remote_guess} | tail -1")
         return PurePosixPath(result.stdout)
 
     def get_remote_tmp_dir(self) -> PurePosixPath:
@@ -48,7 +48,7 @@  def get_dpdk_build_env_vars(self, arch: Architecture) -> dict:
         env_vars = {}
         if arch == Architecture.i686:
             # find the pkg-config path and store it in PKG_CONFIG_LIBDIR
-            out = self.remote_session.send_command("find /usr -type d -name pkgconfig")
+            out = self.send_command("find /usr -type d -name pkgconfig")
             pkg_path = ""
             res_path = out.stdout.split("\r\n")
             for cur_path in res_path:
@@ -65,13 +65,19 @@  def get_dpdk_build_env_vars(self, arch: Architecture) -> dict:
     def join_remote_path(self, *args: str | PurePath) -> PurePosixPath:
         return PurePosixPath(*args)
 
-    def copy_file(
+    def copy_from(
         self,
         source_file: str | PurePath,
         destination_file: str | PurePath,
-        source_remote: bool = False,
     ) -> None:
-        self.remote_session.copy_file(source_file, destination_file, source_remote)
+        self.remote_session.copy_from(source_file, destination_file)
+
+    def copy_to(
+        self,
+        source_file: str | PurePath,
+        destination_file: str | PurePath,
+    ) -> None:
+        self.remote_session.copy_to(source_file, destination_file)
 
     def remove_remote_dir(
         self,
@@ -80,24 +86,24 @@  def remove_remote_dir(
         force: bool = True,
     ) -> None:
         opts = PosixSession.combine_short_options(r=recursive, f=force)
-        self.remote_session.send_command(f"rm{opts} {remote_dir_path}")
+        self.send_command(f"rm{opts} {remote_dir_path}")
 
     def extract_remote_tarball(
         self,
         remote_tarball_path: str | PurePath,
         expected_dir: str | PurePath | None = None,
     ) -> None:
-        self.remote_session.send_command(
+        self.send_command(
             f"tar xfm {remote_tarball_path} "
             f"-C {PurePosixPath(remote_tarball_path).parent}",
             60,
         )
         if expected_dir:
-            self.remote_session.send_command(f"ls {expected_dir}", verify=True)
+            self.send_command(f"ls {expected_dir}", verify=True)
 
     def build_dpdk(
         self,
-        env_vars: EnvVarsDict,
+        env_vars: dict,
         meson_args: MesonArgs,
         remote_dpdk_dir: str | PurePath,
         remote_dpdk_build_dir: str | PurePath,
@@ -108,7 +114,7 @@  def build_dpdk(
             if rebuild:
                 # reconfigure, then build
                 self._logger.info("Reconfiguring DPDK build.")
-                self.remote_session.send_command(
+                self.send_command(
                     f"meson configure {meson_args} {remote_dpdk_build_dir}",
                     timeout,
                     verify=True,
@@ -118,7 +124,7 @@  def build_dpdk(
                 # fresh build - remove target dir first, then build from scratch
                 self._logger.info("Configuring DPDK build from scratch.")
                 self.remove_remote_dir(remote_dpdk_build_dir)
-                self.remote_session.send_command(
+                self.send_command(
                     f"meson setup "
                     f"{meson_args} {remote_dpdk_dir} {remote_dpdk_build_dir}",
                     timeout,
@@ -127,14 +133,14 @@  def build_dpdk(
                 )
 
             self._logger.info("Building DPDK.")
-            self.remote_session.send_command(
+            self.send_command(
                 f"ninja -C {remote_dpdk_build_dir}", timeout, verify=True, env=env_vars
             )
         except RemoteCommandExecutionError as e:
             raise DPDKBuildError(f"DPDK build failed when doing '{e.command}'.")
 
     def get_dpdk_version(self, build_dir: str | PurePath) -> str:
-        out = self.remote_session.send_command(
+        out = self.send_command(
             f"cat {self.join_remote_path(build_dir, 'VERSION')}", verify=True
         )
         return out.stdout
@@ -146,7 +152,7 @@  def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) -> None:
             # kill and cleanup only if DPDK is running
             dpdk_pids = self._get_dpdk_pids(dpdk_runtime_dirs)
             for dpdk_pid in dpdk_pids:
-                self.remote_session.send_command(f"kill -9 {dpdk_pid}", 20)
+                self.send_command(f"kill -9 {dpdk_pid}", 20)
             self._check_dpdk_hugepages(dpdk_runtime_dirs)
             self._remove_dpdk_runtime_dirs(dpdk_runtime_dirs)
 
@@ -168,7 +174,7 @@  def _list_remote_dirs(self, remote_path: str | PurePath) -> list[str] | None:
         Return a list of directories of the remote_dir.
         If remote_path doesn't exist, return None.
         """
-        out = self.remote_session.send_command(
+        out = self.send_command(
             f"ls -l {remote_path} | awk '/^d/ {{print $NF}}'"
         ).stdout
         if "No such file or directory" in out:
@@ -182,9 +188,7 @@  def _get_dpdk_pids(self, dpdk_runtime_dirs: Iterable[str | PurePath]) -> list[in
         for dpdk_runtime_dir in dpdk_runtime_dirs:
             dpdk_config_file = PurePosixPath(dpdk_runtime_dir, "config")
             if self._remote_files_exists(dpdk_config_file):
-                out = self.remote_session.send_command(
-                    f"lsof -Fp {dpdk_config_file}"
-                ).stdout
+                out = self.send_command(f"lsof -Fp {dpdk_config_file}").stdout
                 if out and "No such file or directory" not in out:
                     for out_line in out.splitlines():
                         match = re.match(pid_regex, out_line)
@@ -193,7 +197,7 @@  def _get_dpdk_pids(self, dpdk_runtime_dirs: Iterable[str | PurePath]) -> list[in
         return pids
 
     def _remote_files_exists(self, remote_path: PurePath) -> bool:
-        result = self.remote_session.send_command(f"test -e {remote_path}")
+        result = self.send_command(f"test -e {remote_path}")
         return not result.return_code
 
     def _check_dpdk_hugepages(
@@ -202,9 +206,7 @@  def _check_dpdk_hugepages(
         for dpdk_runtime_dir in dpdk_runtime_dirs:
             hugepage_info = PurePosixPath(dpdk_runtime_dir, "hugepage_info")
             if self._remote_files_exists(hugepage_info):
-                out = self.remote_session.send_command(
-                    f"lsof -Fp {hugepage_info}"
-                ).stdout
+                out = self.send_command(f"lsof -Fp {hugepage_info}").stdout
                 if out and "No such file or directory" not in out:
                     self._logger.warning("Some DPDK processes did not free hugepages.")
                     self._logger.warning("*******************************************")
diff --git a/dts/framework/remote_session/remote/remote_session.py b/dts/framework/remote_session/remote/remote_session.py
index 91dee3cb4f..0647d93de4 100644
--- a/dts/framework/remote_session/remote/remote_session.py
+++ b/dts/framework/remote_session/remote/remote_session.py
@@ -11,7 +11,6 @@ 
 from framework.exception import RemoteCommandExecutionError
 from framework.logger import DTSLOG
 from framework.settings import SETTINGS
-from framework.utils import EnvVarsDict
 
 
 @dataclasses.dataclass(slots=True, frozen=True)
@@ -89,7 +88,7 @@  def send_command(
         command: str,
         timeout: float = SETTINGS.timeout,
         verify: bool = False,
-        env: EnvVarsDict | None = None,
+        env: dict | None = None,
     ) -> CommandResult:
         """
         Send a command to the connected node using optional env vars
@@ -114,7 +113,7 @@  def send_command(
 
     @abstractmethod
     def _send_command(
-        self, command: str, timeout: float, env: EnvVarsDict | None
+        self, command: str, timeout: float, env: dict | None
     ) -> CommandResult:
         """
         Use the underlying protocol to execute the command using optional env vars
@@ -141,15 +140,33 @@  def is_alive(self) -> bool:
         """
 
     @abstractmethod
-    def copy_file(
+    def copy_from(
         self,
         source_file: str | PurePath,
         destination_file: str | PurePath,
-        source_remote: bool = False,
     ) -> None:
+        """Copy a file from the remote Node to the local filesystem.
+
+        Copy source_file from the remote Node associated with this remote
+        session to destination_file on the local filesystem.
+
+        Args:
+            source_file: the file on the remote Node.
+            destination_file: a file or directory path on the local filesystem.
         """
-        Copy source_file from local filesystem to destination_file on the remote Node
-        associated with the remote session.
-        If source_remote is True, reverse the direction - copy source_file from the
-        associated Node to destination_file on local filesystem.
+
+    @abstractmethod
+    def copy_to(
+        self,
+        source_file: str | PurePath,
+        destination_file: str | PurePath,
+    ) -> None:
+        """Copy a file from local filesystem to the remote Node.
+
+        Copy source_file from local filesystem to destination_file
+        on the remote Node associated with this remote session.
+
+        Args:
+            source_file: the file on the local filesystem.
+            destination_file: a file or directory path on the remote Node.
         """
diff --git a/dts/framework/remote_session/remote/ssh_session.py b/dts/framework/remote_session/remote/ssh_session.py
index 42ff9498a2..8d127f1601 100644
--- a/dts/framework/remote_session/remote/ssh_session.py
+++ b/dts/framework/remote_session/remote/ssh_session.py
@@ -1,29 +1,49 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
-# Copyright(c) 2010-2014 Intel Corporation
-# Copyright(c) 2022-2023 PANTHEON.tech s.r.o.
-# Copyright(c) 2022-2023 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
 
-import time
+import socket
+import traceback
 from pathlib import PurePath
 
-import pexpect  # type: ignore
-from pexpect import pxssh  # type: ignore
+from fabric import Connection  # type: ignore[import]
+from invoke.exceptions import (  # type: ignore[import]
+    CommandTimedOut,
+    ThreadException,
+    UnexpectedExit,
+)
+from paramiko.ssh_exception import (  # type: ignore[import]
+    AuthenticationException,
+    BadHostKeyException,
+    NoValidConnectionsError,
+    SSHException,
+)
 
 from framework.config import NodeConfiguration
 from framework.exception import SSHConnectionError, SSHSessionDeadError, SSHTimeoutError
 from framework.logger import DTSLOG
-from framework.utils import GREEN, RED, EnvVarsDict
 
 from .remote_session import CommandResult, RemoteSession
 
 
 class SSHSession(RemoteSession):
-    """
-    Module for creating Pexpect SSH remote sessions.
+    """A persistent SSH connection to a remote Node.
+
+    The connection is implemented with the Fabric Python library.
+
+    Args:
+        node_config: The configuration of the Node to connect to.
+        session_name: The name of the session.
+        logger: The logger used for logging.
+            This should be passed from the parent OSSession.
+
+    Attributes:
+        session: The underlying Fabric SSH connection.
+
+    Raises:
+        SSHConnectionError: The connection cannot be established.
     """
 
-    session: pxssh.pxssh
-    magic_prompt: str
+    session: Connection
 
     def __init__(
         self,
@@ -31,218 +51,91 @@  def __init__(
         session_name: str,
         logger: DTSLOG,
     ):
-        self.magic_prompt = "MAGIC PROMPT"
         super(SSHSession, self).__init__(node_config, session_name, logger)
 
     def _connect(self) -> None:
-        """
-        Create connection to assigned node.
-        """
+        errors = []
         retry_attempts = 10
         login_timeout = 20 if self.port else 10
-        password_regex = (
-            r"(?i)(?:password:)|(?:passphrase for key)|(?i)(password for .+:)"
-        )
-        try:
-            for retry_attempt in range(retry_attempts):
-                self.session = pxssh.pxssh(encoding="utf-8")
-                try:
-                    self.session.login(
-                        self.ip,
-                        self.username,
-                        self.password,
-                        original_prompt="[$#>]",
-                        port=self.port,
-                        login_timeout=login_timeout,
-                        password_regex=password_regex,
-                    )
-                    break
-                except Exception as e:
-                    self._logger.warning(e)
-                    time.sleep(2)
-                    self._logger.info(
-                        f"Retrying connection: retry number {retry_attempt + 1}."
-                    )
-            else:
-                raise Exception(f"Connection to {self.hostname} failed")
-
-            self.send_expect("stty -echo", "#")
-            self.send_expect("stty columns 1000", "#")
-            self.send_expect("bind 'set enable-bracketed-paste off'", "#")
-        except Exception as e:
-            self._logger.error(RED(str(e)))
-            if getattr(self, "port", None):
-                suggestion = (
-                    f"\nSuggestion: Check if the firewall on {self.hostname} is "
-                    f"stopped.\n"
+        for retry_attempt in range(retry_attempts):
+            try:
+                self.session = Connection(
+                    self.ip,
+                    user=self.username,
+                    port=self.port,
+                    connect_kwargs={"password": self.password},
+                    connect_timeout=login_timeout,
                 )
-                self._logger.info(GREEN(suggestion))
-
-            raise SSHConnectionError(self.hostname)
+                self.session.open()
 
-    def send_expect(
-        self, command: str, prompt: str, timeout: float = 15, verify: bool = False
-    ) -> str | int:
-        try:
-            ret = self.send_expect_base(command, prompt, timeout)
-            if verify:
-                ret_status = self.send_expect_base("echo $?", prompt, timeout)
-                try:
-                    retval = int(ret_status)
-                    if retval:
-                        self._logger.error(f"Command: {command} failure!")
-                        self._logger.error(ret)
-                        return retval
-                    else:
-                        return ret
-                except ValueError:
-                    return ret
-            else:
-                return ret
-        except Exception as e:
-            self._logger.error(
-                f"Exception happened in [{command}] and output is "
-                f"[{self._get_output()}]"
-            )
-            raise e
-
-    def send_expect_base(self, command: str, prompt: str, timeout: float) -> str:
-        self._clean_session()
-        original_prompt = self.session.PROMPT
-        self.session.PROMPT = prompt
-        self._send_line(command)
-        self._prompt(command, timeout)
-
-        before = self._get_output()
-        self.session.PROMPT = original_prompt
-        return before
-
-    def _clean_session(self) -> None:
-        self.session.PROMPT = self.magic_prompt
-        self.get_output(timeout=0.01)
-        self.session.PROMPT = self.session.UNIQUE_PROMPT
-
-    def _send_line(self, command: str) -> None:
-        if not self.is_alive():
-            raise SSHSessionDeadError(self.hostname)
-        if len(command) == 2 and command.startswith("^"):
-            self.session.sendcontrol(command[1])
-        else:
-            self.session.sendline(command)
+            except (ValueError, BadHostKeyException, AuthenticationException) as e:
+                self._logger.exception(e)
+                raise SSHConnectionError(self.hostname) from e
 
-    def _prompt(self, command: str, timeout: float) -> None:
-        if not self.session.prompt(timeout):
-            raise SSHTimeoutError(command, self._get_output()) from None
+            except (NoValidConnectionsError, socket.error, SSHException) as e:
+                self._logger.debug(traceback.format_exc())
+                self._logger.warning(e)
 
-    def get_output(self, timeout: float = 15) -> str:
-        """
-        Get all output before timeout
-        """
-        try:
-            self.session.prompt(timeout)
-        except Exception:
-            pass
-
-        before = self._get_output()
-        self._flush()
-
-        return before
+                error = repr(e)
+                if error not in errors:
+                    errors.append(error)
 
-    def _get_output(self) -> str:
-        if not self.is_alive():
-            raise SSHSessionDeadError(self.hostname)
-        before = self.session.before.rsplit("\r\n", 1)[0]
-        if before == "[PEXPECT]":
-            return ""
-        return before
+                self._logger.info(
+                    f"Retrying connection: retry number {retry_attempt + 1}."
+                )
 
-    def _flush(self) -> None:
-        """
-        Clear all session buffer
-        """
-        self.session.buffer = ""
-        self.session.before = ""
+            else:
+                break
+        else:
+            raise SSHConnectionError(self.hostname, errors)
 
     def is_alive(self) -> bool:
-        return self.session.isalive()
+        return self.session.is_connected
 
     def _send_command(
-        self, command: str, timeout: float, env: EnvVarsDict | None
+        self, command: str, timeout: float, env: dict | None
     ) -> CommandResult:
-        output = self._send_command_get_output(command, timeout, env)
-        return_code = int(self._send_command_get_output("echo $?", timeout, None))
+        """Send a command and return the result of the execution.
 
-        # we're capturing only stdout
-        return CommandResult(self.name, command, output, "", return_code)
+        Args:
+            command: The command to execute.
+            timeout: Wait at most this many seconds for the execution to complete.
+            env: Extra environment variables that will be used in command execution.
 
-    def _send_command_get_output(
-        self, command: str, timeout: float, env: EnvVarsDict | None
-    ) -> str:
+        Raises:
+            SSHSessionDeadError: The session died while executing the command.
+            SSHTimeoutError: The command execution timed out.
+        """
         try:
-            self._clean_session()
-            if env:
-                command = f"{env} {command}"
-            self._send_line(command)
-        except Exception as e:
-            raise e
+            output = self.session.run(
+                command, env=env, warn=True, hide=True, timeout=timeout
+            )
 
-        output = self.get_output(timeout=timeout)
-        self.session.PROMPT = self.session.UNIQUE_PROMPT
-        self.session.prompt(0.1)
+        except (UnexpectedExit, ThreadException) as e:
+            self._logger.exception(e)
+            raise SSHSessionDeadError(self.hostname) from e
 
-        return output
+        except CommandTimedOut as e:
+            self._logger.exception(e)
+            raise SSHTimeoutError(command, e.result.stderr) from e
 
-    def _close(self, force: bool = False) -> None:
-        if force is True:
-            self.session.close()
-        else:
-            if self.is_alive():
-                self.session.logout()
+        return CommandResult(
+            self.name, command, output.stdout, output.stderr, output.return_code
+        )
 
-    def copy_file(
+    def copy_from(
         self,
         source_file: str | PurePath,
         destination_file: str | PurePath,
-        source_remote: bool = False,
     ) -> None:
-        """
-        Send a local file to a remote host.
-        """
-        if source_remote:
-            source_file = f"{self.username}@{self.ip}:{source_file}"
-        else:
-            destination_file = f"{self.username}@{self.ip}:{destination_file}"
+        self.session.get(str(destination_file), str(source_file))
 
-        port = ""
-        if self.port:
-            port = f" -P {self.port}"
-
-        command = (
-            f"scp -v{port} -o NoHostAuthenticationForLocalhost=yes"
-            f" {source_file} {destination_file}"
-        )
-
-        self._spawn_scp(command)
+    def copy_to(
+        self,
+        source_file: str | PurePath,
+        destination_file: str | PurePath,
+    ) -> None:
+        self.session.put(str(source_file), str(destination_file))
 
-    def _spawn_scp(self, scp_cmd: str) -> None:
-        """
-        Transfer a file with SCP
-        """
-        self._logger.info(scp_cmd)
-        p: pexpect.spawn = pexpect.spawn(scp_cmd)
-        time.sleep(0.5)
-        ssh_newkey: str = "Are you sure you want to continue connecting"
-        i: int = p.expect(
-            [ssh_newkey, "[pP]assword", "# ", pexpect.EOF, pexpect.TIMEOUT], 120
-        )
-        if i == 0:  # add once in trust list
-            p.sendline("yes")
-            i = p.expect([ssh_newkey, "[pP]assword", pexpect.EOF], 2)
-
-        if i == 1:
-            time.sleep(0.5)
-            p.sendline(self.password)
-            p.expect("Exit status 0", 60)
-        if i == 4:
-            self._logger.error("SCP TIMEOUT error %d" % i)
-        p.close()
+    def _close(self, force: bool = False) -> None:
+        self.session.close()
diff --git a/dts/framework/testbed_model/sut_node.py b/dts/framework/testbed_model/sut_node.py
index 2b2b50d982..9dbc390848 100644
--- a/dts/framework/testbed_model/sut_node.py
+++ b/dts/framework/testbed_model/sut_node.py
@@ -10,7 +10,7 @@ 
 from framework.config import BuildTargetConfiguration, NodeConfiguration
 from framework.remote_session import CommandResult, OSSession
 from framework.settings import SETTINGS
-from framework.utils import EnvVarsDict, MesonArgs
+from framework.utils import MesonArgs
 
 from .hw import LogicalCoreCount, LogicalCoreList, VirtualDevice
 from .node import Node
@@ -27,7 +27,7 @@  class SutNode(Node):
     _dpdk_prefix_list: list[str]
     _dpdk_timestamp: str
     _build_target_config: BuildTargetConfiguration | None
-    _env_vars: EnvVarsDict
+    _env_vars: dict
     _remote_tmp_dir: PurePath
     __remote_dpdk_dir: PurePath | None
     _dpdk_version: str | None
@@ -38,7 +38,7 @@  def __init__(self, node_config: NodeConfiguration):
         super(SutNode, self).__init__(node_config)
         self._dpdk_prefix_list = []
         self._build_target_config = None
-        self._env_vars = EnvVarsDict()
+        self._env_vars = {}
         self._remote_tmp_dir = self.main_session.get_remote_tmp_dir()
         self.__remote_dpdk_dir = None
         self._dpdk_version = None
@@ -94,7 +94,7 @@  def _configure_build_target(
         """
         Populate common environment variables and set build target config.
         """
-        self._env_vars = EnvVarsDict()
+        self._env_vars = {}
         self._build_target_config = build_target_config
         self._env_vars.update(
             self.main_session.get_dpdk_build_env_vars(build_target_config.arch)
@@ -112,7 +112,7 @@  def _copy_dpdk_tarball(self) -> None:
         Copy to and extract DPDK tarball on the SUT node.
         """
         self._logger.info("Copying DPDK tarball to SUT.")
-        self.main_session.copy_file(SETTINGS.dpdk_tarball_path, self._remote_tmp_dir)
+        self.main_session.copy_to(SETTINGS.dpdk_tarball_path, self._remote_tmp_dir)
 
         # construct remote tarball path
         # the basename is the same on local host and on remote Node
@@ -259,7 +259,7 @@  def run_dpdk_app(
         Run DPDK application on the remote node.
         """
         return self.main_session.send_command(
-            f"{app_path} {eal_args}", timeout, verify=True
+            f"{app_path} {eal_args}", timeout, privileged=True, verify=True
         )
 
 
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 55e0b0ef0e..8cfbc6a29d 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -42,19 +42,10 @@  def expand_range(range_str: str) -> list[int]:
     return expanded_range
 
 
-def GREEN(text: str) -> str:
-    return f"\u001B[32;1m{str(text)}\u001B[0m"
-
-
 def RED(text: str) -> str:
     return f"\u001B[31;1m{str(text)}\u001B[0m"
 
 
-class EnvVarsDict(dict):
-    def __str__(self) -> str:
-        return " ".join(["=".join(item) for item in self.items()])
-
-
 class MesonArgs(object):
     """
     Aggregate the arguments needed to build DPDK: