@@ -4,9 +4,13 @@
# Copyright(c) 2022 University of New Hampshire
#
+import dataclasses
+import re
+from abc import ABC
from typing import Optional
-from .config import NodeConfiguration
+from framework.config import OS, NodeConfiguration
+
from .logger import DTSLOG, getLogger
from .settings import SETTINGS
from .ssh_connection import SSHConnection
@@ -16,22 +20,41 @@
"""
-class Node(object):
+@dataclasses.dataclass(slots=True, frozen=True)
+class CPUCore:
+ thread: str
+ socket: str
+ core: int
+
+
+class Node(ABC):
"""
Basic module for node management. This module implements methods that
manage a node, such as information gathering (of CPU/PCI/NIC) and
environment setup.
"""
- _config: NodeConfiguration
+ name: str
+ skip_setup: bool
+ sessions: list[SSHConnection]
+ default_hugepages_cleared: bool
+ prefix_list: list[str]
+ cores: list[CPUCore]
+ number_of_cores: int
logger: DTSLOG
main_session: SSHConnection
- name: str
+ _config: NodeConfiguration
_other_sessions: list[SSHConnection]
def __init__(self, node_config: NodeConfiguration):
self._config = node_config
self.name = node_config.name
+ self.skip_setup = SETTINGS.skip_setup
+ self.default_hugepages_cleared = False
+ self.prefix_list = []
+ self.cores = []
+ self.number_of_cores = 0
+ self._dpdk_dir = None
self.logger = getLogger(self.name)
self.logger.info(f"Created node: {self.name}")
@@ -42,22 +65,23 @@ def __init__(self, node_config: NodeConfiguration):
self.get_username(),
self.get_password(),
)
+ self._other_sessions = []
def get_ip_address(self) -> str:
"""
- Get SUT's ip address.
+ Get Node's ip address.
"""
return self._config.hostname
def get_password(self) -> Optional[str]:
"""
- Get SUT's login password.
+ Get Node's login password.
"""
return self._config.password
def get_username(self) -> str:
"""
- Get SUT's login username.
+ Get Node's login username.
"""
return self._config.user
@@ -66,6 +90,7 @@ def send_expect(
command: str,
expected: str,
timeout: float = SETTINGS.timeout,
+ alt_session: bool = False,
verify: bool = False,
trim_whitespace: bool = True,
) -> str | int:
@@ -81,19 +106,373 @@ def send_expect(
if trim_whitespace:
expected = expected.strip()
+ if alt_session and len(self._other_sessions):
+ return self._other_sessions[0].send_expect(
+ command, expected, timeout, verify
+ )
+
return self.main_session.send_expect(command, expected, timeout, verify)
- def send_command(self, cmds: str, timeout: float = SETTINGS.timeout) -> str:
+ def send_command(
+ self, cmds: str, timeout: float = SETTINGS.timeout, alt_session: bool = False
+ ) -> str:
"""
Send commands to node and return string before timeout.
"""
+ if alt_session and len(self._other_sessions):
+ return self._other_sessions[0].send_command(cmds, timeout)
+
return self.main_session.send_command(cmds, timeout)
+ def get_session_output(self, timeout: float = SETTINGS.timeout):
+ """
+ Get session output message before timeout
+ """
+ return self.main_session.get_session_before(timeout)
+
+ def get_total_huge_pages(self):
+ """
+ Get the huge page number of Node.
+ """
+ huge_pages = self.send_expect(
+ "awk '/HugePages_Total/ { print $2 }' /proc/meminfo", "# ", alt_session=True
+ )
+ if huge_pages != "":
+ return int(huge_pages.split()[0])
+ return 0
+
+ def mount_huge_pages(self):
+ """
+ Mount hugepage file system on Node.
+ """
+ self.send_expect("umount `awk '/hugetlbfs/ { print $2 }' /proc/mounts`", "# ")
+ out = self.send_expect("awk '/hugetlbfs/ { print $2 }' /proc/mounts", "# ")
+ # only mount hugepage when no hugetlbfs mounted
+ if not len(out):
+ self.send_expect("mkdir -p /mnt/huge", "# ")
+ self.send_expect("mount -t hugetlbfs nodev /mnt/huge", "# ")
+
+ def strip_hugepage_path(self):
+ mounts = self.send_expect("cat /proc/mounts |grep hugetlbfs", "# ")
+ infos = mounts.split()
+ if len(infos) >= 2:
+ return infos[1]
+ else:
+ return ""
+
+ def set_huge_pages(self, huge_pages, numa=""):
+ """
+ Set numbers of huge pages
+ """
+ page_size = self.send_expect(
+ "awk '/Hugepagesize/ {print $2}' /proc/meminfo", "# "
+ )
+
+ if not numa:
+ self.send_expect(
+ "echo %d > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages"
+ % (huge_pages, page_size),
+ "# ",
+ 5,
+ )
+ else:
+ # sometimes we set hugepage on kernel cmdline, so we clear it
+ if not self.default_hugepages_cleared:
+ self.send_expect(
+ "echo 0 > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages"
+ % (page_size),
+ "# ",
+ 5,
+ )
+ self.default_hugepages_cleared = True
+
+ # some platform not support numa, example VM SUT
+ try:
+ self.send_expect(
+ "echo %d > /sys/devices/system/node/%s/hugepages/hugepages-%skB/nr_hugepages"
+ % (huge_pages, numa, page_size),
+ "# ",
+ 5,
+ )
+ except:
+ self.logger.warning("set %d hugepage on %s error" % (huge_pages, numa))
+ self.send_expect(
+ "echo %d > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages"
+ % (huge_pages.page_size),
+ "# ",
+ 5,
+ )
+
+ def get_dpdk_pids(self, prefix_list, alt_session):
+ """
+ get all dpdk applications on Node.
+ """
+ file_directories = [
+ "/var/run/dpdk/%s/config" % file_prefix for file_prefix in prefix_list
+ ]
+ pids = []
+ pid_reg = r"p(\d+)"
+ for config_file in file_directories:
+ # Covers case where the process is run as a unprivileged user and does not generate the file
+ isfile = self.send_expect(
+ "ls -l {}".format(config_file), "# ", 20, alt_session
+ )
+ if isfile:
+ cmd = "lsof -Fp %s" % config_file
+ out = self.send_expect(cmd, "# ", 20, alt_session)
+ if len(out):
+ lines = out.split("\r\n")
+ for line in lines:
+ m = re.match(pid_reg, line)
+ if m:
+ pids.append(m.group(1))
+ for pid in pids:
+ self.send_expect("kill -9 %s" % pid, "# ", 20, alt_session)
+ self.get_session_output(timeout=2)
+
+ hugepage_info = [
+ "/var/run/dpdk/%s/hugepage_info" % file_prefix
+ for file_prefix in prefix_list
+ ]
+ for hugepage in hugepage_info:
+ # Covers case where the process is run as a unprivileged user and does not generate the file
+ isfile = self.send_expect(
+ "ls -l {}".format(hugepage), "# ", 20, alt_session
+ )
+ if isfile:
+ cmd = "lsof -Fp %s" % hugepage
+ out = self.send_expect(cmd, "# ", 20, alt_session)
+ if len(out) and "No such file or directory" not in out:
+ self.logger.warning("There are some dpdk process not free hugepage")
+ self.logger.warning("**************************************")
+ self.logger.warning(out)
+ self.logger.warning("**************************************")
+
+ # remove directory
+ directorys = ["/var/run/dpdk/%s" % file_prefix for file_prefix in prefix_list]
+ for directory in directorys:
+ cmd = "rm -rf %s" % directory
+ self.send_expect(cmd, "# ", 20, alt_session)
+
+ # delete hugepage on mnt path
+ if getattr(self, "hugepage_path", None):
+ for file_prefix in prefix_list:
+ cmd = "rm -rf %s/%s*" % (self.hugepage_path, file_prefix)
+ self.send_expect(cmd, "# ", 20, alt_session)
+
+ def kill_all(self, alt_session=True):
+ """
+ Kill all dpdk applications on Node.
+ """
+ if "Traffic" in str(self):
+ self.logger.info("kill_all: called by tg")
+ pass
+ else:
+ if self.prefix_list:
+ self.logger.info("kill_all: called by SUT and prefix list has value.")
+ self.get_dpdk_pids(self.prefix_list, alt_session)
+ # init prefix_list
+ self.prefix_list = []
+ else:
+ self.logger.info("kill_all: called by SUT and has no prefix list.")
+ out = self.send_command(
+ "ls -l /var/run/dpdk |awk '/^d/ {print $NF}'",
+ timeout=0.5,
+ alt_session=True,
+ )
+ # the last directory is expect string, eg: [PEXPECT]#
+ if out != "":
+ dir_list = out.split("\r\n")
+ self.get_dpdk_pids(dir_list[:-1], alt_session)
+
+ def get_os(self) -> OS:
+ return self._config.os
+
+ def init_core_list(self):
+ """
+ Load or create core information of Node.
+ """
+ if not self.cores or not self.number_of_cores:
+ self.init_core_list_uncached()
+
+ def init_core_list_uncached(self):
+ """
+ Scan cores on Node and create core information list.
+ """
+ init_core_list_uncached = getattr(
+ self, "init_core_list_uncached_%s" % self.get_os()
+ )
+ init_core_list_uncached()
+
+ def init_core_list_uncached_linux(self):
+ """
+ Scan cores in linux and create core information list.
+ """
+ self.cores = []
+
+ cpuinfo = self.send_expect(
+ "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \#", "#", alt_session=True
+ )
+
+ cpuinfo = [i for i in cpuinfo.split() if re.match("^\d.+", i)]
+ # haswell cpu on cottonwood core id not correct
+ # need additional coremap for haswell cpu
+ core_id = 0
+ coremap = {}
+ for line in cpuinfo:
+ (thread, core, socket, node) = line.split(",")[0:4]
+
+ if core not in list(coremap.keys()):
+ coremap[core] = core_id
+ core_id += 1
+
+ if self._config.bypass_core0 and core == "0" and socket == "0":
+ self.logger.info("Core0 bypassed")
+ continue
+ if self._config.arch == "arm64" or self._config.arch == "ppc64":
+ self.cores.append(
+ CPUCore(thread=thread, socket=node, core=coremap[core])
+ )
+ else:
+ self.cores.append(
+ CPUCore(thread=thread, socket=socket, core=coremap[core])
+ )
+
+ self.number_of_cores = len(self.cores)
+
+ def get_core_list(self, config, socket=-1, from_last=False):
+ """
+ Get lcore array according to the core config like "all", "1S/1C/1T".
+ We can specify the physical CPU socket by the "socket" parameter.
+ """
+ if config == "all":
+ cores = []
+ if socket != -1:
+ for core in self.cores:
+ if int(core.socket) == socket:
+ cores.append(core.thread)
+ else:
+ cores = [core.thread for core in self.cores]
+ return cores
+
+ m = re.match("([1234])S/([0-9]+)C/([12])T", config)
+
+ if m:
+ nr_sockets = int(m.group(1))
+ nr_cores = int(m.group(2))
+ nr_threads = int(m.group(3))
+
+ partial_cores = self.cores
+
+ # If not specify socket sockList will be [0,1] in numa system
+ # If specify socket will just use the socket
+ if socket < 0:
+ sockList = set([int(core.socket) for core in partial_cores])
+ else:
+ for n in partial_cores:
+ if int(n.socket) == socket:
+ sockList = [int(n.socket)]
+
+ if from_last:
+ sockList = list(sockList)[-nr_sockets:]
+ else:
+ sockList = list(sockList)[:nr_sockets]
+ partial_cores = [n for n in partial_cores if int(n.socket) in sockList]
+ thread_list = set([int(n.thread) for n in partial_cores])
+ thread_list = list(thread_list)
+
+ # filter usable core to core_list
+ temp = []
+ for sock in sockList:
+ core_list = set(
+ [int(n.core) for n in partial_cores if int(n.socket) == sock]
+ )
+ if from_last:
+ core_list = list(core_list)[-nr_cores:]
+ else:
+ core_list = list(core_list)[:nr_cores]
+ temp.extend(core_list)
+
+ core_list = temp
+
+ # if system core less than request just use all cores in in socket
+ if len(core_list) < (nr_cores * nr_sockets):
+ partial_cores = self.cores
+ sockList = set([int(n.socket) for n in partial_cores])
+
+ if from_last:
+ sockList = list(sockList)[-nr_sockets:]
+ else:
+ sockList = list(sockList)[:nr_sockets]
+ partial_cores = [n for n in partial_cores if int(n.socket) in sockList]
+
+ temp = []
+ for sock in sockList:
+ core_list = list(
+ [int(n.thread) for n in partial_cores if int(n.socket) == sock]
+ )
+ if from_last:
+ core_list = core_list[-nr_cores:]
+ else:
+ core_list = core_list[:nr_cores]
+ temp.extend(core_list)
+
+ core_list = temp
+
+ partial_cores = [n for n in partial_cores if int(n.core) in core_list]
+ temp = []
+ if len(core_list) < nr_cores:
+ raise ValueError(
+ "Cannot get requested core configuration "
+ "requested {} have {}".format(config, self.cores)
+ )
+ if len(sockList) < nr_sockets:
+ raise ValueError(
+ "Cannot get requested core configuration "
+ "requested {} have {}".format(config, self.cores)
+ )
+ # recheck the core_list and create the thread_list
+ i = 0
+ for sock in sockList:
+ coreList_aux = [
+ int(core_list[n])
+ for n in range((nr_cores * i), (nr_cores * i + nr_cores))
+ ]
+ for core in coreList_aux:
+ thread_list = list(
+ [
+ int(n.thread)
+ for n in partial_cores
+ if ((int(n.core) == core) and (int(n.socket) == sock))
+ ]
+ )
+ if from_last:
+ thread_list = thread_list[-nr_threads:]
+ else:
+ thread_list = thread_list[:nr_threads]
+ temp.extend(thread_list)
+ thread_list = temp
+ i += 1
+ return list(map(str, thread_list))
+
+ def create_session(self, name: str) -> SSHConnection:
+ connection = SSHConnection(
+ self.get_ip_address(),
+ name,
+ getLogger(name, node=self.name),
+ self.get_username(),
+ self.get_password(),
+ )
+ self._other_sessions.append(connection)
+ return connection
+
def node_exit(self) -> None:
"""
Recover all resource before node exit
"""
if self.main_session:
self.main_session.close()
+ for session in self._other_sessions:
+ session.close()
self.logger.logger_exit()