[V1,2/2] framework/packet: Update packet module for new methods.

Message ID 20230704064726.455361-3-ke1.xu@intel.com (mailing list archive)
State Superseded
Headers
Series Updating packet module for new util functions and packet types. |

Checks

Context Check Description
ci/Intel-dts-format-test success Testing OK
ci/Intel-dts-pylama-test success Testing OK
ci/Intel-dts-suite-test success Testing OK

Commit Message

Ke Xu July 4, 2023, 6:47 a.m. UTC
  Added new util functions. Packet module introduced several util functions
 for writing packets into files, getting and increasing IP addresses,
 getting ether types and so on. As more packet types are introduced, some
 other util functions are required. This patch implemented a set of packet
 layer indexing, packet payload, segment and checksum checking. This patch
 also fixed some bad doc strings.

1. Add util method segment_bytes.

2. Add methods get_packet_layer_index_oip_ol4_tun_ip_l4, get_packet_layer_index_to_tso_seg,
 get_packet_layer_index_tunnel for packet layer locating.

3. Add doc string for strip_pktload.

4. Add methods for packet payload getting, payload segmentation and checking.

5. Add methods for packet checksum getting, correction, stating and offload flag checking.

Signed-off-by: Ke Xu <ke1.xu@intel.com>
---
 framework/packet.py | 856 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 854 insertions(+), 2 deletions(-)
  

Patch

diff --git a/framework/packet.py b/framework/packet.py
index c44c955f..7eeab18b 100644
--- a/framework/packet.py
+++ b/framework/packet.py
@@ -27,9 +27,9 @@  from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting
 from scapy.layers.l2 import ARP, GRE, Dot1Q, Ether
 from scapy.layers.sctp import SCTP
 from scapy.layers.vxlan import VXLAN
-from scapy.packet import Raw
+from scapy.packet import Padding, Raw
 from scapy.sendrecv import sendp
-from scapy.utils import hexstr, rdpcap, wrpcap
+from scapy.utils import hexstr, randstring, rdpcap, wrpcap
 
 from .utils import convert_int2ip, convert_ip2int, get_module_path
 
@@ -127,6 +127,152 @@  def write_raw_pkt(pkt_str, file_name):
         w.close()
 
 
