[RFC,v2,04/10] dts: add dpdk execution handling
Checks
Commit Message
Add methods for setting up and shutting down DPDK apps and for
constructing EAL parameters.
Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
dts/conf.yaml | 4 +
dts/framework/config/__init__.py | 85 ++++++++-
dts/framework/config/conf_yaml_schema.json | 22 +++
.../remote_session/os/linux_session.py | 15 ++
dts/framework/remote_session/os/os_session.py | 16 +-
.../remote_session/os/posix_session.py | 80 ++++++++
dts/framework/testbed_model/hw/__init__.py | 17 ++
dts/framework/testbed_model/hw/cpu.py | 164 ++++++++++++++++
dts/framework/testbed_model/node/node.py | 36 ++++
dts/framework/testbed_model/node/sut_node.py | 178 +++++++++++++++++-
dts/framework/utils.py | 20 ++
11 files changed, 634 insertions(+), 3 deletions(-)
create mode 100644 dts/framework/testbed_model/hw/__init__.py
create mode 100644 dts/framework/testbed_model/hw/cpu.py
Comments
On Mon, Nov 14, 2022 at 11:54 AM Juraj Linkeš <juraj.linkes@pantheon.tech>
wrote:
> Add methods for setting up and shutting down DPDK apps and for
> constructing EAL parameters.
>
> Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
> ---
> dts/conf.yaml | 4 +
> dts/framework/config/__init__.py | 85 ++++++++-
> dts/framework/config/conf_yaml_schema.json | 22 +++
> .../remote_session/os/linux_session.py | 15 ++
> dts/framework/remote_session/os/os_session.py | 16 +-
> .../remote_session/os/posix_session.py | 80 ++++++++
> dts/framework/testbed_model/hw/__init__.py | 17 ++
> dts/framework/testbed_model/hw/cpu.py | 164 ++++++++++++++++
> dts/framework/testbed_model/node/node.py | 36 ++++
> dts/framework/testbed_model/node/sut_node.py | 178 +++++++++++++++++-
> dts/framework/utils.py | 20 ++
> 11 files changed, 634 insertions(+), 3 deletions(-)
> create mode 100644 dts/framework/testbed_model/hw/__init__.py
> create mode 100644 dts/framework/testbed_model/hw/cpu.py
>
> diff --git a/dts/conf.yaml b/dts/conf.yaml
> index 6b0bc5c2bf..976888a88e 100644
> --- a/dts/conf.yaml
> +++ b/dts/conf.yaml
> @@ -12,4 +12,8 @@ nodes:
> - name: "SUT 1"
> hostname: sut1.change.me.localhost
> user: root
> + arch: x86_64
> os: linux
> + bypass_core0: true
> + cpus: ""
> + memory_channels: 4
> diff --git a/dts/framework/config/__init__.py
> b/dts/framework/config/__init__.py
> index 1b97dc3ab9..344d697a69 100644
> --- a/dts/framework/config/__init__.py
> +++ b/dts/framework/config/__init__.py
> @@ -11,12 +11,13 @@
> import pathlib
> from dataclasses import dataclass
> from enum import Enum, auto, unique
> -from typing import Any
> +from typing import Any, Iterable
>
> import warlock # type: ignore
> import yaml
>
> from framework.settings import SETTINGS
> +from framework.utils import expand_range
>
>
> class StrEnum(Enum):
> @@ -60,6 +61,80 @@ class Compiler(StrEnum):
> msvc = auto()
>
>
> +@dataclass(slots=True, frozen=True)
> +class CPU:
> + cpu: int
> + core: int
> + socket: int
> + node: int
> +
> + def __str__(self) -> str:
> + return str(self.cpu)
> +
> +
> +class CPUList(object):
> + """
> + Convert these options into a list of int cpus
> + cpu_list=[CPU1, CPU2] - a list of CPUs
> + cpu_list=[0,1,2,3] - a list of int indices
> + cpu_list=['0','1','2-3'] - a list of str indices; ranges are supported
> + cpu_list='0,1,2-3' - a comma delimited str of indices; ranges are
> supported
> +
> + The class creates a unified format used across the framework and
> allows
> + the user to use either a str representation (using str(instance) or
> directly
> + in f-strings) or a list representation (by accessing
> instance.cpu_list).
> + Empty cpu_list is allowed.
> + """
> +
> + _cpu_list: list[int]
> +
> + def __init__(self, cpu_list: list[int | str | CPU] | str):
> + self._cpu_list = []
> + if isinstance(cpu_list, str):
> + self._from_str(cpu_list.split(","))
> + else:
> + self._from_str((str(cpu) for cpu in cpu_list))
> +
> + # the input cpus may not be sorted
> + self._cpu_list.sort()
> +
> + @property
> + def cpu_list(self) -> list[int]:
> + return self._cpu_list
> +
> + def _from_str(self, cpu_list: Iterable[str]) -> None:
> + for cpu in cpu_list:
> + self._cpu_list.extend(expand_range(cpu))
> +
> + def _get_consecutive_cpus_range(self, cpu_list: list[int]) ->
> list[str]:
> + formatted_core_list = []
> + tmp_cpus_list = list(sorted(cpu_list))
> + segment = tmp_cpus_list[:1]
> + for core_id in tmp_cpus_list[1:]:
> + if core_id - segment[-1] == 1:
> + segment.append(core_id)
> + else:
> + formatted_core_list.append(
> + f"{segment[0]}-{segment[-1]}"
> + if len(segment) > 1
> + else f"{segment[0]}"
> + )
> + current_core_index = tmp_cpus_list.index(core_id)
> + formatted_core_list.extend(
> +
> self._get_consecutive_cpus_range(tmp_cpus_list[current_core_index:])
> + )
> + segment.clear()
> + break
> + if len(segment) > 0:
> + formatted_core_list.append(
> + f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else
> f"{segment[0]}"
> + )
> + return formatted_core_list
> +
> + def __str__(self) -> str:
> + return
> f'{",".join(self._get_consecutive_cpus_range(self._cpu_list))}'
> +
> +
> # Slots enables some optimizations, by pre-allocating space for the
> defined
> # attributes in the underlying data structure.
> #
> @@ -71,7 +146,11 @@ class NodeConfiguration:
> hostname: str
> user: str
> password: str | None
> + arch: Architecture
> os: OS
> + bypass_core0: bool
> + cpus: CPUList
> + memory_channels: int
>
> @staticmethod
> def from_dict(d: dict) -> "NodeConfiguration":
> @@ -80,7 +159,11 @@ def from_dict(d: dict) -> "NodeConfiguration":
> hostname=d["hostname"],
> user=d["user"],
> password=d.get("password"),
> + arch=Architecture(d["arch"]),
> os=OS(d["os"]),
> + bypass_core0=d.get("bypass_core0", False),
> + cpus=CPUList(d.get("cpus", "1")),
> + memory_channels=d.get("memory_channels", 1),
> )
>
>
> diff --git a/dts/framework/config/conf_yaml_schema.json
> b/dts/framework/config/conf_yaml_schema.json
> index 409ce7ac74..c59d3e30e6 100644
> --- a/dts/framework/config/conf_yaml_schema.json
> +++ b/dts/framework/config/conf_yaml_schema.json
> @@ -6,6 +6,12 @@
> "type": "string",
> "description": "A unique identifier for a node"
> },
> + "ARCH": {
> + "type": "string",
> + "enum": [
> + "x86_64"
>
arm64 and ppc64le should probably be included here. I think that we can
focus on 64 bit arches for now.
> + ]
> + },
> "OS": {
> "type": "string",
> "enum": [
> @@ -82,8 +88,23 @@
> "type": "string",
> "description": "The password to use on this node. Use only as
> a last resort. SSH keys are STRONGLY preferred."
> },
> + "arch": {
> + "$ref": "#/definitions/ARCH"
> + },
> "os": {
> "$ref": "#/definitions/OS"
> + },
> + "bypass_core0": {
> + "type": "boolean",
> + "description": "Indicate that DPDK should omit using the
> first core."
> + },
> + "cpus": {
> + "type": "string",
> + "description": "Optional comma-separated list of cpus to use,
> e.g.: 1,2,3,4,5,18-22. Defaults to 1. An empty string means use all cpus."
> + },
> + "memory_channels": {
> + "type": "integer",
> + "description": "How many memory channels to use. Optional,
> defaults to 1."
> }
> },
> "additionalProperties": false,
> @@ -91,6 +112,7 @@
> "name",
> "hostname",
> "user",
> + "arch",
> "os"
> ]
> },
> diff --git a/dts/framework/remote_session/os/linux_session.py
> b/dts/framework/remote_session/os/linux_session.py
> index 39e80631dd..21f117b714 100644
> --- a/dts/framework/remote_session/os/linux_session.py
> +++ b/dts/framework/remote_session/os/linux_session.py
> @@ -2,6 +2,8 @@
> # Copyright(c) 2022 PANTHEON.tech s.r.o.
> # Copyright(c) 2022 University of New Hampshire
>
> +from framework.config import CPU
> +
> from .posix_session import PosixSession
>
>
> @@ -9,3 +11,16 @@ class LinuxSession(PosixSession):
> """
> The implementation of non-Posix compliant parts of Linux remote
> sessions.
> """
> +
> + def get_remote_cpus(self, bypass_core0: bool) -> list[CPU]:
> + cpu_info = self.remote_session.send_command(
> + "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#"
> + ).stdout
> + cpus = []
> + for cpu_line in cpu_info.splitlines():
> + cpu, core, socket, node = cpu_line.split(",")
> + if bypass_core0 and core == 0 and socket == 0:
> + self.logger.info("Core0 bypassed.")
> + continue
> + cpus.append(CPU(int(cpu), int(core), int(socket), int(node)))
> + return cpus
> diff --git a/dts/framework/remote_session/os/os_session.py
> b/dts/framework/remote_session/os/os_session.py
> index 57e2865282..6f6b6a979e 100644
> --- a/dts/framework/remote_session/os/os_session.py
> +++ b/dts/framework/remote_session/os/os_session.py
> @@ -3,9 +3,10 @@
> # Copyright(c) 2022 University of New Hampshire
>
> from abc import ABC, abstractmethod
> +from collections.abc import Iterable
> from pathlib import PurePath
>
> -from framework.config import Architecture, NodeConfiguration
> +from framework.config import CPU, Architecture, NodeConfiguration
> from framework.logger import DTSLOG
> from framework.remote_session.factory import create_remote_session
> from framework.remote_session.remote_session import RemoteSession
> @@ -130,3 +131,16 @@ def get_dpdk_version(self, version_path: str |
> PurePath) -> str:
> """
> Inspect DPDK version on the remote node from version_path.
> """
> +
> + @abstractmethod
> + def get_remote_cpus(self, bypass_core0: bool) -> list[CPU]:
> + """
> + Compose a list of CPUs present on the remote node.
> + """
> +
> + @abstractmethod
> + def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) ->
> None:
> + """
> + Kill and cleanup all DPDK apps identified by dpdk_prefix_list. If
> + dpdk_prefix_list is empty, attempt to find running DPDK apps to
> kill and clean.
> + """
> diff --git a/dts/framework/remote_session/os/posix_session.py
> b/dts/framework/remote_session/os/posix_session.py
> index a36b8e8c1a..7151263c7a 100644
> --- a/dts/framework/remote_session/os/posix_session.py
> +++ b/dts/framework/remote_session/os/posix_session.py
> @@ -2,6 +2,8 @@
> # Copyright(c) 2022 PANTHEON.tech s.r.o.
> # Copyright(c) 2022 University of New Hampshire
>
> +import re
> +from collections.abc import Iterable
> from pathlib import PurePath, PurePosixPath
>
> from framework.config import Architecture
> @@ -138,3 +140,81 @@ def get_dpdk_version(self, build_dir: str | PurePath)
> -> str:
> f"cat {self.join_remote_path(build_dir, 'VERSION')}",
> verify=True
> )
> return out.stdout
> +
> + def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) ->
> None:
> + self.logger.info("Cleaning up DPDK apps.")
> + dpdk_runtime_dirs = self._get_dpdk_runtime_dirs(dpdk_prefix_list)
> + if dpdk_runtime_dirs:
> + # kill and cleanup only if DPDK is running
> + dpdk_pids = self._get_dpdk_pids(dpdk_runtime_dirs)
> + for dpdk_pid in dpdk_pids:
> + self.remote_session.send_command(f"kill -9 {dpdk_pid}",
> 20)
> + self._check_dpdk_hugepages(dpdk_runtime_dirs)
> + self._remove_dpdk_runtime_dirs(dpdk_runtime_dirs)
> +
> + def _get_dpdk_runtime_dirs(
> + self, dpdk_prefix_list: Iterable[str]
> + ) -> list[PurePosixPath]:
> + prefix = PurePosixPath("/var", "run", "dpdk")
> + if not dpdk_prefix_list:
> + remote_prefixes = self._list_remote_dirs(prefix)
> + if not remote_prefixes:
> + dpdk_prefix_list = []
> + else:
> + dpdk_prefix_list = remote_prefixes
> +
> + return [PurePosixPath(prefix, dpdk_prefix) for dpdk_prefix in
> dpdk_prefix_list]
> +
> + def _list_remote_dirs(self, remote_path: str | PurePath) -> list[str]
> | None:
> + """
> + Return a list of directories of the remote_dir.
> + If remote_path doesn't exist, return None.
> + """
> + out = self.remote_session.send_command(
> + f"ls -l {remote_path} | awk '/^d/ {{print $NF}}'"
> + ).stdout
> + if "No such file or directory" in out:
> + return None
> + else:
> + return out.splitlines()
> +
> + def _get_dpdk_pids(self, dpdk_runtime_dirs: Iterable[str | PurePath])
> -> list[int]:
> + pids = []
> + pid_regex = r"p(\d+)"
> + for dpdk_runtime_dir in dpdk_runtime_dirs:
> + dpdk_config_file = PurePosixPath(dpdk_runtime_dir, "config")
> + if self._remote_files_exists(dpdk_config_file):
> + out = self.remote_session.send_command(
> + f"lsof -Fp {dpdk_config_file}"
> + ).stdout
> + if out and "No such file or directory" not in out:
> + for out_line in out.splitlines():
> + match = re.match(pid_regex, out_line)
> + if match:
> + pids.append(int(match.group(1)))
> + return pids
> +
> + def _remote_files_exists(self, remote_path: PurePath) -> bool:
> + result = self.remote_session.send_command(f"test -e
> {remote_path}")
> + return not result.return_code
> +
> + def _check_dpdk_hugepages(
> + self, dpdk_runtime_dirs: Iterable[str | PurePath]
> + ) -> None:
> + for dpdk_runtime_dir in dpdk_runtime_dirs:
> + hugepage_info = PurePosixPath(dpdk_runtime_dir,
> "hugepage_info")
> + if self._remote_files_exists(hugepage_info):
> + out = self.remote_session.send_command(
> + f"lsof -Fp {hugepage_info}"
> + ).stdout
> + if out and "No such file or directory" not in out:
> + self.logger.warning("Some DPDK processes did not free
> hugepages.")
> +
> self.logger.warning("*******************************************")
> + self.logger.warning(out)
> +
> self.logger.warning("*******************************************")
> +
> + def _remove_dpdk_runtime_dirs(
> + self, dpdk_runtime_dirs: Iterable[str | PurePath]
> + ) -> None:
> + for dpdk_runtime_dir in dpdk_runtime_dirs:
> + self.remove_remote_dir(dpdk_runtime_dir)
> diff --git a/dts/framework/testbed_model/hw/__init__.py
> b/dts/framework/testbed_model/hw/__init__.py
> new file mode 100644
> index 0000000000..7d79a7efd0
> --- /dev/null
> +++ b/dts/framework/testbed_model/hw/__init__.py
> @@ -0,0 +1,17 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 PANTHEON.tech s.r.o.
> +
> +from framework.config import CPU, CPUList
> +
> +from .cpu import CPUAmount, CPUAmountFilter, CPUFilter, CPUListFilter
> +
> +
> +def cpu_filter(
> + core_list: list[CPU], filter_specifier: CPUAmount | CPUList,
> ascending: bool
> +) -> CPUFilter:
> + if isinstance(filter_specifier, CPUList):
> + return CPUListFilter(core_list, filter_specifier, ascending)
> + elif isinstance(filter_specifier, CPUAmount):
> + return CPUAmountFilter(core_list, filter_specifier, ascending)
> + else:
> + raise ValueError(f"Unsupported filter r{filter_specifier}")
> diff --git a/dts/framework/testbed_model/hw/cpu.py
> b/dts/framework/testbed_model/hw/cpu.py
> new file mode 100644
> index 0000000000..87e87bcb4e
> --- /dev/null
> +++ b/dts/framework/testbed_model/hw/cpu.py
> @@ -0,0 +1,164 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 PANTHEON.tech s.r.o.
> +
> +import dataclasses
> +from abc import ABC, abstractmethod
> +from collections.abc import Iterable
> +
> +from framework.config import CPU, CPUList
> +
> +
> +@dataclasses.dataclass(slots=True, frozen=True)
> +class CPUAmount:
> + """
> + Define the amounts of cpus to use. If sockets is not None,
> socket_amount
> + is ignored.
> + """
> +
> + cpus_per_core: int = 1
> + cores_per_socket: int = 2
> + socket_amount: int = 1
> + sockets: list[int] | None = None
> +
> +
> +class CPUFilter(ABC):
> + """
> + Filter according to the input filter specifier. Each filter needs to
> be
> + implemented in a derived class.
> + This class only implements operations common to all filters, such as
> sorting
> + the list to be filtered beforehand.
> + """
> +
> + _filter_specifier: CPUAmount | CPUList
> + _cpus_to_filter: list[CPU]
> +
> + def __init__(
> + self,
> + core_list: list[CPU],
> + filter_specifier: CPUAmount | CPUList,
> + ascending: bool = True,
> + ) -> None:
> + self._filter_specifier = filter_specifier
> +
> + # sorting by core is needed in case hyperthreading is enabled
> + self._cpus_to_filter = sorted(
> + core_list, key=lambda x: x.core, reverse=not ascending
> + )
> + self.filter()
> +
> + @abstractmethod
> + def filter(self) -> list[CPU]:
> + """
> + Use the input self._filter_specifier to filter
> self._cpus_to_filter
> + and return the list of filtered CPUs. self._cpus_to_filter is a
> + sorter copy of the original list, so it may be modified.
> + """
> +
> +
> +class CPUAmountFilter(CPUFilter):
> + """
> + Filter the input list of CPUs according to specified rules:
> + Use cores from the specified amount of sockets or from the specified
> socket ids.
> + If sockets is specified, it takes precedence over socket_amount.
> + From each of those sockets, use only cores_per_socket of cores.
> + And for each core, use cpus_per_core of cpus. Hypertheading
> + must be enabled for this to take effect.
> + If ascending is True, use cores with the lowest numerical id first
> + and continue in ascending order. If False, start with the highest
> + id and continue in descending order. This ordering affects which
> + sockets to consider first as well.
> + """
> +
> + _filter_specifier: CPUAmount
> +
> + def filter(self) -> list[CPU]:
> + return
> self._filter_cpus(self._filter_sockets(self._cpus_to_filter))
> +
> + def _filter_sockets(self, cpus_to_filter: Iterable[CPU]) -> list[CPU]:
> + allowed_sockets: set[int] = set()
> + socket_amount = self._filter_specifier.socket_amount
> + if self._filter_specifier.sockets:
> + socket_amount = len(self._filter_specifier.sockets)
> + allowed_sockets = set(self._filter_specifier.sockets)
> +
> + filtered_cpus = []
> + for cpu in cpus_to_filter:
> + if not self._filter_specifier.sockets:
> + if len(allowed_sockets) < socket_amount:
> + allowed_sockets.add(cpu.socket)
> + if cpu.socket in allowed_sockets:
> + filtered_cpus.append(cpu)
> +
> + if len(allowed_sockets) < socket_amount:
> + raise ValueError(
> + f"The amount of sockets from which to use cores "
> + f"({socket_amount}) exceeds the actual amount present "
> + f"on the node ({len(allowed_sockets)})"
> + )
> +
> + return filtered_cpus
> +
> + def _filter_cpus(self, cpus_to_filter: Iterable[CPU]) -> list[CPU]:
> + # no need to use ordered dict, from Python3.7 the dict
> + # insertion order is preserved (LIFO).
> + allowed_cpu_per_core_count_map: dict[int, int] = {}
> + filtered_cpus = []
> + for cpu in cpus_to_filter:
> + if cpu.core in allowed_cpu_per_core_count_map:
> + cpu_count = allowed_cpu_per_core_count_map[cpu.core]
> + if self._filter_specifier.cpus_per_core > cpu_count:
> + # only add cpus of the given core
> + allowed_cpu_per_core_count_map[cpu.core] += 1
> + filtered_cpus.append(cpu)
> + else:
> + raise ValueError(
> + f"The amount of CPUs per core to use "
> + f"({self._filter_specifier.cpus_per_core}) "
> + f"exceeds the actual amount present. Is
> hyperthreading enabled?"
> + )
> + elif self._filter_specifier.cores_per_socket > len(
> + allowed_cpu_per_core_count_map
> + ):
> + # only add cpus if we need more
> + allowed_cpu_per_core_count_map[cpu.core] = 1
> + filtered_cpus.append(cpu)
> + else:
> + # cpus are sorted by core, at this point we won't
> encounter new cores
> + break
> +
> + cores_per_socket = len(allowed_cpu_per_core_count_map)
> + if cores_per_socket < self._filter_specifier.cores_per_socket:
> + raise ValueError(
> + f"The amount of cores per socket to use "
> + f"({self._filter_specifier.cores_per_socket}) "
> + f"exceeds the actual amount present ({cores_per_socket})"
> + )
> +
> + return filtered_cpus
> +
> +
> +class CPUListFilter(CPUFilter):
> + """
> + Filter the input list of CPUs according to the input list of
> + core indices.
> + An empty CPUList won't filter anything.
> + """
> +
> + _filter_specifier: CPUList
> +
> + def filter(self) -> list[CPU]:
> + if not len(self._filter_specifier.cpu_list):
> + return self._cpus_to_filter
> +
> + filtered_cpus = []
> + for core in self._cpus_to_filter:
> + if core.cpu in self._filter_specifier.cpu_list:
> + filtered_cpus.append(core)
> +
> + if len(filtered_cpus) != len(self._filter_specifier.cpu_list):
> + raise ValueError(
> + f"Not all cpus from {self._filter_specifier.cpu_list}
> were found"
> + f"among {self._cpus_to_filter}"
> + )
> +
> + return filtered_cpus
> diff --git a/dts/framework/testbed_model/node/node.py
> b/dts/framework/testbed_model/node/node.py
> index 86654e55ae..5ee7023335 100644
> --- a/dts/framework/testbed_model/node/node.py
> +++ b/dts/framework/testbed_model/node/node.py
> @@ -8,13 +8,16 @@
> """
>
> from framework.config import (
> + CPU,
> BuildTargetConfiguration,
> + CPUList,
> ExecutionConfiguration,
> NodeConfiguration,
> )
> from framework.exception import NodeCleanupError, NodeSetupError,
> convert_exception
> from framework.logger import DTSLOG, getLogger
> from framework.remote_session import OSSession, create_session
> +from framework.testbed_model.hw import CPUAmount, cpu_filter
>
>
> class Node(object):
> @@ -28,6 +31,7 @@ class Node(object):
> main_session: OSSession
> logger: DTSLOG
> config: NodeConfiguration
> + cpus: list[CPU]
> _other_sessions: list[OSSession]
>
> def __init__(self, node_config: NodeConfiguration):
> @@ -38,6 +42,7 @@ def __init__(self, node_config: NodeConfiguration):
> self.logger = getLogger(self.name)
> self.logger.info(f"Created node: {self.name}")
> self.main_session = create_session(self.config, self.name,
> self.logger)
> + self._get_remote_cpus()
>
> @convert_exception(NodeSetupError)
> def setup_execution(self, execution_config: ExecutionConfiguration)
> -> None:
> @@ -109,6 +114,37 @@ def create_session(self, name: str) -> OSSession:
> self._other_sessions.append(connection)
> return connection
>
> + def filter_cpus(
> + self,
> + filter_specifier: CPUAmount | CPUList,
> + ascending: bool = True,
> + ) -> list[CPU]:
> + """
> + Filter the logical cpus found on the Node according to specified
> rules:
> + Use cores from the specified amount of sockets or from the
> specified
> + socket ids. If sockets is specified, it takes precedence over
> socket_amount.
> + From each of those sockets, use only cpus_per_socket of cores.
> + And for each core, use cpus_per_core of cpus. Hypertheading
> + must be enabled for this to take effect.
> + If ascending is True, use cores with the lowest numerical id first
> + and continue in ascending order. If False, start with the highest
> + id and continue in descending order. This ordering affects which
> + sockets to consider first as well.
> + """
> + self.logger.info("Filtering ")
> + return cpu_filter(
> + self.cpus,
> + filter_specifier,
> + ascending,
> + ).filter()
> +
> + def _get_remote_cpus(self) -> None:
> + """
> + Scan cpus in the remote OS and store a list of CPUs.
> + """
> + self.logger.info("Getting CPU information.")
> + self.cpus =
> self.main_session.get_remote_cpus(self.config.bypass_core0)
> +
> def close(self) -> None:
> """
> Close all connections and free other resources.
> diff --git a/dts/framework/testbed_model/node/sut_node.py
> b/dts/framework/testbed_model/node/sut_node.py
> index 53268a7565..ff3be845b4 100644
> --- a/dts/framework/testbed_model/node/sut_node.py
> +++ b/dts/framework/testbed_model/node/sut_node.py
> @@ -4,10 +4,13 @@
>
> import os
> import tarfile
> +import time
> from pathlib import PurePath
>
> -from framework.config import BuildTargetConfiguration, NodeConfiguration
> +from framework.config import CPU, BuildTargetConfiguration, CPUList,
> NodeConfiguration
> +from framework.remote_session import OSSession
> from framework.settings import SETTINGS
> +from framework.testbed_model.hw import CPUAmount, CPUListFilter
> from framework.utils import EnvVarsDict, skip_setup
>
> from .node import Node
> @@ -21,19 +24,31 @@ class SutNode(Node):
> Another key capability is building DPDK according to given build
> target.
> """
>
> + cpus: list[CPU]
> + dpdk_prefix_list: list[str]
> + dpdk_prefix_subfix: str
> _build_target_config: BuildTargetConfiguration | None
> _env_vars: EnvVarsDict
> _remote_tmp_dir: PurePath
> __remote_dpdk_dir: PurePath | None
> _app_compile_timeout: float
> + _dpdk_kill_session: OSSession | None
>
> def __init__(self, node_config: NodeConfiguration):
> super(SutNode, self).__init__(node_config)
> + self.dpdk_prefix_list = []
> self._build_target_config = None
> self._env_vars = EnvVarsDict()
> self._remote_tmp_dir = self.main_session.get_remote_tmp_dir()
> self.__remote_dpdk_dir = None
> self._app_compile_timeout = 90
> + self._dpdk_kill_session = None
> +
> + # filter the node cpus according to user config
> + self.cpus = CPUListFilter(self.cpus, self.config.cpus).filter()
> + self.dpdk_prefix_subfix = (
> + f"{str(os.getpid())}_{time.strftime('%Y%m%d%H%M%S',
> time.localtime())}"
> + )
>
> @property
> def _remote_dpdk_dir(self) -> PurePath:
> @@ -142,3 +157,164 @@ def build_dpdk_app(self, app_name: str) -> PurePath:
> return self.main_session.join_remote_path(
> build_dir, "examples", f"dpdk-{app_name}"
> )
> +
> + def kill_cleanup_dpdk_apps(self) -> None:
> + """
> + Kill all dpdk applications on the SUT. Cleanup hugepages.
> + """
> + if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
> + # we can use the session if it exists and responds
> +
> self._dpdk_kill_session.kill_cleanup_dpdk_apps(self.dpdk_prefix_list)
> + else:
> + # otherwise, we need to (re)create it
> + self._dpdk_kill_session = self.create_session("dpdk_kill")
> + self.dpdk_prefix_list = []
> +
> + def create_eal_parameters(
> + self,
> + fixed_prefix: bool = False,
> + core_filter_specifier: CPUAmount | CPUList = CPUAmount(),
> + ascending_cores: bool = True,
> + prefix: str = "",
> + no_pci: bool = False,
> + vdevs: list[str] = None,
>
I would prefer to have vdevs be a list of objects, even if for now that
class just takes a string in its constructor. Later on we can add
subclasses for specific vdevs that might see heavy use, such
as librte_net_pcap and crypto_openssl.
> + other_eal_param: str = "",
> + ) -> str:
> + """
> + Generate eal parameters character string;
> + :param fixed_prefix: use fixed file-prefix or not, when it is
> true,
> + the file-prefix will not be added a timestamp
> + :param core_filter_specifier: an amount of cpus/cores/sockets to
> use
> + or a list of cpu ids to use.
> + The default will select one cpu for each of two
> cores
> + on one socket, in ascending order of core ids.
> + :param ascending_cores: True, use cores with the lowest numerical
> id first
> + and continue in ascending order. If False, start
> with the
> + highest id and continue in descending order. This
> ordering
> + affects which sockets to consider first as well.
> + :param prefix: set file prefix string, eg:
> + prefix='vf';
> + :param no_pci: switch of disable PCI bus eg:
> + no_pci=True;
> + :param vdevs: virtual device list, eg:
> + vdevs=['net_ring0', 'net_ring1'];
> + :param other_eal_param: user defined DPDK eal parameters, eg:
> + other_eal_param='--single-file-segments';
> + :return: eal param string, eg:
> + '-c 0xf -a 0000:88:00.0
> --file-prefix=dpdk_1112_20190809143420';
> + if DPDK version < 20.11-rc4, eal_str eg:
> + '-c 0xf -w 0000:88:00.0
> --file-prefix=dpdk_1112_20190809143420';
> + """
> + if vdevs is None:
> + vdevs = []
> +
> + config = {
> + "core_filter_specifier": core_filter_specifier,
> + "ascending_cores": ascending_cores,
> + "prefix": prefix,
> + "no_pci": no_pci,
> + "vdevs": vdevs,
> + "other_eal_param": other_eal_param,
> + }
> +
> + eal_parameter_creator = _EalParameter(
> + sut_node=self, fixed_prefix=fixed_prefix, **config
> + )
> + eal_str = eal_parameter_creator.make_eal_param()
> +
> + return eal_str
> +
> +
> +class _EalParameter(object):
> + def __init__(
> + self,
> + sut_node: SutNode,
> + fixed_prefix: bool,
> + core_filter_specifier: CPUAmount | CPUList,
> + ascending_cores: bool,
> + prefix: str,
> + no_pci: bool,
> + vdevs: list[str],
> + other_eal_param: str,
> + ):
> + """
> + Generate eal parameters character string;
> + :param sut_node: SUT Node;
> + :param fixed_prefix: use fixed file-prefix or not, when it is
> true,
> + he file-prefix will not be added a timestamp
> + :param core_filter_specifier: an amount of cpus/cores/sockets to
> use
> + or a list of cpu ids to use.
> + :param ascending_cores: True, use cores with the lowest numerical
> id first
> + and continue in ascending order. If False, start
> with the
> + highest id and continue in descending order. This
> ordering
> + affects which sockets to consider first as well.
> + :param prefix: set file prefix string, eg:
> + prefix='vf';
> + :param no_pci: switch of disable PCI bus eg:
> + no_pci=True;
> + :param vdevs: virtual device list, eg:
> + vdevs=['net_ring0', 'net_ring1'];
> + :param other_eal_param: user defined DPDK eal parameters, eg:
> + other_eal_param='--single-file-segments';
> + """
> + self.os = sut_node.config.os
> + self.fixed_prefix = fixed_prefix
> + self.sut_node = sut_node
> + self.core_filter_specifier = core_filter_specifier
> + self.ascending_cores = ascending_cores
> + self.prefix = prefix
> + self.no_pci = no_pci
> + self.vdevs = vdevs
> + self.other_eal_param = other_eal_param
> +
> + def _make_lcores_param(self) -> str:
> + filtered_cpus = self.sut_node.filter_cpus(
> + self.core_filter_specifier, self.ascending_cores
> + )
> + return f"-l {CPUList(filtered_cpus)}"
> +
> + def _make_memory_channels(self) -> str:
> + param_template = "-n {}"
> + return param_template.format(self.sut_node.config.memory_channels)
> +
> + def _make_no_pci_param(self) -> str:
> + if self.no_pci is True:
> + return "--no-pci"
> + else:
> + return ""
> +
> + def _make_prefix_param(self) -> str:
> + if self.prefix == "":
> + fixed_file_prefix = f"dpdk_{self.sut_node.dpdk_prefix_subfix}"
> + else:
> + fixed_file_prefix = self.prefix
> + if not self.fixed_prefix:
> + fixed_file_prefix = (
> +
> f"{fixed_file_prefix}_{self.sut_node.dpdk_prefix_subfix}"
> + )
> + fixed_file_prefix =
> self._do_os_handle_with_prefix_param(fixed_file_prefix)
> + return fixed_file_prefix
> +
> + def _make_vdevs_param(self) -> str:
> + if len(self.vdevs) == 0:
> + return ""
> + else:
> + return " ".join(f"--vdev {vdev}" for vdev in self.vdevs)
> +
> + def _do_os_handle_with_prefix_param(self, file_prefix: str) -> str:
> + self.sut_node.dpdk_prefix_list.append(file_prefix)
> + return f"--file-prefix={file_prefix}"
> +
> + def make_eal_param(self) -> str:
> + _eal_str = " ".join(
> + [
> + self._make_lcores_param(),
> + self._make_memory_channels(),
> + self._make_prefix_param(),
> + self._make_no_pci_param(),
> + self._make_vdevs_param(),
> + # append user defined eal parameters
> + self.other_eal_param,
> + ]
> + )
> + return _eal_str
> diff --git a/dts/framework/utils.py b/dts/framework/utils.py
> index 91e58f3218..3c2f0adff9 100644
> --- a/dts/framework/utils.py
> +++ b/dts/framework/utils.py
> @@ -32,6 +32,26 @@ def skip_setup(func) -> Callable[..., None]:
> return func
>
>
> +def expand_range(range_str: str) -> list[int]:
> + """
> + Process range string into a list of integers. There are two possible
> formats:
> + n - a single integer
> + n-m - a range of integers
> +
> + The returned range includes both n and m. Empty string returns an
> empty list.
> + """
> + expanded_range: list[int] = []
> + if range_str:
> + range_boundaries = range_str.split("-")
> + # will throw an exception when items in range_boundaries can't be
> converted,
> + # serving as type check
> + expanded_range.extend(
> + range(int(range_boundaries[0]), int(range_boundaries[-1]) + 1)
> + )
> +
> + return expanded_range
> +
> +
> def GREEN(text: str) -> str:
> return f"\u001B[32;1m{str(text)}\u001B[0m"
>
> --
> 2.30.2
>
>
Again, apologies for removing recipients in my earlier reply.
From: Owen Hilyard <ohilyard@iol.unh.edu>
Sent: Monday, November 21, 2022 1:40 PM
To: Juraj Linkeš <juraj.linkes@pantheon.tech>
Subject: Re: [RFC PATCH v2 04/10] dts: add dpdk execution handling
On Fri, Nov 18, 2022 at 8:00 AM Juraj Linkeš <juraj.linkes@pantheon.tech<mailto:juraj.linkes@pantheon.tech>> wrote:
diff --git a/dts/framework/config/conf_yaml_schema.json b/dts/framework/config/conf_yaml_schema.json
index 409ce7ac74..c59d3e30e6 100644
--- a/dts/framework/config/conf_yaml_schema.json
+++ b/dts/framework/config/conf_yaml_schema.json
@@ -6,6 +6,12 @@
"type": "string",
"description": "A unique identifier for a node"
},
+ "ARCH": {
+ "type": "string",
+ "enum": [
+ "x86_64"
arm64 and ppc64le should probably be included here. I think that we can focus on 64 bit arches for now.
[Juraj] Seems safe enough. At this point it doesn't matter, but when we have a number of testcases, we may need to revisit this (if we can't verify an architecture for example).
[Owen] The reason I want this is because I want there to always be an architecture that is not the one being developed on that developers need to handle properly. LoongArch might actually be a good candidate for this if support gets merged, since to my knowledge almost no one has access to their server-class CPUs yet. Essentially, I want to force anyone who does something that is architecture dependent to consider other architectures, not just have the "the entire world is x86" mentality.
Alright, good to know.
I have a semi-related point, we specify arch (and os as well) in both build target and SUT config. Are these even going to be different? I see cpu (or platform in meson config) being different, but not the other two and that could simplify the config a bit.
<snip>
+ def kill_cleanup_dpdk_apps(self) -> None:
+ """
+ Kill all dpdk applications on the SUT. Cleanup hugepages.
+ """
+ if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
+ # we can use the session if it exists and responds
+ self._dpdk_kill_session.kill_cleanup_dpdk_apps(self.dpdk_prefix_list)
+ else:
+ # otherwise, we need to (re)create it
+ self._dpdk_kill_session = self.create_session("dpdk_kill")
+ self.dpdk_prefix_list = []
+
+ def create_eal_parameters(
+ self,
+ fixed_prefix: bool = False,
+ core_filter_specifier: CPUAmount | CPUList = CPUAmount(),
+ ascending_cores: bool = True,
+ prefix: str = "",
+ no_pci: bool = False,
+ vdevs: list[str] = None,
I would prefer to have vdevs be a list of objects, even if for now that class just takes a string in its constructor. Later on we can add subclasses for specific vdevs that might see heavy use, such as librte_net_pcap and crypto_openssl.
[Juraj] Ok, this is simple enough, I'll add it.
On Wed, Nov 23, 2022 at 8:03 AM Juraj Linkeš <juraj.linkes@pantheon.tech>
wrote:
> Again, apologies for removing recipients in my earlier reply.
>
>
>
> *From:* Owen Hilyard <ohilyard@iol.unh.edu>
> *Sent:* Monday, November 21, 2022 1:40 PM
> *To:* Juraj Linkeš <juraj.linkes@pantheon.tech>
> *Subject:* Re: [RFC PATCH v2 04/10] dts: add dpdk execution handling
>
>
>
> On Fri, Nov 18, 2022 at 8:00 AM Juraj Linkeš <juraj.linkes@pantheon.tech>
> wrote:
>
> diff --git a/dts/framework/config/conf_yaml_schema.json
> b/dts/framework/config/conf_yaml_schema.json
> index 409ce7ac74..c59d3e30e6 100644
> --- a/dts/framework/config/conf_yaml_schema.json
> +++ b/dts/framework/config/conf_yaml_schema.json
> @@ -6,6 +6,12 @@
> "type": "string",
> "description": "A unique identifier for a node"
> },
> + "ARCH": {
> + "type": "string",
> + "enum": [
> + "x86_64"
>
> arm64 and ppc64le should probably be included here. I think that we can
> focus on 64 bit arches for now.
>
> [Juraj] Seems safe enough. At this point it doesn't matter, but when we
> have a number of testcases, we may need to revisit this (if we can't verify
> an architecture for example).
>
>
>
> [Owen] The reason I want this is because I want there to always be an
> architecture that is not the one being developed on that developers need to
> handle properly. LoongArch might actually be a good candidate for this if
> support gets merged, since to my knowledge almost no one has access to
> their server-class CPUs yet. Essentially, I want to force anyone who does
> something that is architecture dependent to consider other architectures,
> not just have the "the entire world is x86" mentality.
>
>
>
> Alright, good to know.
>
> I have a semi-related point, we specify arch (and os as well) in both
> build target and SUT config. Are these even going to be different? I see
> cpu (or platform in meson config) being different, but not the other two
> and that could simplify the config a bit.
>
[Owen] If I remember correctly, the older DTS has i686 (32 bit x86)
support, and you might want to run i686 on an x86_64 cpu. That is the only
use case I can see for differing build arch and SUT arch. The community lab
doesn't have any 32 bit hardware, so any future 32 bit testing would need
to happen on a 64 bit system running in a compatibility mode.
> <snip>
>
> + def kill_cleanup_dpdk_apps(self) -> None:
> + """
> + Kill all dpdk applications on the SUT. Cleanup hugepages.
> + """
> + if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
> + # we can use the session if it exists and responds
> +
> self._dpdk_kill_session.kill_cleanup_dpdk_apps(self.dpdk_prefix_list)
> + else:
> + # otherwise, we need to (re)create it
> + self._dpdk_kill_session = self.create_session("dpdk_kill")
> + self.dpdk_prefix_list = []
> +
> + def create_eal_parameters(
> + self,
> + fixed_prefix: bool = False,
> + core_filter_specifier: CPUAmount | CPUList = CPUAmount(),
> + ascending_cores: bool = True,
> + prefix: str = "",
> + no_pci: bool = False,
> + vdevs: list[str] = None,
>
> I would prefer to have vdevs be a list of objects, even if for now that
> class just takes a string in its constructor. Later on we can add
> subclasses for specific vdevs that might see heavy use, such
> as librte_net_pcap and crypto_openssl.
>
> [Juraj] Ok, this is simple enough, I'll add it.
>
>
@@ -12,4 +12,8 @@ nodes:
- name: "SUT 1"
hostname: sut1.change.me.localhost
user: root
+ arch: x86_64
os: linux
+ bypass_core0: true
+ cpus: ""
+ memory_channels: 4
@@ -11,12 +11,13 @@
import pathlib
from dataclasses import dataclass
from enum import Enum, auto, unique
-from typing import Any
+from typing import Any, Iterable
import warlock # type: ignore
import yaml
from framework.settings import SETTINGS
+from framework.utils import expand_range
class StrEnum(Enum):
@@ -60,6 +61,80 @@ class Compiler(StrEnum):
msvc = auto()
+@dataclass(slots=True, frozen=True)
+class CPU:
+ cpu: int
+ core: int
+ socket: int
+ node: int
+
+ def __str__(self) -> str:
+ return str(self.cpu)
+
+
+class CPUList(object):
+ """
+ Convert these options into a list of int cpus
+ cpu_list=[CPU1, CPU2] - a list of CPUs
+ cpu_list=[0,1,2,3] - a list of int indices
+ cpu_list=['0','1','2-3'] - a list of str indices; ranges are supported
+ cpu_list='0,1,2-3' - a comma delimited str of indices; ranges are supported
+
+ The class creates a unified format used across the framework and allows
+ the user to use either a str representation (using str(instance) or directly
+ in f-strings) or a list representation (by accessing instance.cpu_list).
+ Empty cpu_list is allowed.
+ """
+
+ _cpu_list: list[int]
+
+ def __init__(self, cpu_list: list[int | str | CPU] | str):
+ self._cpu_list = []
+ if isinstance(cpu_list, str):
+ self._from_str(cpu_list.split(","))
+ else:
+ self._from_str((str(cpu) for cpu in cpu_list))
+
+ # the input cpus may not be sorted
+ self._cpu_list.sort()
+
+ @property
+ def cpu_list(self) -> list[int]:
+ return self._cpu_list
+
+ def _from_str(self, cpu_list: Iterable[str]) -> None:
+ for cpu in cpu_list:
+ self._cpu_list.extend(expand_range(cpu))
+
+ def _get_consecutive_cpus_range(self, cpu_list: list[int]) -> list[str]:
+ formatted_core_list = []
+ tmp_cpus_list = list(sorted(cpu_list))
+ segment = tmp_cpus_list[:1]
+ for core_id in tmp_cpus_list[1:]:
+ if core_id - segment[-1] == 1:
+ segment.append(core_id)
+ else:
+ formatted_core_list.append(
+ f"{segment[0]}-{segment[-1]}"
+ if len(segment) > 1
+ else f"{segment[0]}"
+ )
+ current_core_index = tmp_cpus_list.index(core_id)
+ formatted_core_list.extend(
+ self._get_consecutive_cpus_range(tmp_cpus_list[current_core_index:])
+ )
+ segment.clear()
+ break
+ if len(segment) > 0:
+ formatted_core_list.append(
+ f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
+ )
+ return formatted_core_list
+
+ def __str__(self) -> str:
+ return f'{",".join(self._get_consecutive_cpus_range(self._cpu_list))}'
+
+
# Slots enables some optimizations, by pre-allocating space for the defined
# attributes in the underlying data structure.
#
@@ -71,7 +146,11 @@ class NodeConfiguration:
hostname: str
user: str
password: str | None
+ arch: Architecture
os: OS
+ bypass_core0: bool
+ cpus: CPUList
+ memory_channels: int
@staticmethod
def from_dict(d: dict) -> "NodeConfiguration":
@@ -80,7 +159,11 @@ def from_dict(d: dict) -> "NodeConfiguration":
hostname=d["hostname"],
user=d["user"],
password=d.get("password"),
+ arch=Architecture(d["arch"]),
os=OS(d["os"]),
+ bypass_core0=d.get("bypass_core0", False),
+ cpus=CPUList(d.get("cpus", "1")),
+ memory_channels=d.get("memory_channels", 1),
)
@@ -6,6 +6,12 @@
"type": "string",
"description": "A unique identifier for a node"
},
+ "ARCH": {
+ "type": "string",
+ "enum": [
+ "x86_64"
+ ]
+ },
"OS": {
"type": "string",
"enum": [
@@ -82,8 +88,23 @@
"type": "string",
"description": "The password to use on this node. Use only as a last resort. SSH keys are STRONGLY preferred."
},
+ "arch": {
+ "$ref": "#/definitions/ARCH"
+ },
"os": {
"$ref": "#/definitions/OS"
+ },
+ "bypass_core0": {
+ "type": "boolean",
+ "description": "Indicate that DPDK should omit using the first core."
+ },
+ "cpus": {
+ "type": "string",
+ "description": "Optional comma-separated list of cpus to use, e.g.: 1,2,3,4,5,18-22. Defaults to 1. An empty string means use all cpus."
+ },
+ "memory_channels": {
+ "type": "integer",
+ "description": "How many memory channels to use. Optional, defaults to 1."
}
},
"additionalProperties": false,
@@ -91,6 +112,7 @@
"name",
"hostname",
"user",
+ "arch",
"os"
]
},
@@ -2,6 +2,8 @@
# Copyright(c) 2022 PANTHEON.tech s.r.o.
# Copyright(c) 2022 University of New Hampshire
+from framework.config import CPU
+
from .posix_session import PosixSession
@@ -9,3 +11,16 @@ class LinuxSession(PosixSession):
"""
The implementation of non-Posix compliant parts of Linux remote sessions.
"""
+
+ def get_remote_cpus(self, bypass_core0: bool) -> list[CPU]:
+ cpu_info = self.remote_session.send_command(
+ "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#"
+ ).stdout
+ cpus = []
+ for cpu_line in cpu_info.splitlines():
+ cpu, core, socket, node = cpu_line.split(",")
+ if bypass_core0 and core == 0 and socket == 0:
+ self.logger.info("Core0 bypassed.")
+ continue
+ cpus.append(CPU(int(cpu), int(core), int(socket), int(node)))
+ return cpus
@@ -3,9 +3,10 @@
# Copyright(c) 2022 University of New Hampshire
from abc import ABC, abstractmethod
+from collections.abc import Iterable
from pathlib import PurePath
-from framework.config import Architecture, NodeConfiguration
+from framework.config import CPU, Architecture, NodeConfiguration
from framework.logger import DTSLOG
from framework.remote_session.factory import create_remote_session
from framework.remote_session.remote_session import RemoteSession
@@ -130,3 +131,16 @@ def get_dpdk_version(self, version_path: str | PurePath) -> str:
"""
Inspect DPDK version on the remote node from version_path.
"""
+
+ @abstractmethod
+ def get_remote_cpus(self, bypass_core0: bool) -> list[CPU]:
+ """
+ Compose a list of CPUs present on the remote node.
+ """
+
+ @abstractmethod
+ def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) -> None:
+ """
+ Kill and cleanup all DPDK apps identified by dpdk_prefix_list. If
+ dpdk_prefix_list is empty, attempt to find running DPDK apps to kill and clean.
+ """
@@ -2,6 +2,8 @@
# Copyright(c) 2022 PANTHEON.tech s.r.o.
# Copyright(c) 2022 University of New Hampshire
+import re
+from collections.abc import Iterable
from pathlib import PurePath, PurePosixPath
from framework.config import Architecture
@@ -138,3 +140,81 @@ def get_dpdk_version(self, build_dir: str | PurePath) -> str:
f"cat {self.join_remote_path(build_dir, 'VERSION')}", verify=True
)
return out.stdout
+
+ def kill_cleanup_dpdk_apps(self, dpdk_prefix_list: Iterable[str]) -> None:
+ self.logger.info("Cleaning up DPDK apps.")
+ dpdk_runtime_dirs = self._get_dpdk_runtime_dirs(dpdk_prefix_list)
+ if dpdk_runtime_dirs:
+ # kill and cleanup only if DPDK is running
+ dpdk_pids = self._get_dpdk_pids(dpdk_runtime_dirs)
+ for dpdk_pid in dpdk_pids:
+ self.remote_session.send_command(f"kill -9 {dpdk_pid}", 20)
+ self._check_dpdk_hugepages(dpdk_runtime_dirs)
+ self._remove_dpdk_runtime_dirs(dpdk_runtime_dirs)
+
+ def _get_dpdk_runtime_dirs(
+ self, dpdk_prefix_list: Iterable[str]
+ ) -> list[PurePosixPath]:
+ prefix = PurePosixPath("/var", "run", "dpdk")
+ if not dpdk_prefix_list:
+ remote_prefixes = self._list_remote_dirs(prefix)
+ if not remote_prefixes:
+ dpdk_prefix_list = []
+ else:
+ dpdk_prefix_list = remote_prefixes
+
+ return [PurePosixPath(prefix, dpdk_prefix) for dpdk_prefix in dpdk_prefix_list]
+
+ def _list_remote_dirs(self, remote_path: str | PurePath) -> list[str] | None:
+ """
+ Return a list of directories of the remote_dir.
+ If remote_path doesn't exist, return None.
+ """
+ out = self.remote_session.send_command(
+ f"ls -l {remote_path} | awk '/^d/ {{print $NF}}'"
+ ).stdout
+ if "No such file or directory" in out:
+ return None
+ else:
+ return out.splitlines()
+
+ def _get_dpdk_pids(self, dpdk_runtime_dirs: Iterable[str | PurePath]) -> list[int]:
+ pids = []
+ pid_regex = r"p(\d+)"
+ for dpdk_runtime_dir in dpdk_runtime_dirs:
+ dpdk_config_file = PurePosixPath(dpdk_runtime_dir, "config")
+ if self._remote_files_exists(dpdk_config_file):
+ out = self.remote_session.send_command(
+ f"lsof -Fp {dpdk_config_file}"
+ ).stdout
+ if out and "No such file or directory" not in out:
+ for out_line in out.splitlines():
+ match = re.match(pid_regex, out_line)
+ if match:
+ pids.append(int(match.group(1)))
+ return pids
+
+ def _remote_files_exists(self, remote_path: PurePath) -> bool:
+ result = self.remote_session.send_command(f"test -e {remote_path}")
+ return not result.return_code
+
+ def _check_dpdk_hugepages(
+ self, dpdk_runtime_dirs: Iterable[str | PurePath]
+ ) -> None:
+ for dpdk_runtime_dir in dpdk_runtime_dirs:
+ hugepage_info = PurePosixPath(dpdk_runtime_dir, "hugepage_info")
+ if self._remote_files_exists(hugepage_info):
+ out = self.remote_session.send_command(
+ f"lsof -Fp {hugepage_info}"
+ ).stdout
+ if out and "No such file or directory" not in out:
+ self.logger.warning("Some DPDK processes did not free hugepages.")
+ self.logger.warning("*******************************************")
+ self.logger.warning(out)
+ self.logger.warning("*******************************************")
+
+ def _remove_dpdk_runtime_dirs(
+ self, dpdk_runtime_dirs: Iterable[str | PurePath]
+ ) -> None:
+ for dpdk_runtime_dir in dpdk_runtime_dirs:
+ self.remove_remote_dir(dpdk_runtime_dir)
new file mode 100644
@@ -0,0 +1,17 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 PANTHEON.tech s.r.o.
+
+from framework.config import CPU, CPUList
+
+from .cpu import CPUAmount, CPUAmountFilter, CPUFilter, CPUListFilter
+
+
+def cpu_filter(
+ core_list: list[CPU], filter_specifier: CPUAmount | CPUList, ascending: bool
+) -> CPUFilter:
+ if isinstance(filter_specifier, CPUList):
+ return CPUListFilter(core_list, filter_specifier, ascending)
+ elif isinstance(filter_specifier, CPUAmount):
+ return CPUAmountFilter(core_list, filter_specifier, ascending)
+ else:
+ raise ValueError(f"Unsupported filter r{filter_specifier}")
new file mode 100644
@@ -0,0 +1,164 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 PANTHEON.tech s.r.o.
+
+import dataclasses
+from abc import ABC, abstractmethod
+from collections.abc import Iterable
+
+from framework.config import CPU, CPUList
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class CPUAmount:
+ """
+ Define the amounts of cpus to use. If sockets is not None, socket_amount
+ is ignored.
+ """
+
+ cpus_per_core: int = 1
+ cores_per_socket: int = 2
+ socket_amount: int = 1
+ sockets: list[int] | None = None
+
+
+class CPUFilter(ABC):
+ """
+ Filter according to the input filter specifier. Each filter needs to be
+ implemented in a derived class.
+ This class only implements operations common to all filters, such as sorting
+ the list to be filtered beforehand.
+ """
+
+ _filter_specifier: CPUAmount | CPUList
+ _cpus_to_filter: list[CPU]
+
+ def __init__(
+ self,
+ core_list: list[CPU],
+ filter_specifier: CPUAmount | CPUList,
+ ascending: bool = True,
+ ) -> None:
+ self._filter_specifier = filter_specifier
+
+ # sorting by core is needed in case hyperthreading is enabled
+ self._cpus_to_filter = sorted(
+ core_list, key=lambda x: x.core, reverse=not ascending
+ )
+ self.filter()
+
+ @abstractmethod
+ def filter(self) -> list[CPU]:
+ """
+ Use the input self._filter_specifier to filter self._cpus_to_filter
+ and return the list of filtered CPUs. self._cpus_to_filter is a
+ sorter copy of the original list, so it may be modified.
+ """
+
+
+class CPUAmountFilter(CPUFilter):
+ """
+ Filter the input list of CPUs according to specified rules:
+ Use cores from the specified amount of sockets or from the specified socket ids.
+ If sockets is specified, it takes precedence over socket_amount.
+ From each of those sockets, use only cores_per_socket of cores.
+ And for each core, use cpus_per_core of cpus. Hypertheading
+ must be enabled for this to take effect.
+ If ascending is True, use cores with the lowest numerical id first
+ and continue in ascending order. If False, start with the highest
+ id and continue in descending order. This ordering affects which
+ sockets to consider first as well.
+ """
+
+ _filter_specifier: CPUAmount
+
+ def filter(self) -> list[CPU]:
+ return self._filter_cpus(self._filter_sockets(self._cpus_to_filter))
+
+ def _filter_sockets(self, cpus_to_filter: Iterable[CPU]) -> list[CPU]:
+ allowed_sockets: set[int] = set()
+ socket_amount = self._filter_specifier.socket_amount
+ if self._filter_specifier.sockets:
+ socket_amount = len(self._filter_specifier.sockets)
+ allowed_sockets = set(self._filter_specifier.sockets)
+
+ filtered_cpus = []
+ for cpu in cpus_to_filter:
+ if not self._filter_specifier.sockets:
+ if len(allowed_sockets) < socket_amount:
+ allowed_sockets.add(cpu.socket)
+ if cpu.socket in allowed_sockets:
+ filtered_cpus.append(cpu)
+
+ if len(allowed_sockets) < socket_amount:
+ raise ValueError(
+ f"The amount of sockets from which to use cores "
+ f"({socket_amount}) exceeds the actual amount present "
+ f"on the node ({len(allowed_sockets)})"
+ )
+
+ return filtered_cpus
+
+ def _filter_cpus(self, cpus_to_filter: Iterable[CPU]) -> list[CPU]:
+ # no need to use ordered dict, from Python3.7 the dict
+ # insertion order is preserved (LIFO).
+ allowed_cpu_per_core_count_map: dict[int, int] = {}
+ filtered_cpus = []
+ for cpu in cpus_to_filter:
+ if cpu.core in allowed_cpu_per_core_count_map:
+ cpu_count = allowed_cpu_per_core_count_map[cpu.core]
+ if self._filter_specifier.cpus_per_core > cpu_count:
+ # only add cpus of the given core
+ allowed_cpu_per_core_count_map[cpu.core] += 1
+ filtered_cpus.append(cpu)
+ else:
+ raise ValueError(
+ f"The amount of CPUs per core to use "
+ f"({self._filter_specifier.cpus_per_core}) "
+ f"exceeds the actual amount present. Is hyperthreading enabled?"
+ )
+ elif self._filter_specifier.cores_per_socket > len(
+ allowed_cpu_per_core_count_map
+ ):
+ # only add cpus if we need more
+ allowed_cpu_per_core_count_map[cpu.core] = 1
+ filtered_cpus.append(cpu)
+ else:
+ # cpus are sorted by core, at this point we won't encounter new cores
+ break
+
+ cores_per_socket = len(allowed_cpu_per_core_count_map)
+ if cores_per_socket < self._filter_specifier.cores_per_socket:
+ raise ValueError(
+ f"The amount of cores per socket to use "
+ f"({self._filter_specifier.cores_per_socket}) "
+ f"exceeds the actual amount present ({cores_per_socket})"
+ )
+
+ return filtered_cpus
+
+
+class CPUListFilter(CPUFilter):
+ """
+ Filter the input list of CPUs according to the input list of
+ core indices.
+ An empty CPUList won't filter anything.
+ """
+
+ _filter_specifier: CPUList
+
+ def filter(self) -> list[CPU]:
+ if not len(self._filter_specifier.cpu_list):
+ return self._cpus_to_filter
+
+ filtered_cpus = []
+ for core in self._cpus_to_filter:
+ if core.cpu in self._filter_specifier.cpu_list:
+ filtered_cpus.append(core)
+
+ if len(filtered_cpus) != len(self._filter_specifier.cpu_list):
+ raise ValueError(
+ f"Not all cpus from {self._filter_specifier.cpu_list} were found"
+ f"among {self._cpus_to_filter}"
+ )
+
+ return filtered_cpus
@@ -8,13 +8,16 @@
"""
from framework.config import (
+ CPU,
BuildTargetConfiguration,
+ CPUList,
ExecutionConfiguration,
NodeConfiguration,
)
from framework.exception import NodeCleanupError, NodeSetupError, convert_exception
from framework.logger import DTSLOG, getLogger
from framework.remote_session import OSSession, create_session
+from framework.testbed_model.hw import CPUAmount, cpu_filter
class Node(object):
@@ -28,6 +31,7 @@ class Node(object):
main_session: OSSession
logger: DTSLOG
config: NodeConfiguration
+ cpus: list[CPU]
_other_sessions: list[OSSession]
def __init__(self, node_config: NodeConfiguration):
@@ -38,6 +42,7 @@ def __init__(self, node_config: NodeConfiguration):
self.logger = getLogger(self.name)
self.logger.info(f"Created node: {self.name}")
self.main_session = create_session(self.config, self.name, self.logger)
+ self._get_remote_cpus()
@convert_exception(NodeSetupError)
def setup_execution(self, execution_config: ExecutionConfiguration) -> None:
@@ -109,6 +114,37 @@ def create_session(self, name: str) -> OSSession:
self._other_sessions.append(connection)
return connection
+ def filter_cpus(
+ self,
+ filter_specifier: CPUAmount | CPUList,
+ ascending: bool = True,
+ ) -> list[CPU]:
+ """
+ Filter the logical cpus found on the Node according to specified rules:
+ Use cores from the specified amount of sockets or from the specified
+ socket ids. If sockets is specified, it takes precedence over socket_amount.
+ From each of those sockets, use only cpus_per_socket of cores.
+ And for each core, use cpus_per_core of cpus. Hypertheading
+ must be enabled for this to take effect.
+ If ascending is True, use cores with the lowest numerical id first
+ and continue in ascending order. If False, start with the highest
+ id and continue in descending order. This ordering affects which
+ sockets to consider first as well.
+ """
+ self.logger.info("Filtering ")
+ return cpu_filter(
+ self.cpus,
+ filter_specifier,
+ ascending,
+ ).filter()
+
+ def _get_remote_cpus(self) -> None:
+ """
+ Scan cpus in the remote OS and store a list of CPUs.
+ """
+ self.logger.info("Getting CPU information.")
+ self.cpus = self.main_session.get_remote_cpus(self.config.bypass_core0)
+
def close(self) -> None:
"""
Close all connections and free other resources.
@@ -4,10 +4,13 @@
import os
import tarfile
+import time
from pathlib import PurePath
-from framework.config import BuildTargetConfiguration, NodeConfiguration
+from framework.config import CPU, BuildTargetConfiguration, CPUList, NodeConfiguration
+from framework.remote_session import OSSession
from framework.settings import SETTINGS
+from framework.testbed_model.hw import CPUAmount, CPUListFilter
from framework.utils import EnvVarsDict, skip_setup
from .node import Node
@@ -21,19 +24,31 @@ class SutNode(Node):
Another key capability is building DPDK according to given build target.
"""
+ cpus: list[CPU]
+ dpdk_prefix_list: list[str]
+ dpdk_prefix_subfix: str
_build_target_config: BuildTargetConfiguration | None
_env_vars: EnvVarsDict
_remote_tmp_dir: PurePath
__remote_dpdk_dir: PurePath | None
_app_compile_timeout: float
+ _dpdk_kill_session: OSSession | None
def __init__(self, node_config: NodeConfiguration):
super(SutNode, self).__init__(node_config)
+ self.dpdk_prefix_list = []
self._build_target_config = None
self._env_vars = EnvVarsDict()
self._remote_tmp_dir = self.main_session.get_remote_tmp_dir()
self.__remote_dpdk_dir = None
self._app_compile_timeout = 90
+ self._dpdk_kill_session = None
+
+ # filter the node cpus according to user config
+ self.cpus = CPUListFilter(self.cpus, self.config.cpus).filter()
+ self.dpdk_prefix_subfix = (
+ f"{str(os.getpid())}_{time.strftime('%Y%m%d%H%M%S', time.localtime())}"
+ )
@property
def _remote_dpdk_dir(self) -> PurePath:
@@ -142,3 +157,164 @@ def build_dpdk_app(self, app_name: str) -> PurePath:
return self.main_session.join_remote_path(
build_dir, "examples", f"dpdk-{app_name}"
)
+
+ def kill_cleanup_dpdk_apps(self) -> None:
+ """
+ Kill all dpdk applications on the SUT. Cleanup hugepages.
+ """
+ if self._dpdk_kill_session and self._dpdk_kill_session.is_alive():
+ # we can use the session if it exists and responds
+ self._dpdk_kill_session.kill_cleanup_dpdk_apps(self.dpdk_prefix_list)
+ else:
+ # otherwise, we need to (re)create it
+ self._dpdk_kill_session = self.create_session("dpdk_kill")
+ self.dpdk_prefix_list = []
+
+ def create_eal_parameters(
+ self,
+ fixed_prefix: bool = False,
+ core_filter_specifier: CPUAmount | CPUList = CPUAmount(),
+ ascending_cores: bool = True,
+ prefix: str = "",
+ no_pci: bool = False,
+ vdevs: list[str] = None,
+ other_eal_param: str = "",
+ ) -> str:
+ """
+ Generate eal parameters character string;
+ :param fixed_prefix: use fixed file-prefix or not, when it is true,
+ the file-prefix will not be added a timestamp
+ :param core_filter_specifier: an amount of cpus/cores/sockets to use
+ or a list of cpu ids to use.
+ The default will select one cpu for each of two cores
+ on one socket, in ascending order of core ids.
+ :param ascending_cores: True, use cores with the lowest numerical id first
+ and continue in ascending order. If False, start with the
+ highest id and continue in descending order. This ordering
+ affects which sockets to consider first as well.
+ :param prefix: set file prefix string, eg:
+ prefix='vf';
+ :param no_pci: switch of disable PCI bus eg:
+ no_pci=True;
+ :param vdevs: virtual device list, eg:
+ vdevs=['net_ring0', 'net_ring1'];
+ :param other_eal_param: user defined DPDK eal parameters, eg:
+ other_eal_param='--single-file-segments';
+ :return: eal param string, eg:
+ '-c 0xf -a 0000:88:00.0 --file-prefix=dpdk_1112_20190809143420';
+ if DPDK version < 20.11-rc4, eal_str eg:
+ '-c 0xf -w 0000:88:00.0 --file-prefix=dpdk_1112_20190809143420';
+ """
+ if vdevs is None:
+ vdevs = []
+
+ config = {
+ "core_filter_specifier": core_filter_specifier,
+ "ascending_cores": ascending_cores,
+ "prefix": prefix,
+ "no_pci": no_pci,
+ "vdevs": vdevs,
+ "other_eal_param": other_eal_param,
+ }
+
+ eal_parameter_creator = _EalParameter(
+ sut_node=self, fixed_prefix=fixed_prefix, **config
+ )
+ eal_str = eal_parameter_creator.make_eal_param()
+
+ return eal_str
+
+
+class _EalParameter(object):
+ def __init__(
+ self,
+ sut_node: SutNode,
+ fixed_prefix: bool,
+ core_filter_specifier: CPUAmount | CPUList,
+ ascending_cores: bool,
+ prefix: str,
+ no_pci: bool,
+ vdevs: list[str],
+ other_eal_param: str,
+ ):
+ """
+ Generate eal parameters character string;
+ :param sut_node: SUT Node;
+ :param fixed_prefix: use fixed file-prefix or not, when it is true,
+ he file-prefix will not be added a timestamp
+ :param core_filter_specifier: an amount of cpus/cores/sockets to use
+ or a list of cpu ids to use.
+ :param ascending_cores: True, use cores with the lowest numerical id first
+ and continue in ascending order. If False, start with the
+ highest id and continue in descending order. This ordering
+ affects which sockets to consider first as well.
+ :param prefix: set file prefix string, eg:
+ prefix='vf';
+ :param no_pci: switch of disable PCI bus eg:
+ no_pci=True;
+ :param vdevs: virtual device list, eg:
+ vdevs=['net_ring0', 'net_ring1'];
+ :param other_eal_param: user defined DPDK eal parameters, eg:
+ other_eal_param='--single-file-segments';
+ """
+ self.os = sut_node.config.os
+ self.fixed_prefix = fixed_prefix
+ self.sut_node = sut_node
+ self.core_filter_specifier = core_filter_specifier
+ self.ascending_cores = ascending_cores
+ self.prefix = prefix
+ self.no_pci = no_pci
+ self.vdevs = vdevs
+ self.other_eal_param = other_eal_param
+
+ def _make_lcores_param(self) -> str:
+ filtered_cpus = self.sut_node.filter_cpus(
+ self.core_filter_specifier, self.ascending_cores
+ )
+ return f"-l {CPUList(filtered_cpus)}"
+
+ def _make_memory_channels(self) -> str:
+ param_template = "-n {}"
+ return param_template.format(self.sut_node.config.memory_channels)
+
+ def _make_no_pci_param(self) -> str:
+ if self.no_pci is True:
+ return "--no-pci"
+ else:
+ return ""
+
+ def _make_prefix_param(self) -> str:
+ if self.prefix == "":
+ fixed_file_prefix = f"dpdk_{self.sut_node.dpdk_prefix_subfix}"
+ else:
+ fixed_file_prefix = self.prefix
+ if not self.fixed_prefix:
+ fixed_file_prefix = (
+ f"{fixed_file_prefix}_{self.sut_node.dpdk_prefix_subfix}"
+ )
+ fixed_file_prefix = self._do_os_handle_with_prefix_param(fixed_file_prefix)
+ return fixed_file_prefix
+
+ def _make_vdevs_param(self) -> str:
+ if len(self.vdevs) == 0:
+ return ""
+ else:
+ return " ".join(f"--vdev {vdev}" for vdev in self.vdevs)
+
+ def _do_os_handle_with_prefix_param(self, file_prefix: str) -> str:
+ self.sut_node.dpdk_prefix_list.append(file_prefix)
+ return f"--file-prefix={file_prefix}"
+
+ def make_eal_param(self) -> str:
+ _eal_str = " ".join(
+ [
+ self._make_lcores_param(),
+ self._make_memory_channels(),
+ self._make_prefix_param(),
+ self._make_no_pci_param(),
+ self._make_vdevs_param(),
+ # append user defined eal parameters
+ self.other_eal_param,
+ ]
+ )
+ return _eal_str
@@ -32,6 +32,26 @@ def skip_setup(func) -> Callable[..., None]:
return func
+def expand_range(range_str: str) -> list[int]:
+ """
+ Process range string into a list of integers. There are two possible formats:
+ n - a single integer
+ n-m - a range of integers
+
+ The returned range includes both n and m. Empty string returns an empty list.
+ """
+ expanded_range: list[int] = []
+ if range_str:
+ range_boundaries = range_str.split("-")
+ # will throw an exception when items in range_boundaries can't be converted,
+ # serving as type check
+ expanded_range.extend(
+ range(int(range_boundaries[0]), int(range_boundaries[-1]) + 1)
+ )
+
+ return expanded_range
+
+
def GREEN(text: str) -> str:
return f"\u001B[32;1m{str(text)}\u001B[0m"