[v5,04/10] dts: add dpdk execution handling

Message ID 20230223152840.634183-5-juraj.linkes@pantheon.tech (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series dts: add hello world testcase |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Juraj Linkeš Feb. 23, 2023, 3:28 p.m. UTC
Add methods for setting up and shutting down DPDK apps and for
constructing EAL parameters.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 dts/conf.yaml                                 |   4 +
 dts/framework/config/__init__.py              |   8 +
 dts/framework/config/conf_yaml_schema.json    |  25 ++
 dts/framework/remote_session/linux_session.py |  18 ++
 dts/framework/remote_session/os_session.py    |  23 +-
 dts/framework/remote_session/posix_session.py |  83 ++++++
 dts/framework/testbed_model/__init__.py       |   8 +
 dts/framework/testbed_model/dpdk.py           |  45 +++
 dts/framework/testbed_model/hw/__init__.py    |  27 ++
 dts/framework/testbed_model/hw/cpu.py         | 274 ++++++++++++++++++
 .../testbed_model/hw/virtual_device.py        |  16 +
 dts/framework/testbed_model/node.py           |  43 +++
 dts/framework/testbed_model/sut_node.py       |  81 +++++-
 dts/framework/utils.py                        |  20 ++
 14 files changed, 673 insertions(+), 2 deletions(-)
 create mode 100644 dts/framework/testbed_model/hw/__init__.py
 create mode 100644 dts/framework/testbed_model/hw/cpu.py
 create mode 100644 dts/framework/testbed_model/hw/virtual_device.py
  

Patch

diff --git a/dts/conf.yaml b/dts/conf.yaml
index 03696d2bab..1648e5c3c5 100644
--- a/dts/conf.yaml
+++ b/dts/conf.yaml
@@ -13,4 +13,8 @@  nodes:
   - name: "SUT 1"
     hostname: sut1.change.me.localhost
     user: root
+    arch: x86_64
     os: linux
+    lcores: ""
+    use_first_core: false
+    memory_channels: 4
diff --git a/dts/framework/config/__init__.py b/dts/framework/config/__init__.py
index ca61cb10fe..17b917f3b3 100644
--- a/dts/framework/config/__init__.py
+++ b/dts/framework/config/__init__.py
@@ -72,7 +72,11 @@  class NodeConfiguration:
     hostname: str
     user: str
     password: str | None
+    arch: Architecture
     os: OS
+    lcores: str
+    use_first_core: bool
+    memory_channels: int
 
     @staticmethod
     def from_dict(d: dict) -> "NodeConfiguration":
@@ -81,7 +85,11 @@  def from_dict(d: dict) -> "NodeConfiguration":
             hostname=d["hostname"],
             user=d["user"],
             password=d.get("password"),
+            arch=Architecture(d["arch"]),
             os=OS(d["os"]),
+            lcores=d.get("lcores", "1"),
+            use_first_core=d.get("use_first_core", False),
+            memory_channels=d.get("memory_channels", 1),
         )
 
 
diff --git a/dts/framework/config/conf_yaml_schema.json b/dts/framework/config/conf_yaml_schema.json
index 9170307fbe..334b4bd8ab 100644
--- a/dts/framework/config/conf_yaml_schema.json
+++ b/dts/framework/config/conf_yaml_schema.json
@@ -6,6 +6,14 @@ 
       "type": "string",
       "description": "A unique identifier for a node"
     },
+    "ARCH": {
+      "type": "string",
+      "enum": [
+        "x86_64",
+        "arm64",
+        "ppc64le"
+      ]
+    },
     "OS": {
       "type": "string",
       "enum": [
@@ -92,8 +100,24 @@ 
             "type": "string",
             "description": "The password to use on this node. Use only as a last resort. SSH keys are STRONGLY preferred."
           },
+          "arch": {
+            "$ref": "#/definitions/ARCH"
+          },
           "os": {
             "$ref": "#/definitions/OS"
+          },
+          "lcores": {
+            "type": "string",
+            "pattern": "^(([0-9]+|([0-9]+-[0-9]+))(,([0-9]+|([0-9]+-[0-9]+)))*)?$",
+            "description": "Optional comma-separated list of logical cores to use, e.g.: 1,2,3,4,5,18-22. Defaults to 1. An empty string means use all lcores."
+          },
+          "use_first_core": {
+            "type": "boolean",
+            "description": "Indicate whether DPDK should use the first physical core. It won't be used by default."
+          },
+          "memory_channels": {
+            "type": "integer",
+            "description": "How many memory channels to use. Optional, defaults to 1."
           }
         },
         "additionalProperties": false,