+def segment_bytes(payload: bytes, seg_len: int) -> list:
+    """
+    Segment a bytes according to the seg_len.
+    payload: bytes, a sequence to be segmented.
+    seg_len: int, the length of segmentation.
+    return: list, a list of segmented bytes.
+    """
+    segments = []
+    if seg_len > 0:
+        while payload:
+            _s = payload[:seg_len]
+            payload = payload[seg_len:]
+            segments.append(_s)
+    else:
+        segments = [payload]
+    return segments
+
+
+def get_packet_layer_index_oip_ol4_tun_ip_l4(
+    pkt, support_rx_tunnel: bool = True
+) -> tuple:
+    """
+    Get the layer index of outer-ip, outer-l4, tunnel, inner-ip, inner-l4 in a packet.
+    The index counting follows the DPDK rule, that if a non-tunnel packet is counted,
+    the ip / l4 layer are considered the inner ip / l4 layers.
+    If a layer is not found, it will return None for that index.
+    This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3
+    or higher network, which means a tunnel packet is laid between an outer IP layer and an
+    inner IP layer.
+    pkt: Packet, the Scapy Packet object to be counted.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, indexes of outer IP, outer L4, tunnel, inner IP, inner L4.
+    """
+    _LAYERS_L3 = {IP, IPv6}
+    _LAYERS_L4 = {UDP, TCP, SCTP}
+    _layers = pkt.layers()
+    _index_tunnel = get_packet_layer_index_tunnel(pkt)
+    _index_oip = None
+    _index_ol4 = None
+    _index_ip = None
+    _index_l4 = None
+    if _index_tunnel is None:
+        for _i, _l in enumerate(_layers):
+            if _l in _LAYERS_L3:
+                _index_ip = _i
+            if _l in _LAYERS_L4:
+                _index_l4 = _i
+    else:
+        for _i in range(0, _index_tunnel + 1):
+            _l = _layers[_i]
+            if _l in _LAYERS_L3:
+                _index_oip = _i
+            if _l in _LAYERS_L4:
+                _index_ol4 = _i
+        for _i in range(_index_tunnel + 1, len(_layers)):
+            _l = _layers[_i]
+            if _l in _LAYERS_L3:
+                _index_ip = _i
+            if _l in _LAYERS_L4:
+                _index_l4 = _i
+    if not support_rx_tunnel and not _index_tunnel is None:
+        (_index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4) = (
+            None,
+            None,
+            None,
+            _index_oip,
+            _index_ol4,
+        )
+    return _index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4
+
+
+def get_packet_layer_index_to_tso_seg(
+    pkt,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> int:
+    """
+    Get the layer index of TSO in a packet.
+    In DPDK TSO is considered Transmittion Segmentation Offload. That both TCP
+    and UDP are considered to be segemeted.
+    pkt: Packt, Scapy Packet object to be segmeted.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: int, the index of the layer to be segmented in the packet.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    if support_seg_tunnel and not _index_tunnel is None:
+        if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):
+            return _index_l4
+        if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):
+            return _index_l4
+    if support_seg_non_tunnel and _index_tunnel is None:
+        if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):
+            return _index_l4
+        if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):
+            return _index_l4
+    return None
+
+
+def get_packet_layer_index_tunnel(pkt):
+    """
+    Get the layer index of tunnel layer in a packet.
+    If a layer is not found, it will return None for that index.
+    This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3
+    or higher network, which means a tunnel packet is laid between an outer IP layer and an
+    inner IP layer.
+    pkt: Packet, Scapy Packet object to be indexed.
+    return: int | None, the index of the tunnel layer in the packet.
+    """
+    # packet types
+    _LAYERS_TUNNEL = {VXLAN, GRE, GTP_U_Header, GENEVE}
+
+    _layers = pkt.layers()
+    for _i, _l in enumerate(_layers):
+        _layers_outer = _layers[: _i + 1]
+        _layers_inner = _layers[_i + 1 :]
+        if (
+            (_l in _LAYERS_TUNNEL)
+            and (IP in _layers_outer or IPv6 in _layers_outer)
+            and (IP in _layers_inner or IPv6 in _layers_inner)
+        ):
+            return _i
+    for _i, _l in enumerate(_layers):
+        if (_l is IP or _l is IPv6) and (
+            _layers[_i + 1] is IP or _layers[_i + 1] is IPv6
+        ):
+            return _i
+    return None
+
+
 class scapy(object):
     SCAPY_LAYERS = {
         "ether": Ether(dst="ff:ff:ff:ff:ff:ff"),
@@ -1188,6 +1334,12 @@  def compare_pktload(pkt1=None, pkt2=None, layer="L2"):
 
 
 def strip_pktload(pkt=None, layer="L2", p_index=0):
+    """
+    Strip the payload of a specific layer indicated by `layer` in a pakcet.
+    pkt: Packet, DTS Packet object to be stripped.
+    layer: str, the layer to be stripped.
+    return: str, the stripped layer formatted in hex string.
+    """
     if layer == "L2":
         l_idx = 0
     elif layer == "L3":
@@ -1205,6 +1357,706 @@  def strip_pktload(pkt=None, layer="L2", p_index=0):
     return load
 
 
+def get_packet_payload_of(pkt, layer: int) -> bytes:
+    """
+    Get the payload of a layer in a packet.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    layer: int, the layer index to get the payload in the packet.
+    return: bytes, the payload of a layer.
+    """
+    if layer is None:
+        return None
+    pkt_payload = pkt.copy()
+    if Padding in pkt_payload:
+        _index_padding = pkt_payload.layers().index(Padding)
+        pkt_payload[_index_padding - 1].remove_payload()
+        if layer >= _index_padding:
+            return None
+    return bytes(pkt_payload[layer].payload)
+
+
+def get_packet_payload(
+    pkt,
+    support_rx_tunnel: bool = True,
+) -> bytes:
+    """
+    Get the payload of a packet.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: bytes, the payload of a packet. If the packet has an L4, the payload is the load of
+    L4, or it will be the load of L3 if L3 exists, else it returns None.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    if not _index_l4 is None:
+        return get_packet_payload_of(pkt, _index_l4)
+    elif not _index_ip is None:
+        return get_packet_payload_of(pkt, _index_ip)
+    else:
+        return None
+
+
+def get_packet_payload_no_tso_seg(
+    pkt,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> bytes:
+    """
+    Get the payload of the layer to be segmented of a packet, not segmented.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: bytes, the payload of the layer to be segmented of a packet. If no layer is to be
+    segmented, it returns None.
+    """
+    _index_seg = get_packet_layer_index_to_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    if not _index_seg is None:
+        return get_packet_payload_of(pkt, _index_seg)
+    else:
+        return None
+
+
+def get_packet_payload_pre_tso_seg(
+    pkt,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> bytes:
+    """
+    Get the payload of a packet before considering it is segmented.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: bytes, the payload of the packet. If it can be segmented then returns the not
+    segmented payload, else it returns the payload despite the segmentation flags.
+    """
+    _payload_no_seg = get_packet_payload_no_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    _payload = get_packet_payload(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+    )
+    if not _payload_no_seg is None:
+        return _payload_no_seg
+    else:
+        return _payload
+
+
+def get_packet_payload_post_tso_seg(
+    pkt,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of payloads of a packet after it is segmented.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: list, sequence of the payloads after applying TSO to a packet. If a packet is not
+    to segment, it returns a list of only the payload. If the packet is not a proper packet, it
+    returns an empty list.
+    """
+    _payload_no_seg = get_packet_payload_no_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    _payload = get_packet_payload(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+    )
+    if not _payload_no_seg is None:
+        _segments = segment_bytes(_payload_no_seg, seg_len)
+    elif not _payload is None:
+        _segments = segment_bytes(_payload, -1)
+    else:
+        _segments = []
+    return _segments
+
+
+def get_packet_tso_seg_tx_flag(
+    pkt,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of TSO TX offload flags of a packet.
+    pkt: Packet, Scapy Packet object to get the offload flag of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: list, set of offload flags formatted in a list of str.
+    """
+    _index_seg = get_packet_layer_index_to_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    _flags = []
+    if not _index_seg is None and len(get_packet_payload_of(pkt, _index_seg)) > seg_len:
+        if isinstance(pkt[_index_seg], TCP):
+            _flags.append("RTE_MBUF_F_TX_TCP_SEG")
+        elif isinstance(pkt[_index_seg], UDP):
+            _flags.append("RTE_MBUF_F_TX_UDP_SEG")
+    return _flags
+
+
+def get_packet_checksum_of(
+    pkt,
+    layer: int,
+    good_checksum: bool = False,
+) -> int:
+    """
+    Get the checksum of a specific layer in a packet.
+    pkt: Packet, Scapy Packet object to get the checksum of.
+    layer: int, the layer index to get the checksum in the packet.
+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+    return: int, the checksum of the packet at the layer.
+    """
+    _re_checksum_pattern = re.compile(r"chksum\s*=\s*(0x[0-9a-fA-F]+|None)")
+    if layer is None:
+        return None
+    p_chksum = pkt[layer].copy()
+    if "chksum" in p_chksum.fieldtype:
+        if not good_checksum:
+            checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))
+            return int(checksum_list_str[0], 16)
+        else:
+            p_chksum.chksum = None
+            checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))
+            return int(checksum_list_str[0], 16)
+    else:
+        return None
+
+
+def get_packet_checksums(
+    pkt,
+    good_checksum: bool = False,
+    support_rx_tunnel: bool = True,
+) -> tuple:
+    """
+    Get the checksum list of a packet.
+    This method calculates the checksum of the whole packet. This means if an inner layer has a
+    bad checksum, it will be corrected then to calculate the outer checksum if `good_checksum` is
+    set.
+    If a layer is not found, it will return None for that layer.
+    pkt: Packet, Scapy Packet object to get the checksum of.
+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    p_chksum = pkt.copy()
+    if good_checksum:
+        for _i in [_index_oip, _index_ol4, _index_ip, _index_l4]:
+            if not _i is None and "chksum" in p_chksum[_i].fieldtype:
+                p_chksum[_i].chksum = None
+    return (
+        get_packet_checksum_of(p_chksum, _index_oip),
+        get_packet_checksum_of(p_chksum, _index_ol4),
+        get_packet_checksum_of(p_chksum, _index_ip),
+        get_packet_checksum_of(p_chksum, _index_l4),
+    )
+
+
+def get_packet_checksum_of_each(
+    pkt,
+    good_checksum: bool = False,
+    support_rx_tunnel: bool = True,
+) -> tuple:
+    """
+    Get the checksum list of a packet.
+    This method calculates the checksums of each layer seperately. This means if an inner layer
+    has a bad checksum, an outer checksum will be calculated based on the bad inner checksum if
+    `good_checksum` is set.
+    If a layer is not found, it will return None for that layer.
+    pkt: Packet, Scapy Packet object to get the checksum of.
+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    return (
+        get_packet_checksum_of(pkt, _index_oip, good_checksum=good_checksum),
+        get_packet_checksum_of(pkt, _index_ol4, good_checksum=good_checksum),
+        get_packet_checksum_of(pkt, _index_ip, good_checksum=good_checksum),
+        get_packet_checksum_of(pkt, _index_l4, good_checksum=good_checksum),
+    )
+
+
+def get_packet_checksum_rx_stat(pkt, support_rx_tunnel: bool = True) -> tuple:
+    """
+    Get the checksum stats of a packet.
+    pkt: Packet, Scapy Packet object to get the checksum stats of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, checksum stats of bad outer IP, bad outer L4, bad inner IP, bad inner L4.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = 0, 0, 0, 0
+    if not _index_oip is None and get_packet_checksum_of(
+        pkt, _index_oip
+    ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):
+        _bad_oip = 1
+    if not _index_oip is None and get_packet_checksum_of(
+        pkt, _index_ol4
+    ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):
+        _bad_ol4 = 1
+    if not _index_ip is None and get_packet_checksum_of(
+        pkt, _index_ip
+    ) != get_packet_checksum_of(pkt, _index_ip, good_checksum=True):
+        _bad_ip = 1
+    if not _index_l4 is None and get_packet_checksum_of(
+        pkt, _index_l4
+    ) != get_packet_checksum_of(pkt, _index_l4, good_checksum=True):
+        _bad_l4 = 1
+    return _bad_oip, _bad_ol4, _bad_ip, _bad_l4
+
+
+def get_packet_checksum_rx_olflag(pkt, support_rx_tunnel: bool = True) -> list:
+    """
+    Get the checksum RX offload flags of a packet.
+    pkt: Packet, Scapy Packet object to get the checksum RX offload flag of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: list, set of offload flags formated in a list of str.
+    """
+    _FLAG_PAT = "RTE_MBUF_F_RX_%s_CKSUM_%s"
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    _flags = []
+    if not _index_oip is None and get_packet_checksum_of(
+        pkt, _index_oip
+    ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):
+        _flags.append(_FLAG_PAT % ("OUTER_IP", "BAD"))
+    else:
+        ## OUTER_IP_CKSUM_GOOD is default not shown
+        pass
+    if not _index_ol4 is None and get_packet_checksum_of(
+        pkt, _index_ol4
+    ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):
+        _flags.append(_FLAG_PAT % ("OUTER_L4", "BAD"))
+    elif support_rx_tunnel:
+        _flags.append(_FLAG_PAT % ("OUTER_L4", "GOOD"))
+    else:
+        pass
+    if get_packet_checksum_of(pkt, _index_ip) != get_packet_checksum_of(
+        pkt, _index_ip, good_checksum=True
+    ):
+        _flags.append(_FLAG_PAT % ("IP", "BAD"))
+    else:
+        _flags.append(_FLAG_PAT % ("IP", "GOOD"))
+    if get_packet_checksum_of(pkt, _index_l4) != get_packet_checksum_of(
+        pkt, _index_l4, good_checksum=True
+    ):
+        _flags.append(_FLAG_PAT % ("L4", "BAD"))
+    else:
+        _flags.append(_FLAG_PAT % ("L4", "GOOD"))
+    return _flags
+
+
+def get_packets_payload_pre_seg_list(
+    pkts: list,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the payload of a seq of packets before considering it is segmented.
+    """
+    return [
+        get_packet_payload_pre_tso_seg(
+            _p,
+            support_rx_tunnel=support_rx_tunnel,
+            support_seg_non_tunnel=support_seg_non_tunnel,
+            support_seg_tunnel=support_seg_tunnel,
+            support_tso=support_tso,
+            support_ufo=support_ufo,
+        )
+        for _p in pkts
+    ]
+
+
+def get_packets_payload_post_seg_list(
+    pkts: list,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of payloads of a seq of packets after it is segmented.
+    """
+    return [
+        get_packet_payload_post_tso_seg(
+            _p,
+            seg_len=seg_len,
+            support_rx_tunnel=support_rx_tunnel,
+            support_seg_non_tunnel=support_seg_non_tunnel,
+            support_seg_tunnel=support_seg_tunnel,
+            support_tso=support_tso,
+            support_ufo=support_ufo,
+        )
+        for _p in pkts
+    ]
+
+
+def get_packets_checksum_rx_stat_list(
+    pkts: list, support_rx_tunnel: bool = True
+) -> list:
+    """
+    Get the checksum stats of a set of packet.
+    """
+    k_bad_ip = "Bad-ipcsum"
+    k_bad_l4 = "Bad-l4csum"
+    k_bad_oip = "Bad-outer-ipcsum"
+    k_bad_ol4 = "Bad-outer-l4csum"
+    k_rx = "RX-total"
+    stats = {
+        k_bad_ip: 0,
+        k_bad_l4: 0,
+        k_bad_oip: 0,
+        k_bad_ol4: 0,
+        k_rx: 0,
+    }
+    for pkt in pkts:
+        _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = get_packet_checksum_rx_stat(
+            pkt, support_rx_tunnel=support_rx_tunnel
+        )
+        stats[k_bad_oip] += _bad_oip
+        stats[k_bad_ol4] += _bad_ol4
+        stats[k_bad_ip] += _bad_ip
+        stats[k_bad_l4] += _bad_l4
+        stats[k_rx] += 1
+    return stats
+
+
+def get_packets_tso_seg_tx_flag_list(
+    pkts: list,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of TSO TX offload flags of a seq of packet.
+    """
+    flag_list = []
+    for pkt in pkts:
+        flag_list.append(
+            get_packet_tso_seg_tx_flag(
+                pkt,
+                seg_len=seg_len,
+                support_rx_tunnel=support_rx_tunnel,
+                support_seg_non_tunnel=support_seg_non_tunnel,
+                support_seg_tunnel=support_seg_tunnel,
+                support_tso=support_tso,
+                support_ufo=support_ufo,
+            )
+        )
+    return flag_list
+
+
+def get_packets_checksum_rx_olflag_list(
+    pkts: list, support_rx_tunnel: bool = True
+) -> list:
+    """
+    Get the checksum RX offload flags of a seq of packets.
+    """
+    flag_list = []
+    for pkt in pkts:
+        flag_list.append(
+            get_packet_checksum_rx_olflag(pkt, support_rx_tunnel=support_rx_tunnel)
+        )
+    return flag_list
+
+
+def check_packet_segment(
+    pkt,
+    segments: list,
+    seg_len: int,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> bool:
+    """
+    Check the packet segmentation.
+    """
+    payload_expected = get_packet_payload_pre_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    segments_expected = get_packet_payload_post_tso_seg(
+        pkt,
+        seg_len=seg_len,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    if not segments:
+        return False
+    if segments == segments_expected:
+        return True
+    else:
+        payload_segmented = b""
+        for _i, _seg in enumerate(segments, 1):
+            if len(segments_expected) > 1 and len(_seg) > seg_len:
+                return False
+            payload_segmented += _seg
+        return payload_expected == payload_segmented
+
+
+def check_packet_checksum_of(pkt: Packet, layer: int, checksum_ref: int = None) -> bool:
+    """
+    Check the packet checksum of a specific layer indicated by `layer`.
+    """
+    if checksum_ref is None:
+        checksum_ref = get_packet_checksum_of(pkt, layer, good_checksum=True)
+    return get_packet_checksum_of(pkt, layer) == checksum_ref
+
+
+def check_packet_checksums(
+    pkt,
+    checksum_ref_list: list = None,
+    support_rx_tunnel: bool = True,
+) -> bool:
+    """
+    Check the packet checksum.
+    pkt: checked packet;
+    checksum_ref_list: refering checksum list, recalculate the whole packet for correct checksum in default;
+    return: if this packet is in good checksum.
+    """
+    if checksum_ref_list is None:
+        checksum_ref_list = get_packet_checksums(
+            pkt, support_rx_tunnel=support_rx_tunnel, good_checksum=True
+        )
+    return (
+        get_packet_checksums(pkt, support_rx_tunnel=support_rx_tunnel)
+        == checksum_ref_list
+    )
+
+
+def check_packet_checksum_of_each(
+    pkt,
+    checksum_ref_list: list = None,
+    support_rx_tunnel: bool = True,
+    allow_zero_outer_ip: bool = False,
+    allow_zero_outer_udp: bool = False,
+    allow_zero_inner_ip: bool = False,
+    allow_zero_inner_udp: bool = False,
+    allow_zero_inner_tcp: bool = False,
+    allow_zero_inner_sctp: bool = False,
+) -> tuple:
+    """
+    Check the packet checksum of each layer.
+    pkt: checked packet;
+    checksum_ref_list: refering checksum list, recalculate each layer seperately for correct checksum in default;
+    allow_zero_*: allowing zero checksum validated passed on specific layer;
+    return: if this packet is validated passed on outer-ip, outer-l4, inner-ip, inner-l4 layer checksum.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    if checksum_ref_list and len(checksum_ref_list) == 2:
+        (_checksum_ref_ip, _checksum_ref_l4) = (
+            checksum_ref_list[0],
+            checksum_ref_list[1],
+        )
+        return (
+            True,
+            True,
+            check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),
+            check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),
+        )
+    elif checksum_ref_list and len(checksum_ref_list) == 4:
+        (_checksum_ref_oip, _checksum_ref_ol4, _checksum_ref_ip, _checksum_ref_l4) = (
+            checksum_ref_list[0],
+            checksum_ref_list[1],
+            checksum_ref_list[2],
+            checksum_ref_list[3],
+        )
+        return (
+            check_packet_checksum_of(pkt, _index_oip, _checksum_ref_oip),
+            check_packet_checksum_of(pkt, _index_ol4, _checksum_ref_ol4),
+            check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),
+            check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),
+        )
+    elif checksum_ref_list:
+        raise ValueError("Wrong checksum ref list length.")
+    else:
+        _verified_l4 = (
+            _index_l4 is None
+            or check_packet_checksum_of(pkt, _index_l4)
+            or (
+                not _index_l4 is None
+                and allow_zero_inner_udp
+                and isinstance(pkt[_index_l4], UDP)
+                and check_packet_checksum_of(pkt, _index_l4, 0x00)
+            )
+            or (
+                not _index_l4 is None
+                and allow_zero_inner_tcp
+                and isinstance(pkt[_index_l4], TCP)
+                and check_packet_checksum_of(pkt, _index_l4, 0x00)
+            )
+            or (
+                not _index_l4 is None
+                and allow_zero_inner_sctp
+                and isinstance(pkt[_index_l4], SCTP)
+                and check_packet_checksum_of(pkt, _index_l4, 0x0000)
+            )
+        )
+        _verified_ip = (
+            _index_ip is None
+            or check_packet_checksum_of(pkt, _index_ip)
+            or (
+                not _index_ip is None
+                and allow_zero_inner_ip
+                and isinstance(pkt[_index_ip], IP)
+                and check_packet_checksum_of(pkt, _index_ip, 0x00)
+            )
+        )
+        _verified_ol4 = (
+            _index_ol4 is None
+            or check_packet_checksum_of(pkt, _index_ol4)
+            or (
+                not _index_ol4 is None
+                and allow_zero_outer_udp
+                and isinstance(pkt[_index_ol4], UDP)
+                and get_packet_checksum_of(pkt, _index_ol4) == 0x00
+            )
+        )
+        _verified_oip = (
+            _index_oip is None
+            or check_packet_checksum_of(pkt, _index_oip)
+            or (
+                not _index_oip is None
+                and allow_zero_outer_ip
+                and isinstance(pkt[_index_oip], IP)
+                and get_packet_checksum_of(pkt, _index_oip) == 0x00
+            )
+        )
+        return _verified_oip, _verified_ol4, _verified_ip, _verified_l4
+
+
+def check_packet_verbose(verbose: str, verbose_ref_list: list) -> bool:
+    """
+    Check the verbose with the reference list.
+    """
+    for _ref in verbose_ref_list:
+        if not _ref in verbose:
+            return False
+    return True
+
+
 ###############################################################################
 ###############################################################################
 if __name__ == "__main__":