@@ -28,6 +28,21 @@ def get_output(self) -> str:
return self.output
+class VerifyFailure(Exception):
+ """
+ To be used within the test cases to verify if a command output
+ is as it was expected.
+ """
+
+ value: str
+
+ def __init__(self, value: str):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+
class SSHConnectionException(Exception):
"""
SSH connection error.
new file mode 100644
@@ -0,0 +1,274 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 PANTHEON.tech s.r.o.
+#
+
+"""
+A base class for creating DTS test cases.
+"""
+
+import re
+import time
+import traceback
+
+from .exception import TimeoutException, VerifyFailure
+from .logger import getLogger
+from .test_result import Result
+
+
+class TestCase(object):
+ def __init__(self, sut_nodes, tg_node, suitename, target, func):
+ self.sut_node = sut_nodes[0]
+ self.sut_nodes = sut_nodes
+ self.tg_node = tg_node
+ self.suite_name = suitename
+ self.target = target
+
+ # local variable
+ self._requested_tests = None
+ self._subtitle = None
+
+ # check session and reconnect if possible
+ for sut_node in self.sut_nodes:
+ self._check_and_reconnect(node=sut_node)
+ self._check_and_reconnect(node=self.tg_node)
+
+ # result object for save suite result
+ self._suite_result = Result()
+ self._suite_result.sut = self.sut_node.node["IP"]
+ self._suite_result.target = target
+ self._suite_result.test_suite = self.suite_name
+ if self._suite_result is None:
+ raise ValueError("Result object should not None")
+
+ self._enable_func = func
+
+ # command history
+ self.setup_history = list()
+ self.test_history = list()
+
+ def init_log(self):
+ # get log handler
+ class_name = self.__class__.__name__
+ self.logger = getLogger(class_name)
+
+ def _check_and_reconnect(self, node=None):
+ try:
+ result = node.session.check_available()
+ except:
+ result = False
+
+ if result is False:
+ node.reconnect_session()
+ if "sut" in str(type(node)):
+ node.send_expect("cd %s" % node.base_dir, "#")
+ node.set_env_variable()
+
+ try:
+ result = node.alt_session.check_available()
+ except:
+ result = False
+
+ if result is False:
+ node.reconnect_session(alt_session=True)
+
+ def set_up_all(self):
+ pass
+
+ def set_up(self):
+ pass
+
+ def tear_down(self):
+ pass
+
+ def tear_down_all(self):
+ pass
+
+ def verify(self, passed, description):
+ if not passed:
+ raise VerifyFailure(description)
+
+ def _get_functional_cases(self):
+ """
+ Get all functional test cases.
+ """
+ return self._get_test_cases(r"test_(?!perf_)")
+
+ def _has_it_been_requested(self, test_case, test_name_regex):
+ """
+ Check whether test case has been requested for validation.
+ """
+ name_matches = re.match(test_name_regex, test_case.__name__)
+
+ if self._requested_tests is not None:
+ return name_matches and test_case.__name__ in self._requested_tests
+
+ return name_matches
+
+ def set_requested_cases(self, case_list):
+ """
+ Pass down input cases list for check
+ """
+ if self._requested_tests is None:
+ self._requested_tests = case_list
+ elif case_list is not None:
+ self._requested_tests += case_list
+
+ def _get_test_cases(self, test_name_regex):
+ """
+ Return case list which name matched regex.
+ """
+ for test_case_name in dir(self):
+ test_case = getattr(self, test_case_name)
+ if callable(test_case) and self._has_it_been_requested(
+ test_case, test_name_regex
+ ):
+ yield test_case
+
+ def execute_setup_all(self):
+ """
+ Execute suite setup_all function before cases.
+ """
+ # clear all previous output
+ for sut_node in self.sut_nodes:
+ sut_node.get_session_output(timeout=0.1)
+ self.tg_node.get_session_output(timeout=0.1)
+
+ # save into setup history list
+ self.enable_history(self.setup_history)
+
+ try:
+ self.set_up_all()
+ return True
+ except Exception as v:
+ self.logger.error("set_up_all failed:\n" + traceback.format_exc())
+ # record all cases blocked
+ if self._enable_func:
+ for case_obj in self._get_functional_cases():
+ self._suite_result.test_case = case_obj.__name__
+ self._suite_result.test_case_blocked(
+ "set_up_all failed: {}".format(str(v))
+ )
+ return False
+
+ def _execute_test_case(self, case_obj):
+ """
+ Execute specified test case in specified suite. If any exception occurred in
+ validation process, save the result and tear down this case.
+ """
+ case_name = case_obj.__name__
+ self._suite_result.test_case = case_obj.__name__
+
+ # save into test command history
+ self.test_history = list()
+ self.enable_history(self.test_history)
+
+ case_result = True
+ try:
+ self.logger.info("Test Case %s Begin" % case_name)
+
+ self.running_case = case_name
+ # clean session
+ for sut_node in self.sut_nodes:
+ sut_node.get_session_output(timeout=0.1)
+ self.tg_node.get_session_output(timeout=0.1)
+ # run set_up function for each case
+ self.set_up()
+ # run test case
+ case_obj()
+
+ self._suite_result.test_case_passed()
+
+ self.logger.info("Test Case %s Result PASSED:" % case_name)
+
+ except VerifyFailure as v:
+ case_result = False
+ self._suite_result.test_case_failed(str(v))
+ self.logger.error("Test Case %s Result FAILED: " % (case_name) + str(v))
+ except KeyboardInterrupt:
+ self._suite_result.test_case_blocked("Skipped")
+ self.logger.error("Test Case %s SKIPPED: " % (case_name))
+ self.tear_down()
+ raise KeyboardInterrupt("Stop DTS")
+ except TimeoutException as e:
+ case_result = False
+ self._suite_result.test_case_failed(str(e))
+ self.logger.error("Test Case %s Result FAILED: " % (case_name) + str(e))
+ self.logger.error("%s" % (e.get_output()))
+ except Exception:
+ case_result = False
+ trace = traceback.format_exc()
+ self._suite_result.test_case_failed(trace)
+ self.logger.error("Test Case %s Result ERROR: " % (case_name) + trace)
+ finally:
+ self.execute_tear_down()
+ return case_result
+
+ def execute_test_cases(self):
+ """
+ Execute all test cases in one suite.
+ """
+ # prepare debugger rerun case environment
+ if self._enable_func:
+ for case_obj in self._get_functional_cases():
+ for i in range(self.tg_node.re_run_time + 1):
+ ret = self.execute_test_case(case_obj)
+
+ if ret is False and self.tg_node.re_run_time:
+ for sut_node in self.sut_nodes:
+ sut_node.get_session_output(timeout=0.5 * (i + 1))
+ self.tg_node.get_session_output(timeout=0.5 * (i + 1))
+ time.sleep(i + 1)
+ self.logger.info(
+ " Test case %s failed and re-run %d time"
+ % (case_obj.__name__, i + 1)
+ )
+ else:
+ break
+
+ def execute_test_case(self, case_obj):
+ """
+ Execute test case or enter into debug mode.
+ """
+ return self._execute_test_case(case_obj)
+
+ def get_result(self):
+ """
+ Return suite test result
+ """
+ return self._suite_result
+
+ def execute_tear_downall(self):
+ """
+ execute suite tear_down_all function
+ """
+ try:
+ self.tear_down_all()
+ except Exception:
+ self.logger.error("tear_down_all failed:\n" + traceback.format_exc())
+
+ for sut_node in self.sut_nodes:
+ sut_node.kill_all()
+ self.tg_node.kill_all()
+
+ def execute_tear_down(self):
+ """
+ execute suite tear_down function
+ """
+ try:
+ self.tear_down()
+ except Exception:
+ self.logger.error("tear_down failed:\n" + traceback.format_exc())
+ self.logger.warning(
+ "tear down %s failed, might iterfere next case's result!"
+ % self.running_case
+ )
+
+ def enable_history(self, history):
+ """
+ Enable history for all Node's default session
+ """
+ for sut_node in self.sut_nodes:
+ sut_node.session.set_history(history)
+
+ self.tg_node.session.set_history(history)
new file mode 100644
@@ -0,0 +1,218 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 PANTHEON.tech s.r.o.
+#
+
+"""
+Generic result container and reporters
+"""
+
+
+class Result(object):
+ """
+ Generic result container. Useful to store/retrieve results during
+ a DTF execution.
+
+ It manages and hide an internal complex structure like the one shown below.
+ This is presented to the user with a property based interface.
+
+ internals = [
+ 'sut1', [
+ 'kdriver',
+ 'firmware',
+ 'pkg',
+ 'driver',
+ 'dpdk_version',
+ 'target1', 'nic1', [
+ 'suite1', [
+ 'case1', ['PASSED', ''],
+ 'case2', ['PASSED', ''],
+ ],
+ ],
+ 'target2', 'nic1', [
+ 'suite2', [
+ 'case3', ['PASSED', ''],
+ 'case4', ['FAILED', 'message'],
+ ],
+ 'suite3', [
+ 'case5', ['BLOCKED', 'message'],
+ ],
+ ]
+ ]
+ ]
+
+ """
+
+ def __init__(self):
+ self.__sut = 0
+ self.__target = 0
+ self.__test_suite = 0
+ self.__test_case = 0
+ self.__test_result = None
+ self.__message = None
+ self.__internals = []
+ self.__failed_suts = {}
+ self.__failed_targets = {}
+
+ def __set_sut(self, sut):
+ if sut not in self.__internals:
+ self.__internals.append(sut)
+ self.__internals.append([])
+ self.__sut = self.__internals.index(sut)
+
+ def __get_sut(self):
+ return self.__internals[self.__sut]
+
+ def current_dpdk_version(self, sut):
+ """
+ Returns the dpdk version for a given SUT
+ """
+ try:
+ sut_idx = self.__internals.index(sut)
+ return self.__internals[sut_idx + 1][4]
+ except:
+ return ""
+
+ def __set_dpdk_version(self, dpdk_version):
+ if dpdk_version not in self.internals[self.__sut + 1]:
+ dpdk_current = self.__get_dpdk_version()
+ if dpdk_current:
+ if dpdk_version not in dpdk_current:
+ self.internals[self.__sut + 1][4] = (
+ dpdk_current + "/" + dpdk_version
+ )
+ else:
+ self.internals[self.__sut + 1].append(dpdk_version)
+
+ def __get_dpdk_version(self):
+ try:
+ return self.internals[self.__sut + 1][4]
+ except:
+ return ""
+
+ def __current_targets(self):
+ return self.internals[self.__sut + 1]
+
+ def __set_target(self, target):
+ targets = self.__current_targets()
+ if target not in targets:
+ targets.append(target)
+ targets.append("_nic_")
+ targets.append([])
+ self.__target = targets.index(target)
+
+ def __get_target(self):
+ return self.__current_targets()[self.__target]
+
+ def __current_suites(self):
+ return self.__current_targets()[self.__target + 2]
+
+ def __set_test_suite(self, test_suite):
+ suites = self.__current_suites()
+ if test_suite not in suites:
+ suites.append(test_suite)
+ suites.append([])
+ self.__test_suite = suites.index(test_suite)
+
+ def __get_test_suite(self):
+ return self.__current_suites()[self.__test_suite]
+
+ def __current_cases(self):
+ return self.__current_suites()[self.__test_suite + 1]
+
+ def __set_test_case(self, test_case):
+ cases = self.__current_cases()
+ cases.append(test_case)
+ cases.append([])
+ self.__test_case = cases.index(test_case)
+
+ def __get_test_case(self):
+ return self.__current_cases()[self.__test_case]
+
+ def __get_internals(self):
+ return self.__internals
+
+ def __current_result(self):
+ return self.__current_cases()[self.__test_case + 1]
+
+ def __set_test_case_result(self, result, message):
+ test_case = self.__current_result()
+ test_case.append(result)
+ test_case.append(message)
+ self.__test_result = result
+ self.__message = message
+
+ def copy_suite(self, suite_result):
+ self.__current_suites()[self.__test_suite + 1] = suite_result.__current_cases()
+
+ def test_case_passed(self):
+ """
+ Set last test case added as PASSED
+ """
+ self.__set_test_case_result(result="PASSED", message="")
+
+ def test_case_failed(self, message):
+ """
+ Set last test case added as FAILED
+ """
+ self.__set_test_case_result(result="FAILED", message=message)
+
+ def test_case_blocked(self, message):
+ """
+ Set last test case added as BLOCKED
+ """
+ self.__set_test_case_result(result="BLOCKED", message=message)
+
+ def all_suts(self):
+ """
+ Returns all the SUTs it's aware of.
+ """
+ return self.__internals[::2]
+
+ def all_targets(self, sut):
+ """
+ Returns the targets for a given SUT
+ """
+ try:
+ sut_idx = self.__internals.index(sut)
+ except:
+ return None
+ return self.__internals[sut_idx + 1][5::3]
+
+ def add_failed_sut(self, sut, msg):
+ """
+ Sets the given SUT as failing due to msg
+ """
+ self.__failed_suts[sut] = msg
+
+ def remove_failed_sut(self, sut):
+ """
+ Remove the given SUT from failed SUTs collection
+ """
+ if sut in self.__failed_suts:
+ self.__failed_suts.pop(sut)
+
+ def add_failed_target(self, sut, target, msg):
+ """
+ Sets the given SUT, target as failing due to msg
+ """
+ self.__failed_targets[sut + target] = msg
+
+ def remove_failed_target(self, sut, target):
+ """
+ Remove the given SUT, target from failed targets collection
+ """
+ key_word = sut + target
+ if key_word in self.__failed_targets:
+ self.__failed_targets.pop(key_word)
+
+ """
+ Attributes defined as properties to hide the implementation from the
+ presented interface.
+ """
+ sut = property(__get_sut, __set_sut)
+ dpdk_version = property(__get_dpdk_version, __set_dpdk_version)
+ target = property(__get_target, __set_target)
+ test_suite = property(__get_test_suite, __set_test_suite)
+ test_case = property(__get_test_case, __set_test_case)
+ internals = property(__get_internals)
@@ -4,6 +4,7 @@
# Copyright(c) 2022 University of New Hampshire
#
+import inspect
import sys
@@ -15,6 +16,19 @@ def GREEN(text: str) -> str:
return f"\u001B[32;1m{str(text)}\u001B[0m"
+def get_subclasses(module, clazz):
+ """
+ Get module attribute name and attribute.
+ """
+ for subclazz_name, subclazz in inspect.getmembers(module):
+ if (
+ hasattr(subclazz, "__bases__")
+ and subclazz.__bases__
+ and clazz in subclazz.__bases__
+ ):
+ yield (subclazz_name, subclazz)
+
+
def check_dts_python_version() -> None:
if sys.version_info.major < 3 or (
sys.version_info.major == 3 and sys.version_info.minor < 10