@@ -13,12 +13,13 @@
* Test case verification.
"""
+from collections import Counter
from ipaddress import IPv4Interface, IPv6Interface, ip_interface
from typing import ClassVar, Union
from scapy.layers.inet import IP # type: ignore[import-untyped]
from scapy.layers.l2 import Ether # type: ignore[import-untyped]
-from scapy.packet import Packet, Padding # type: ignore[import-untyped]
+from scapy.packet import Packet, Padding, raw # type: ignore[import-untyped]
from framework.testbed_model.port import Port, PortLink
from framework.testbed_model.sut_node import SutNode
@@ -199,9 +200,34 @@ def send_packet_and_capture(
Returns:
A list of received packets.
"""
- packet = self._adjust_addresses(packet)
- return self.tg_node.send_packet_and_capture(
- packet,
+ return self.send_packets_and_capture(
+ [packet],
+ filter_config,
+ duration,
+ )
+
+ def send_packets_and_capture(
+ self,
+ packets: list[Packet],
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+ duration: float = 1,
+ ) -> list[Packet]:
+ """Send and receive `packets` using the associated TG.
+
+ Send `packets` through the appropriate interface and receive on the appropriate interface.
+ Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
+
+ Args:
+ packets: The packets to send.
+ filter_config: The filter to use when capturing packets.
+ duration: Capture traffic for this amount of time after sending `packet`.
+
+ Returns:
+ A list of received packets.
+ """
+ packets = [self._adjust_addresses(packet) for packet in packets]
+ return self.tg_node.send_packets_and_capture(
+ packets,
self._tg_port_egress,
self._tg_port_ingress,
filter_config,
@@ -303,6 +329,40 @@ def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]
)
self._fail_test_case_verify("An expected packet not found among received packets.")
+ def match_all_packets(
+ self, expected_packets: list[Packet], received_packets: list[Packet]
+ ) -> None:
+ """Matches all the expected packets against the received ones.
+
+ Matching is performed by counting down the occurrences in a dictionary which keys are the
+ raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
+ are automatically ignored.
+
+ Args:
+ expected_packets: The packets we are expecting to receive.
+ received_packets: All the packets that were received.
+
+ Raises:
+ TestCaseVerifyError: if and not all the `expected_packets` were found in
+ `received_packets`.
+ """
+ expected_packets_counters = Counter(map(raw, expected_packets))
+ received_packets_counters = Counter(map(raw, received_packets))
+ # The number of expected packets is subtracted by the number of received packets, ignoring
+ # any unexpected packets and capping at zero.
+ missing_packets_counters = expected_packets_counters - received_packets_counters
+ missing_packets_count = missing_packets_counters.total()
+ self._logger.debug(
+ f"match_all_packets: expected {len(expected_packets)}, "
+ f"received {len(received_packets)}, missing {missing_packets_count}"
+ )
+
+ if missing_packets_count != 0:
+ self._fail_test_case_verify(
+ f"Not all packets were received, expected {len(expected_packets)} "
+ f"but {missing_packets_count} were missing."
+ )
+
def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
self._logger.debug(
f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
@@ -51,22 +51,22 @@ def __init__(self, node_config: TGNodeConfiguration):
self.traffic_generator = create_traffic_generator(self, node_config.traffic_generator)
self._logger.info(f"Created node: {self.name}")
- def send_packet_and_capture(
+ def send_packets_and_capture(
self,
- packet: Packet,
+ packets: list[Packet],
send_port: Port,
receive_port: Port,
filter_config: PacketFilteringConfig = PacketFilteringConfig(),
duration: float = 1,
) -> list[Packet]:
- """Send `packet`, return received traffic.
+ """Send `packets`, return received traffic.
- Send `packet` on `send_port` and then return all traffic captured
+ Send `packets` on `send_port` and then return all traffic captured
on `receive_port` for the given duration. Also record the captured traffic
in a pcap file.
Args:
- packet: The packet to send.
+ packets: The packets to send.
send_port: The egress port on the TG node.
receive_port: The ingress port in the TG node.
filter_config: The filter to use when capturing packets.
@@ -75,8 +75,8 @@ def send_packet_and_capture(
Returns:
A list of received packets. May be empty if no packets are captured.
"""
- return self.traffic_generator.send_packet_and_capture(
- packet,
+ return self.traffic_generator.send_packets_and_capture(
+ packets,
send_port,
receive_port,
filter_config,
@@ -63,37 +63,6 @@ def is_capturing(self) -> bool:
"""This traffic generator can capture traffic."""
return True
- def send_packet_and_capture(
- self,
- packet: Packet,
- send_port: Port,
- receive_port: Port,
- filter_config: PacketFilteringConfig,
- duration: float,
- capture_name: str = _get_default_capture_name(),
- ) -> list[Packet]:
- """Send `packet` and capture received traffic.
-
- Send `packet` on `send_port` and then return all traffic captured
- on `receive_port` for the given `duration`.
-
- The captured traffic is recorded in the `capture_name`.pcap file.
-
- Args:
- packet: The packet to send.
- send_port: The egress port on the TG node.
- receive_port: The ingress port in the TG node.
- filter_config: Filters to apply when capturing packets.
- duration: Capture traffic for this amount of time after sending the packet.
- capture_name: The name of the .pcap file where to store the capture.
-
- Returns:
- The received packets. May be empty if no packets are captured.
- """
- return self.send_packets_and_capture(
- [packet], send_port, receive_port, filter_config, duration, capture_name
- )
-
def send_packets_and_capture(
self,
packets: list[Packet],