@@ -101,6 +125,7 @@ 
           "name",
           "hostname",
           "user",
+          "arch",
           "os"
         ]
       },
diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py
index 9d14166077..c49b6bb1d7 100644
--- a/dts/framework/remote_session/linux_session.py
+++ b/dts/framework/remote_session/linux_session.py
@@ -2,6 +2,8 @@ 
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
 # Copyright(c) 2023 University of New Hampshire
 
+from framework.testbed_model import LogicalCore
+
 from .posix_session import PosixSession
 
 
@@ -9,3 +11,19 @@  class LinuxSession(PosixSession):
     """
     The implementation of non-Posix compliant parts of Linux remote sessions.
     """
+
+    def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]:
+        cpu_info = self.remote_session.send_command(
+            "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#"
+        ).stdout
+        lcores = []
+        for cpu_line in cpu_info.splitlines():
+            lcore, core, socket, node = map(int, cpu_line.split(","))
+            if core == 0 and socket == 0 and not use_first_core:
+                self._logger.info("Not using the first physical core.")
+                continue
+            lcores.append(LogicalCore(lcore, core, socket, node))
+        return lcores
+
+    def get_dpdk_file_prefix(self, dpdk_prefix) -> str:
+        return dpdk_prefix
diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index 06d1ffefdd..c30753e0b8 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -3,12 +3,13 @@ 
 # Copyright(c) 2023 University of New Hampshire
 
 from abc import ABC, abstractmethod
+from collections.abc import Iterable
 from pathlib import PurePath
 
 from framework.config import Architecture, NodeConfiguration
 from framework.logger import DTSLOG
 from framework.settings import SETTINGS
-from framework.testbed_model import MesonArgs
+from framework.testbed_model import LogicalCore, MesonArgs
 from framework.utils import EnvVarsDict
 
 from .remote import RemoteSession, create_remote_session
@@ -130,3 +131,23 @@  def get_dpdk_version(self, version_path: str | PurePath) -> str:
         """
         Inspect DPDK version on the remote node from version_path.
         """
+
+    @abstractmethod
+    def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]:
+        """
+        Compose a list of LogicalCores present on the remote node.
+        If use_first_core is False, the first physical core won't be used.
+        """
+
+    @abstractmethod
+    def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) -> None:
+        """
+        Kill and cleanup all DPDK apps identified by dpdk_prefix_list. If
+        dpdk_prefix_list is empty, attempt to find running DPDK apps to kill and clean.
+        """
+
+    @abstractmethod
+    def get_dpdk_file_prefix(self, dpdk_prefix) -> str:
+        """
+        Get the DPDK file prefix that will be used when running DPDK apps.
+        """
diff --git a/dts/framework/remote_session/posix_session.py b/dts/framework/remote_session/posix_session.py
index 7a5c38c36e..9b05464d65 100644
--- a/dts/framework/remote_session/posix_session.py
+++ b/dts/framework/remote_session/posix_session.py
@@ -2,6 +2,8 @@ 
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
 # Copyright(c) 2023 University of New Hampshire
 
+import re
+from collections.abc import Iterable
 from pathlib import PurePath, PurePosixPath
 
 from framework.config import Architecture
@@ -137,3 +139,84 @@  def get_dpdk_version(self, build_dir: str | PurePath) -> str:
             f"cat {self.join_remote_path(build_dir, 'VERSION')}", verify=True
         )
         return out.stdout
