@@ -27,9 +27,9 @@ from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting
from scapy.layers.l2 import ARP, GRE, Dot1Q, Ether
from scapy.layers.sctp import SCTP
from scapy.layers.vxlan import VXLAN
-from scapy.packet import Raw
+from scapy.packet import Padding, Raw
from scapy.sendrecv import sendp
-from scapy.utils import hexstr, rdpcap, wrpcap
+from scapy.utils import hexstr, randstring, rdpcap, wrpcap
from .utils import convert_int2ip, convert_ip2int, get_module_path
@@ -127,6 +127,152 @@ def write_raw_pkt(pkt_str, file_name):
w.close()
+def segment_bytes(payload: bytes, seg_len: int) -> list:
+ """
+ Segment a bytes according to the seg_len.
+ payload: bytes, a sequence to be segmented.
+ seg_len: int, the length of segmentation.
+ return: list, a list of segmented bytes.
+ """
+ segments = []
+ if seg_len > 0:
+ while payload:
+ _s = payload[:seg_len]
+ payload = payload[seg_len:]
+ segments.append(_s)
+ else:
+ segments = [payload]
+ return segments
+
+
+def get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel: bool = True
+) -> tuple:
+ """
+ Get the layer index of outer-ip, outer-l4, tunnel, inner-ip, inner-l4 in a packet.
+ The index counting follows the DPDK rule, that if a non-tunnel packet is counted,
+ the ip / l4 layer are considered the inner ip / l4 layers.
+ If a layer is not found, it will return None for that index.
+ This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3
+ or higher network, which means a tunnel packet is laid between an outer IP layer and an
+ inner IP layer.
+ pkt: Packet, the Scapy Packet object to be counted.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ return: tuple, indexes of outer IP, outer L4, tunnel, inner IP, inner L4.
+ """
+ _LAYERS_L3 = {IP, IPv6}
+ _LAYERS_L4 = {UDP, TCP, SCTP}
+ _layers = pkt.layers()
+ _index_tunnel = get_packet_layer_index_tunnel(pkt)
+ _index_oip = None
+ _index_ol4 = None
+ _index_ip = None
+ _index_l4 = None
+ if _index_tunnel is None:
+ for _i, _l in enumerate(_layers):
+ if _l in _LAYERS_L3:
+ _index_ip = _i
+ if _l in _LAYERS_L4:
+ _index_l4 = _i
+ else:
+ for _i in range(0, _index_tunnel + 1):
+ _l = _layers[_i]
+ if _l in _LAYERS_L3:
+ _index_oip = _i
+ if _l in _LAYERS_L4:
+ _index_ol4 = _i
+ for _i in range(_index_tunnel + 1, len(_layers)):
+ _l = _layers[_i]
+ if _l in _LAYERS_L3:
+ _index_ip = _i
+ if _l in _LAYERS_L4:
+ _index_l4 = _i
+ if not support_rx_tunnel and not _index_tunnel is None:
+ (_index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4) = (
+ None,
+ None,
+ None,
+ _index_oip,
+ _index_ol4,
+ )
+ return _index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4
+
+
+def get_packet_layer_index_to_tso_seg(
+ pkt,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> int:
+ """
+ Get the layer index of TSO in a packet.
+ In DPDK TSO is considered Transmittion Segmentation Offload. That both TCP
+ and UDP are considered to be segemeted.
+ pkt: Packt, Scapy Packet object to be segmeted.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+ support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+ support_tso: bool, flag indicating if TCP Segmentation is enabled.
+ support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+ return: int, the index of the layer to be segmented in the packet.
+ """
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ if support_seg_tunnel and not _index_tunnel is None:
+ if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):
+ return _index_l4
+ if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):
+ return _index_l4
+ if support_seg_non_tunnel and _index_tunnel is None:
+ if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):
+ return _index_l4
+ if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):
+ return _index_l4
+ return None
+
+
+def get_packet_layer_index_tunnel(pkt):
+ """
+ Get the layer index of tunnel layer in a packet.
+ If a layer is not found, it will return None for that index.
+ This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3
+ or higher network, which means a tunnel packet is laid between an outer IP layer and an
+ inner IP layer.
+ pkt: Packet, Scapy Packet object to be indexed.
+ return: int | None, the index of the tunnel layer in the packet.
+ """
+ # packet types
+ _LAYERS_TUNNEL = {VXLAN, GRE, GTP_U_Header, GENEVE}
+
+ _layers = pkt.layers()
+ for _i, _l in enumerate(_layers):
+ _layers_outer = _layers[: _i + 1]
+ _layers_inner = _layers[_i + 1 :]
+ if (
+ (_l in _LAYERS_TUNNEL)
+ and (IP in _layers_outer or IPv6 in _layers_outer)
+ and (IP in _layers_inner or IPv6 in _layers_inner)
+ ):
+ return _i
+ for _i, _l in enumerate(_layers):
+ if (_l is IP or _l is IPv6) and (
+ _layers[_i + 1] is IP or _layers[_i + 1] is IPv6
+ ):
+ return _i
+ return None
+
+
class scapy(object):
SCAPY_LAYERS = {
"ether": Ether(dst="ff:ff:ff:ff:ff:ff"),
@@ -1188,6 +1334,12 @@ def compare_pktload(pkt1=None, pkt2=None, layer="L2"):
def strip_pktload(pkt=None, layer="L2", p_index=0):
+ """
+ Strip the payload of a specific layer indicated by `layer` in a pakcet.
+ pkt: Packet, DTS Packet object to be stripped.
+ layer: str, the layer to be stripped.
+ return: str, the stripped layer formatted in hex string.
+ """
if layer == "L2":
l_idx = 0
elif layer == "L3":
@@ -1205,6 +1357,706 @@ def strip_pktload(pkt=None, layer="L2", p_index=0):
return load
+def get_packet_payload_of(pkt, layer: int) -> bytes:
+ """
+ Get the payload of a layer in a packet.
+ pkt: Packet, Scapy Packet object to get the payload of.
+ layer: int, the layer index to get the payload in the packet.
+ return: bytes, the payload of a layer.
+ """
+ if layer is None:
+ return None
+ pkt_payload = pkt.copy()
+ if Padding in pkt_payload:
+ _index_padding = pkt_payload.layers().index(Padding)
+ pkt_payload[_index_padding - 1].remove_payload()
+ if layer >= _index_padding:
+ return None
+ return bytes(pkt_payload[layer].payload)
+
+
+def get_packet_payload(
+ pkt,
+ support_rx_tunnel: bool = True,
+) -> bytes:
+ """
+ Get the payload of a packet.
+ pkt: Packet, Scapy Packet object to get the payload of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ return: bytes, the payload of a packet. If the packet has an L4, the payload is the load of
+ L4, or it will be the load of L3 if L3 exists, else it returns None.
+ """
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ if not _index_l4 is None:
+ return get_packet_payload_of(pkt, _index_l4)
+ elif not _index_ip is None:
+ return get_packet_payload_of(pkt, _index_ip)
+ else:
+ return None
+
+
+def get_packet_payload_no_tso_seg(
+ pkt,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> bytes:
+ """
+ Get the payload of the layer to be segmented of a packet, not segmented.
+ pkt: Packet, Scapy Packet object to get the payload of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+ support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+ support_tso: bool, flag indicating if TCP Segmentation is enabled.
+ support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+ return: bytes, the payload of the layer to be segmented of a packet. If no layer is to be
+ segmented, it returns None.
+ """
+ _index_seg = get_packet_layer_index_to_tso_seg(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ if not _index_seg is None:
+ return get_packet_payload_of(pkt, _index_seg)
+ else:
+ return None
+
+
+def get_packet_payload_pre_tso_seg(
+ pkt,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> bytes:
+ """
+ Get the payload of a packet before considering it is segmented.
+ pkt: Packet, Scapy Packet object to get the payload of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+ support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+ support_tso: bool, flag indicating if TCP Segmentation is enabled.
+ support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+ return: bytes, the payload of the packet. If it can be segmented then returns the not
+ segmented payload, else it returns the payload despite the segmentation flags.
+ """
+ _payload_no_seg = get_packet_payload_no_tso_seg(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ _payload = get_packet_payload(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ )
+ if not _payload_no_seg is None:
+ return _payload_no_seg
+ else:
+ return _payload
+
+
+def get_packet_payload_post_tso_seg(
+ pkt,
+ seg_len: int = 800,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> list:
+ """
+ Get the list of payloads of a packet after it is segmented.
+ pkt: Packet, Scapy Packet object to get the payload of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+ support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+ support_tso: bool, flag indicating if TCP Segmentation is enabled.
+ support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+ return: list, sequence of the payloads after applying TSO to a packet. If a packet is not
+ to segment, it returns a list of only the payload. If the packet is not a proper packet, it
+ returns an empty list.
+ """
+ _payload_no_seg = get_packet_payload_no_tso_seg(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ _payload = get_packet_payload(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ )
+ if not _payload_no_seg is None:
+ _segments = segment_bytes(_payload_no_seg, seg_len)
+ elif not _payload is None:
+ _segments = segment_bytes(_payload, -1)
+ else:
+ _segments = []
+ return _segments
+
+
+def get_packet_tso_seg_tx_flag(
+ pkt,
+ seg_len: int = 800,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> list:
+ """
+ Get the list of TSO TX offload flags of a packet.
+ pkt: Packet, Scapy Packet object to get the offload flag of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+ support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+ support_tso: bool, flag indicating if TCP Segmentation is enabled.
+ support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+ return: list, set of offload flags formatted in a list of str.
+ """
+ _index_seg = get_packet_layer_index_to_tso_seg(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ _flags = []
+ if not _index_seg is None and len(get_packet_payload_of(pkt, _index_seg)) > seg_len:
+ if isinstance(pkt[_index_seg], TCP):
+ _flags.append("RTE_MBUF_F_TX_TCP_SEG")
+ elif isinstance(pkt[_index_seg], UDP):
+ _flags.append("RTE_MBUF_F_TX_UDP_SEG")
+ return _flags
+
+
+def get_packet_checksum_of(
+ pkt,
+ layer: int,
+ good_checksum: bool = False,
+) -> int:
+ """
+ Get the checksum of a specific layer in a packet.
+ pkt: Packet, Scapy Packet object to get the checksum of.
+ layer: int, the layer index to get the checksum in the packet.
+ good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+ return: int, the checksum of the packet at the layer.
+ """
+ _re_checksum_pattern = re.compile(r"chksum\s*=\s*(0x[0-9a-fA-F]+|None)")
+ if layer is None:
+ return None
+ p_chksum = pkt[layer].copy()
+ if "chksum" in p_chksum.fieldtype:
+ if not good_checksum:
+ checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))
+ return int(checksum_list_str[0], 16)
+ else:
+ p_chksum.chksum = None
+ checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))
+ return int(checksum_list_str[0], 16)
+ else:
+ return None
+
+
+def get_packet_checksums(
+ pkt,
+ good_checksum: bool = False,
+ support_rx_tunnel: bool = True,
+) -> tuple:
+ """
+ Get the checksum list of a packet.
+ This method calculates the checksum of the whole packet. This means if an inner layer has a
+ bad checksum, it will be corrected then to calculate the outer checksum if `good_checksum` is
+ set.
+ If a layer is not found, it will return None for that layer.
+ pkt: Packet, Scapy Packet object to get the checksum of.
+ good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.
+ """
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ p_chksum = pkt.copy()
+ if good_checksum:
+ for _i in [_index_oip, _index_ol4, _index_ip, _index_l4]:
+ if not _i is None and "chksum" in p_chksum[_i].fieldtype:
+ p_chksum[_i].chksum = None
+ return (
+ get_packet_checksum_of(p_chksum, _index_oip),
+ get_packet_checksum_of(p_chksum, _index_ol4),
+ get_packet_checksum_of(p_chksum, _index_ip),
+ get_packet_checksum_of(p_chksum, _index_l4),
+ )
+
+
+def get_packet_checksum_of_each(
+ pkt,
+ good_checksum: bool = False,
+ support_rx_tunnel: bool = True,
+) -> tuple:
+ """
+ Get the checksum list of a packet.
+ This method calculates the checksums of each layer seperately. This means if an inner layer
+ has a bad checksum, an outer checksum will be calculated based on the bad inner checksum if
+ `good_checksum` is set.
+ If a layer is not found, it will return None for that layer.
+ pkt: Packet, Scapy Packet object to get the checksum of.
+ good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.
+ """
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ return (
+ get_packet_checksum_of(pkt, _index_oip, good_checksum=good_checksum),
+ get_packet_checksum_of(pkt, _index_ol4, good_checksum=good_checksum),
+ get_packet_checksum_of(pkt, _index_ip, good_checksum=good_checksum),
+ get_packet_checksum_of(pkt, _index_l4, good_checksum=good_checksum),
+ )
+
+
+def get_packet_checksum_rx_stat(pkt, support_rx_tunnel: bool = True) -> tuple:
+ """
+ Get the checksum stats of a packet.
+ pkt: Packet, Scapy Packet object to get the checksum stats of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ return: tuple, checksum stats of bad outer IP, bad outer L4, bad inner IP, bad inner L4.
+ """
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = 0, 0, 0, 0
+ if not _index_oip is None and get_packet_checksum_of(
+ pkt, _index_oip
+ ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):
+ _bad_oip = 1
+ if not _index_oip is None and get_packet_checksum_of(
+ pkt, _index_ol4
+ ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):
+ _bad_ol4 = 1
+ if not _index_ip is None and get_packet_checksum_of(
+ pkt, _index_ip
+ ) != get_packet_checksum_of(pkt, _index_ip, good_checksum=True):
+ _bad_ip = 1
+ if not _index_l4 is None and get_packet_checksum_of(
+ pkt, _index_l4
+ ) != get_packet_checksum_of(pkt, _index_l4, good_checksum=True):
+ _bad_l4 = 1
+ return _bad_oip, _bad_ol4, _bad_ip, _bad_l4
+
+
+def get_packet_checksum_rx_olflag(pkt, support_rx_tunnel: bool = True) -> list:
+ """
+ Get the checksum RX offload flags of a packet.
+ pkt: Packet, Scapy Packet object to get the checksum RX offload flag of.
+ support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+ L4 layers will be considered the only L3 / L4 layers.
+ return: list, set of offload flags formated in a list of str.
+ """
+ _FLAG_PAT = "RTE_MBUF_F_RX_%s_CKSUM_%s"
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ _flags = []
+ if not _index_oip is None and get_packet_checksum_of(
+ pkt, _index_oip
+ ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):
+ _flags.append(_FLAG_PAT % ("OUTER_IP", "BAD"))
+ else:
+ ## OUTER_IP_CKSUM_GOOD is default not shown
+ pass
+ if not _index_ol4 is None and get_packet_checksum_of(
+ pkt, _index_ol4
+ ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):
+ _flags.append(_FLAG_PAT % ("OUTER_L4", "BAD"))
+ elif support_rx_tunnel:
+ _flags.append(_FLAG_PAT % ("OUTER_L4", "GOOD"))
+ else:
+ pass
+ if get_packet_checksum_of(pkt, _index_ip) != get_packet_checksum_of(
+ pkt, _index_ip, good_checksum=True
+ ):
+ _flags.append(_FLAG_PAT % ("IP", "BAD"))
+ else:
+ _flags.append(_FLAG_PAT % ("IP", "GOOD"))
+ if get_packet_checksum_of(pkt, _index_l4) != get_packet_checksum_of(
+ pkt, _index_l4, good_checksum=True
+ ):
+ _flags.append(_FLAG_PAT % ("L4", "BAD"))
+ else:
+ _flags.append(_FLAG_PAT % ("L4", "GOOD"))
+ return _flags
+
+
+def get_packets_payload_pre_seg_list(
+ pkts: list,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> list:
+ """
+ Get the payload of a seq of packets before considering it is segmented.
+ """
+ return [
+ get_packet_payload_pre_tso_seg(
+ _p,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ for _p in pkts
+ ]
+
+
+def get_packets_payload_post_seg_list(
+ pkts: list,
+ seg_len: int = 800,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> list:
+ """
+ Get the list of payloads of a seq of packets after it is segmented.
+ """
+ return [
+ get_packet_payload_post_tso_seg(
+ _p,
+ seg_len=seg_len,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ for _p in pkts
+ ]
+
+
+def get_packets_checksum_rx_stat_list(
+ pkts: list, support_rx_tunnel: bool = True
+) -> list:
+ """
+ Get the checksum stats of a set of packet.
+ """
+ k_bad_ip = "Bad-ipcsum"
+ k_bad_l4 = "Bad-l4csum"
+ k_bad_oip = "Bad-outer-ipcsum"
+ k_bad_ol4 = "Bad-outer-l4csum"
+ k_rx = "RX-total"
+ stats = {
+ k_bad_ip: 0,
+ k_bad_l4: 0,
+ k_bad_oip: 0,
+ k_bad_ol4: 0,
+ k_rx: 0,
+ }
+ for pkt in pkts:
+ _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = get_packet_checksum_rx_stat(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ stats[k_bad_oip] += _bad_oip
+ stats[k_bad_ol4] += _bad_ol4
+ stats[k_bad_ip] += _bad_ip
+ stats[k_bad_l4] += _bad_l4
+ stats[k_rx] += 1
+ return stats
+
+
+def get_packets_tso_seg_tx_flag_list(
+ pkts: list,
+ seg_len: int = 800,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> list:
+ """
+ Get the list of TSO TX offload flags of a seq of packet.
+ """
+ flag_list = []
+ for pkt in pkts:
+ flag_list.append(
+ get_packet_tso_seg_tx_flag(
+ pkt,
+ seg_len=seg_len,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ )
+ return flag_list
+
+
+def get_packets_checksum_rx_olflag_list(
+ pkts: list, support_rx_tunnel: bool = True
+) -> list:
+ """
+ Get the checksum RX offload flags of a seq of packets.
+ """
+ flag_list = []
+ for pkt in pkts:
+ flag_list.append(
+ get_packet_checksum_rx_olflag(pkt, support_rx_tunnel=support_rx_tunnel)
+ )
+ return flag_list
+
+
+def check_packet_segment(
+ pkt,
+ segments: list,
+ seg_len: int,
+ support_rx_tunnel: bool = True,
+ support_seg_non_tunnel: bool = True,
+ support_seg_tunnel: bool = True,
+ support_tso: bool = True,
+ support_ufo: bool = True,
+) -> bool:
+ """
+ Check the packet segmentation.
+ """
+ payload_expected = get_packet_payload_pre_tso_seg(
+ pkt,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ segments_expected = get_packet_payload_post_tso_seg(
+ pkt,
+ seg_len=seg_len,
+ support_rx_tunnel=support_rx_tunnel,
+ support_seg_non_tunnel=support_seg_non_tunnel,
+ support_seg_tunnel=support_seg_tunnel,
+ support_tso=support_tso,
+ support_ufo=support_ufo,
+ )
+ if not segments:
+ return False
+ if segments == segments_expected:
+ return True
+ else:
+ payload_segmented = b""
+ for _i, _seg in enumerate(segments, 1):
+ if len(segments_expected) > 1 and len(_seg) > seg_len:
+ return False
+ payload_segmented += _seg
+ return payload_expected == payload_segmented
+
+
+def check_packet_checksum_of(pkt: Packet, layer: int, checksum_ref: int = None) -> bool:
+ """
+ Check the packet checksum of a specific layer indicated by `layer`.
+ """
+ if checksum_ref is None:
+ checksum_ref = get_packet_checksum_of(pkt, layer, good_checksum=True)
+ return get_packet_checksum_of(pkt, layer) == checksum_ref
+
+
+def check_packet_checksums(
+ pkt,
+ checksum_ref_list: list = None,
+ support_rx_tunnel: bool = True,
+) -> bool:
+ """
+ Check the packet checksum.
+ pkt: checked packet;
+ checksum_ref_list: refering checksum list, recalculate the whole packet for correct checksum in default;
+ return: if this packet is in good checksum.
+ """
+ if checksum_ref_list is None:
+ checksum_ref_list = get_packet_checksums(
+ pkt, support_rx_tunnel=support_rx_tunnel, good_checksum=True
+ )
+ return (
+ get_packet_checksums(pkt, support_rx_tunnel=support_rx_tunnel)
+ == checksum_ref_list
+ )
+
+
+def check_packet_checksum_of_each(
+ pkt,
+ checksum_ref_list: list = None,
+ support_rx_tunnel: bool = True,
+ allow_zero_outer_ip: bool = False,
+ allow_zero_outer_udp: bool = False,
+ allow_zero_inner_ip: bool = False,
+ allow_zero_inner_udp: bool = False,
+ allow_zero_inner_tcp: bool = False,
+ allow_zero_inner_sctp: bool = False,
+) -> tuple:
+ """
+ Check the packet checksum of each layer.
+ pkt: checked packet;
+ checksum_ref_list: refering checksum list, recalculate each layer seperately for correct checksum in default;
+ allow_zero_*: allowing zero checksum validated passed on specific layer;
+ return: if this packet is validated passed on outer-ip, outer-l4, inner-ip, inner-l4 layer checksum.
+ """
+ (
+ _index_oip,
+ _index_ol4,
+ _index_tunnel,
+ _index_ip,
+ _index_l4,
+ ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+ pkt, support_rx_tunnel=support_rx_tunnel
+ )
+ if checksum_ref_list and len(checksum_ref_list) == 2:
+ (_checksum_ref_ip, _checksum_ref_l4) = (
+ checksum_ref_list[0],
+ checksum_ref_list[1],
+ )
+ return (
+ True,
+ True,
+ check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),
+ check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),
+ )
+ elif checksum_ref_list and len(checksum_ref_list) == 4:
+ (_checksum_ref_oip, _checksum_ref_ol4, _checksum_ref_ip, _checksum_ref_l4) = (
+ checksum_ref_list[0],
+ checksum_ref_list[1],
+ checksum_ref_list[2],
+ checksum_ref_list[3],
+ )
+ return (
+ check_packet_checksum_of(pkt, _index_oip, _checksum_ref_oip),
+ check_packet_checksum_of(pkt, _index_ol4, _checksum_ref_ol4),
+ check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),
+ check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),
+ )
+ elif checksum_ref_list:
+ raise ValueError("Wrong checksum ref list length.")
+ else:
+ _verified_l4 = (
+ _index_l4 is None
+ or check_packet_checksum_of(pkt, _index_l4)
+ or (
+ not _index_l4 is None
+ and allow_zero_inner_udp
+ and isinstance(pkt[_index_l4], UDP)
+ and check_packet_checksum_of(pkt, _index_l4, 0x00)
+ )
+ or (
+ not _index_l4 is None
+ and allow_zero_inner_tcp
+ and isinstance(pkt[_index_l4], TCP)
+ and check_packet_checksum_of(pkt, _index_l4, 0x00)
+ )
+ or (
+ not _index_l4 is None
+ and allow_zero_inner_sctp
+ and isinstance(pkt[_index_l4], SCTP)
+ and check_packet_checksum_of(pkt, _index_l4, 0x0000)
+ )
+ )
+ _verified_ip = (
+ _index_ip is None
+ or check_packet_checksum_of(pkt, _index_ip)
+ or (
+ not _index_ip is None
+ and allow_zero_inner_ip
+ and isinstance(pkt[_index_ip], IP)
+ and check_packet_checksum_of(pkt, _index_ip, 0x00)
+ )
+ )
+ _verified_ol4 = (
+ _index_ol4 is None
+ or check_packet_checksum_of(pkt, _index_ol4)
+ or (
+ not _index_ol4 is None
+ and allow_zero_outer_udp
+ and isinstance(pkt[_index_ol4], UDP)
+ and get_packet_checksum_of(pkt, _index_ol4) == 0x00
+ )
+ )
+ _verified_oip = (
+ _index_oip is None
+ or check_packet_checksum_of(pkt, _index_oip)
+ or (
+ not _index_oip is None
+ and allow_zero_outer_ip
+ and isinstance(pkt[_index_oip], IP)
+ and get_packet_checksum_of(pkt, _index_oip) == 0x00
+ )
+ )
+ return _verified_oip, _verified_ol4, _verified_ip, _verified_l4
+
+
+def check_packet_verbose(verbose: str, verbose_ref_list: list) -> bool:
+ """
+ Check the verbose with the reference list.
+ """
+ for _ref in verbose_ref_list:
+ if not _ref in verbose:
+ return False
+ return True
+
+
###############################################################################
###############################################################################
if __name__ == "__main__":