[3/5] dts: add random packet generator

Message ID 20240806121417.2567708-4-Luca.Vizzarro@arm.com (mailing list archive)
State Superseded, archived
Delegated to: Juraj Linkeš
Headers
Series dts: add pktgen and testpmd changes |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Luca Vizzarro Aug. 6, 2024, 12:14 p.m. UTC
From: Luca Vizzarro <luca.vizzarro@arm.com>

Add a basic utility that can create random L3 and L4 packets with random
payloads and port numbers (if L4).

Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
Reviewed-by: Paul Szczpanek <paul.szczepanek@arm.com>
Reviewed-by: Alex Chapman <alex.chapman@arm.com>
---
 dts/framework/utils.py | 79 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 77 insertions(+), 2 deletions(-)
  

Patch

diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 6b5d5a805f..c768dd0c99 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -17,14 +17,16 @@ 
 import atexit
 import json
 import os
+import random
 import subprocess
-from enum import Enum
+from enum import Enum, Flag
 from pathlib import Path
 from subprocess import SubprocessError
 
+from scapy.layers.inet import IP, TCP, UDP, Ether  # type: ignore[import-untyped]
 from scapy.packet import Packet  # type: ignore[import-untyped]
 
-from .exception import ConfigurationError
+from .exception import ConfigurationError, InternalError
 
 REGEX_FOR_PCI_ADDRESS: str = "/[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}/"
 
@@ -244,3 +246,76 @@  def _delete_tarball(self) -> None:
     def __fspath__(self) -> str:
         """The os.PathLike protocol implementation."""
         return str(self._tarball_path)
+
+
+class PacketProtocols(Flag):
+    """Flag specifying which protocols to use for packet generation."""
+
+    #:
+    IP = 1
+    #:
+    TCP = 2 | IP
+    #:
+    UDP = 4 | IP
+    #:
+    ALL = TCP | UDP
+
+
+def generate_random_packets(
+    number_of: int,
+    payload_size: int = 1500,
+    protocols: PacketProtocols = PacketProtocols.ALL,
+    ports_range: range = range(1024, 49152),
+    mtu: int = 1500,
+) -> list[Packet]:
+    """Generate a number of random packets.
+
+    The payload of the packets will consist of random bytes. If `payload_size` is too big, then the
+    maximum payload size allowed for the specific packet type is used. The size is calculated based
+    on the specified `mtu`, therefore it is essential that `mtu` is set correctly to match the MTU
+    of the port that will send out the generated packets.
+
+    If `protocols` has any L4 protocol enabled then all the packets are generated with any of
+    the specified L4 protocols chosen at random. If only :attr:`~PacketProtocols.IP` is set, then
+    only L3 packets are generated.
+
+    If L4 packets will be generated, then the TCP/UDP ports to be used will be chosen at random from
+    `ports_range`.
+
+    Args:
+        number_of: The number of packets to generate.
+        payload_size: The packet payload size to generate, capped based on `mtu`.
+        protocols: The protocols to use for the generated packets.
+        ports_range: The range of L4 port numbers to use. Used only if `protocols` has L4 protocols.
+        mtu: The MTU of the NIC port that will send out the generated packets.
+
+    Raises:
+        InternalError: If the `payload_size` is invalid.
+
+    Returns:
+        A list containing the randomly generated packets.
+    """
+    if payload_size < 0:
+        raise InternalError(f"An invalid payload_size of {payload_size} was given.")
+
+    l4_factories = []
+    if protocols & PacketProtocols.TCP:
+        l4_factories.append(TCP)
+    if protocols & PacketProtocols.UDP:
+        l4_factories.append(UDP)
+
+    def _make_packet() -> Packet:
+        packet = Ether()
+
+        if protocols & PacketProtocols.IP:
+            packet /= IP()
+
+        if len(l4_factories) > 0:
+            src_port, dst_port = random.choices(ports_range, k=2)
+            packet /= random.choice(l4_factories)(sport=src_port, dport=dst_port)
+
+        max_payload_size = mtu - len(packet)
+        usable_payload_size = payload_size if payload_size < max_payload_size else max_payload_size
+        return packet / random.randbytes(usable_payload_size)
+
+    return [_make_packet() for _ in range(number_of)]