+
+    def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) -> None:
+        self._logger.info("Cleaning up DPDK apps.")
+        dpdk_runtime_dirs = self._get_dpdk_runtime_dirs(dpdk_prefix_list)
+        if dpdk_runtime_dirs:
+            # kill and cleanup only if DPDK is running
+            dpdk_pids = self._get_dpdk_pids(dpdk_runtime_dirs)
+            for dpdk_pid in dpdk_pids:
+                self.remote_session.send_command(f"kill -9 {dpdk_pid}", 20)
+            self._check_dpdk_hugepages(dpdk_runtime_dirs)
+            self._remove_dpdk_runtime_dirs(dpdk_runtime_dirs)
+
+    def _get_dpdk_runtime_dirs(
+        self, dpdk_prefix_list: Iterable[str]
+    ) -> list[PurePosixPath]:
+        prefix = PurePosixPath("/var", "run", "dpdk")
+        if not dpdk_prefix_list:
+            remote_prefixes = self._list_remote_dirs(prefix)
+            if not remote_prefixes:
+                dpdk_prefix_list = []
+            else:
+                dpdk_prefix_list = remote_prefixes
+
+        return [PurePosixPath(prefix, dpdk_prefix) for dpdk_prefix in dpdk_prefix_list]
+
+    def _list_remote_dirs(self, remote_path: str | PurePath) -> list[str] | None:
+        """
+        Return a list of directories of the remote_dir.
+        If remote_path doesn't exist, return None.
+        """
+        out = self.remote_session.send_command(
+            f"ls -l {remote_path} | awk '/^d/ {{print $NF}}'"
+        ).stdout
+        if "No such file or directory" in out:
+            return None
+        else:
+            return out.splitlines()
+
+    def _get_dpdk_pids(self, dpdk_runtime_dirs: Iterable[str | PurePath]) -> list[int]:
+        pids = []
+        pid_regex = r"p(\d+)"
+        for dpdk_runtime_dir in dpdk_runtime_dirs:
+            dpdk_config_file = PurePosixPath(dpdk_runtime_dir, "config")
+            if self._remote_files_exists(dpdk_config_file):
+                out = self.remote_session.send_command(
+                    f"lsof -Fp {dpdk_config_file}"
+                ).stdout
+                if out and "No such file or directory" not in out:
+                    for out_line in out.splitlines():
+                        match = re.match(pid_regex, out_line)
+                        if match:
+                            pids.append(int(match.group(1)))
+        return pids
+
+    def _remote_files_exists(self, remote_path: PurePath) -> bool:
+        result = self.remote_session.send_command(f"test -e {remote_path}")
+        return not result.return_code
+
+    def _check_dpdk_hugepages(
+        self, dpdk_runtime_dirs: Iterable[str | PurePath]
+    ) -> None:
+        for dpdk_runtime_dir in dpdk_runtime_dirs:
+            hugepage_info = PurePosixPath(dpdk_runtime_dir, "hugepage_info")
+            if self._remote_files_exists(hugepage_info):
+                out = self.remote_session.send_command(
+                    f"lsof -Fp {hugepage_info}"
+                ).stdout
+                if out and "No such file or directory" not in out:
+                    self._logger.warning("Some DPDK processes did not free hugepages.")
+                    self._logger.warning("*******************************************")
+                    self._logger.warning(out)
+                    self._logger.warning("*******************************************")
+
+    def _remove_dpdk_runtime_dirs(
+        self, dpdk_runtime_dirs: Iterable[str | PurePath]
+    ) -> None:
+        for dpdk_runtime_dir in dpdk_runtime_dirs:
+            self.remove_remote_dir(dpdk_runtime_dir)
+
+    def get_dpdk_file_prefix(self, dpdk_prefix) -> str:
+        return ""
diff --git a/dts/framework/testbed_model/__init__.py b/dts/framework/testbed_model/__init__.py
index 96e2ab7c3f..22c7c16708 100644
--- a/dts/framework/testbed_model/__init__.py
+++ b/dts/framework/testbed_model/__init__.py
@@ -10,5 +10,13 @@ 
 # pylama:ignore=W0611
 
 from .dpdk import MesonArgs
+from .hw import (
+    LogicalCore,
+    LogicalCoreCount,
+    LogicalCoreList,
+    LogicalCoreListFilter,
+    VirtualDevice,
+    lcore_filter,
+)
 from .node import Node
 from .sut_node import SutNode
diff --git a/dts/framework/testbed_model/dpdk.py b/dts/framework/testbed_model/dpdk.py
index 0526974f72..9b3a9e7381 100644
--- a/dts/framework/testbed_model/dpdk.py
+++ b/dts/framework/testbed_model/dpdk.py
@@ -6,6 +6,8 @@ 
 Various utilities used for configuring, building and running DPDK.
 """
 
+from .hw import LogicalCoreList, VirtualDevice
+
 
 class MesonArgs(object):
     """
@@ -31,3 +33,46 @@  def __init__(self, default_library: str | None = None, **dpdk_args: str | bool):
 
     def __str__(self) -> str:
         return " ".join(f"{self.default_library} {self.dpdk_args}".split())
+
+
+class EalParameters(object):
+    def __init__(
+        self,
+        lcore_list: LogicalCoreList,
+        memory_channels: int,
+        prefix: str,
+        no_pci: bool,
+        vdevs: list[VirtualDevice],
+        other_eal_param: str,
+    ):
+        """
+        Generate eal parameters character string;
+        :param lcore_list: the list of logical cores to use.
+        :param memory_channels: the number of memory channels to use.
+        :param prefix: set file prefix string, eg:
+                        prefix='vf'
+        :param no_pci: switch of disable PCI bus eg:
+                        no_pci=True
+        :param vdevs: virtual device list, eg:
+                        vdevs=['net_ring0', 'net_ring1']
+        :param other_eal_param: user defined DPDK eal parameters, eg:
+                        other_eal_param='--single-file-segments'
+        """
+        self._lcore_list = f"-l {lcore_list}"
+        self._memory_channels = f"-n {memory_channels}"
+        self._prefix = prefix
+        if prefix:
+            self._prefix = f"--file-prefix={prefix}"
+        self._no_pci = "--no-pci" if no_pci else ""
+        self._vdevs = " ".join(f"--vdev {vdev}" for vdev in vdevs)
+        self._other_eal_param = other_eal_param
+
+    def __str__(self) -> str:
+        return (
+            f"{self._lcore_list} "
+            f"{self._memory_channels} "
+            f"{self._prefix} "
+            f"{self._no_pci} "
+            f"{self._vdevs} "
+            f"{self._other_eal_param}"
+        )
diff --git a/dts/framework/testbed_model/hw/__init__.py b/dts/framework/testbed_model/hw/__init__.py
new file mode 100644
index 0000000000..88ccac0b0e
--- /dev/null
+++ b/dts/framework/testbed_model/hw/__init__.py
@@ -0,0 +1,27 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+# pylama:ignore=W0611
+
+from .cpu import (
+    LogicalCore,
+    LogicalCoreCount,
+    LogicalCoreCountFilter,
+    LogicalCoreFilter,
+    LogicalCoreList,
+    LogicalCoreListFilter,
+)
+from .virtual_device import VirtualDevice
+
+
+def lcore_filter(
+    core_list: list[LogicalCore],
+    filter_specifier: LogicalCoreCount | LogicalCoreList,
+    ascending: bool,
+) -> LogicalCoreFilter:
+    if isinstance(filter_specifier, LogicalCoreList):
+        return LogicalCoreListFilter(core_list, filter_specifier, ascending)
+    elif isinstance(filter_specifier, LogicalCoreCount):
+        return LogicalCoreCountFilter(core_list, filter_specifier, ascending)
+    else:
+        raise ValueError(f"Unsupported filter r{filter_specifier}")
diff --git a/dts/framework/testbed_model/hw/cpu.py b/dts/framework/testbed_model/hw/cpu.py
new file mode 100644
index 0000000000..d1918a12dc
--- /dev/null
+++ b/dts/framework/testbed_model/hw/cpu.py
@@ -0,0 +1,274 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+import dataclasses
+from abc import ABC, abstractmethod
+from collections.abc import Iterable, ValuesView
+from dataclasses import dataclass
+
+from framework.utils import expand_range
+
+
+@dataclass(slots=True, frozen=True)
+class LogicalCore(object):
+    """
+    Representation of a CPU core. A physical core is represented in OS
+    by multiple logical cores (lcores) if CPU multithreading is enabled.
+    """
+
+    lcore: int
+    core: int
+    socket: int
+    node: int
+
+    def __int__(self) -> int:
+        return self.lcore
+
+
+class LogicalCoreList(object):
+    """
+    Convert these options into a list of logical core ids.
+    lcore_list=[LogicalCore1, LogicalCore2] - a list of LogicalCores
+    lcore_list=[0,1,2,3] - a list of int indices
+    lcore_list=['0','1','2-3'] - a list of str indices; ranges are supported
+    lcore_list='0,1,2-3' - a comma delimited str of indices; ranges are supported
+
+    The class creates a unified format used across the framework and allows
+    the user to use either a str representation (using str(instance) or directly
+    in f-strings) or a list representation (by accessing instance.lcore_list).
+    Empty lcore_list is allowed.
+    """
+
+    _lcore_list: list[int]
+    _lcore_str: str
+
+    def __init__(self, lcore_list: list[int] | list[str] | list[LogicalCore] | str):
+        self._lcore_list = []
+        if isinstance(lcore_list, str):
+            lcore_list = lcore_list.split(",")
+        for lcore in lcore_list:
+            if isinstance(lcore, str):
+                self._lcore_list.extend(expand_range(lcore))
+            else:
+                self._lcore_list.append(int(lcore))
+
+        # the input lcores may not be sorted
+        self._lcore_list.sort()
+        self._lcore_str = (
+            f'{",".join(self._get_consecutive_lcores_range(self._lcore_list))}'
+        )
+
+    @property
+    def lcore_list(self) -> list[int]:
+        return self._lcore_list
+
+    def _get_consecutive_lcores_range(self, lcore_ids_list: list[int]) -> list[str]:
+        formatted_core_list = []
+        segment = lcore_ids_list[:1]
+        for lcore_id in lcore_ids_list[1:]:
+            if lcore_id - segment[-1] == 1:
+                segment.append(lcore_id)
+            else:
+                formatted_core_list.append(
+                    f"{segment[0]}-{segment[-1]}"
+                    if len(segment) > 1
+                    else f"{segment[0]}"
+                )
+                current_core_index = lcore_ids_list.index(lcore_id)
+                formatted_core_list.extend(
+                    self._get_consecutive_lcores_range(
+                        lcore_ids_list[current_core_index:]
+                    )
+                )
+                segment.clear()
+                break
+        if len(segment) > 0:
+            formatted_core_list.append(
+                f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
+            )
+        return formatted_core_list
+
+    def __str__(self) -> str:
+        return self._lcore_str
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class LogicalCoreCount(object):
+    """
+    Define the number of logical cores to use.
+    If sockets is not None, socket_count is ignored.
+    """
+
+    lcores_per_core: int = 1
+    cores_per_socket: int = 2
+    socket_count: int = 1
+    sockets: list[int] | None = None
+
+
+class LogicalCoreFilter(ABC):
+    """
+    Filter according to the input filter specifier. Each filter needs to be
+    implemented in a derived class.
+    This class only implements operations common to all filters, such as sorting
+    the list to be filtered beforehand.
+    """
+
+    _filter_specifier: LogicalCoreCount | LogicalCoreList
+    _lcores_to_filter: list[LogicalCore]
+
+    def __init__(
+        self,
+        lcore_list: list[LogicalCore],
+        filter_specifier: LogicalCoreCount | LogicalCoreList,
+        ascending: bool = True,
+    ):
+        self._filter_specifier = filter_specifier
+
+        # sorting by core is needed in case hyperthreading is enabled
+        self._lcores_to_filter = sorted(
+            lcore_list, key=lambda x: x.core, reverse=not ascending
+        )
+        self.filter()
+
+    @abstractmethod
+    def filter(self) -> list[LogicalCore]:
+        """
+        Use self._filter_specifier to filter self._lcores_to_filter
+        and return the list of filtered LogicalCores.
+        self._lcores_to_filter is a sorted copy of the original list,
+        so it may be modified.
+        """
+
+
+class LogicalCoreCountFilter(LogicalCoreFilter):
+    """
+    Filter the input list of LogicalCores according to specified rules:
+    Use cores from the specified number of sockets or from the specified socket ids.
+    If sockets is specified, it takes precedence over socket_count.
+    From each of those sockets, use only cores_per_socket of cores.
+    And for each core, use lcores_per_core of logical cores. Hypertheading
+    must be enabled for this to take effect.
+    If ascending is True, use cores with the lowest numerical id first
+    and continue in ascending order. If False, start with the highest
+    id and continue in descending order. This ordering affects which
+    sockets to consider first as well.
+    """
+
+    _filter_specifier: LogicalCoreCount
+
+    def filter(self) -> list[LogicalCore]:
+        sockets_to_filter = self._filter_sockets(self._lcores_to_filter)
+        filtered_lcores = []
+        for socket_to_filter in sockets_to_filter:
+            filtered_lcores.extend(self._filter_cores_from_socket(socket_to_filter))
+        return filtered_lcores
+
+    def _filter_sockets(
+        self, lcores_to_filter: Iterable[LogicalCore]
+    ) -> ValuesView[list[LogicalCore]]:
+        """
+        Remove all lcores that don't match the specified socket(s).
+        If self._filter_specifier.sockets is not None, keep lcores from those sockets,
+        otherwise keep lcores from the first
+        self._filter_specifier.socket_count sockets.
+        """
+        allowed_sockets: set[int] = set()
+        socket_count = self._filter_specifier.socket_count
+        if self._filter_specifier.sockets:
+            socket_count = len(self._filter_specifier.sockets)
+            allowed_sockets = set(self._filter_specifier.sockets)
+
+        filtered_lcores: dict[int, list[LogicalCore]] = {}
+        for lcore in lcores_to_filter:
+            if not self._filter_specifier.sockets:
+                if len(allowed_sockets) < socket_count:
+                    allowed_sockets.add(lcore.socket)
+            if lcore.socket in allowed_sockets:
+                if lcore.socket in filtered_lcores:
+                    filtered_lcores[lcore.socket].append(lcore)
+                else:
+                    filtered_lcores[lcore.socket] = [lcore]
+
+        if len(allowed_sockets) < socket_count:
+            raise ValueError(
+                f"The actual number of sockets from which to use cores "
+                f"({len(allowed_sockets)}) is lower than required ({socket_count})."
+            )
+
+        return filtered_lcores.values()
+
+    def _filter_cores_from_socket(
+        self, lcores_to_filter: Iterable[LogicalCore]
+    ) -> list[LogicalCore]:
+        """
+        Keep only the first self._filter_specifier.cores_per_socket cores.
+        In multithreaded environments, keep only
+        the first self._filter_specifier.lcores_per_core lcores of those cores.
+        """
+
+        # no need to use ordered dict, from Python3.7 the dict
+        # insertion order is preserved (LIFO).
+        lcore_count_per_core_map: dict[int, int] = {}
+        filtered_lcores = []
+        for lcore in lcores_to_filter:
+            if lcore.core in lcore_count_per_core_map:
+                current_core_lcore_count = lcore_count_per_core_map[lcore.core]
+                if self._filter_specifier.lcores_per_core > current_core_lcore_count:
+                    # only add lcores of the given core
+                    lcore_count_per_core_map[lcore.core] += 1
+                    filtered_lcores.append(lcore)
+                else:
+                    # we have enough lcores per this core
+                    continue
+            elif self._filter_specifier.cores_per_socket > len(
+                lcore_count_per_core_map
+            ):
+                # only add cores if we need more
+                lcore_count_per_core_map[lcore.core] = 1
+                filtered_lcores.append(lcore)
+            else:
+                # we have enough cores
+                break
+
+        cores_per_socket = len(lcore_count_per_core_map)
+        if cores_per_socket < self._filter_specifier.cores_per_socket:
+            raise ValueError(
+                f"The actual number of cores per socket ({cores_per_socket}) "
+                f"is lower than required ({self._filter_specifier.cores_per_socket})."
+            )
+
+        lcores_per_core = lcore_count_per_core_map[filtered_lcores[-1].core]
+        if lcores_per_core < self._filter_specifier.lcores_per_core:
+            raise ValueError(
+                f"The actual number of logical cores per core ({lcores_per_core}) "
+                f"is lower than required ({self._filter_specifier.lcores_per_core})."
+            )
+
+        return filtered_lcores
+
+
+class LogicalCoreListFilter(LogicalCoreFilter):
+    """
+    Filter the input list of Logical Cores according to the input list of
+    lcore indices.
+    An empty LogicalCoreList won't filter anything.
+    """
+
+    _filter_specifier: LogicalCoreList
+
+    def filter(self) -> list[LogicalCore]:
+        if not len(self._filter_specifier.lcore_list):
+            return self._lcores_to_filter
+
+        filtered_lcores = []
+        for core in self._lcores_to_filter:
+            if core.lcore in self._filter_specifier.lcore_list:
+                filtered_lcores.append(core)
+
+        if len(filtered_lcores) != len(self._filter_specifier.lcore_list):
+            raise ValueError(
+                f"Not all logical cores from {self._filter_specifier.lcore_list} "
+                f"were found among {self._lcores_to_filter}"
+            )
+
+        return filtered_lcores
diff --git a/dts/framework/testbed_model/hw/virtual_device.py b/dts/framework/testbed_model/hw/virtual_device.py
new file mode 100644
index 0000000000..eb664d9f17
--- /dev/null
+++ b/dts/framework/testbed_model/hw/virtual_device.py
@@ -0,0 +1,16 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+
+class VirtualDevice(object):
+    """
+    Base class for virtual devices used by DPDK.
+    """
+
+    name: str
+
+    def __init__(self, name: str):
+        self.name = name
+
+    def __str__(self) -> str:
+        return self.name
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index a37f7921e0..b93b9d238e 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -15,6 +15,14 @@ 
 from framework.logger import DTSLOG, getLogger
 from framework.remote_session import OSSession, create_session
 
+from .hw import (
+    LogicalCore,
+    LogicalCoreCount,
+    LogicalCoreList,
+    LogicalCoreListFilter,
+    lcore_filter,
+)
+
 
 class Node(object):
     """
@@ -26,6 +34,7 @@  class Node(object):
     main_session: OSSession
     config: NodeConfiguration
     name: str
+    lcores: list[LogicalCore]
     _logger: DTSLOG
     _other_sessions: list[OSSession]
 
@@ -35,6 +44,12 @@  def __init__(self, node_config: NodeConfiguration):
         self._logger = getLogger(self.name)
         self.main_session = create_session(self.config, self.name, self._logger)
 
+        self._get_remote_cpus()
+        # filter the node lcores according to user config
+        self.lcores = LogicalCoreListFilter(
+            self.lcores, LogicalCoreList(self.config.lcores)
+        ).filter()
+
         self._other_sessions = []
 
         self._logger.info(f"Created node: {self.name}")
@@ -107,6 +122,34 @@  def create_session(self, name: str) -> OSSession:
         self._other_sessions.append(connection)
         return connection
 
+    def filter_lcores(
+        self,
+        filter_specifier: LogicalCoreCount | LogicalCoreList,
+        ascending: bool = True,
+    ) -> list[LogicalCore]:
+        """
+        Filter the LogicalCores found on the Node according to
+        a LogicalCoreCount or a LogicalCoreList.
+
+        If ascending is True, use cores with the lowest numerical id first
+        and continue in ascending order. If False, start with the highest
+        id and continue in descending order. This ordering affects which
+        sockets to consider first as well.
+        """
+        self._logger.debug(f"Filtering {filter_specifier} from {self.lcores}.")
+        return lcore_filter(
+            self.lcores,
+            filter_specifier,
+            ascending,
+        ).filter()
+
+    def _get_remote_cpus(self) -> None:
+        """
+        Scan CPUs in the remote OS and store a list of LogicalCores.
+        """
+        self._logger.info("Getting CPU information.")
+        self.lcores = self.main_session.get_remote_cpus(self.config.use_first_core)
+
     def close(self) -> None:
         """
         Close all connections and free other resources.
diff --git a/dts/framework/testbed_model/sut_node.py b/dts/framework/testbed_model/sut_node.py
index 442a41bdc8..a9ae2e4a6f 100644
--- a/dts/framework/testbed_model/sut_node.py
+++ b/dts/framework/testbed_model/sut_node.py
@@ -4,13 +4,16 @@ 
 
 import os
 import tarfile
+import time
 from pathlib import PurePath
 
 from framework.config import BuildTargetConfiguration, NodeConfiguration
+from framework.remote_session import OSSession
 from framework.settings import SETTINGS
 from framework.utils import EnvVarsDict, skip_setup
 
-from .dpdk import MesonArgs
+from .dpdk import EalParameters, MesonArgs
+from .hw import LogicalCoreCount, LogicalCoreList, VirtualDevice
 from .node import Node
 
 
@@ -22,21 +25,29 @@  class SutNode(Node):
     Another key capability is building DPDK according to given build target.
     """
 
+    _dpdk_prefix_list: list[str]
+    _dpdk_timestamp: str
     _build_target_config: BuildTargetConfiguration | None
     _env_vars: EnvVarsDict
     _remote_tmp_dir: PurePath
     __remote_dpdk_dir: PurePath | None
     _dpdk_version: str | None
     _app_compile_timeout: float
+    _dpdk_kill_session: OSSession | None
 
     def __init__(self, node_config: NodeConfiguration):
         super(SutNode, self).__init__(node_config)
+        self._dpdk_prefix_list = []
         self._build_target_config = None
         self._env_vars = EnvVarsDict()
         self._remote_tmp_dir = self.main_session.get_remote_tmp_dir()
         self.__remote_dpdk_dir = None
         self._dpdk_version = None
         self._app_compile_timeout = 90
+        self._dpdk_kill_session = None
+        self._dpdk_timestamp = (
+            f"{str(os.getpid())}_{time.strftime('%Y%m%d%H%M%S', time.localtime())}"
+        )
 
     @property
     def _remote_dpdk_dir(self) -> PurePath:
@@ -169,3 +180,71 @@  def build_dpdk_app(self, app_name: str, **meson_dpdk_args: str | bool) -> PurePa
         return self.main_session.join_remote_path(
             self.remote_dpdk_build_dir, "examples", f"dpdk-{app_name}"
         )
+
+    def kill_cleanup_dpdk_apps(self) -> None:
+        """
+        Kill all dpdk applications on the SUT. Cleanup hugepages.
+        """
+        if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
+            # we can use the session if it exists and responds
+            self._dpdk_kill_session.kill_cleanup_dpdk_apps(self._dpdk_prefix_list)
+        else:
+            # otherwise, we need to (re)create it
+            self._dpdk_kill_session = self.create_session("dpdk_kill")
+        self._dpdk_prefix_list = []
+
+    def create_eal_parameters(
+        self,
+        lcore_filter_specifier: LogicalCoreCount | LogicalCoreList = LogicalCoreCount(),
+        ascending_cores: bool = True,
+        prefix: str = "dpdk",
+        append_prefix_timestamp: bool = True,
+        no_pci: bool = False,
+        vdevs: list[VirtualDevice] = None,
+        other_eal_param: str = "",
+    ) -> EalParameters:
+        """
+        Generate eal parameters character string;
+        :param lcore_filter_specifier: a number of lcores/cores/sockets to use
+                        or a list of lcore ids to use.
+                        The default will select one lcore for each of two cores
+                        on one socket, in ascending order of core ids.
+        :param ascending_cores: True, use cores with the lowest numerical id first
+                        and continue in ascending order. If False, start with the
+                        highest id and continue in descending order. This ordering
+                        affects which sockets to consider first as well.
+        :param prefix: set file prefix string, eg:
+                        prefix='vf'
+        :param append_prefix_timestamp: if True, will append a timestamp to
+                        DPDK file prefix.
+        :param no_pci: switch of disable PCI bus eg:
+                        no_pci=True
+        :param vdevs: virtual device list, eg:
+                        vdevs=['net_ring0', 'net_ring1']
+        :param other_eal_param: user defined DPDK eal parameters, eg:
+                        other_eal_param='--single-file-segments'
+        :return: eal param string, eg:
+                '-c 0xf -a 0000:88:00.0 --file-prefix=dpdk_1112_20190809143420';
+        """
+
+        lcore_list = LogicalCoreList(
+            self.filter_lcores(lcore_filter_specifier, ascending_cores)
+        )
+
+        if append_prefix_timestamp:
+            prefix = f"{prefix}_{self._dpdk_timestamp}"
+        prefix = self.main_session.get_dpdk_file_prefix(prefix)
+        if prefix:
+            self._dpdk_prefix_list.append(prefix)
+
+        if vdevs is None:
+            vdevs = []
+
+        return EalParameters(
+            lcore_list=lcore_list,
+            memory_channels=self.config.memory_channels,
+            prefix=prefix,
+            no_pci=no_pci,
+            vdevs=vdevs,
+            other_eal_param=other_eal_param,
+        )
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 611071604b..eebe76f16c 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -32,6 +32,26 @@  def skip_setup(func) -> Callable[..., None]:
         return func
 
 
+def expand_range(range_str: str) -> list[int]:
+    """
+    Process range string into a list of integers. There are two possible formats:
+    n - a single integer
+    n-m - a range of integers
+
+    The returned range includes both n and m. Empty string returns an empty list.
+    """
+    expanded_range: list[int] = []
+    if range_str:
+        range_boundaries = range_str.split("-")
+        # will throw an exception when items in range_boundaries can't be converted,
+        # serving as type check
+        expanded_range.extend(
+            range(int(range_boundaries[0]), int(range_boundaries[-1]) + 1)
+        )
+
+    return expanded_range
+
+
 def GREEN(text: str) -> str:
     return f"\u001B[32;1m{str(text)}\u001B[0m"