@@ -27,6 +27,7 @@
from .settings import SETTINGS
from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
from .testbed_model import SutNode, TGNode
+from .testbed_model.capturing_traffic_generator import PacketFilteringConfig
from .testbed_model.hw.port import Port, PortLink
from .utils import get_packet_summaries
@@ -149,7 +150,12 @@ def configure_testbed_ipv4(self, restore: bool = False) -> None:
def _configure_ipv4_forwarding(self, enable: bool) -> None:
self.sut_node.configure_ipv4_forwarding(enable)
- def send_packet_and_capture(self, packet: Packet, duration: float = 1) -> list[Packet]:
+ def send_packet_and_capture(
+ self,
+ packet: Packet,
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
+ duration: float = 1,
+ ) -> list[Packet]:
"""
Send a packet through the appropriate interface and
receive on the appropriate interface.
@@ -158,7 +164,11 @@ def send_packet_and_capture(self, packet: Packet, duration: float = 1) -> list[P
"""
packet = self._adjust_addresses(packet)
return self.tg_node.send_packet_and_capture(
- packet, self._tg_port_egress, self._tg_port_ingress, duration
+ packet,
+ self._tg_port_egress,
+ self._tg_port_ingress,
+ filter_config,
+ duration,
)
def get_expected_packet(self, packet: Packet) -> Packet:
@@ -11,6 +11,7 @@
import uuid
from abc import abstractmethod
+from dataclasses import dataclass
import scapy.utils # type: ignore[import]
from scapy.packet import Packet # type: ignore[import]
@@ -29,6 +30,19 @@ def _get_default_capture_name() -> str:
return str(uuid.uuid4())
+@dataclass(slots=True)
+class PacketFilteringConfig:
+ """The supported filtering options for :class:`CapturingTrafficGenerator`.
+
+ Attributes:
+ no_lldp: If :data:`True`, LLDP packets will be filtered out when capturing.
+ no_arp: If :data:`True`, ARP packets will be filtered out when capturing.
+ """
+
+ no_lldp: bool = True
+ no_arp: bool = True
+
+
class CapturingTrafficGenerator(TrafficGenerator):
"""Capture packets after sending traffic.
@@ -51,6 +65,7 @@ def send_packet_and_capture(
packet: Packet,
send_port: Port,
receive_port: Port,
+ filter_config: PacketFilteringConfig,
duration: float,
capture_name: str = _get_default_capture_name(),
) -> list[Packet]:
@@ -64,6 +79,7 @@ def send_packet_and_capture(
packet: The packet to send.
send_port: The egress port on the TG node.
receive_port: The ingress port in the TG node.
+ filter_config: Filters to apply when capturing packets.
duration: Capture traffic for this amount of time after sending the packet.
capture_name: The name of the .pcap file where to store the capture.
@@ -71,7 +87,7 @@ def send_packet_and_capture(
A list of received packets. May be empty if no packets are captured.
"""
return self.send_packets_and_capture(
- [packet], send_port, receive_port, duration, capture_name
+ [packet], send_port, receive_port, filter_config, duration, capture_name
)
def send_packets_and_capture(
@@ -79,6 +95,7 @@ def send_packets_and_capture(
packets: list[Packet],
send_port: Port,
receive_port: Port,
+ filter_config: PacketFilteringConfig,
duration: float,
capture_name: str = _get_default_capture_name(),
) -> list[Packet]:
@@ -92,6 +109,7 @@ def send_packets_and_capture(
packets: The packets to send.
send_port: The egress port on the TG node.
receive_port: The ingress port in the TG node.
+ filter_config: Filters to apply when capturing packets.
duration: Capture traffic for this amount of time after sending the packets.
capture_name: The name of the .pcap file where to store the capture.
@@ -106,6 +124,7 @@ def send_packets_and_capture(
packets,
send_port,
receive_port,
+ filter_config,
duration,
)
@@ -119,6 +138,7 @@ def _send_packets_and_capture(
packets: list[Packet],
send_port: Port,
receive_port: Port,
+ filter_config: PacketFilteringConfig,
duration: float,
) -> list[Packet]:
"""
@@ -30,6 +30,7 @@
from .capturing_traffic_generator import (
CapturingTrafficGenerator,
+ PacketFilteringConfig,
_get_default_capture_name,
)
from .hw.port import Port
@@ -69,6 +70,7 @@ def scapy_send_packets_and_capture(
send_iface: str,
recv_iface: str,
duration: float,
+ sniff_filter: str,
) -> list[bytes]:
"""RPC function to send and capture packets.
@@ -90,6 +92,7 @@ def scapy_send_packets_and_capture(
iface=recv_iface,
store=True,
started_callback=lambda *args: scapy.all.sendp(scapy_packets, iface=send_iface),
+ filter=sniff_filter,
)
sniffer.start()
time.sleep(duration)
@@ -249,16 +252,38 @@ def _send_packets(self, packets: list[Packet], port: Port) -> None:
packets = [packet.build() for packet in packets]
self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name)
+ def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
+ """Combines filter settings from `filter_config` into a BPF that scapy can use.
+
+ Scapy allows for the use of Berkeley Packet Filters (BPFs) to filter what packets are
+ collected based on various attributes of the packet.
+
+ Args:
+ filter_config: Config class that specifies which filters should be applied.
+
+ Returns:
+ A string representing the combination of BPF filters to be passed to scapy. For
+ example:
+
+ "ether[12:2] != 0x88cc && ether[12:2] != 0x0806"
+ """
+ bpf_filter: list[str] = []
+ if filter_config.no_arp:
+ bpf_filter.append("ether[12:2] != 0x0806")
+ if filter_config.no_lldp:
+ bpf_filter.append("ether[12:2] != 0x88cc")
+ return " && ".join(bpf_filter)
+
def _send_packets_and_capture(
self,
packets: list[Packet],
send_port: Port,
receive_port: Port,
+ filter_config: PacketFilteringConfig,
duration: float,
capture_name: str = _get_default_capture_name(),
) -> list[Packet]:
binary_packets = [packet.build() for packet in packets]
-
xmlrpc_packets: list[
xmlrpc.client.Binary
] = self.rpc_server_proxy.scapy_send_packets_and_capture(
@@ -266,6 +291,7 @@ def _send_packets_and_capture(
send_port.logical_name,
receive_port.logical_name,
duration,
+ self._create_packet_filter(filter_config),
) # type: ignore[assignment]
scapy_packets = [Ether(packet.data) for packet in xmlrpc_packets]
@@ -23,7 +23,10 @@
)
from framework.exception import ConfigurationError
-from .capturing_traffic_generator import CapturingTrafficGenerator
+from .capturing_traffic_generator import (
+ CapturingTrafficGenerator,
+ PacketFilteringConfig,
+)
from .hw.port import Port
from .node import Node
@@ -53,6 +56,7 @@ def send_packet_and_capture(
packet: Packet,
send_port: Port,
receive_port: Port,
+ filter_config: PacketFilteringConfig = PacketFilteringConfig(),
duration: float = 1,
) -> list[Packet]:
"""Send a packet, return received traffic.
@@ -71,7 +75,11 @@ def send_packet_and_capture(
A list of received packets. May be empty if no packets are captured.
"""
return self.traffic_generator.send_packet_and_capture(
- packet, send_port, receive_port, duration
+ packet,
+ send_port,
+ receive_port,
+ filter_config,
+ duration,
)
def close(self) -> None: