[RFC,v1,04/10] dts: add basic node management methods

Message ID 20220824162454.394285-5-juraj.linkes@pantheon.tech (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series dts: add hello world testcase |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Juraj Linkeš Aug. 24, 2022, 4:24 p.m. UTC
  The nodes DTS is working with are either a system under test node (where
DPDK runs) and a traffic generator node.
The added methods are common to both system under test nodes and traffic
generator nodes.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 dts/framework/node.py | 395 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 387 insertions(+), 8 deletions(-)
  

Patch

diff --git a/dts/framework/node.py b/dts/framework/node.py
index e5c5454ebe..c08c79cca3 100644
--- a/dts/framework/node.py
+++ b/dts/framework/node.py
@@ -4,9 +4,13 @@ 
 # Copyright(c) 2022 University of New Hampshire
 #
 
+import dataclasses
+import re
+from abc import ABC
 from typing import Optional
 
-from .config import NodeConfiguration
+from framework.config import OS, NodeConfiguration
+
 from .logger import DTSLOG, getLogger
 from .settings import SETTINGS
 from .ssh_connection import SSHConnection
@@ -16,22 +20,41 @@ 
 """
 
 
-class Node(object):
+@dataclasses.dataclass(slots=True, frozen=True)
+class CPUCore:
+    thread: str
+    socket: str
+    core: int
+
+
+class Node(ABC):
     """
     Basic module for node management. This module implements methods that
     manage a node, such as information gathering (of CPU/PCI/NIC) and
     environment setup.
     """
 
-    _config: NodeConfiguration
+    name: str
+    skip_setup: bool
+    sessions: list[SSHConnection]
+    default_hugepages_cleared: bool
+    prefix_list: list[str]
+    cores: list[CPUCore]
+    number_of_cores: int
     logger: DTSLOG
     main_session: SSHConnection
-    name: str
+    _config: NodeConfiguration
     _other_sessions: list[SSHConnection]
 
     def __init__(self, node_config: NodeConfiguration):
         self._config = node_config
         self.name = node_config.name
+        self.skip_setup = SETTINGS.skip_setup
+        self.default_hugepages_cleared = False
+        self.prefix_list = []
+        self.cores = []
+        self.number_of_cores = 0
+        self._dpdk_dir = None
 
         self.logger = getLogger(self.name)
         self.logger.info(f"Created node: {self.name}")
@@ -42,22 +65,23 @@  def __init__(self, node_config: NodeConfiguration):
             self.get_username(),
             self.get_password(),
         )
+        self._other_sessions = []
 
     def get_ip_address(self) -> str:
         """
-        Get SUT's ip address.
+        Get Node's ip address.
         """
         return self._config.hostname
 
     def get_password(self) -> Optional[str]:
         """
-        Get SUT's login password.
+        Get Node's login password.
         """
         return self._config.password
 
     def get_username(self) -> str:
         """
-        Get SUT's login username.
+        Get Node's login username.
         """
         return self._config.user
 
@@ -66,6 +90,7 @@  def send_expect(
         command: str,
         expected: str,
         timeout: float = SETTINGS.timeout,
+        alt_session: bool = False,
         verify: bool = False,
         trim_whitespace: bool = True,
     ) -> str | int:
@@ -81,19 +106,373 @@  def send_expect(
         if trim_whitespace:
             expected = expected.strip()
 
+        if alt_session and len(self._other_sessions):
+            return self._other_sessions[0].send_expect(
+                command, expected, timeout, verify
+            )
+
         return self.main_session.send_expect(command, expected, timeout, verify)
 
-    def send_command(self, cmds: str, timeout: float = SETTINGS.timeout) -> str:
+    def send_command(
+        self, cmds: str, timeout: float = SETTINGS.timeout, alt_session: bool = False
+    ) -> str:
         """
         Send commands to node and return string before timeout.
         """
 
+        if alt_session and len(self._other_sessions):
+            return self._other_sessions[0].send_command(cmds, timeout)
+
         return self.main_session.send_command(cmds, timeout)
 
+    def get_session_output(self, timeout: float = SETTINGS.timeout):
+        """
+        Get session output message before timeout
+        """
+        return self.main_session.get_session_before(timeout)
+
+    def get_total_huge_pages(self):
+        """
+        Get the huge page number of Node.
+        """
+        huge_pages = self.send_expect(
+            "awk '/HugePages_Total/ { print $2 }' /proc/meminfo", "# ", alt_session=True
+        )
+        if huge_pages != "":
+            return int(huge_pages.split()[0])
+        return 0
+
+    def mount_huge_pages(self):
+        """
+        Mount hugepage file system on Node.
+        """
+        self.send_expect("umount `awk '/hugetlbfs/ { print $2 }' /proc/mounts`", "# ")
+        out = self.send_expect("awk '/hugetlbfs/ { print $2 }' /proc/mounts", "# ")
+        # only mount hugepage when no hugetlbfs mounted
+        if not len(out):
+            self.send_expect("mkdir -p /mnt/huge", "# ")
+            self.send_expect("mount -t hugetlbfs nodev /mnt/huge", "# ")
+
+    def strip_hugepage_path(self):
+        mounts = self.send_expect("cat /proc/mounts |grep hugetlbfs", "# ")
+        infos = mounts.split()
+        if len(infos) >= 2:
+            return infos[1]
+        else:
+            return ""
+
+    def set_huge_pages(self, huge_pages, numa=""):
+        """
+        Set numbers of huge pages
+        """
+        page_size = self.send_expect(
+            "awk '/Hugepagesize/ {print $2}' /proc/meminfo", "# "
+        )
+
+        if not numa:
+            self.send_expect(
+                "echo %d > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages"
+                % (huge_pages, page_size),
+                "# ",
+                5,
+            )
+        else:
+            # sometimes we set hugepage on kernel cmdline, so we clear it
+            if not self.default_hugepages_cleared:
+                self.send_expect(
+                    "echo 0 > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages"
+                    % (page_size),
+                    "# ",
+                    5,
+                )
+                self.default_hugepages_cleared = True
+
+            # some platform not support numa, example VM SUT
+            try:
+                self.send_expect(
+                    "echo %d > /sys/devices/system/node/%s/hugepages/hugepages-%skB/nr_hugepages"
+                    % (huge_pages, numa, page_size),
+                    "# ",
+                    5,
+                )
+            except:
+                self.logger.warning("set %d hugepage on %s error" % (huge_pages, numa))
+                self.send_expect(
+                    "echo %d > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages"
+                    % (huge_pages.page_size),
+                    "# ",
+                    5,
+                )
+
+    def get_dpdk_pids(self, prefix_list, alt_session):
+        """
+        get all dpdk applications on Node.
+        """
+        file_directories = [
+            "/var/run/dpdk/%s/config" % file_prefix for file_prefix in prefix_list
+        ]
+        pids = []
+        pid_reg = r"p(\d+)"
+        for config_file in file_directories:
+            # Covers case where the process is run as a unprivileged user and does not generate the file
+            isfile = self.send_expect(
+                "ls -l {}".format(config_file), "# ", 20, alt_session
+            )
+            if isfile:
+                cmd = "lsof -Fp %s" % config_file
+                out = self.send_expect(cmd, "# ", 20, alt_session)
+                if len(out):
+                    lines = out.split("\r\n")
+                    for line in lines:
+                        m = re.match(pid_reg, line)
+                        if m:
+                            pids.append(m.group(1))
+                for pid in pids:
+                    self.send_expect("kill -9 %s" % pid, "# ", 20, alt_session)
+                    self.get_session_output(timeout=2)
+
+        hugepage_info = [
+            "/var/run/dpdk/%s/hugepage_info" % file_prefix
+            for file_prefix in prefix_list
+        ]
+        for hugepage in hugepage_info:
+            # Covers case where the process is run as a unprivileged user and does not generate the file
+            isfile = self.send_expect(
+                "ls -l {}".format(hugepage), "# ", 20, alt_session
+            )
+            if isfile:
+                cmd = "lsof -Fp %s" % hugepage
+                out = self.send_expect(cmd, "# ", 20, alt_session)
+                if len(out) and "No such file or directory" not in out:
+                    self.logger.warning("There are some dpdk process not free hugepage")
+                    self.logger.warning("**************************************")
+                    self.logger.warning(out)
+                    self.logger.warning("**************************************")
+
+        # remove directory
+        directorys = ["/var/run/dpdk/%s" % file_prefix for file_prefix in prefix_list]
+        for directory in directorys:
+            cmd = "rm -rf %s" % directory
+            self.send_expect(cmd, "# ", 20, alt_session)
+
+        # delete hugepage on mnt path
+        if getattr(self, "hugepage_path", None):
+            for file_prefix in prefix_list:
+                cmd = "rm -rf %s/%s*" % (self.hugepage_path, file_prefix)
+                self.send_expect(cmd, "# ", 20, alt_session)
+
+    def kill_all(self, alt_session=True):
+        """
+        Kill all dpdk applications on Node.
+        """
+        if "Traffic" in str(self):
+            self.logger.info("kill_all: called by tg")
+            pass
+        else:
+            if self.prefix_list:
+                self.logger.info("kill_all: called by SUT and prefix list has value.")
+                self.get_dpdk_pids(self.prefix_list, alt_session)
+                # init prefix_list
+                self.prefix_list = []
+            else:
+                self.logger.info("kill_all: called by SUT and has no prefix list.")
+                out = self.send_command(
+                    "ls -l /var/run/dpdk |awk '/^d/ {print $NF}'",
+                    timeout=0.5,
+                    alt_session=True,
+                )
+                # the last directory is expect string, eg: [PEXPECT]#
+                if out != "":
+                    dir_list = out.split("\r\n")
+                    self.get_dpdk_pids(dir_list[:-1], alt_session)
+
+    def get_os(self) -> OS:
+        return self._config.os
+
+    def init_core_list(self):
+        """
+        Load or create core information of Node.
+        """
+        if not self.cores or not self.number_of_cores:
+            self.init_core_list_uncached()
+
+    def init_core_list_uncached(self):
+        """
+        Scan cores on Node and create core information list.
+        """
+        init_core_list_uncached = getattr(
+            self, "init_core_list_uncached_%s" % self.get_os()
+        )
+        init_core_list_uncached()
+
+    def init_core_list_uncached_linux(self):
+        """
+        Scan cores in linux and create core information list.
+        """
+        self.cores = []
+
+        cpuinfo = self.send_expect(
+            "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \#", "#", alt_session=True
+        )
+
+        cpuinfo = [i for i in cpuinfo.split() if re.match("^\d.+", i)]
+        # haswell cpu on cottonwood core id not correct
+        # need additional coremap for haswell cpu
+        core_id = 0
+        coremap = {}
+        for line in cpuinfo:
+            (thread, core, socket, node) = line.split(",")[0:4]
+
+            if core not in list(coremap.keys()):
+                coremap[core] = core_id
+                core_id += 1
+
+            if self._config.bypass_core0 and core == "0" and socket == "0":
+                self.logger.info("Core0 bypassed")
+                continue
+            if self._config.arch == "arm64" or self._config.arch == "ppc64":
+                self.cores.append(
+                    CPUCore(thread=thread, socket=node, core=coremap[core])
+                )
+            else:
+                self.cores.append(
+                    CPUCore(thread=thread, socket=socket, core=coremap[core])
+                )
+
+        self.number_of_cores = len(self.cores)
+
+    def get_core_list(self, config, socket=-1, from_last=False):
+        """
+        Get lcore array according to the core config like "all", "1S/1C/1T".
+        We can specify the physical CPU socket by the "socket" parameter.
+        """
+        if config == "all":
+            cores = []
+            if socket != -1:
+                for core in self.cores:
+                    if int(core.socket) == socket:
+                        cores.append(core.thread)
+            else:
+                cores = [core.thread for core in self.cores]
+            return cores
+
+        m = re.match("([1234])S/([0-9]+)C/([12])T", config)
+
+        if m:
+            nr_sockets = int(m.group(1))
+            nr_cores = int(m.group(2))
+            nr_threads = int(m.group(3))
+
+            partial_cores = self.cores
+
+            # If not specify socket sockList will be [0,1] in numa system
+            # If specify socket will just use the socket
+            if socket < 0:
+                sockList = set([int(core.socket) for core in partial_cores])
+            else:
+                for n in partial_cores:
+                    if int(n.socket) == socket:
+                        sockList = [int(n.socket)]
+
+            if from_last:
+                sockList = list(sockList)[-nr_sockets:]
+            else:
+                sockList = list(sockList)[:nr_sockets]
+            partial_cores = [n for n in partial_cores if int(n.socket) in sockList]
+            thread_list = set([int(n.thread) for n in partial_cores])
+            thread_list = list(thread_list)
+
+            # filter usable core to core_list
+            temp = []
+            for sock in sockList:
+                core_list = set(
+                    [int(n.core) for n in partial_cores if int(n.socket) == sock]
+                )
+                if from_last:
+                    core_list = list(core_list)[-nr_cores:]
+                else:
+                    core_list = list(core_list)[:nr_cores]
+                temp.extend(core_list)
+
+            core_list = temp
+
+            # if system core less than request just use all cores in in socket
+            if len(core_list) < (nr_cores * nr_sockets):
+                partial_cores = self.cores
+                sockList = set([int(n.socket) for n in partial_cores])
+
+                if from_last:
+                    sockList = list(sockList)[-nr_sockets:]
+                else:
+                    sockList = list(sockList)[:nr_sockets]
+                partial_cores = [n for n in partial_cores if int(n.socket) in sockList]
+
+                temp = []
+                for sock in sockList:
+                    core_list = list(
+                        [int(n.thread) for n in partial_cores if int(n.socket) == sock]
+                    )
+                    if from_last:
+                        core_list = core_list[-nr_cores:]
+                    else:
+                        core_list = core_list[:nr_cores]
+                    temp.extend(core_list)
+
+                core_list = temp
+
+            partial_cores = [n for n in partial_cores if int(n.core) in core_list]
+            temp = []
+            if len(core_list) < nr_cores:
+                raise ValueError(
+                    "Cannot get requested core configuration "
+                    "requested {} have {}".format(config, self.cores)
+                )
+            if len(sockList) < nr_sockets:
+                raise ValueError(
+                    "Cannot get requested core configuration "
+                    "requested {} have {}".format(config, self.cores)
+                )
+            # recheck the core_list and create the thread_list
+            i = 0
+            for sock in sockList:
+                coreList_aux = [
+                    int(core_list[n])
+                    for n in range((nr_cores * i), (nr_cores * i + nr_cores))
+                ]
+                for core in coreList_aux:
+                    thread_list = list(
+                        [
+                            int(n.thread)
+                            for n in partial_cores
+                            if ((int(n.core) == core) and (int(n.socket) == sock))
+                        ]
+                    )
+                    if from_last:
+                        thread_list = thread_list[-nr_threads:]
+                    else:
+                        thread_list = thread_list[:nr_threads]
+                    temp.extend(thread_list)
+                    thread_list = temp
+                i += 1
+            return list(map(str, thread_list))
+
+    def create_session(self, name: str) -> SSHConnection:
+        connection = SSHConnection(
+            self.get_ip_address(),
+            name,
+            getLogger(name, node=self.name),
+            self.get_username(),
+            self.get_password(),
+        )
+        self._other_sessions.append(connection)
+        return connection
+
     def node_exit(self) -> None:
         """
         Recover all resource before node exit
         """
         if self.main_session:
             self.main_session.close()
+        for session in self._other_sessions:
+            session.close()
         self.logger.logger_exit()