@@ -10,6 +10,7 @@
from framework.logger import DTSLOG
from framework.settings import SETTINGS
from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import PortIdentifier
from framework.utils import EnvVarsDict, MesonArgs
from .remote import CommandResult, RemoteSession, create_remote_session
@@ -37,6 +38,7 @@ def __init__(
self.name = name
self._logger = logger
self.remote_session = create_remote_session(node_config, name, logger)
+ self._disable_terminal_colors()
def close(self, force: bool = False) -> None:
"""
@@ -53,7 +55,7 @@ def is_alive(self) -> bool:
def send_command(
self,
command: str,
- timeout: float,
+ timeout: float = SETTINGS.timeout,
verify: bool = False,
env: EnvVarsDict | None = None,
) -> CommandResult:
@@ -64,6 +66,12 @@ def send_command(
"""
return self.remote_session.send_command(command, timeout, verify, env)
+ @abstractmethod
+ def _disable_terminal_colors(self) -> None:
+ """
+ Disable the colors in the ssh session.
+ """
+
@abstractmethod
def guess_dpdk_remote_dir(self, remote_dir) -> PurePath:
"""
@@ -173,3 +181,15 @@ def setup_hugepages(self, hugepage_amount: int, force_first_numa: bool) -> None:
if needed and mount the hugepages if needed.
If force_first_numa is True, configure hugepages just on the first socket.
"""
+
+ @abstractmethod
+ def get_logical_name_of_port(self, id: PortIdentifier) -> str | None:
+ """
+ Gets the logical name (eno1, ens5, etc) of a port by the port's identifier.
+ """
+
+ @abstractmethod
+ def check_link_is_up(self, id: PortIdentifier) -> bool:
+ """
+ Check that the link is up.
+ """
@@ -219,3 +219,6 @@ def _remove_dpdk_runtime_dirs(
def get_dpdk_file_prefix(self, dpdk_prefix) -> str:
return ""
+
+ def _disable_terminal_colors(self) -> None:
+ self.remote_session.send_command("export TERM=xterm-mono")
new file mode 100644
@@ -0,0 +1,155 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+#
+
+import itertools
+import uuid
+from abc import abstractmethod
+
+import scapy.utils
+from scapy.packet import Packet
+
+from framework.testbed_model.hw.port import PortIdentifier
+from framework.settings import SETTINGS
+
+from .traffic_generator import TrafficGenerator
+
+
+def _get_default_capture_name() -> str:
+ """
+ This is the function used for the default implementation of capture names.
+ """
+ return str(uuid.uuid4())
+
+
+class CapturingTrafficGenerator(TrafficGenerator):
+ """
+ A mixin interface which enables a packet generator to declare that it can capture
+ packets and return them to the user.
+
+ All packet functions added by this class should write out the captured packets
+ to a pcap file in output, allowing for easier analysis of failed tests.
+ """
+
+ def is_capturing(self) -> bool:
+ return True
+
+ @abstractmethod
+ def send_packet_and_capture(
+ self,
+ send_port_id: PortIdentifier,
+ packet: Packet,
+ receive_port_id: PortIdentifier,
+ duration_s: int,
+ capture_name: str = _get_default_capture_name(),
+ ) -> list[Packet]:
+ """
+ Send a packet on the send port and then capture all traffic on receive port
+ for the given duration.
+
+ Captures packets and adds them to output/<capture_name>.pcap.
+
+ This function must handle no packets even being received.
+ """
+ raise NotImplementedError()
+
+ def send_packets_and_capture(
+ self,
+ send_port_id: PortIdentifier,
+ packets: list[Packet],
+ receive_port_id: PortIdentifier,
+ duration_s: int,
+ capture_name: str = _get_default_capture_name(),
+ ) -> list[Packet]:
+ """
+ Send a group of packets on the send port and then capture all traffic on the
+ receive port for the given duration.
+
+ This function must handle no packets even being received.
+ """
+ self.logger.info(
+ f"Incremental captures will be created at output/{capture_name}-<n>.pcap"
+ )
+ received_packets: list[list[Packet]] = []
+ for i, packet in enumerate(packets):
+ received_packets.append(
+ self.send_packet_and_capture(
+ send_port_id,
+ packet,
+ receive_port_id,
+ duration_s,
+ capture_name=f"{capture_name}-{i}",
+ )
+ )
+
+ flattened_received_packets = list(
+ itertools.chain.from_iterable(received_packets)
+ )
+ scapy.utils.wrpcap(f"output/{capture_name}.pcap", flattened_received_packets)
+ return flattened_received_packets
+
+ def send_packet_and_expect_packet(
+ self,
+ send_port_id: PortIdentifier,
+ packet: Packet,
+ receive_port_id: PortIdentifier,
+ expected_packet: Packet,
+ timeout: int = SETTINGS.timeout,
+ capture_name: str = _get_default_capture_name(),
+ ) -> None:
+ """
+ Sends the provided packet, capturing received packets. Then asserts that the
+ only 1 packet was received, and that the packet that was received is equal to
+ the expected packet.
+ """
+ packets: list[Packet] = self.send_packet_and_capture(
+ send_port_id, packet, receive_port_id, timeout
+ )
+
+ assert len(packets) != 0, "Expected a packet, but none were captured"
+ assert len(packets) == 1, (
+ "More packets than expected were received, "
+ f"capture written to output/{capture_name}.pcap"
+ )
+ assert packets[0] == expected_packet, (
+ f"Received packet differed from expected packet, capture written to "
+ f"output/{capture_name}.pcap"
+ )
+
+ def send_packets_and_expect_packets(
+ self,
+ send_port_id: PortIdentifier,
+ packets: list[Packet],
+ receive_port_id: PortIdentifier,
+ expected_packets: list[Packet],
+ timeout: int = SETTINGS.timeout,
+ capture_name: str = _get_default_capture_name(),
+ ) -> None:
+ """
+ Sends the provided packets, capturing received packets. Then asserts that the
+ correct number of packets was received, and that the packets that were received
+ are equal to the expected packet. This equality is done by comparing packets
+ at the same index.
+ """
+ packets: list[Packet] = self.send_packets_and_capture(
+ send_port_id, packets, receive_port_id, timeout
+ )
+
+ if len(expected_packets) > 0:
+ assert len(packets) != 0, "Expected packets, but none were captured"
+
+ assert len(packets) == len(expected_packets), (
+ "A different number of packets than expected were received, "
+ f"capture written to output/{capture_name}.pcap or split across "
+ f"output/{capture_name}-<n>.pcap"
+ )
+ for i, expected_packet in enumerate(expected_packets):
+ assert packets[i] == expected_packet, (
+ f"Received packet {i} differed from expected packet, capture written "
+ f"to output/{capture_name}.pcap or output/{capture_name}-{i}.pcap"
+ )
+
+ def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
+ file_name = f"output/{capture_name}.pcap"
+ self.logger.debug(f"Writing packets to {file_name}")
+ scapy.utils.wrpcap(file_name, packets)
new file mode 100644
@@ -0,0 +1,55 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+#
+
+from dataclasses import dataclass
+
+from framework.config import PortConfig
+
+
+@dataclass(slots=True, frozen=True)
+class PortIdentifier:
+ node: str
+ pci: str
+
+
+@dataclass(slots=True, frozen=True)
+class Port:
+ """
+ identifier: The information that uniquely identifies this port.
+ pci: The PCI address of the port.
+
+ os_driver: The driver normally used by this port (ex: i40e)
+ dpdk_os_driver: The driver that the os should bind this device to for DPDK to use it. (ex: vfio-pci)
+
+ Note: os_driver and dpdk_os_driver may be the same thing, see mlx5_core
+
+ peer: The identifier for whatever this port is plugged into.
+ """
+
+ id: int
+ identifier: PortIdentifier
+ os_driver: str
+ dpdk_os_driver: str
+ peer: PortIdentifier
+
+ @property
+ def node(self) -> str:
+ return self.identifier.node
+
+ @property
+ def pci(self) -> str:
+ return self.identifier.pci
+
+ @staticmethod
+ def from_config(node_name: str, config: PortConfig) -> "Port":
+ return Port(
+ id=config.id,
+ identifier=PortIdentifier(
+ node=node_name,
+ pci=config.pci,
+ ),
+ os_driver=config.os_driver,
+ dpdk_os_driver=config.dpdk_os_driver,
+ peer=PortIdentifier(node=config.peer_node, pci=config.peer_pci),
+ )
@@ -47,6 +47,8 @@ def __init__(self, node_config: NodeConfiguration):
self._logger = getLogger(self.name)
self.main_session = create_session(self.config, self.name, self._logger)
+ self._logger.info(f"Connected to node: {self.name}")
+
self._get_remote_cpus()
# filter the node lcores according to user config
self.lcores = LogicalCoreListFilter(
@@ -55,8 +57,6 @@ def __init__(self, node_config: NodeConfiguration):
self._other_sessions = []
- self._logger.info(f"Created node: {self.name}")
-
def set_up_execution(self, execution_config: ExecutionConfiguration) -> None:
"""
Perform the execution setup that will be done for each execution
@@ -7,7 +7,7 @@
import time
from pathlib import PurePath
-from framework.config import BuildTargetConfiguration, NodeConfiguration
+from framework.config import BuildTargetConfiguration, SUTConfiguration
from framework.remote_session import CommandResult, OSSession
from framework.settings import SETTINGS
from framework.utils import EnvVarsDict, MesonArgs
@@ -34,7 +34,7 @@ class SutNode(Node):
_app_compile_timeout: float
_dpdk_kill_session: OSSession | None
- def __init__(self, node_config: NodeConfiguration):
+ def __init__(self, node_config: SUTConfiguration):
super(SutNode, self).__init__(node_config)
self._dpdk_prefix_list = []
self._build_target_config = None
@@ -47,6 +47,7 @@ def __init__(self, node_config: NodeConfiguration):
self._dpdk_timestamp = (
f"{str(os.getpid())}_{time.strftime('%Y%m%d%H%M%S', time.localtime())}"
)
+ self._logger.info(f"Created node: {self.name}")
@property
def _remote_dpdk_dir(self) -> PurePath:
new file mode 100644
@@ -0,0 +1,62 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+
+from framework.config import TGConfiguration
+from framework.testbed_model.hw.port import Port, PortIdentifier
+from framework.testbed_model.traffic_generator import TrafficGenerator
+
+from .node import Node
+
+
+class TGNode(Node):
+ """
+ A class for managing connections to the Traffic Generator node and managing
+ traffic generators residing within.
+ """
+
+ ports: list[Port]
+ traffic_generator: TrafficGenerator
+
+ def __init__(self, node_config: TGConfiguration):
+ super(TGNode, self).__init__(node_config)
+ self.ports = [
+ Port.from_config(self.name, port_config)
+ for port_config in node_config.ports
+ ]
+ self.traffic_generator = TrafficGenerator.from_config(
+ self, node_config.traffic_generator
+ )
+ self._logger.info(f"Created node: {self.name}")
+
+ def get_ports_with_loop_topology(
+ self, peer_node: "Node"
+ ) -> tuple[PortIdentifier, PortIdentifier] | None:
+ for port1 in self.ports:
+ if port1.peer.node == peer_node.name:
+ for port2 in self.ports:
+ if port2.peer.node == peer_node.name:
+ return (port1.identifier, port2.identifier)
+ self._logger.warning(
+ f"Attempted to find loop topology between {self.name} and {peer_node.name}, but none could be found"
+ )
+ return None
+
+ def verify(self) -> None:
+ for port in self.ports:
+ self.traffic_generator.assert_port_is_connected(port.identifier)
+ port = self.ports[0]
+ # Check that the traffic generator is working by sending a packet.
+ # send_packet should throw an error if something goes wrong.
+ self.traffic_generator.send_packet(
+ port.identifier, Ether() / IP() / UDP() / Raw(b"Hello World")
+ )
+
+ def close(self) -> None:
+ self.traffic_generator.close()
+ super(TGNode, self).close()
new file mode 100644
@@ -0,0 +1,59 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+#
+
+from abc import ABC, abstractmethod
+
+from scapy.packet import Packet
+
+from framework.config import TrafficGeneratorConfig, TrafficGeneratorType
+from framework.logger import DTSLOG
+from framework.testbed_model.hw.port import PortIdentifier
+
+
+class TrafficGenerator(ABC):
+ logger: DTSLOG
+
+ @abstractmethod
+ def send_packet(self, port: PortIdentifier, packet: Packet) -> None:
+ """
+ Sends a packet and blocks until it is fully sent.
+
+ What fully sent means is defined by the traffic generator.
+ """
+ raise NotImplementedError()
+
+ def send_packets(self, port: PortIdentifier, packets: list[Packet]) -> None:
+ """
+ Sends a list of packets and blocks until they are fully sent.
+
+ What "fully sent" means is defined by the traffic generator.
+ """
+ # default implementation, this should be overridden if there is a better
+ # way to do this on a specific packet generator
+ for packet in packets:
+ self.send_packet(port, packet)
+
+ def is_capturing(self) -> bool:
+ """
+ Whether this traffic generator can capture traffic
+ """
+ return False
+
+ @staticmethod
+ def from_config(
+ node: "Node", traffic_generator_config: TrafficGeneratorConfig
+ ) -> "TrafficGenerator":
+ from .scapy import ScapyTrafficGenerator
+
+ match traffic_generator_config.traffic_generator_type:
+ case TrafficGeneratorType.SCAPY:
+ return ScapyTrafficGenerator(node, node.ports)
+
+ @abstractmethod
+ def close(self):
+ pass
+
+ @abstractmethod
+ def assert_port_is_connected(self, id: PortIdentifier) -> None:
+ pass