[v2,3/6] dts: traffic generator abstractions

Message ID 20230717110709.39220-4-juraj.linkes@pantheon.tech (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series dts: tg abstractions and scapy tg |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Juraj Linkeš July 17, 2023, 11:07 a.m. UTC
There are traffic abstractions for all traffic generators and for
traffic generators that can capture (not just count) packets.

There also related abstractions, such as TGNode where the traffic
generators reside and some related code.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 doc/guides/tools/dts.rst                      |  31 ++++
 dts/framework/dts.py                          |  61 ++++----
 dts/framework/remote_session/linux_session.py |  78 ++++++++++
 dts/framework/remote_session/os_session.py    |  15 ++
 dts/framework/test_suite.py                   |   4 +-
 dts/framework/testbed_model/__init__.py       |   1 +
 .../capturing_traffic_generator.py            | 135 ++++++++++++++++++
 dts/framework/testbed_model/hw/port.py        |  60 ++++++++
 dts/framework/testbed_model/node.py           |  15 ++
 dts/framework/testbed_model/scapy.py          |  74 ++++++++++
 dts/framework/testbed_model/tg_node.py        |  99 +++++++++++++
 .../testbed_model/traffic_generator.py        |  72 ++++++++++
 dts/framework/utils.py                        |  13 ++
 13 files changed, 632 insertions(+), 26 deletions(-)
 create mode 100644 dts/framework/testbed_model/capturing_traffic_generator.py
 create mode 100644 dts/framework/testbed_model/hw/port.py
 create mode 100644 dts/framework/testbed_model/scapy.py
 create mode 100644 dts/framework/testbed_model/tg_node.py
 create mode 100644 dts/framework/testbed_model/traffic_generator.py
  

Comments

Jeremy Spewock July 18, 2023, 7:56 p.m. UTC | #1
Hey Juraj,

Just a couple of comments below, but very minor stuff. Just a few docstring
that I commented on and one question about the factory for traffic
generators that I was wondering what you thought about. More below:

On Mon, Jul 17, 2023 at 7:07 AM Juraj Linkeš <juraj.linkes@pantheon.tech>
wrote:

> There are traffic abstractions for all traffic generators and for
> traffic generators that can capture (not just count) packets.
>
> There also related abstractions, such as TGNode where the traffic
> generators reside and some related code.
>
> Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
> ---
>  doc/guides/tools/dts.rst                      |  31 ++++
>  dts/framework/dts.py                          |  61 ++++----
>  dts/framework/remote_session/linux_session.py |  78 ++++++++++
>  dts/framework/remote_session/os_session.py    |  15 ++
>  dts/framework/test_suite.py                   |   4 +-
>  dts/framework/testbed_model/__init__.py       |   1 +
>  .../capturing_traffic_generator.py            | 135 ++++++++++++++++++
>  dts/framework/testbed_model/hw/port.py        |  60 ++++++++
>  dts/framework/testbed_model/node.py           |  15 ++
>  dts/framework/testbed_model/scapy.py          |  74 ++++++++++
>  dts/framework/testbed_model/tg_node.py        |  99 +++++++++++++
>  .../testbed_model/traffic_generator.py        |  72 ++++++++++
>  dts/framework/utils.py                        |  13 ++
>  13 files changed, 632 insertions(+), 26 deletions(-)
>  create mode 100644
> dts/framework/testbed_model/capturing_traffic_generator.py
>  create mode 100644 dts/framework/testbed_model/hw/port.py
>  create mode 100644 dts/framework/testbed_model/scapy.py
>  create mode 100644 dts/framework/testbed_model/tg_node.py
>  create mode 100644 dts/framework/testbed_model/traffic_generator.py
>
> diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
> index c7b31623e4..2f97d1df6e 100644
> --- a/doc/guides/tools/dts.rst
> +++ b/doc/guides/tools/dts.rst
> @@ -153,6 +153,37 @@ There are two areas that need to be set up on a
> System Under Test:
>
>        sudo usermod -aG sudo <sut_user>
>
> +
> +Setting up Traffic Generator Node
> +---------------------------------
> +
> +These need to be set up on a Traffic Generator Node:
> +
> +#. **Traffic generator dependencies**
> +
> +   The traffic generator running on the traffic generator node must be
> installed beforehand.
> +   For Scapy traffic generator, only a few Python libraries need to be
> installed:
> +
> +   .. code-block:: console
> +
> +      sudo apt install python3-pip
> +      sudo pip install --upgrade pip
> +      sudo pip install scapy==2.5.0
> +
> +#. **Hardware dependencies**
> +
> +   The traffic generators, like DPDK, need a proper driver and firmware.
> +   The Scapy traffic generator doesn't have strict requirements - the
> drivers that come
> +   with most OS distributions will be satisfactory.
> +
> +
> +#. **User with administrator privileges**
> +
> +   Similarly to the System Under Test, traffic generators need
> administrator privileges
> +   to be able to use the devices.
> +   Refer to the `System Under Test section <sut_admin_user>` for details.
> +
> +
>  Running DTS
>  -----------
>
> diff --git a/dts/framework/dts.py b/dts/framework/dts.py
> index 372bc72787..265ed7fd5b 100644
> --- a/dts/framework/dts.py
> +++ b/dts/framework/dts.py
> @@ -15,7 +15,7 @@
>  from .logger import DTSLOG, getLogger
>  from .test_result import BuildTargetResult, DTSResult, ExecutionResult,
> Result
>  from .test_suite import get_test_suites
> -from .testbed_model import SutNode
> +from .testbed_model import SutNode, TGNode
>  from .utils import check_dts_python_version
>
>  dts_logger: DTSLOG = getLogger("DTSRunner")
> @@ -33,29 +33,31 @@ def run_all() -> None:
>      # check the python version of the server that run dts
>      check_dts_python_version()
>
> -    nodes: dict[str, SutNode] = {}
> +    sut_nodes: dict[str, SutNode] = {}
> +    tg_nodes: dict[str, TGNode] = {}
>      try:
>          # for all Execution sections
>          for execution in CONFIGURATION.executions:
> -            sut_node = None
> -            if execution.system_under_test_node.name in nodes:
> -                # a Node with the same name already exists
> -                sut_node = nodes[execution.system_under_test_node.name]
> -            else:
> -                # the SUT has not been initialized yet
> -                try:
> +            sut_node = sut_nodes.get(
> execution.system_under_test_node.name)
> +            tg_node = tg_nodes.get(execution.traffic_generator_node.name)
> +
> +            try:
> +                if not sut_node:
>                      sut_node = SutNode(execution.system_under_test_node)
> -                    result.update_setup(Result.PASS)
> -                except Exception as e:
> -                    dts_logger.exception(
> -                        f"Connection to node
> {execution.system_under_test_node} failed."
> -                    )
> -                    result.update_setup(Result.FAIL, e)
> -                else:
> -                    nodes[sut_node.name] = sut_node
> -
> -            if sut_node:
> -                _run_execution(sut_node, execution, result)
> +                    sut_nodes[sut_node.name] = sut_node
> +                if not tg_node:
> +                    tg_node = TGNode(execution.traffic_generator_node)
> +                    tg_nodes[tg_node.name] = tg_node
> +                result.update_setup(Result.PASS)
> +            except Exception as e:
> +                failed_node = execution.system_under_test_node.name
> +                if sut_node:
> +                    failed_node = execution.traffic_generator_node.name
> +                dts_logger.exception(f"Creation of node {failed_node}
> failed.")
> +                result.update_setup(Result.FAIL, e)
> +
> +            else:
> +                _run_execution(sut_node, tg_node, execution, result)
>
>      except Exception as e:
>          dts_logger.exception("An unexpected error has occurred.")
> @@ -64,7 +66,7 @@ def run_all() -> None:
>
>      finally:
>          try:
> -            for node in nodes.values():
> +            for node in (sut_nodes | tg_nodes).values():
>                  node.close()
>              result.update_teardown(Result.PASS)
>          except Exception as e:
> @@ -81,7 +83,10 @@ def run_all() -> None:
>
>
>  def _run_execution(
> -    sut_node: SutNode, execution: ExecutionConfiguration, result:
> DTSResult
> +    sut_node: SutNode,
> +    tg_node: TGNode,
> +    execution: ExecutionConfiguration,
> +    result: DTSResult,
>  ) -> None:
>      """
>      Run the given execution. This involves running the execution setup as
> well as
> @@ -101,7 +106,9 @@ def _run_execution(
>
>      else:
>          for build_target in execution.build_targets:
> -            _run_build_target(sut_node, build_target, execution,
> execution_result)
> +            _run_build_target(
> +                sut_node, tg_node, build_target, execution,
> execution_result
> +            )
>
>      finally:
>          try:
> @@ -114,6 +121,7 @@ def _run_execution(
>
>  def _run_build_target(
>      sut_node: SutNode,
> +    tg_node: TGNode,
>      build_target: BuildTargetConfiguration,
>      execution: ExecutionConfiguration,
>      execution_result: ExecutionResult,
> @@ -134,7 +142,7 @@ def _run_build_target(
>          build_target_result.update_setup(Result.FAIL, e)
>
>      else:
> -        _run_all_suites(sut_node, execution, build_target_result)
> +        _run_all_suites(sut_node, tg_node, execution, build_target_result)
>
>      finally:
>          try:
> @@ -147,6 +155,7 @@ def _run_build_target(
>
>  def _run_all_suites(
>      sut_node: SutNode,
> +    tg_node: TGNode,
>      execution: ExecutionConfiguration,
>      build_target_result: BuildTargetResult,
>  ) -> None:
> @@ -161,7 +170,7 @@ def _run_all_suites(
>      for test_suite_config in execution.test_suites:
>          try:
>              _run_single_suite(
> -                sut_node, execution, build_target_result,
> test_suite_config
> +                sut_node, tg_node, execution, build_target_result,
> test_suite_config
>              )
>          except BlockingTestSuiteError as e:
>              dts_logger.exception(
> @@ -177,6 +186,7 @@ def _run_all_suites(
>
>  def _run_single_suite(
>      sut_node: SutNode,
> +    tg_node: TGNode,
>      execution: ExecutionConfiguration,
>      build_target_result: BuildTargetResult,
>      test_suite_config: TestSuiteConfig,
> @@ -205,6 +215,7 @@ def _run_single_suite(
>          for test_suite_class in test_suite_classes:
>              test_suite = test_suite_class(
>                  sut_node,
> +                tg_node,
>                  test_suite_config.test_cases,
>                  execution.func,
>                  build_target_result,
> diff --git a/dts/framework/remote_session/linux_session.py
> b/dts/framework/remote_session/linux_session.py
> index f13f399121..284c74795d 100644
> --- a/dts/framework/remote_session/linux_session.py
> +++ b/dts/framework/remote_session/linux_session.py
> @@ -2,13 +2,47 @@
>  # Copyright(c) 2023 PANTHEON.tech s.r.o.
>  # Copyright(c) 2023 University of New Hampshire
>
> +import json
> +from typing import TypedDict
> +
> +from typing_extensions import NotRequired
> +
>  from framework.exception import RemoteCommandExecutionError
>  from framework.testbed_model import LogicalCore
> +from framework.testbed_model.hw.port import Port
>  from framework.utils import expand_range
>
>  from .posix_session import PosixSession
>
>
> +class LshwConfigurationOutput(TypedDict):
> +    link: str
> +
> +
> +class LshwOutput(TypedDict):
> +    """
> +    A model of the relevant information from json lshw output, e.g.:
> +    {
> +    ...
> +    "businfo" : "pci@0000:08:00.0",
> +    "logicalname" : "enp8s0",
> +    "version" : "00",
> +    "serial" : "52:54:00:59:e1:ac",
> +    ...
> +    "configuration" : {
> +      ...
> +      "link" : "yes",
> +      ...
> +    },
> +    ...
> +    """
> +
> +    businfo: str
> +    logicalname: NotRequired[str]
> +    serial: NotRequired[str]
> +    configuration: LshwConfigurationOutput
> +
> +
>  class LinuxSession(PosixSession):
>      """
>      The implementation of non-Posix compliant parts of Linux remote
> sessions.
> @@ -102,3 +136,47 @@ def _configure_huge_pages(
>          self.send_command(
>              f"echo {amount} | tee {hugepage_config_path}", privileged=True
>          )
> +
> +    def update_ports(self, ports: list[Port]) -> None:
> +        self._logger.debug("Gathering port info.")
> +        for port in ports:
> +            assert (
> +                port.node == self.name
> +            ), "Attempted to gather port info on the wrong node"
> +
> +        port_info_list = self._get_lshw_info()
> +        for port in ports:
> +            for port_info in port_info_list:
> +                if f"pci@{port.pci}" == port_info.get("businfo"):
> +                    self._update_port_attr(
> +                        port, port_info.get("logicalname"), "logical_name"
> +                    )
> +                    self._update_port_attr(port, port_info.get("serial"),
> "mac_address")
> +                    port_info_list.remove(port_info)
> +                    break
> +            else:
> +                self._logger.warning(f"No port at pci address {port.pci}
> found.")
> +
> +    def _get_lshw_info(self) -> list[LshwOutput]:
> +        output = self.send_command("lshw -quiet -json -C network",
> verify=True)
> +        return json.loads(output.stdout)
> +
> +    def _update_port_attr(
> +        self, port: Port, attr_value: str | None, attr_name: str
> +    ) -> None:
> +        if attr_value:
> +            setattr(port, attr_name, attr_value)
> +            self._logger.debug(
> +                f"Found '{attr_name}' of port {port.pci}: '{attr_value}'."
> +            )
> +        else:
> +            self._logger.warning(
> +                f"Attempted to get '{attr_name}' of port {port.pci}, "
> +                f"but it doesn't exist."
> +            )
> +
> +    def configure_port_state(self, port: Port, enable: bool) -> None:
> +        state = "up" if enable else "down"
> +        self.send_command(
> +            f"ip link set dev {port.logical_name} {state}",
> privileged=True
> +        )
> diff --git a/dts/framework/remote_session/os_session.py
> b/dts/framework/remote_session/os_session.py
> index cc13b02f16..633d06eb5d 100644
> --- a/dts/framework/remote_session/os_session.py
> +++ b/dts/framework/remote_session/os_session.py
> @@ -12,6 +12,7 @@
>  from framework.remote_session.remote import InteractiveShell, TestPmdShell
>  from framework.settings import SETTINGS
>  from framework.testbed_model import LogicalCore
> +from framework.testbed_model.hw.port import Port
>  from framework.utils import MesonArgs
>
>  from .remote import (
> @@ -255,3 +256,17 @@ def get_node_info(self) -> NodeInfo:
>          """
>          Collect information about the node
>          """
> +
> +    @abstractmethod
> +    def update_ports(self, ports: list[Port]) -> None:
> +        """
> +        Get additional information about ports:
> +            Logical name (e.g. enp7s0) if applicable
> +            Mac address
> +        """
> +
> +    @abstractmethod
> +    def configure_port_state(self, port: Port, enable: bool) -> None:
> +        """
> +        Enable/disable port.
> +        """
> diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
> index de94c9332d..056460dd05 100644
> --- a/dts/framework/test_suite.py
> +++ b/dts/framework/test_suite.py
> @@ -20,7 +20,7 @@
>  from .logger import DTSLOG, getLogger
>  from .settings import SETTINGS
>  from .test_result import BuildTargetResult, Result, TestCaseResult,
> TestSuiteResult
> -from .testbed_model import SutNode
> +from .testbed_model import SutNode, TGNode
>
>
>  class TestSuite(object):
> @@ -51,11 +51,13 @@ class TestSuite(object):
>      def __init__(
>          self,
>          sut_node: SutNode,
> +        tg_node: TGNode,
>          test_cases: list[str],
>          func: bool,
>          build_target_result: BuildTargetResult,
>      ):
>          self.sut_node = sut_node
> +        self.tg_node = tg_node
>          self._logger = getLogger(self.__class__.__name__)
>          self._test_cases_to_run = test_cases
>          self._test_cases_to_run.extend(SETTINGS.test_cases)
> diff --git a/dts/framework/testbed_model/__init__.py
> b/dts/framework/testbed_model/__init__.py
> index f54a947051..5cbb859e47 100644
> --- a/dts/framework/testbed_model/__init__.py
> +++ b/dts/framework/testbed_model/__init__.py
> @@ -20,3 +20,4 @@
>  )
>  from .node import Node
>  from .sut_node import SutNode
> +from .tg_node import TGNode
> diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py
> b/dts/framework/testbed_model/capturing_traffic_generator.py
> new file mode 100644
> index 0000000000..1130d87f1e
> --- /dev/null
> +++ b/dts/framework/testbed_model/capturing_traffic_generator.py
> @@ -0,0 +1,135 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""Traffic generator that can capture packets.
> +
> +In functional testing, we need to interrogate received packets to check
> their validity.
> +Here we define the interface common to all
> +traffic generators capable of capturing traffic.
>

Is there a reason for the line break here? Just to keep things consistent I
think it might make sense to extend this line to be the same length as the
one above.


> +"""
> +
> +import uuid
> +from abc import abstractmethod
> +
> +import scapy.utils  # type: ignore[import]
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.settings import SETTINGS
> +from framework.utils import get_packet_summaries
> +
> +from .hw.port import Port
> +from .traffic_generator import TrafficGenerator
> +
> +
> +def _get_default_capture_name() -> str:
> +    """
> +    This is the function used for the default implementation of capture
> names.
> +    """
> +    return str(uuid.uuid4())
> +
> +
> +class CapturingTrafficGenerator(TrafficGenerator):
> +    """
> +    A mixin interface which enables a packet generator to declare that
> it can capture
> +    packets and return them to the user.
>

This is missing the one line summary at the top of the comment. Obviously
this is not a big issue, but we likely would want this to be uniform with
the rest of the module which does have the summary at the top.


> +
> +    The methods of capturing traffic generators obey the following
> workflow:
> +        1. send packets
> +        2. capture packets
> +        3. write the capture to a .pcap file
> +        4. return the received packets
> +    """
> +
> +    @property
> +    def is_capturing(self) -> bool:
> +        return True
> +
> +    def send_packet_and_capture(
> +        self,
> +        packet: Packet,
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +        capture_name: str = _get_default_capture_name(),
> +    ) -> list[Packet]:
> +        """Send a packet, return received traffic.
> +
> +        Send a packet on the send_port and then return all traffic
> captured
> +        on the receive_port for the given duration. Also record the
> captured traffic
> +        in a pcap file.
> +
>
+        Args:
> +            packet: The packet to send.
> +            send_port: The egress port on the TG node.
> +            receive_port: The ingress port in the TG node.
> +            duration: Capture traffic for this amount of time after
> sending the packet.
> +            capture_name: The name of the .pcap file where to store the
> capture.
> +
> +        Returns:
> +             A list of received packets. May be empty if no packets are
> captured.
> +        """
> +        return self.send_packets_and_capture(
> +            [packet], send_port, receive_port, duration, capture_name
> +        )
> +
> +    def send_packets_and_capture(
> +        self,
> +        packets: list[Packet],
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +        capture_name: str = _get_default_capture_name(),
> +    ) -> list[Packet]:
> +        """Send packets, return received traffic.
> +
> +        Send packets on the send_port and then return all traffic captured
> +        on the receive_port for the given duration. Also record the
> captured traffic
> +        in a pcap file.
> +
> +        Args:
> +            packets: The packets to send.
> +            send_port: The egress port on the TG node.
> +            receive_port: The ingress port in the TG node.
> +            duration: Capture traffic for this amount of time after
> sending the packets.
> +            capture_name: The name of the .pcap file where to store the
> capture.
> +
> +        Returns:
> +             A list of received packets. May be empty if no packets are
> captured.
> +        """
> +        self._logger.debug(get_packet_summaries(packets))
> +        self._logger.debug(
> +            f"Sending packet on {send_port.logical_name}, "
> +            f"receiving on {receive_port.logical_name}."
> +        )
> +        received_packets = self._send_packets_and_capture(
> +            packets,
> +            send_port,
> +            receive_port,
> +            duration,
> +        )
> +
> +        self._logger.debug(
> +            f"Received packets: {get_packet_summaries(received_packets)}"
> +        )
> +        self._write_capture_from_packets(capture_name, received_packets)
> +        return received_packets
> +
> +    @abstractmethod
> +    def _send_packets_and_capture(
> +        self,
> +        packets: list[Packet],
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +    ) -> list[Packet]:
> +        """
> +        The extended classes must implement this method which
> +        sends packets on send_port and receives packets on the
> receive_port
> +        for the specified duration. It must be able to handle no received
> packets.
> +        """
> +
> +    def _write_capture_from_packets(self, capture_name: str, packets:
> list[Packet]):
> +        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
> +        self._logger.debug(f"Writing packets to {file_name}.")
> +        scapy.utils.wrpcap(file_name, packets)
> diff --git a/dts/framework/testbed_model/hw/port.py
> b/dts/framework/testbed_model/hw/port.py
> new file mode 100644
> index 0000000000..680c29bfe3
> --- /dev/null
> +++ b/dts/framework/testbed_model/hw/port.py
> @@ -0,0 +1,60 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +from dataclasses import dataclass
> +
> +from framework.config import PortConfig
> +
> +
> +@dataclass(slots=True, frozen=True)
> +class PortIdentifier:
> +    node: str
> +    pci: str
> +
> +
> +@dataclass(slots=True)
> +class Port:
> +    """
> +    identifier: The PCI address of the port on a node.
> +
> +    os_driver: The driver used by this port when the OS is controlling it.
> +        Example: i40e
> +    os_driver_for_dpdk: The driver the device must be bound to for DPDK
> to use it,
> +        Example: vfio-pci.
> +
> +    Note: os_driver and os_driver_for_dpdk may be the same thing.
> +        Example: mlx5_core
> +
> +    peer: The identifier of a port this port is connected with.
> +    """
> +
> +    identifier: PortIdentifier
> +    os_driver: str
> +    os_driver_for_dpdk: str
> +    peer: PortIdentifier
> +    mac_address: str = ""
> +    logical_name: str = ""
> +
> +    def __init__(self, node_name: str, config: PortConfig):
> +        self.identifier = PortIdentifier(
> +            node=node_name,
> +            pci=config.pci,
> +        )
> +        self.os_driver = config.os_driver
> +        self.os_driver_for_dpdk = config.os_driver_for_dpdk
> +        self.peer = PortIdentifier(node=config.peer_node,
> pci=config.peer_pci)
> +
> +    @property
> +    def node(self) -> str:
> +        return self.identifier.node
> +
> +    @property
> +    def pci(self) -> str:
> +        return self.identifier.pci
> +
> +
> +@dataclass(slots=True, frozen=True)
> +class PortLink:
> +    sut_port: Port
> +    tg_port: Port
> diff --git a/dts/framework/testbed_model/node.py
> b/dts/framework/testbed_model/node.py
> index d2d55d904e..e09931cedf 100644
> --- a/dts/framework/testbed_model/node.py
> +++ b/dts/framework/testbed_model/node.py
> @@ -25,6 +25,7 @@
>      LogicalCoreListFilter,
>      lcore_filter,
>  )
> +from .hw.port import Port
>
>
>  class Node(object):
> @@ -38,6 +39,7 @@ class Node(object):
>      config: NodeConfiguration
>      name: str
>      lcores: list[LogicalCore]
> +    ports: list[Port]
>      _logger: DTSLOG
>      _other_sessions: list[OSSession]
>      _execution_config: ExecutionConfiguration
> @@ -57,6 +59,13 @@ def __init__(self, node_config: NodeConfiguration):
>          ).filter()
>
>          self._other_sessions = []
> +        self._init_ports()
> +
> +    def _init_ports(self) -> None:
> +        self.ports = [Port(self.name, port_config) for port_config in
> self.config.ports]
> +        self.main_session.update_ports(self.ports)
> +        for port in self.ports:
> +            self.configure_port_state(port)
>
>      def set_up_execution(self, execution_config: ExecutionConfiguration)
> -> None:
>          """
> @@ -168,6 +177,12 @@ def _setup_hugepages(self):
>                  self.config.hugepages.amount,
> self.config.hugepages.force_first_numa
>              )
>
> +    def configure_port_state(self, port: Port, enable: bool = True) ->
> None:
> +        """
> +        Enable/disable port.
> +        """
> +        self.main_session.configure_port_state(port, enable)
> +
>      def close(self) -> None:
>          """
>          Close all connections and free other resources.
> diff --git a/dts/framework/testbed_model/scapy.py
> b/dts/framework/testbed_model/scapy.py
> new file mode 100644
> index 0000000000..1a23dc9fa3
> --- /dev/null
> +++ b/dts/framework/testbed_model/scapy.py
> @@ -0,0 +1,74 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""Scapy traffic generator.
> +
> +Traffic generator used for functional testing, implemented using the
> Scapy library.
> +The traffic generator uses an XML-RPC server to run Scapy on the remote
> TG node.
> +
> +The XML-RPC server runs in an interactive remote SSH session running
> Python console,
> +where we start the server. The communication with the server is
> facilitated with
> +a local server proxy.
> +"""
> +
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.config import OS, ScapyTrafficGeneratorConfig
> +from framework.logger import getLogger
> +
> +from .capturing_traffic_generator import (
> +    CapturingTrafficGenerator,
> +    _get_default_capture_name,
> +)
> +from .hw.port import Port
> +from .tg_node import TGNode
> +
> +
> +class ScapyTrafficGenerator(CapturingTrafficGenerator):
> +    """Provides access to scapy functions via an RPC interface.
> +
> +    The traffic generator first starts an XML-RPC on the remote TG node.
> +    Then it populates the server with functions which use the Scapy
> library
> +    to send/receive traffic.
> +
> +    Any packets sent to the remote server are first converted to bytes.
> +    They are received as xmlrpc.client.Binary objects on the server side.
> +    When the server sends the packets back, they are also received as
> +    xmlrpc.client.Binary object on the client side, are converted back to
> Scapy
> +    packets and only then returned from the methods.
> +
> +    Arguments:
> +        tg_node: The node where the traffic generator resides.
> +        config: The user configuration of the traffic generator.
> +    """
> +
> +    _config: ScapyTrafficGeneratorConfig
> +    _tg_node: TGNode
> +
> +    def __init__(self, tg_node: TGNode, config:
> ScapyTrafficGeneratorConfig):
> +        self._config = config
> +        self._tg_node = tg_node
> +        self._logger = getLogger(
> +            f"{self._tg_node.name} {self._config.traffic_generator_type}"
> +        )
> +
> +        assert (
> +            self._tg_node.config.os == OS.linux
> +        ), "Linux is the only supported OS for scapy traffic generation"
> +
> +    def _send_packets(self, packets: list[Packet], port: Port) -> None:
> +        raise NotImplementedError()
> +
> +    def _send_packets_and_capture(
> +        self,
> +        packets: list[Packet],
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float,
> +        capture_name: str = _get_default_capture_name(),
> +    ) -> list[Packet]:
> +        raise NotImplementedError()
> +
> +    def close(self):
> +        pass
> diff --git a/dts/framework/testbed_model/tg_node.py
> b/dts/framework/testbed_model/tg_node.py
> new file mode 100644
> index 0000000000..27025cfa31
> --- /dev/null
> +++ b/dts/framework/testbed_model/tg_node.py
> @@ -0,0 +1,99 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2010-2014 Intel Corporation
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""Traffic generator node.
> +
> +This is the node where the traffic generator resides.
> +The distinction between a node and a traffic generator is as follows:
> +A node is a host that DTS connects to. It could be a baremetal server,
> +a VM or a container.
> +A traffic generator is software running on the node.
> +A traffic generator node is a node running a traffic generator.
> +A node can be a traffic generator node as well as system under test node.
> +"""
> +
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.config import (
> +    ScapyTrafficGeneratorConfig,
> +    TGNodeConfiguration,
> +    TrafficGeneratorType,
> +)
> +from framework.exception import ConfigurationError
> +
> +from .capturing_traffic_generator import CapturingTrafficGenerator
> +from .hw.port import Port
> +from .node import Node
> +
> +
> +class TGNode(Node):
> +    """Manage connections to a node with a traffic generator.
> +
> +    Apart from basic node management capabilities, the Traffic Generator
> node has
> +    specialized methods for handling the traffic generator running on it.
> +
> +    Arguments:
> +        node_config: The user configuration of the traffic generator node.
> +
> +    Attributes:
> +        traffic_generator: The traffic generator running on the node.
> +    """
> +
> +    traffic_generator: CapturingTrafficGenerator
> +
> +    def __init__(self, node_config: TGNodeConfiguration):
> +        super(TGNode, self).__init__(node_config)
> +        self.traffic_generator = create_traffic_generator(
> +            self, node_config.traffic_generator
> +        )
> +        self._logger.info(f"Created node: {self.name}")
> +
> +    def send_packet_and_capture(
> +        self,
> +        packet: Packet,
> +        send_port: Port,
> +        receive_port: Port,
> +        duration: float = 1,
> +    ) -> list[Packet]:
> +        """Send a packet, return received traffic.
> +
> +        Send a packet on the send_port and then return all traffic
> captured
> +        on the receive_port for the given duration. Also record the
> captured traffic
> +        in a pcap file.
> +
> +        Args:
> +            packet: The packet to send.
> +            send_port: The egress port on the TG node.
> +            receive_port: The ingress port in the TG node.
> +            duration: Capture traffic for this amount of time after
> sending the packet.
> +
> +        Returns:
> +             A list of received packets. May be empty if no packets are
> captured.
> +        """
> +        return self.traffic_generator.send_packet_and_capture(
> +            packet, send_port, receive_port, duration
> +        )
> +
> +    def close(self) -> None:
> +        """Free all resources used by the node"""
> +        self.traffic_generator.close()
> +        super(TGNode, self).close()
> +
> +
> +def create_traffic_generator(
> +    tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
> +) -> CapturingTrafficGenerator:
> +    """A factory function for creating traffic generator object from user
> config."""
> +
> +    from .scapy import ScapyTrafficGenerator
> +
> +    match traffic_generator_config.traffic_generator_type:
> +        case TrafficGeneratorType.SCAPY:
> +            return ScapyTrafficGenerator(tg_node,
> traffic_generator_config)
> +        case _:
> +            raise ConfigurationError(
> +                "Unknown traffic generator: "
> +                f"{traffic_generator_config.traffic_generator_type}"
> +            )


Would it be possible here to do something like what we did in
create_interactive_shell with a TypeVar where we can initialize it
directly? It would change from using the enum to setting the
traffic_generator_config.traffic_generator_type to a specific class in the
config (in this case, ScapyTrafficGenerator), but I think it would be
possible to change in the from_dict method where we could set this type to
the class directly instead of the enum (or maybe had the enum relate it's
values to the classes themselves).

I think this would make some things slightly more complicated (like how we
would map from conf.yaml to one of the classes and all of those needing to
be imported in config/__init__.py) but it would save developers in the
future from having to add to two different places  (the enum in
config/__init__.py and this match statement) and save this list from being
arbitrarily long. I think this is fine for this patch but maybe when we
expand the traffic generator or the scapy generator it could be worth
thinking about.

Do you think it would make sense to change it in this way or would that be
somewhat unnecessary in your eyes?


> diff --git a/dts/framework/testbed_model/traffic_generator.py
> b/dts/framework/testbed_model/traffic_generator.py
> new file mode 100644
> index 0000000000..28c35d3ce4
> --- /dev/null
> +++ b/dts/framework/testbed_model/traffic_generator.py
> @@ -0,0 +1,72 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2022 University of New Hampshire
> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
> +
> +"""The base traffic generator.
> +
> +These traffic generators can't capture received traffic,
> +only count the number of received packets.
> +"""
> +
> +from abc import ABC, abstractmethod
> +
> +from scapy.packet import Packet  # type: ignore[import]
> +
> +from framework.logger import DTSLOG
> +from framework.utils import get_packet_summaries
> +
> +from .hw.port import Port
> +
> +
> +class TrafficGenerator(ABC):
> +    """The base traffic generator.
> +
> +    Defines the few basic methods that each traffic generator must
> implement.
> +    """
> +
> +    _logger: DTSLOG
> +
> +    def send_packet(self, packet: Packet, port: Port) -> None:
> +        """Send a packet and block until it is fully sent.
> +
> +        What fully sent means is defined by the traffic generator.
> +
> +        Args:
> +            packet: The packet to send.
> +            port: The egress port on the TG node.
> +        """
> +        self.send_packets([packet], port)
> +
> +    def send_packets(self, packets: list[Packet], port: Port) -> None:
> +        """Send packets and block until they are fully sent.
> +
> +        What fully sent means is defined by the traffic generator.
> +
> +        Args:
> +            packets: The packets to send.
> +            port: The egress port on the TG node.
> +        """
> +        self._logger.info(f"Sending packet{'s' if len(packets) > 1 else
> ''}.")
> +        self._logger.debug(get_packet_summaries(packets))
> +        self._send_packets(packets, port)
> +
> +    @abstractmethod
> +    def _send_packets(self, packets: list[Packet], port: Port) -> None:
> +        """
> +        The extended classes must implement this method which
> +        sends packets on send_port. The method should block until all
> packets
> +        are fully sent.
> +        """
> +
> +    @property
> +    def is_capturing(self) -> bool:
> +        """Whether this traffic generator can capture traffic.
> +
> +        Returns:
> +            True if the traffic generator can capture traffic, False
> otherwise.
> +        """
> +        return False
> +
> +    @abstractmethod
> +    def close(self) -> None:
> +        """Free all resources used by the traffic generator."""
> diff --git a/dts/framework/utils.py b/dts/framework/utils.py
> index 60abe46edf..d27c2c5b5f 100644
> --- a/dts/framework/utils.py
> +++ b/dts/framework/utils.py
> @@ -4,6 +4,7 @@
>  # Copyright(c) 2022-2023 University of New Hampshire
>
>  import atexit
> +import json
>  import os
>  import subprocess
>  import sys
> @@ -11,6 +12,8 @@
>  from pathlib import Path
>  from subprocess import SubprocessError
>
> +from scapy.packet import Packet  # type: ignore[import]
> +
>  from .exception import ConfigurationError
>
>
> @@ -64,6 +67,16 @@ def expand_range(range_str: str) -> list[int]:
>      return expanded_range
>
>
> +def get_packet_summaries(packets: list[Packet]):
> +    if len(packets) == 1:
> +        packet_summaries = packets[0].summary()
> +    else:
> +        packet_summaries = json.dumps(
> +            list(map(lambda pkt: pkt.summary(), packets)), indent=4
> +        )
> +    return f"Packet contents: \n{packet_summaries}"
> +
> +
>  def RED(text: str) -> str:
>      return f"\u001B[31;1m{str(text)}\u001B[0m"
>
> --
> 2.34.1
>
>
  
Juraj Linkeš July 19, 2023, 1:23 p.m. UTC | #2
On Tue, Jul 18, 2023 at 9:56 PM Jeremy Spewock <jspewock@iol.unh.edu> wrote:
>
>
> Hey Juraj,
>
> Just a couple of comments below, but very minor stuff. Just a few docstring that I commented on and one question about the factory for traffic generators that I was wondering what you thought about. More below:
>

<snip>

>> diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py b/dts/framework/testbed_model/capturing_traffic_generator.py
>> new file mode 100644
>> index 0000000000..1130d87f1e
>> --- /dev/null
>> +++ b/dts/framework/testbed_model/capturing_traffic_generator.py
>> @@ -0,0 +1,135 @@
>> +# SPDX-License-Identifier: BSD-3-Clause
>> +# Copyright(c) 2022 University of New Hampshire
>> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
>> +
>> +"""Traffic generator that can capture packets.
>> +
>> +In functional testing, we need to interrogate received packets to check their validity.
>> +Here we define the interface common to all
>> +traffic generators capable of capturing traffic.
>
>
> Is there a reason for the line break here? Just to keep things consistent I think it might make sense to extend this line to be the same length as the one above.
>

The reason is the full line is above the character limit. I was using
Thomas's heuristic of splitting the lines where it makes logical
sense, but it does look kinda wonky, so I'll try to reformat it more
sensibly.

>>
>> +"""
>> +
>> +import uuid
>> +from abc import abstractmethod
>> +
>> +import scapy.utils  # type: ignore[import]
>> +from scapy.packet import Packet  # type: ignore[import]
>> +
>> +from framework.settings import SETTINGS
>> +from framework.utils import get_packet_summaries
>> +
>> +from .hw.port import Port
>> +from .traffic_generator import TrafficGenerator
>> +
>> +
>> +def _get_default_capture_name() -> str:
>> +    """
>> +    This is the function used for the default implementation of capture names.
>> +    """
>> +    return str(uuid.uuid4())
>> +
>> +
>> +class CapturingTrafficGenerator(TrafficGenerator):
>> +    """
>> +    A mixin interface which enables a packet generator to declare that it can capture
>> +    packets and return them to the user.
>
>
> This is missing the one line summary at the top of the comment. Obviously this is not a big issue, but we likely would want this to be uniform with the rest of the module which does have the summary at the top.
>

Good catch. I plan on reviewing all of the docstring when updating the
format of the docstrings so this means less work in the future. :-)

>>
>> +
>> +    The methods of capturing traffic generators obey the following workflow:
>> +        1. send packets
>> +        2. capture packets
>> +        3. write the capture to a .pcap file
>> +        4. return the received packets
>> +    """
>> +
>> +    @property
>> +    def is_capturing(self) -> bool:
>> +        return True
>> +
>> +    def send_packet_and_capture(
>> +        self,
>> +        packet: Packet,
>> +        send_port: Port,
>> +        receive_port: Port,
>> +        duration: float,
>> +        capture_name: str = _get_default_capture_name(),
>> +    ) -> list[Packet]:
>> +        """Send a packet, return received traffic.
>> +
>> +        Send a packet on the send_port and then return all traffic captured
>> +        on the receive_port for the given duration. Also record the captured traffic
>> +        in a pcap file.
>> +
>>
>> +        Args:
>> +            packet: The packet to send.
>> +            send_port: The egress port on the TG node.
>> +            receive_port: The ingress port in the TG node.
>> +            duration: Capture traffic for this amount of time after sending the packet.
>> +            capture_name: The name of the .pcap file where to store the capture.
>> +
>> +        Returns:
>> +             A list of received packets. May be empty if no packets are captured.
>> +        """
>> +        return self.send_packets_and_capture(
>> +            [packet], send_port, receive_port, duration, capture_name
>> +        )
>> +
>> +    def send_packets_and_capture(
>> +        self,
>> +        packets: list[Packet],
>> +        send_port: Port,
>> +        receive_port: Port,
>> +        duration: float,
>> +        capture_name: str = _get_default_capture_name(),
>> +    ) -> list[Packet]:
>> +        """Send packets, return received traffic.
>> +
>> +        Send packets on the send_port and then return all traffic captured
>> +        on the receive_port for the given duration. Also record the captured traffic
>> +        in a pcap file.
>> +
>> +        Args:
>> +            packets: The packets to send.
>> +            send_port: The egress port on the TG node.
>> +            receive_port: The ingress port in the TG node.
>> +            duration: Capture traffic for this amount of time after sending the packets.
>> +            capture_name: The name of the .pcap file where to store the capture.
>> +
>> +        Returns:
>> +             A list of received packets. May be empty if no packets are captured.
>> +        """
>> +        self._logger.debug(get_packet_summaries(packets))
>> +        self._logger.debug(
>> +            f"Sending packet on {send_port.logical_name}, "
>> +            f"receiving on {receive_port.logical_name}."
>> +        )
>> +        received_packets = self._send_packets_and_capture(
>> +            packets,
>> +            send_port,
>> +            receive_port,
>> +            duration,
>> +        )
>> +
>> +        self._logger.debug(
>> +            f"Received packets: {get_packet_summaries(received_packets)}"
>> +        )
>> +        self._write_capture_from_packets(capture_name, received_packets)
>> +        return received_packets
>> +
>> +    @abstractmethod
>> +    def _send_packets_and_capture(
>> +        self,
>> +        packets: list[Packet],
>> +        send_port: Port,
>> +        receive_port: Port,
>> +        duration: float,
>> +    ) -> list[Packet]:
>> +        """
>> +        The extended classes must implement this method which
>> +        sends packets on send_port and receives packets on the receive_port
>> +        for the specified duration. It must be able to handle no received packets.
>> +        """
>> +
>> +    def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
>> +        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
>> +        self._logger.debug(f"Writing packets to {file_name}.")
>> +        scapy.utils.wrpcap(file_name, packets)

<snip>

>> diff --git a/dts/framework/testbed_model/tg_node.py b/dts/framework/testbed_model/tg_node.py
>> new file mode 100644
>> index 0000000000..27025cfa31
>> --- /dev/null
>> +++ b/dts/framework/testbed_model/tg_node.py
>> @@ -0,0 +1,99 @@
>> +# SPDX-License-Identifier: BSD-3-Clause
>> +# Copyright(c) 2010-2014 Intel Corporation
>> +# Copyright(c) 2022 University of New Hampshire
>> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
>> +
>> +"""Traffic generator node.
>> +
>> +This is the node where the traffic generator resides.
>> +The distinction between a node and a traffic generator is as follows:
>> +A node is a host that DTS connects to. It could be a baremetal server,
>> +a VM or a container.
>> +A traffic generator is software running on the node.
>> +A traffic generator node is a node running a traffic generator.
>> +A node can be a traffic generator node as well as system under test node.
>> +"""
>> +
>> +from scapy.packet import Packet  # type: ignore[import]
>> +
>> +from framework.config import (
>> +    ScapyTrafficGeneratorConfig,
>> +    TGNodeConfiguration,
>> +    TrafficGeneratorType,
>> +)
>> +from framework.exception import ConfigurationError
>> +
>> +from .capturing_traffic_generator import CapturingTrafficGenerator
>> +from .hw.port import Port
>> +from .node import Node
>> +
>> +
>> +class TGNode(Node):
>> +    """Manage connections to a node with a traffic generator.
>> +
>> +    Apart from basic node management capabilities, the Traffic Generator node has
>> +    specialized methods for handling the traffic generator running on it.
>> +
>> +    Arguments:
>> +        node_config: The user configuration of the traffic generator node.
>> +
>> +    Attributes:
>> +        traffic_generator: The traffic generator running on the node.
>> +    """
>> +
>> +    traffic_generator: CapturingTrafficGenerator
>> +
>> +    def __init__(self, node_config: TGNodeConfiguration):
>> +        super(TGNode, self).__init__(node_config)
>> +        self.traffic_generator = create_traffic_generator(
>> +            self, node_config.traffic_generator
>> +        )
>> +        self._logger.info(f"Created node: {self.name}")
>> +
>> +    def send_packet_and_capture(
>> +        self,
>> +        packet: Packet,
>> +        send_port: Port,
>> +        receive_port: Port,
>> +        duration: float = 1,
>> +    ) -> list[Packet]:
>> +        """Send a packet, return received traffic.
>> +
>> +        Send a packet on the send_port and then return all traffic captured
>> +        on the receive_port for the given duration. Also record the captured traffic
>> +        in a pcap file.
>> +
>> +        Args:
>> +            packet: The packet to send.
>> +            send_port: The egress port on the TG node.
>> +            receive_port: The ingress port in the TG node.
>> +            duration: Capture traffic for this amount of time after sending the packet.
>> +
>> +        Returns:
>> +             A list of received packets. May be empty if no packets are captured.
>> +        """
>> +        return self.traffic_generator.send_packet_and_capture(
>> +            packet, send_port, receive_port, duration
>> +        )
>> +
>> +    def close(self) -> None:
>> +        """Free all resources used by the node"""
>> +        self.traffic_generator.close()
>> +        super(TGNode, self).close()
>> +
>> +
>> +def create_traffic_generator(
>> +    tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
>> +) -> CapturingTrafficGenerator:
>> +    """A factory function for creating traffic generator object from user config."""
>> +
>> +    from .scapy import ScapyTrafficGenerator
>> +
>> +    match traffic_generator_config.traffic_generator_type:
>> +        case TrafficGeneratorType.SCAPY:
>> +            return ScapyTrafficGenerator(tg_node, traffic_generator_config)
>> +        case _:
>> +            raise ConfigurationError(
>> +                "Unknown traffic generator: "
>> +                f"{traffic_generator_config.traffic_generator_type}"
>> +            )
>
>
> Would it be possible here to do something like what we did in create_interactive_shell with a TypeVar where we can initialize it directly? It would change from using the enum to setting the traffic_generator_config.traffic_generator_type to a specific class in the config (in this case, ScapyTrafficGenerator), but I think it would be possible to change in the from_dict method where we could set this type to the class directly instead of the enum (or maybe had the enum relate it's values to the classes themselves).
>
> I think this would make some things slightly more complicated (like how we would map from conf.yaml to one of the classes and all of those needing to be imported in config/__init__.py) but it would save developers in the future from having to add to two different places  (the enum in  config/__init__.py and this match statement) and save this list from being arbitrarily long. I think this is fine for this patch but maybe when we expand the traffic generator or the scapy generator it could be worth thinking about.
>
> Do you think it would make sense to change it in this way or would that be somewhat unnecessary in your eyes?
>

That would be great, but the problem with using any object from the
framework in config is cyclical imports. Almost everything imports
config, so config can't import anything from the framework, at least
on the top level.
But it looks like it can be done with imports in the from_dict method.
This is an interesting approach, I'll think about it. The best would
probably be to leave this as is and use this approach for more things
than just the traffic generator.
There's one more thing to consider and that is will the imports work
with the constraints imposed by documentation generation. I'm thinking
let's leave this as is and try this when updating the code for doc's
needs.

>>
>> diff --git a/dts/framework/testbed_model/traffic_generator.py b/dts/framework/testbed_model/traffic_generator.py
>> new file mode 100644
>> index 0000000000..28c35d3ce4
>> --- /dev/null
>> +++ b/dts/framework/testbed_model/traffic_generator.py
>> @@ -0,0 +1,72 @@
>> +# SPDX-License-Identifier: BSD-3-Clause
>> +# Copyright(c) 2022 University of New Hampshire
>> +# Copyright(c) 2023 PANTHEON.tech s.r.o.
>> +
>> +"""The base traffic generator.
>> +
>> +These traffic generators can't capture received traffic,
>> +only count the number of received packets.
>> +"""
>> +
>> +from abc import ABC, abstractmethod
>> +
>> +from scapy.packet import Packet  # type: ignore[import]
>> +
>> +from framework.logger import DTSLOG
>> +from framework.utils import get_packet_summaries
>> +
>> +from .hw.port import Port
>> +
>> +
>> +class TrafficGenerator(ABC):
>> +    """The base traffic generator.
>> +
>> +    Defines the few basic methods that each traffic generator must implement.
>> +    """
>> +
>> +    _logger: DTSLOG
>> +
>> +    def send_packet(self, packet: Packet, port: Port) -> None:
>> +        """Send a packet and block until it is fully sent.
>> +
>> +        What fully sent means is defined by the traffic generator.
>> +
>> +        Args:
>> +            packet: The packet to send.
>> +            port: The egress port on the TG node.
>> +        """
>> +        self.send_packets([packet], port)
>> +
>> +    def send_packets(self, packets: list[Packet], port: Port) -> None:
>> +        """Send packets and block until they are fully sent.
>> +
>> +        What fully sent means is defined by the traffic generator.
>> +
>> +        Args:
>> +            packets: The packets to send.
>> +            port: The egress port on the TG node.
>> +        """
>> +        self._logger.info(f"Sending packet{'s' if len(packets) > 1 else ''}.")
>> +        self._logger.debug(get_packet_summaries(packets))
>> +        self._send_packets(packets, port)
>> +
>> +    @abstractmethod
>> +    def _send_packets(self, packets: list[Packet], port: Port) -> None:
>> +        """
>> +        The extended classes must implement this method which
>> +        sends packets on send_port. The method should block until all packets
>> +        are fully sent.
>> +        """
>> +
>> +    @property
>> +    def is_capturing(self) -> bool:
>> +        """Whether this traffic generator can capture traffic.
>> +
>> +        Returns:
>> +            True if the traffic generator can capture traffic, False otherwise.
>> +        """
>> +        return False
>> +
>> +    @abstractmethod
>> +    def close(self) -> None:
>> +        """Free all resources used by the traffic generator."""
>> diff --git a/dts/framework/utils.py b/dts/framework/utils.py
>> index 60abe46edf..d27c2c5b5f 100644
>> --- a/dts/framework/utils.py
>> +++ b/dts/framework/utils.py
>> @@ -4,6 +4,7 @@
>>  # Copyright(c) 2022-2023 University of New Hampshire
>>
>>  import atexit
>> +import json
>>  import os
>>  import subprocess
>>  import sys
>> @@ -11,6 +12,8 @@
>>  from pathlib import Path
>>  from subprocess import SubprocessError
>>
>> +from scapy.packet import Packet  # type: ignore[import]
>> +
>>  from .exception import ConfigurationError
>>
>>
>> @@ -64,6 +67,16 @@ def expand_range(range_str: str) -> list[int]:
>>      return expanded_range
>>
>>
>> +def get_packet_summaries(packets: list[Packet]):
>> +    if len(packets) == 1:
>> +        packet_summaries = packets[0].summary()
>> +    else:
>> +        packet_summaries = json.dumps(
>> +            list(map(lambda pkt: pkt.summary(), packets)), indent=4
>> +        )
>> +    return f"Packet contents: \n{packet_summaries}"
>> +
>> +
>>  def RED(text: str) -> str:
>>      return f"\u001B[31;1m{str(text)}\u001B[0m"
>>
>> --
>> 2.34.1
>>
  

Patch

diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index c7b31623e4..2f97d1df6e 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -153,6 +153,37 @@  There are two areas that need to be set up on a System Under Test:
 
       sudo usermod -aG sudo <sut_user>
 
+
+Setting up Traffic Generator Node
+---------------------------------
+
+These need to be set up on a Traffic Generator Node:
+
+#. **Traffic generator dependencies**
+
+   The traffic generator running on the traffic generator node must be installed beforehand.
+   For Scapy traffic generator, only a few Python libraries need to be installed:
+
+   .. code-block:: console
+
+      sudo apt install python3-pip
+      sudo pip install --upgrade pip
+      sudo pip install scapy==2.5.0
+
+#. **Hardware dependencies**
+
+   The traffic generators, like DPDK, need a proper driver and firmware.
+   The Scapy traffic generator doesn't have strict requirements - the drivers that come
+   with most OS distributions will be satisfactory.
+
+
+#. **User with administrator privileges**
+
+   Similarly to the System Under Test, traffic generators need administrator privileges
+   to be able to use the devices.
+   Refer to the `System Under Test section <sut_admin_user>` for details.
+
+
 Running DTS
 -----------
 
diff --git a/dts/framework/dts.py b/dts/framework/dts.py
index 372bc72787..265ed7fd5b 100644
--- a/dts/framework/dts.py
+++ b/dts/framework/dts.py
@@ -15,7 +15,7 @@ 
 from .logger import DTSLOG, getLogger
 from .test_result import BuildTargetResult, DTSResult, ExecutionResult, Result
 from .test_suite import get_test_suites
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
 from .utils import check_dts_python_version
 
 dts_logger: DTSLOG = getLogger("DTSRunner")
@@ -33,29 +33,31 @@  def run_all() -> None:
     # check the python version of the server that run dts
     check_dts_python_version()
 
-    nodes: dict[str, SutNode] = {}
+    sut_nodes: dict[str, SutNode] = {}
+    tg_nodes: dict[str, TGNode] = {}
     try:
         # for all Execution sections
         for execution in CONFIGURATION.executions:
-            sut_node = None
-            if execution.system_under_test_node.name in nodes:
-                # a Node with the same name already exists
-                sut_node = nodes[execution.system_under_test_node.name]
-            else:
-                # the SUT has not been initialized yet
-                try:
+            sut_node = sut_nodes.get(execution.system_under_test_node.name)
+            tg_node = tg_nodes.get(execution.traffic_generator_node.name)
+
+            try:
+                if not sut_node:
                     sut_node = SutNode(execution.system_under_test_node)
-                    result.update_setup(Result.PASS)
-                except Exception as e:
-                    dts_logger.exception(
-                        f"Connection to node {execution.system_under_test_node} failed."
-                    )
-                    result.update_setup(Result.FAIL, e)
-                else:
-                    nodes[sut_node.name] = sut_node
-
-            if sut_node:
-                _run_execution(sut_node, execution, result)
+                    sut_nodes[sut_node.name] = sut_node
+                if not tg_node:
+                    tg_node = TGNode(execution.traffic_generator_node)
+                    tg_nodes[tg_node.name] = tg_node
+                result.update_setup(Result.PASS)
+            except Exception as e:
+                failed_node = execution.system_under_test_node.name
+                if sut_node:
+                    failed_node = execution.traffic_generator_node.name
+                dts_logger.exception(f"Creation of node {failed_node} failed.")
+                result.update_setup(Result.FAIL, e)
+
+            else:
+                _run_execution(sut_node, tg_node, execution, result)
 
     except Exception as e:
         dts_logger.exception("An unexpected error has occurred.")
@@ -64,7 +66,7 @@  def run_all() -> None:
 
     finally:
         try:
-            for node in nodes.values():
+            for node in (sut_nodes | tg_nodes).values():
                 node.close()
             result.update_teardown(Result.PASS)
         except Exception as e:
@@ -81,7 +83,10 @@  def run_all() -> None:
 
 
 def _run_execution(
-    sut_node: SutNode, execution: ExecutionConfiguration, result: DTSResult
+    sut_node: SutNode,
+    tg_node: TGNode,
+    execution: ExecutionConfiguration,
+    result: DTSResult,
 ) -> None:
     """
     Run the given execution. This involves running the execution setup as well as
@@ -101,7 +106,9 @@  def _run_execution(
 
     else:
         for build_target in execution.build_targets:
-            _run_build_target(sut_node, build_target, execution, execution_result)
+            _run_build_target(
+                sut_node, tg_node, build_target, execution, execution_result
+            )
 
     finally:
         try:
@@ -114,6 +121,7 @@  def _run_execution(
 
 def _run_build_target(
     sut_node: SutNode,
+    tg_node: TGNode,
     build_target: BuildTargetConfiguration,
     execution: ExecutionConfiguration,
     execution_result: ExecutionResult,
@@ -134,7 +142,7 @@  def _run_build_target(
         build_target_result.update_setup(Result.FAIL, e)
 
     else:
-        _run_all_suites(sut_node, execution, build_target_result)
+        _run_all_suites(sut_node, tg_node, execution, build_target_result)
 
     finally:
         try:
@@ -147,6 +155,7 @@  def _run_build_target(
 
 def _run_all_suites(
     sut_node: SutNode,
+    tg_node: TGNode,
     execution: ExecutionConfiguration,
     build_target_result: BuildTargetResult,
 ) -> None:
@@ -161,7 +170,7 @@  def _run_all_suites(
     for test_suite_config in execution.test_suites:
         try:
             _run_single_suite(
-                sut_node, execution, build_target_result, test_suite_config
+                sut_node, tg_node, execution, build_target_result, test_suite_config
             )
         except BlockingTestSuiteError as e:
             dts_logger.exception(
@@ -177,6 +186,7 @@  def _run_all_suites(
 
 def _run_single_suite(
     sut_node: SutNode,
+    tg_node: TGNode,
     execution: ExecutionConfiguration,
     build_target_result: BuildTargetResult,
     test_suite_config: TestSuiteConfig,
@@ -205,6 +215,7 @@  def _run_single_suite(
         for test_suite_class in test_suite_classes:
             test_suite = test_suite_class(
                 sut_node,
+                tg_node,
                 test_suite_config.test_cases,
                 execution.func,
                 build_target_result,
diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py
index f13f399121..284c74795d 100644
--- a/dts/framework/remote_session/linux_session.py
+++ b/dts/framework/remote_session/linux_session.py
@@ -2,13 +2,47 @@ 
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
 # Copyright(c) 2023 University of New Hampshire
 
+import json
+from typing import TypedDict
+
+from typing_extensions import NotRequired
+
 from framework.exception import RemoteCommandExecutionError
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
 from framework.utils import expand_range
 
 from .posix_session import PosixSession
 
 
+class LshwConfigurationOutput(TypedDict):
+    link: str
+
+
+class LshwOutput(TypedDict):
+    """
+    A model of the relevant information from json lshw output, e.g.:
+    {
+    ...
+    "businfo" : "pci@0000:08:00.0",
+    "logicalname" : "enp8s0",
+    "version" : "00",
+    "serial" : "52:54:00:59:e1:ac",
+    ...
+    "configuration" : {
+      ...
+      "link" : "yes",
+      ...
+    },
+    ...
+    """
+
+    businfo: str
+    logicalname: NotRequired[str]
+    serial: NotRequired[str]
+    configuration: LshwConfigurationOutput
+
+
 class LinuxSession(PosixSession):
     """
     The implementation of non-Posix compliant parts of Linux remote sessions.
@@ -102,3 +136,47 @@  def _configure_huge_pages(
         self.send_command(
             f"echo {amount} | tee {hugepage_config_path}", privileged=True
         )
+
+    def update_ports(self, ports: list[Port]) -> None:
+        self._logger.debug("Gathering port info.")
+        for port in ports:
+            assert (
+                port.node == self.name
+            ), "Attempted to gather port info on the wrong node"
+
+        port_info_list = self._get_lshw_info()
+        for port in ports:
+            for port_info in port_info_list:
+                if f"pci@{port.pci}" == port_info.get("businfo"):
+                    self._update_port_attr(
+                        port, port_info.get("logicalname"), "logical_name"
+                    )
+                    self._update_port_attr(port, port_info.get("serial"), "mac_address")
+                    port_info_list.remove(port_info)
+                    break
+            else:
+                self._logger.warning(f"No port at pci address {port.pci} found.")
+
+    def _get_lshw_info(self) -> list[LshwOutput]:
+        output = self.send_command("lshw -quiet -json -C network", verify=True)
+        return json.loads(output.stdout)
+
+    def _update_port_attr(
+        self, port: Port, attr_value: str | None, attr_name: str
+    ) -> None:
+        if attr_value:
+            setattr(port, attr_name, attr_value)
+            self._logger.debug(
+                f"Found '{attr_name}' of port {port.pci}: '{attr_value}'."
+            )
+        else:
+            self._logger.warning(
+                f"Attempted to get '{attr_name}' of port {port.pci}, "
+                f"but it doesn't exist."
+            )
+
+    def configure_port_state(self, port: Port, enable: bool) -> None:
+        state = "up" if enable else "down"
+        self.send_command(
+            f"ip link set dev {port.logical_name} {state}", privileged=True
+        )
diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index cc13b02f16..633d06eb5d 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -12,6 +12,7 @@ 
 from framework.remote_session.remote import InteractiveShell, TestPmdShell
 from framework.settings import SETTINGS
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
 from framework.utils import MesonArgs
 
 from .remote import (
@@ -255,3 +256,17 @@  def get_node_info(self) -> NodeInfo:
         """
         Collect information about the node
         """
+
+    @abstractmethod
+    def update_ports(self, ports: list[Port]) -> None:
+        """
+        Get additional information about ports:
+            Logical name (e.g. enp7s0) if applicable
+            Mac address
+        """
+
+    @abstractmethod
+    def configure_port_state(self, port: Port, enable: bool) -> None:
+        """
+        Enable/disable port.
+        """
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index de94c9332d..056460dd05 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -20,7 +20,7 @@ 
 from .logger import DTSLOG, getLogger
 from .settings import SETTINGS
 from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
 
 
 class TestSuite(object):
@@ -51,11 +51,13 @@  class TestSuite(object):
     def __init__(
         self,
         sut_node: SutNode,
+        tg_node: TGNode,
         test_cases: list[str],
         func: bool,
         build_target_result: BuildTargetResult,
     ):
         self.sut_node = sut_node
+        self.tg_node = tg_node
         self._logger = getLogger(self.__class__.__name__)
         self._test_cases_to_run = test_cases
         self._test_cases_to_run.extend(SETTINGS.test_cases)
diff --git a/dts/framework/testbed_model/__init__.py b/dts/framework/testbed_model/__init__.py
index f54a947051..5cbb859e47 100644
--- a/dts/framework/testbed_model/__init__.py
+++ b/dts/framework/testbed_model/__init__.py
@@ -20,3 +20,4 @@ 
 )
 from .node import Node
 from .sut_node import SutNode
+from .tg_node import TGNode
diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py b/dts/framework/testbed_model/capturing_traffic_generator.py
new file mode 100644
index 0000000000..1130d87f1e
--- /dev/null
+++ b/dts/framework/testbed_model/capturing_traffic_generator.py
@@ -0,0 +1,135 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator that can capture packets.
+
+In functional testing, we need to interrogate received packets to check their validity.
+Here we define the interface common to all
+traffic generators capable of capturing traffic.
+"""
+
+import uuid
+from abc import abstractmethod
+
+import scapy.utils  # type: ignore[import]
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.settings import SETTINGS
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+from .traffic_generator import TrafficGenerator
+
+
+def _get_default_capture_name() -> str:
+    """
+    This is the function used for the default implementation of capture names.
+    """
+    return str(uuid.uuid4())
+
+
+class CapturingTrafficGenerator(TrafficGenerator):
+    """
+    A mixin interface which enables a packet generator to declare that it can capture
+    packets and return them to the user.
+
+    The methods of capturing traffic generators obey the following workflow:
+        1. send packets
+        2. capture packets
+        3. write the capture to a .pcap file
+        4. return the received packets
+    """
+
+    @property
+    def is_capturing(self) -> bool:
+        return True
+
+    def send_packet_and_capture(
+        self,
+        packet: Packet,
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """Send a packet, return received traffic.
+
+        Send a packet on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packet: The packet to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packet.
+            capture_name: The name of the .pcap file where to store the capture.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        return self.send_packets_and_capture(
+            [packet], send_port, receive_port, duration, capture_name
+        )
+
+    def send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """Send packets, return received traffic.
+
+        Send packets on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packets: The packets to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packets.
+            capture_name: The name of the .pcap file where to store the capture.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        self._logger.debug(get_packet_summaries(packets))
+        self._logger.debug(
+            f"Sending packet on {send_port.logical_name}, "
+            f"receiving on {receive_port.logical_name}."
+        )
+        received_packets = self._send_packets_and_capture(
+            packets,
+            send_port,
+            receive_port,
+            duration,
+        )
+
+        self._logger.debug(
+            f"Received packets: {get_packet_summaries(received_packets)}"
+        )
+        self._write_capture_from_packets(capture_name, received_packets)
+        return received_packets
+
+    @abstractmethod
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+    ) -> list[Packet]:
+        """
+        The extended classes must implement this method which
+        sends packets on send_port and receives packets on the receive_port
+        for the specified duration. It must be able to handle no received packets.
+        """
+
+    def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
+        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
+        self._logger.debug(f"Writing packets to {file_name}.")
+        scapy.utils.wrpcap(file_name, packets)
diff --git a/dts/framework/testbed_model/hw/port.py b/dts/framework/testbed_model/hw/port.py
new file mode 100644
index 0000000000..680c29bfe3
--- /dev/null
+++ b/dts/framework/testbed_model/hw/port.py
@@ -0,0 +1,60 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+from dataclasses import dataclass
+
+from framework.config import PortConfig
+
+
+@dataclass(slots=True, frozen=True)
+class PortIdentifier:
+    node: str
+    pci: str
+
+
+@dataclass(slots=True)
+class Port:
+    """
+    identifier: The PCI address of the port on a node.
+
+    os_driver: The driver used by this port when the OS is controlling it.
+        Example: i40e
+    os_driver_for_dpdk: The driver the device must be bound to for DPDK to use it,
+        Example: vfio-pci.
+
+    Note: os_driver and os_driver_for_dpdk may be the same thing.
+        Example: mlx5_core
+
+    peer: The identifier of a port this port is connected with.
+    """
+
+    identifier: PortIdentifier
+    os_driver: str
+    os_driver_for_dpdk: str
+    peer: PortIdentifier
+    mac_address: str = ""
+    logical_name: str = ""
+
+    def __init__(self, node_name: str, config: PortConfig):
+        self.identifier = PortIdentifier(
+            node=node_name,
+            pci=config.pci,
+        )
+        self.os_driver = config.os_driver
+        self.os_driver_for_dpdk = config.os_driver_for_dpdk
+        self.peer = PortIdentifier(node=config.peer_node, pci=config.peer_pci)
+
+    @property
+    def node(self) -> str:
+        return self.identifier.node
+
+    @property
+    def pci(self) -> str:
+        return self.identifier.pci
+
+
+@dataclass(slots=True, frozen=True)
+class PortLink:
+    sut_port: Port
+    tg_port: Port
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index d2d55d904e..e09931cedf 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -25,6 +25,7 @@ 
     LogicalCoreListFilter,
     lcore_filter,
 )
+from .hw.port import Port
 
 
 class Node(object):
@@ -38,6 +39,7 @@  class Node(object):
     config: NodeConfiguration
     name: str
     lcores: list[LogicalCore]
+    ports: list[Port]
     _logger: DTSLOG
     _other_sessions: list[OSSession]
     _execution_config: ExecutionConfiguration
@@ -57,6 +59,13 @@  def __init__(self, node_config: NodeConfiguration):
         ).filter()
 
         self._other_sessions = []
+        self._init_ports()
+
+    def _init_ports(self) -> None:
+        self.ports = [Port(self.name, port_config) for port_config in self.config.ports]
+        self.main_session.update_ports(self.ports)
+        for port in self.ports:
+            self.configure_port_state(port)
 
     def set_up_execution(self, execution_config: ExecutionConfiguration) -> None:
         """
@@ -168,6 +177,12 @@  def _setup_hugepages(self):
                 self.config.hugepages.amount, self.config.hugepages.force_first_numa
             )
 
+    def configure_port_state(self, port: Port, enable: bool = True) -> None:
+        """
+        Enable/disable port.
+        """
+        self.main_session.configure_port_state(port, enable)
+
     def close(self) -> None:
         """
         Close all connections and free other resources.
diff --git a/dts/framework/testbed_model/scapy.py b/dts/framework/testbed_model/scapy.py
new file mode 100644
index 0000000000..1a23dc9fa3
--- /dev/null
+++ b/dts/framework/testbed_model/scapy.py
@@ -0,0 +1,74 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Scapy traffic generator.
+
+Traffic generator used for functional testing, implemented using the Scapy library.
+The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
+
+The XML-RPC server runs in an interactive remote SSH session running Python console,
+where we start the server. The communication with the server is facilitated with
+a local server proxy.
+"""
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.config import OS, ScapyTrafficGeneratorConfig
+from framework.logger import getLogger
+
+from .capturing_traffic_generator import (
+    CapturingTrafficGenerator,
+    _get_default_capture_name,
+)
+from .hw.port import Port
+from .tg_node import TGNode
+
+
+class ScapyTrafficGenerator(CapturingTrafficGenerator):
+    """Provides access to scapy functions via an RPC interface.
+
+    The traffic generator first starts an XML-RPC on the remote TG node.
+    Then it populates the server with functions which use the Scapy library
+    to send/receive traffic.
+
+    Any packets sent to the remote server are first converted to bytes.
+    They are received as xmlrpc.client.Binary objects on the server side.
+    When the server sends the packets back, they are also received as
+    xmlrpc.client.Binary object on the client side, are converted back to Scapy
+    packets and only then returned from the methods.
+
+    Arguments:
+        tg_node: The node where the traffic generator resides.
+        config: The user configuration of the traffic generator.
+    """
+
+    _config: ScapyTrafficGeneratorConfig
+    _tg_node: TGNode
+
+    def __init__(self, tg_node: TGNode, config: ScapyTrafficGeneratorConfig):
+        self._config = config
+        self._tg_node = tg_node
+        self._logger = getLogger(
+            f"{self._tg_node.name} {self._config.traffic_generator_type}"
+        )
+
+        assert (
+            self._tg_node.config.os == OS.linux
+        ), "Linux is the only supported OS for scapy traffic generation"
+
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        raise NotImplementedError()
+
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        raise NotImplementedError()
+
+    def close(self):
+        pass
diff --git a/dts/framework/testbed_model/tg_node.py b/dts/framework/testbed_model/tg_node.py
new file mode 100644
index 0000000000..27025cfa31
--- /dev/null
+++ b/dts/framework/testbed_model/tg_node.py
@@ -0,0 +1,99 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator node.
+
+This is the node where the traffic generator resides.
+The distinction between a node and a traffic generator is as follows:
+A node is a host that DTS connects to. It could be a baremetal server,
+a VM or a container.
+A traffic generator is software running on the node.
+A traffic generator node is a node running a traffic generator.
+A node can be a traffic generator node as well as system under test node.
+"""
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.config import (
+    ScapyTrafficGeneratorConfig,
+    TGNodeConfiguration,
+    TrafficGeneratorType,
+)
+from framework.exception import ConfigurationError
+
+from .capturing_traffic_generator import CapturingTrafficGenerator
+from .hw.port import Port
+from .node import Node
+
+
+class TGNode(Node):
+    """Manage connections to a node with a traffic generator.
+
+    Apart from basic node management capabilities, the Traffic Generator node has
+    specialized methods for handling the traffic generator running on it.
+
+    Arguments:
+        node_config: The user configuration of the traffic generator node.
+
+    Attributes:
+        traffic_generator: The traffic generator running on the node.
+    """
+
+    traffic_generator: CapturingTrafficGenerator
+
+    def __init__(self, node_config: TGNodeConfiguration):
+        super(TGNode, self).__init__(node_config)
+        self.traffic_generator = create_traffic_generator(
+            self, node_config.traffic_generator
+        )
+        self._logger.info(f"Created node: {self.name}")
+
+    def send_packet_and_capture(
+        self,
+        packet: Packet,
+        send_port: Port,
+        receive_port: Port,
+        duration: float = 1,
+    ) -> list[Packet]:
+        """Send a packet, return received traffic.
+
+        Send a packet on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packet: The packet to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packet.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        return self.traffic_generator.send_packet_and_capture(
+            packet, send_port, receive_port, duration
+        )
+
+    def close(self) -> None:
+        """Free all resources used by the node"""
+        self.traffic_generator.close()
+        super(TGNode, self).close()
+
+
+def create_traffic_generator(
+    tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
+) -> CapturingTrafficGenerator:
+    """A factory function for creating traffic generator object from user config."""
+
+    from .scapy import ScapyTrafficGenerator
+
+    match traffic_generator_config.traffic_generator_type:
+        case TrafficGeneratorType.SCAPY:
+            return ScapyTrafficGenerator(tg_node, traffic_generator_config)
+        case _:
+            raise ConfigurationError(
+                "Unknown traffic generator: "
+                f"{traffic_generator_config.traffic_generator_type}"
+            )
diff --git a/dts/framework/testbed_model/traffic_generator.py b/dts/framework/testbed_model/traffic_generator.py
new file mode 100644
index 0000000000..28c35d3ce4
--- /dev/null
+++ b/dts/framework/testbed_model/traffic_generator.py
@@ -0,0 +1,72 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""The base traffic generator.
+
+These traffic generators can't capture received traffic,
+only count the number of received packets.
+"""
+
+from abc import ABC, abstractmethod
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.logger import DTSLOG
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+
+
+class TrafficGenerator(ABC):
+    """The base traffic generator.
+
+    Defines the few basic methods that each traffic generator must implement.
+    """
+
+    _logger: DTSLOG
+
+    def send_packet(self, packet: Packet, port: Port) -> None:
+        """Send a packet and block until it is fully sent.
+
+        What fully sent means is defined by the traffic generator.
+
+        Args:
+            packet: The packet to send.
+            port: The egress port on the TG node.
+        """
+        self.send_packets([packet], port)
+
+    def send_packets(self, packets: list[Packet], port: Port) -> None:
+        """Send packets and block until they are fully sent.
+
+        What fully sent means is defined by the traffic generator.
+
+        Args:
+            packets: The packets to send.
+            port: The egress port on the TG node.
+        """
+        self._logger.info(f"Sending packet{'s' if len(packets) > 1 else ''}.")
+        self._logger.debug(get_packet_summaries(packets))
+        self._send_packets(packets, port)
+
+    @abstractmethod
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        """
+        The extended classes must implement this method which
+        sends packets on send_port. The method should block until all packets
+        are fully sent.
+        """
+
+    @property
+    def is_capturing(self) -> bool:
+        """Whether this traffic generator can capture traffic.
+
+        Returns:
+            True if the traffic generator can capture traffic, False otherwise.
+        """
+        return False
+
+    @abstractmethod
+    def close(self) -> None:
+        """Free all resources used by the traffic generator."""
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 60abe46edf..d27c2c5b5f 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -4,6 +4,7 @@ 
 # Copyright(c) 2022-2023 University of New Hampshire
 
 import atexit
+import json
 import os
 import subprocess
 import sys
@@ -11,6 +12,8 @@ 
 from pathlib import Path
 from subprocess import SubprocessError
 
+from scapy.packet import Packet  # type: ignore[import]
+
 from .exception import ConfigurationError
 
 
@@ -64,6 +67,16 @@  def expand_range(range_str: str) -> list[int]:
     return expanded_range
 
 
+def get_packet_summaries(packets: list[Packet]):
+    if len(packets) == 1:
+        packet_summaries = packets[0].summary()
+    else:
+        packet_summaries = json.dumps(
+            list(map(lambda pkt: pkt.summary(), packets)), indent=4
+        )
+    return f"Packet contents: \n{packet_summaries}"
+
+
 def RED(text: str) -> str:
     return f"\u001B[31;1m{str(text)}\u001B[0m"