@@ -13,6 +13,7 @@ executions:
skip_smoke_tests: false # optional flag that allows you to skip smoke tests
test_suites:
- hello_world
+ - os_udp
system_under_test_node:
node_name: "SUT 1"
vdevs: # optional; if removed, vdevs won't be used in the execution
@@ -185,7 +185,8 @@
"test_suite": {
"type": "string",
"enum": [
- "hello_world"
+ "hello_world",
+ "os_udp"
]
},
"test_target": {
@@ -3,7 +3,8 @@
# Copyright(c) 2023 University of New Hampshire
import json
-from typing import TypedDict
+from ipaddress import IPv4Interface, IPv6Interface
+from typing import TypedDict, Union
from typing_extensions import NotRequired
@@ -181,3 +182,20 @@ def configure_port_state(self, port: Port, enable: bool) -> None:
self.send_command(
f"ip link set dev {port.logical_name} {state}", privileged=True
)
+
+ def configure_port_ip_address(
+ self,
+ address: Union[IPv4Interface, IPv6Interface],
+ port: Port,
+ delete: bool,
+ ) -> None:
+ command = "del" if delete else "add"
+ self.send_command(
+ f"ip address {command} {address} dev {port.logical_name}",
+ privileged=True,
+ verify=True,
+ )
+
+ def configure_ipv4_forwarding(self, enable: bool) -> None:
+ state = 1 if enable else 0
+ self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True)
@@ -4,8 +4,9 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable
+from ipaddress import IPv4Interface, IPv6Interface
from pathlib import PurePath
-from typing import Type, TypeVar
+from typing import Type, TypeVar, Union
from framework.config import Architecture, NodeConfiguration, NodeInfo
from framework.logger import DTSLOG
@@ -264,3 +265,20 @@ def configure_port_state(self, port: Port, enable: bool) -> None:
"""
Enable/disable port.
"""
+
+ @abstractmethod
+ def configure_port_ip_address(
+ self,
+ address: Union[IPv4Interface, IPv6Interface],
+ port: Port,
+ delete: bool,
+ ) -> None:
+ """
+ Configure (add or delete) an IP address of the input port.
+ """
+
+ @abstractmethod
+ def configure_ipv4_forwarding(self, enable: bool) -> None:
+ """
+ Enable IPv4 forwarding in the underlying OS.
+ """
@@ -9,7 +9,13 @@
import importlib
import inspect
import re
+from ipaddress import IPv4Interface, IPv6Interface, ip_interface
from types import MethodType
+from typing import Union
+
+from scapy.layers.inet import IP # type: ignore[import]
+from scapy.layers.l2 import Ether # type: ignore[import]
+from scapy.packet import Packet, Padding # type: ignore[import]
from .exception import (
BlockingTestSuiteError,
@@ -21,6 +27,8 @@
from .settings import SETTINGS
from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
from .testbed_model import SutNode, TGNode
+from .testbed_model.hw.port import Port, PortLink
+from .utils import get_packet_summaries
class TestSuite(object):
@@ -47,6 +55,15 @@ class TestSuite(object):
_test_cases_to_run: list[str]
_func: bool
_result: TestSuiteResult
+ _port_links: list[PortLink]
+ _sut_port_ingress: Port
+ _sut_port_egress: Port
+ _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
+ _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface]
+ _tg_port_ingress: Port
+ _tg_port_egress: Port
+ _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
+ _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface]
def __init__(
self,
@@ -63,6 +80,31 @@ def __init__(
self._test_cases_to_run.extend(SETTINGS.test_cases)
self._func = func
self._result = build_target_result.add_test_suite(self.__class__.__name__)
+ self._port_links = []
+ self._process_links()
+ self._sut_port_ingress, self._tg_port_egress = (
+ self._port_links[0].sut_port,
+ self._port_links[0].tg_port,
+ )
+ self._sut_port_egress, self._tg_port_ingress = (
+ self._port_links[1].sut_port,
+ self._port_links[1].tg_port,
+ )
+ self._sut_ip_address_ingress = ip_interface("192.168.100.2/24")
+ self._sut_ip_address_egress = ip_interface("192.168.101.2/24")
+ self._tg_ip_address_egress = ip_interface("192.168.100.3/24")
+ self._tg_ip_address_ingress = ip_interface("192.168.101.3/24")
+
+ def _process_links(self) -> None:
+ for sut_port in self.sut_node.ports:
+ for tg_port in self.tg_node.ports:
+ if (sut_port.identifier, sut_port.peer) == (
+ tg_port.peer,
+ tg_port.identifier,
+ ):
+ self._port_links.append(
+ PortLink(sut_port=sut_port, tg_port=tg_port)
+ )
def set_up_suite(self) -> None:
"""
@@ -85,14 +127,181 @@ def tear_down_test_case(self) -> None:
Tear down the previously created test fixtures after each test case.
"""
+ def configure_testbed_ipv4(self, restore: bool = False) -> None:
+ delete = True if restore else False
+ enable = False if restore else True
+ self._configure_ipv4_forwarding(enable)
+ self.sut_node.configure_port_ip_address(
+ self._sut_ip_address_egress, self._sut_port_egress, delete
+ )
+ self.sut_node.configure_port_state(self._sut_port_egress, enable)
+ self.sut_node.configure_port_ip_address(
+ self._sut_ip_address_ingress, self._sut_port_ingress, delete
+ )
+ self.sut_node.configure_port_state(self._sut_port_ingress, enable)
+ self.tg_node.configure_port_ip_address(
+ self._tg_ip_address_ingress, self._tg_port_ingress, delete
+ )
+ self.tg_node.configure_port_state(self._tg_port_ingress, enable)
+ self.tg_node.configure_port_ip_address(
+ self._tg_ip_address_egress, self._tg_port_egress, delete
+ )
+ self.tg_node.configure_port_state(self._tg_port_egress, enable)
+
+ def _configure_ipv4_forwarding(self, enable: bool) -> None:
+ self.sut_node.configure_ipv4_forwarding(enable)
+
+ def send_packet_and_capture(
+ self, packet: Packet, duration: float = 1
+ ) -> list[Packet]:
+ """
+ Send a packet through the appropriate interface and
+ receive on the appropriate interface.
+ Modify the packet with l3/l2 addresses corresponding
+ to the testbed and desired traffic.
+ """
+ packet = self._adjust_addresses(packet)
+ return self.tg_node.send_packet_and_capture(
+ packet, self._tg_port_egress, self._tg_port_ingress, duration
+ )
+
+ def get_expected_packet(self, packet: Packet) -> Packet:
+ return self._adjust_addresses(packet, expected=True)
+
+ def _adjust_addresses(self, packet: Packet, expected: bool = False) -> Packet:
+ """
+ Assumptions:
+ Two links between SUT and TG, one link is TG -> SUT,
+ the other SUT -> TG.
+ """
+ if expected:
+ # The packet enters the TG from SUT
+ # update l2 addresses
+ packet.src = self._sut_port_egress.mac_address
+ packet.dst = self._tg_port_ingress.mac_address
+
+ # The packet is routed from TG egress to TG ingress
+ # update l3 addresses
+ packet.payload.src = self._tg_ip_address_egress.ip.exploded
+ packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
+ else:
+ # The packet leaves TG towards SUT
+ # update l2 addresses
+ packet.src = self._tg_port_egress.mac_address
+ packet.dst = self._sut_port_ingress.mac_address
+
+ # The packet is routed from TG egress to TG ingress
+ # update l3 addresses
+ packet.payload.src = self._tg_ip_address_egress.ip.exploded
+ packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
+
+ return Ether(packet.build())
+
def verify(self, condition: bool, failure_description: str) -> None:
if not condition:
+ self._fail_test_case_verify(failure_description)
+
+ def _fail_test_case_verify(self, failure_description: str) -> None:
+ self._logger.debug(
+ "A test case failed, showing the last 10 commands executed on SUT:"
+ )
+ for command_res in self.sut_node.main_session.remote_session.history[-10:]:
+ self._logger.debug(command_res.command)
+ self._logger.debug(
+ "A test case failed, showing the last 10 commands executed on TG:"
+ )
+ for command_res in self.tg_node.main_session.remote_session.history[-10:]:
+ self._logger.debug(command_res.command)
+ raise TestCaseVerifyError(failure_description)
+
+ def verify_packets(
+ self, expected_packet: Packet, received_packets: list[Packet]
+ ) -> None:
+ for received_packet in received_packets:
+ if self._compare_packets(expected_packet, received_packet):
+ break
+ else:
+ self._logger.debug(
+ f"The expected packet {get_packet_summaries(expected_packet)} "
+ f"not found among received {get_packet_summaries(received_packets)}"
+ )
+ self._fail_test_case_verify(
+ "An expected packet not found among received packets."
+ )
+
+ def _compare_packets(
+ self, expected_packet: Packet, received_packet: Packet
+ ) -> bool:
+ self._logger.debug(
+ "Comparing packets: \n"
+ f"{expected_packet.summary()}\n"
+ f"{received_packet.summary()}"
+ )
+
+ l3 = IP in expected_packet.layers()
+ self._logger.debug("Found l3 layer")
+
+ received_payload = received_packet
+ expected_payload = expected_packet
+ while received_payload and expected_payload:
+ self._logger.debug("Comparing payloads:")
+ self._logger.debug(f"Received: {received_payload}")
+ self._logger.debug(f"Expected: {expected_payload}")
+ if received_payload.__class__ == expected_payload.__class__:
+ self._logger.debug("The layers are the same.")
+ if received_payload.__class__ == Ether:
+ if not self._verify_l2_frame(received_payload, l3):
+ return False
+ elif received_payload.__class__ == IP:
+ if not self._verify_l3_packet(received_payload, expected_payload):
+ return False
+ else:
+ # Different layers => different packets
+ return False
+ received_payload = received_payload.payload
+ expected_payload = expected_payload.payload
+
+ if expected_payload:
self._logger.debug(
- "A test case failed, showing the last 10 commands executed on SUT:"
+ f"The expected packet did not contain {expected_payload}."
)
- for command_res in self.sut_node.main_session.remote_session.history[-10:]:
- self._logger.debug(command_res.command)
- raise TestCaseVerifyError(failure_description)
+ return False
+ if received_payload and received_payload.__class__ != Padding:
+ self._logger.debug(
+ "The received payload had extra layers which were not padding."
+ )
+ return False
+ return True
+
+ def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
+ self._logger.debug("Looking at the Ether layer.")
+ self._logger.debug(
+ f"Comparing received dst mac '{received_packet.dst}' "
+ f"with expected '{self._tg_port_ingress.mac_address}'."
+ )
+ if received_packet.dst != self._tg_port_ingress.mac_address:
+ return False
+
+ expected_src_mac = self._tg_port_egress.mac_address
+ if l3:
+ expected_src_mac = self._sut_port_egress.mac_address
+ self._logger.debug(
+ f"Comparing received src mac '{received_packet.src}' "
+ f"with expected '{expected_src_mac}'."
+ )
+ if received_packet.src != expected_src_mac:
+ return False
+
+ return True
+
+ def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
+ self._logger.debug("Looking at the IP layer.")
+ if (
+ received_packet.src != expected_packet.src
+ or received_packet.dst != expected_packet.dst
+ ):
+ return False
+ return True
def run(self) -> None:
"""
@@ -8,7 +8,8 @@
"""
from abc import ABC
-from typing import Any, Callable, Type
+from ipaddress import IPv4Interface, IPv6Interface
+from typing import Any, Callable, Type, Union
from framework.config import (
BuildTargetConfiguration,
@@ -221,6 +222,17 @@ def configure_port_state(self, port: Port, enable: bool = True) -> None:
"""
self.main_session.configure_port_state(port, enable)
+ def configure_port_ip_address(
+ self,
+ address: Union[IPv4Interface, IPv6Interface],
+ port: Port,
+ delete: bool = False,
+ ) -> None:
+ """
+ Configure the IP address of a port on this node.
+ """
+ self.main_session.configure_port_ip_address(address, port, delete)
+
def close(self) -> None:
"""
Close all connections and free other resources.
@@ -351,6 +351,9 @@ def run_dpdk_app(
f"{app_path} {eal_args}", timeout, privileged=True, verify=True
)
+ def configure_ipv4_forwarding(self, enable: bool) -> None:
+ self.main_session.configure_ipv4_forwarding(enable)
+
def create_interactive_shell(
self,
shell_cls: Type[InteractiveShellType],
new file mode 100644
@@ -0,0 +1,45 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""
+Configure SUT node to route traffic from if1 to if2.
+Send a packet to the SUT node, verify it comes back on the second port on the TG node.
+"""
+
+from scapy.layers.inet import IP, UDP # type: ignore[import]
+from scapy.layers.l2 import Ether # type: ignore[import]
+
+from framework.test_suite import TestSuite
+
+
+class TestOSUdp(TestSuite):
+ def set_up_suite(self) -> None:
+ """
+ Setup:
+ Configure SUT ports and SUT to route traffic from if1 to if2.
+ """
+
+ self.configure_testbed_ipv4()
+
+ def test_os_udp(self) -> None:
+ """
+ Steps:
+ Send a UDP packet.
+ Verify:
+ The packet with proper addresses arrives at the other TG port.
+ """
+
+ packet = Ether() / IP() / UDP()
+
+ received_packets = self.send_packet_and_capture(packet)
+
+ expected_packet = self.get_expected_packet(packet)
+
+ self.verify_packets(expected_packet, received_packets)
+
+ def tear_down_suite(self) -> None:
+ """
+ Teardown:
+ Remove the SUT port configuration configured in setup.
+ """
+ self.configure_testbed_ipv4(restore=True)