get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/129509/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 129509,
    "url": "http://patchwork.dpdk.org/api/patches/129509/?format=api",
    "web_url": "http://patchwork.dpdk.org/project/dts/patch/20230712193126.1994361-3-ke1.xu@intel.com/",
    "project": {
        "id": 3,
        "url": "http://patchwork.dpdk.org/api/projects/3/?format=api",
        "name": "DTS",
        "link_name": "dts",
        "list_id": "dts.dpdk.org",
        "list_email": "dts@dpdk.org",
        "web_url": "",
        "scm_url": "git://dpdk.org/tools/dts",
        "webscm_url": "http://git.dpdk.org/tools/dts/",
        "list_archive_url": "https://inbox.dpdk.org/dts",
        "list_archive_url_format": "https://inbox.dpdk.org/dts/{}",
        "commit_url_format": ""
    },
    "msgid": "<20230712193126.1994361-3-ke1.xu@intel.com>",
    "list_archive_url": "https://inbox.dpdk.org/dts/20230712193126.1994361-3-ke1.xu@intel.com",
    "date": "2023-07-12T19:31:24",
    "name": "[V1,2/4] framework/packet: Update packet module for new methods.",
    "commit_ref": null,
    "pull_url": null,
    "state": "new",
    "archived": false,
    "hash": "18ea219fac78fcd73305ce8a5dda0d9f973efbab",
    "submitter": {
        "id": 2810,
        "url": "http://patchwork.dpdk.org/api/people/2810/?format=api",
        "name": "Ke Xu",
        "email": "ke1.xu@intel.com"
    },
    "delegate": null,
    "mbox": "http://patchwork.dpdk.org/project/dts/patch/20230712193126.1994361-3-ke1.xu@intel.com/mbox/",
    "series": [
        {
            "id": 28925,
            "url": "http://patchwork.dpdk.org/api/series/28925/?format=api",
            "web_url": "http://patchwork.dpdk.org/project/dts/list/?series=28925",
            "date": "2023-07-12T19:31:22",
            "name": "Updating packet module and introducing a new common module.",
            "version": 1,
            "mbox": "http://patchwork.dpdk.org/series/28925/mbox/"
        }
    ],
    "comments": "http://patchwork.dpdk.org/api/patches/129509/comments/",
    "check": "pending",
    "checks": "http://patchwork.dpdk.org/api/patches/129509/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dts-bounces@dpdk.org>",
        "X-Original-To": "patchwork@inbox.dpdk.org",
        "Delivered-To": "patchwork@inbox.dpdk.org",
        "Received": [
            "from mails.dpdk.org (mails.dpdk.org [217.70.189.124])\n\tby inbox.dpdk.org (Postfix) with ESMTP id 28A9B42E57;\n\tWed, 12 Jul 2023 21:33:24 +0200 (CEST)",
            "from mails.dpdk.org (localhost [127.0.0.1])\n\tby mails.dpdk.org (Postfix) with ESMTP id 23C8140EE7;\n\tWed, 12 Jul 2023 21:33:24 +0200 (CEST)",
            "from mga06.intel.com (mga06b.intel.com [134.134.136.31])\n by mails.dpdk.org (Postfix) with ESMTP id 9157E400D5\n for <dts@dpdk.org>; Wed, 12 Jul 2023 21:33:21 +0200 (CEST)",
            "from fmsmga004.fm.intel.com ([10.253.24.48])\n by orsmga104.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384;\n 12 Jul 2023 12:33:20 -0700",
            "from dpdk-xuke-host.sh.intel.com ([10.67.114.220])\n by fmsmga004.fm.intel.com with ESMTP; 12 Jul 2023 12:33:19 -0700"
        ],
        "DKIM-Signature": "v=1; a=rsa-sha256; c=relaxed/simple;\n d=intel.com; i=@intel.com; q=dns/txt; s=Intel;\n t=1689190402; x=1720726402;\n h=from:to:cc:subject:date:message-id:in-reply-to:\n references:mime-version:content-transfer-encoding;\n bh=3/cjcLq0mbnwq9mJKmDsdMNzo102YORPF1rZg+IA5YY=;\n b=XG0srs4lCacSSx/Nfr40aHtVI+SBO/wJvOctahHOcQu2qXsuwdWOxuFI\n u4IY/MCfdrV1Db/Wy3LAxAAD75cIarEnkXn8Dh97tCC3T9bqNlvht0dAV\n IaNLXu4getn/YPXZLmup6m4LIWbXDJw0FOPnHPOiVhsueXLef1gn5+1zL\n 93ZEdcW6kXD6ud7VFPOVZ1cIm6LnaNMgKjUMXDUg7Tn+gz+7iLU7coTvu\n j8mZPzW0NY3fRvr3AbQ0p7565J+b4yEdMcMvSZtP3dM34YRkC+MgNoY0w\n R+1z9tjamVd2cdskDCFCSteBABs8PRzasdKLoi+fMUgpfx25KvmEnefsa A==;",
        "X-IronPort-AV": [
            "E=McAfee;i=\"6600,9927,10769\"; a=\"428728894\"",
            "E=Sophos;i=\"6.01,200,1684825200\"; d=\"scan'208\";a=\"428728894\"",
            "E=McAfee;i=\"6600,9927,10769\"; a=\"791740108\"",
            "E=Sophos;i=\"6.01,200,1684825200\"; d=\"scan'208\";a=\"791740108\""
        ],
        "X-ExtLoop1": "1",
        "From": "Ke Xu <ke1.xu@intel.com>",
        "To": "dts@dpdk.org",
        "Cc": "ke1.xu@intel.com,\n\ttarcadia@qq.com",
        "Subject": "[DTS][Patch V1 2/4] framework/packet: Update packet module for new\n methods.",
        "Date": "Wed, 12 Jul 2023 19:31:24 +0000",
        "Message-Id": "<20230712193126.1994361-3-ke1.xu@intel.com>",
        "X-Mailer": "git-send-email 2.34.1",
        "In-Reply-To": "<20230712193126.1994361-1-ke1.xu@intel.com>",
        "References": "<20230712193126.1994361-1-ke1.xu@intel.com>",
        "MIME-Version": "1.0",
        "Content-Transfer-Encoding": "8bit",
        "X-BeenThere": "dts@dpdk.org",
        "X-Mailman-Version": "2.1.29",
        "Precedence": "list",
        "List-Id": "test suite reviews and discussions <dts.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dts>,\n <mailto:dts-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dts/>",
        "List-Post": "<mailto:dts@dpdk.org>",
        "List-Help": "<mailto:dts-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dts>,\n <mailto:dts-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dts-bounces@dpdk.org"
    },
    "content": "1. Add util method segment_bytes.\n\n2. Add methods get_packet_layer_index_oip_ol4_tun_ip_l4, get_packet_layer_index_to_tso_seg, get_packet_layer_index_tunnel for packet layer locating.\n\n3. Add doc string for strip_pktload.\n\n4. Add methods for packet payload getting, payload segmentation and checking.\n\n5. Add methods for packet checksum getting, correction, stating and offload flag checking.\n\nSigned-off-by: Ke Xu <ke1.xu@intel.com>\n---\n framework/packet.py | 858 +++++++++++++++++++++++++++++++++++++++++++-\n 1 file changed, 856 insertions(+), 2 deletions(-)",
    "diff": "diff --git a/framework/packet.py b/framework/packet.py\nindex c44c955f..02b9f885 100644\n--- a/framework/packet.py\n+++ b/framework/packet.py\n@@ -27,9 +27,9 @@ from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting\n from scapy.layers.l2 import ARP, GRE, Dot1Q, Ether\n from scapy.layers.sctp import SCTP\n from scapy.layers.vxlan import VXLAN\n-from scapy.packet import Raw\n+from scapy.packet import Padding, Raw\n from scapy.sendrecv import sendp\n-from scapy.utils import hexstr, rdpcap, wrpcap\n+from scapy.utils import hexstr, randstring, rdpcap, wrpcap\n \n from .utils import convert_int2ip, convert_ip2int, get_module_path\n \n@@ -127,6 +127,152 @@ def write_raw_pkt(pkt_str, file_name):\n         w.close()\n \n \n+def segment_bytes(payload: bytes, seg_len: int) -> list:\n+    \"\"\"\n+    Segment a bytes according to the seg_len.\n+    payload: bytes, a sequence to be segmented.\n+    seg_len: int, the length of segmentation.\n+    return: list, a list of segmented bytes.\n+    \"\"\"\n+    segments = []\n+    if seg_len > 0:\n+        while payload:\n+            _s = payload[:seg_len]\n+            payload = payload[seg_len:]\n+            segments.append(_s)\n+    else:\n+        segments = [payload]\n+    return segments\n+\n+\n+def get_packet_layer_index_oip_ol4_tun_ip_l4(\n+    pkt, support_rx_tunnel: bool = True\n+) -> tuple:\n+    \"\"\"\n+    Get the layer index of outer-ip, outer-l4, tunnel, inner-ip, inner-l4 in a packet.\n+    The index counting follows the DPDK rule, that if a non-tunnel packet is counted,\n+    the ip / l4 layer are considered the inner ip / l4 layers.\n+    If a layer is not found, it will return None for that index.\n+    This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3\n+    or higher network, which means a tunnel packet is laid between an outer IP layer and an\n+    inner IP layer.\n+    pkt: Packet, the Scapy Packet object to be counted.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    return: tuple, indexes of outer IP, outer L4, tunnel, inner IP, inner L4.\n+    \"\"\"\n+    _LAYERS_L3 = {IP, IPv6}\n+    _LAYERS_L4 = {UDP, TCP, SCTP}\n+    _layers = pkt.layers()\n+    _index_tunnel = get_packet_layer_index_tunnel(pkt)\n+    _index_oip = None\n+    _index_ol4 = None\n+    _index_ip = None\n+    _index_l4 = None\n+    if _index_tunnel is None:\n+        for _i, _l in enumerate(_layers):\n+            if _l in _LAYERS_L3:\n+                _index_ip = _i\n+            if _l in _LAYERS_L4:\n+                _index_l4 = _i\n+    else:\n+        for _i in range(0, _index_tunnel + 1):\n+            _l = _layers[_i]\n+            if _l in _LAYERS_L3:\n+                _index_oip = _i\n+            if _l in _LAYERS_L4:\n+                _index_ol4 = _i\n+        for _i in range(_index_tunnel + 1, len(_layers)):\n+            _l = _layers[_i]\n+            if _l in _LAYERS_L3:\n+                _index_ip = _i\n+            if _l in _LAYERS_L4:\n+                _index_l4 = _i\n+    if not support_rx_tunnel and not _index_tunnel is None:\n+        (_index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4) = (\n+            None,\n+            None,\n+            None,\n+            _index_oip,\n+            _index_ol4,\n+        )\n+    return _index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4\n+\n+\n+def get_packet_layer_index_to_tso_seg(\n+    pkt,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> int:\n+    \"\"\"\n+    Get the layer index of TSO in a packet.\n+    In DPDK TSO is considered Transmittion Segmentation Offload. That both TCP\n+    and UDP are considered to be segemeted.\n+    pkt: Packt, Scapy Packet object to be segmeted.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.\n+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.\n+    support_tso: bool, flag indicating if TCP Segmentation is enabled.\n+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.\n+    return: int, the index of the layer to be segmented in the packet.\n+    \"\"\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    if support_seg_tunnel and not _index_tunnel is None:\n+        if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):\n+            return _index_l4\n+        if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):\n+            return _index_l4\n+    if support_seg_non_tunnel and _index_tunnel is None:\n+        if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):\n+            return _index_l4\n+        if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):\n+            return _index_l4\n+    return None\n+\n+\n+def get_packet_layer_index_tunnel(pkt):\n+    \"\"\"\n+    Get the layer index of tunnel layer in a packet.\n+    If a layer is not found, it will return None for that index.\n+    This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3\n+    or higher network, which means a tunnel packet is laid between an outer IP layer and an\n+    inner IP layer.\n+    pkt: Packet, Scapy Packet object to be indexed.\n+    return: int | None, the index of the tunnel layer in the packet.\n+    \"\"\"\n+    # packet types\n+    _LAYERS_TUNNEL = {VXLAN, GRE, GTP_U_Header, GENEVE}\n+\n+    _layers = pkt.layers()\n+    for _i, _l in enumerate(_layers):\n+        _layers_outer = _layers[: _i + 1]\n+        _layers_inner = _layers[_i + 1 :]\n+        if (\n+            (_l in _LAYERS_TUNNEL)\n+            and (IP in _layers_outer or IPv6 in _layers_outer)\n+            and (IP in _layers_inner or IPv6 in _layers_inner)\n+        ):\n+            return _i\n+    for _i, _l in enumerate(_layers):\n+        if (_l is IP or _l is IPv6) and (\n+            _layers[_i + 1] is IP or _layers[_i + 1] is IPv6\n+        ):\n+            return _i\n+    return None\n+\n+\n class scapy(object):\n     SCAPY_LAYERS = {\n         \"ether\": Ether(dst=\"ff:ff:ff:ff:ff:ff\"),\n@@ -1188,6 +1334,12 @@ def compare_pktload(pkt1=None, pkt2=None, layer=\"L2\"):\n \n \n def strip_pktload(pkt=None, layer=\"L2\", p_index=0):\n+    \"\"\"\n+    Strip the payload of a specific layer indicated by `layer` in a pakcet.\n+    pkt: Packet, DTS Packet object to be stripped.\n+    layer: str, the layer to be stripped.\n+    return: str, the stripped layer formatted in hex string.\n+    \"\"\"\n     if layer == \"L2\":\n         l_idx = 0\n     elif layer == \"L3\":\n@@ -1205,6 +1357,708 @@ def strip_pktload(pkt=None, layer=\"L2\", p_index=0):\n     return load\n \n \n+def get_packet_payload_of(pkt, layer: int) -> bytes:\n+    \"\"\"\n+    Get the payload of a layer in a packet.\n+    pkt: Packet, Scapy Packet object to get the payload of.\n+    layer: int, the layer index to get the payload in the packet.\n+    return: bytes, the payload of a layer.\n+    \"\"\"\n+    if layer is None:\n+        return None\n+    pkt_payload = pkt.copy()\n+    if Padding in pkt_payload:\n+        _index_padding = pkt_payload.layers().index(Padding)\n+        pkt_payload[_index_padding - 1].remove_payload()\n+        if layer >= _index_padding:\n+            return None\n+    return bytes(pkt_payload[layer].payload)\n+\n+\n+def get_packet_payload(\n+    pkt,\n+    support_rx_tunnel: bool = True,\n+) -> bytes:\n+    \"\"\"\n+    Get the payload of a packet.\n+    pkt: Packet, Scapy Packet object to get the payload of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    return: bytes, the payload of a packet. If the packet has an L4, the payload is the load of\n+    L4, or it will be the load of L3 if L3 exists, else it returns None.\n+    \"\"\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    if not _index_l4 is None:\n+        return get_packet_payload_of(pkt, _index_l4)\n+    elif not _index_ip is None:\n+        return get_packet_payload_of(pkt, _index_ip)\n+    else:\n+        return None\n+\n+\n+def get_packet_payload_no_tso_seg(\n+    pkt,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> bytes:\n+    \"\"\"\n+    Get the payload of the layer to be segmented of a packet, not segmented.\n+    pkt: Packet, Scapy Packet object to get the payload of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.\n+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.\n+    support_tso: bool, flag indicating if TCP Segmentation is enabled.\n+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.\n+    return: bytes, the payload of the layer to be segmented of a packet. If no layer is to be\n+    segmented, it returns None.\n+    \"\"\"\n+    _index_seg = get_packet_layer_index_to_tso_seg(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+        support_seg_non_tunnel=support_seg_non_tunnel,\n+        support_seg_tunnel=support_seg_tunnel,\n+        support_tso=support_tso,\n+        support_ufo=support_ufo,\n+    )\n+    if not _index_seg is None:\n+        return get_packet_payload_of(pkt, _index_seg)\n+    else:\n+        return None\n+\n+\n+def get_packet_payload_pre_tso_seg(\n+    pkt,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> bytes:\n+    \"\"\"\n+    Get the payload of a packet before considering it is segmented.\n+    pkt: Packet, Scapy Packet object to get the payload of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.\n+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.\n+    support_tso: bool, flag indicating if TCP Segmentation is enabled.\n+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.\n+    return: bytes, the payload of the packet. If it can be segmented then returns the not\n+    segmented payload, else it returns the payload despite the segmentation flags.\n+    \"\"\"\n+    _payload_no_seg = get_packet_payload_no_tso_seg(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+        support_seg_non_tunnel=support_seg_non_tunnel,\n+        support_seg_tunnel=support_seg_tunnel,\n+        support_tso=support_tso,\n+        support_ufo=support_ufo,\n+    )\n+    _payload = get_packet_payload(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+    )\n+    if not _payload_no_seg is None:\n+        return _payload_no_seg\n+    else:\n+        return _payload\n+\n+\n+def get_packet_payload_post_tso_seg(\n+    pkt,\n+    seg_len: int = 800,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> list:\n+    \"\"\"\n+    Get the list of payloads of a packet after it is segmented.\n+    pkt: Packet, Scapy Packet object to get the payload of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.\n+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.\n+    support_tso: bool, flag indicating if TCP Segmentation is enabled.\n+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.\n+    return: list, sequence of the payloads after applying TSO to a packet. If a packet is not\n+    to segment, it returns a list of only the payload. If the packet is not a proper packet, it\n+    returns an empty list.\n+    \"\"\"\n+    _payload_no_seg = get_packet_payload_no_tso_seg(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+        support_seg_non_tunnel=support_seg_non_tunnel,\n+        support_seg_tunnel=support_seg_tunnel,\n+        support_tso=support_tso,\n+        support_ufo=support_ufo,\n+    )\n+    _payload = get_packet_payload(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+    )\n+    if not _payload_no_seg is None:\n+        _segments = segment_bytes(_payload_no_seg, seg_len)\n+    elif not _payload is None:\n+        _segments = segment_bytes(_payload, -1)\n+    else:\n+        _segments = []\n+    return _segments\n+\n+\n+def get_packet_tso_seg_tx_flag(\n+    pkt,\n+    seg_len: int = 800,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> list:\n+    \"\"\"\n+    Get the list of TSO TX offload flags of a packet.\n+    pkt: Packet, Scapy Packet object to get the offload flag of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.\n+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.\n+    support_tso: bool, flag indicating if TCP Segmentation is enabled.\n+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.\n+    return: list, set of offload flags formatted in a list of str.\n+    \"\"\"\n+    _index_seg = get_packet_layer_index_to_tso_seg(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+        support_seg_non_tunnel=support_seg_non_tunnel,\n+        support_seg_tunnel=support_seg_tunnel,\n+        support_tso=support_tso,\n+        support_ufo=support_ufo,\n+    )\n+    _flags = []\n+    if not _index_seg is None and len(get_packet_payload_of(pkt, _index_seg)) > seg_len:\n+        if isinstance(pkt[_index_seg], TCP):\n+            _flags.append(\"RTE_MBUF_F_TX_TCP_SEG\")\n+        elif isinstance(pkt[_index_seg], UDP):\n+            _flags.append(\"RTE_MBUF_F_TX_UDP_SEG\")\n+    return _flags\n+\n+\n+def get_packet_checksum_of(\n+    pkt,\n+    layer: int,\n+    good_checksum: bool = False,\n+) -> int:\n+    \"\"\"\n+    Get the checksum of a specific layer in a packet.\n+    pkt: Packet, Scapy Packet object to get the checksum of.\n+    layer: int, the layer index to get the checksum in the packet.\n+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.\n+    return: int, the checksum of the packet at the layer.\n+    \"\"\"\n+    _re_checksum_pattern = re.compile(r\"chksum\\s*=\\s*(0x[0-9a-fA-F]+|None)\")\n+    if layer is None:\n+        return None\n+    p_chksum = pkt[layer].copy()\n+    if \"chksum\" in p_chksum.fieldtype:\n+        if not good_checksum:\n+            checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))\n+            return int(checksum_list_str[0], 16)\n+        else:\n+            p_chksum.chksum = None\n+            checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))\n+            return int(checksum_list_str[0], 16)\n+    else:\n+        return None\n+\n+\n+def get_packet_checksums(\n+    pkt,\n+    good_checksum: bool = False,\n+    support_rx_tunnel: bool = True,\n+) -> tuple:\n+    \"\"\"\n+    Get the checksum list of a packet.\n+    This method calculates the checksum of the whole packet. This means if an inner layer has a\n+    bad checksum, it will be corrected then to calculate the outer checksum if `good_checksum` is\n+    set.\n+    If a layer is not found, it will return None for that layer.\n+    pkt: Packet, Scapy Packet object to get the checksum of.\n+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.\n+    \"\"\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    p_chksum = pkt.copy()\n+    if good_checksum:\n+        for _i in [_index_oip, _index_ol4, _index_ip, _index_l4]:\n+            if not _i is None and \"chksum\" in p_chksum[_i].fieldtype:\n+                p_chksum[_i].chksum = None\n+    return (\n+        get_packet_checksum_of(p_chksum, _index_oip),\n+        get_packet_checksum_of(p_chksum, _index_ol4),\n+        get_packet_checksum_of(p_chksum, _index_ip),\n+        get_packet_checksum_of(p_chksum, _index_l4),\n+    )\n+\n+\n+def get_packet_checksum_of_each(\n+    pkt,\n+    good_checksum: bool = False,\n+    support_rx_tunnel: bool = True,\n+) -> tuple:\n+    \"\"\"\n+    Get the checksum list of a packet.\n+    This method calculates the checksums of each layer seperately. This means if an inner layer\n+    has a bad checksum, an outer checksum will be calculated based on the bad inner checksum if\n+    `good_checksum` is set.\n+    If a layer is not found, it will return None for that layer.\n+    pkt: Packet, Scapy Packet object to get the checksum of.\n+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.\n+    \"\"\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    return (\n+        get_packet_checksum_of(pkt, _index_oip, good_checksum=good_checksum),\n+        get_packet_checksum_of(pkt, _index_ol4, good_checksum=good_checksum),\n+        get_packet_checksum_of(pkt, _index_ip, good_checksum=good_checksum),\n+        get_packet_checksum_of(pkt, _index_l4, good_checksum=good_checksum),\n+    )\n+\n+\n+def get_packet_checksum_rx_stat(pkt, support_rx_tunnel: bool = True) -> tuple:\n+    \"\"\"\n+    Get the checksum stats of a packet.\n+    pkt: Packet, Scapy Packet object to get the checksum stats of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    return: tuple, checksum stats of bad outer IP, bad outer L4, bad inner IP, bad inner L4.\n+    \"\"\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = 0, 0, 0, 0\n+    if not _index_oip is None and get_packet_checksum_of(\n+        pkt, _index_oip\n+    ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):\n+        _bad_oip = 1\n+    if not _index_oip is None and get_packet_checksum_of(\n+        pkt, _index_ol4\n+    ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):\n+        _bad_ol4 = 1\n+    if not _index_ip is None and get_packet_checksum_of(\n+        pkt, _index_ip\n+    ) != get_packet_checksum_of(pkt, _index_ip, good_checksum=True):\n+        _bad_ip = 1\n+    if not _index_l4 is None and get_packet_checksum_of(\n+        pkt, _index_l4\n+    ) != get_packet_checksum_of(pkt, _index_l4, good_checksum=True):\n+        _bad_l4 = 1\n+    return _bad_oip, _bad_ol4, _bad_ip, _bad_l4\n+\n+\n+def get_packet_checksum_rx_olflag(pkt, support_rx_tunnel: bool = True) -> list:\n+    \"\"\"\n+    Get the checksum RX offload flags of a packet.\n+    pkt: Packet, Scapy Packet object to get the checksum RX offload flag of.\n+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /\n+    L4 layers will be considered the only L3 / L4 layers.\n+    return: list, set of offload flags formated in a list of str.\n+    \"\"\"\n+    _FLAG_PAT = \"RTE_MBUF_F_RX_%s_CKSUM_%s\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    _flags = []\n+    if not _index_oip is None and get_packet_checksum_of(\n+        pkt, _index_oip\n+    ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):\n+        _flags.append(_FLAG_PAT % (\"OUTER_IP\", \"BAD\"))\n+    else:\n+        ## OUTER_IP_CKSUM_GOOD is default not shown\n+        pass\n+    if not _index_ol4 is None and get_packet_checksum_of(\n+        pkt, _index_ol4\n+    ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):\n+        _flags.append(_FLAG_PAT % (\"OUTER_L4\", \"BAD\"))\n+    elif not _index_ol4 is None:\n+        _flags.append(_FLAG_PAT % (\"OUTER_L4\", \"GOOD\"))\n+    elif _index_ol4 is None:\n+        pass\n+    else:\n+        pass\n+    if get_packet_checksum_of(pkt, _index_ip) != get_packet_checksum_of(\n+        pkt, _index_ip, good_checksum=True\n+    ):\n+        _flags.append(_FLAG_PAT % (\"IP\", \"BAD\"))\n+    else:\n+        _flags.append(_FLAG_PAT % (\"IP\", \"GOOD\"))\n+    if get_packet_checksum_of(pkt, _index_l4) != get_packet_checksum_of(\n+        pkt, _index_l4, good_checksum=True\n+    ):\n+        _flags.append(_FLAG_PAT % (\"L4\", \"BAD\"))\n+    else:\n+        _flags.append(_FLAG_PAT % (\"L4\", \"GOOD\"))\n+    return _flags\n+\n+\n+def get_packets_payload_pre_seg_list(\n+    pkts: list,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> list:\n+    \"\"\"\n+    Get the payload of a seq of packets before considering it is segmented.\n+    \"\"\"\n+    return [\n+        get_packet_payload_pre_tso_seg(\n+            _p,\n+            support_rx_tunnel=support_rx_tunnel,\n+            support_seg_non_tunnel=support_seg_non_tunnel,\n+            support_seg_tunnel=support_seg_tunnel,\n+            support_tso=support_tso,\n+            support_ufo=support_ufo,\n+        )\n+        for _p in pkts\n+    ]\n+\n+\n+def get_packets_payload_post_seg_list(\n+    pkts: list,\n+    seg_len: int = 800,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> list:\n+    \"\"\"\n+    Get the list of payloads of a seq of packets after it is segmented.\n+    \"\"\"\n+    return [\n+        get_packet_payload_post_tso_seg(\n+            _p,\n+            seg_len=seg_len,\n+            support_rx_tunnel=support_rx_tunnel,\n+            support_seg_non_tunnel=support_seg_non_tunnel,\n+            support_seg_tunnel=support_seg_tunnel,\n+            support_tso=support_tso,\n+            support_ufo=support_ufo,\n+        )\n+        for _p in pkts\n+    ]\n+\n+\n+def get_packets_checksum_rx_stat_list(\n+    pkts: list, support_rx_tunnel: bool = True\n+) -> list:\n+    \"\"\"\n+    Get the checksum stats of a set of packet.\n+    \"\"\"\n+    k_bad_ip = \"Bad-ipcsum\"\n+    k_bad_l4 = \"Bad-l4csum\"\n+    k_bad_oip = \"Bad-outer-ipcsum\"\n+    k_bad_ol4 = \"Bad-outer-l4csum\"\n+    k_rx = \"RX-total\"\n+    stats = {\n+        k_bad_ip: 0,\n+        k_bad_l4: 0,\n+        k_bad_oip: 0,\n+        k_bad_ol4: 0,\n+        k_rx: 0,\n+    }\n+    for pkt in pkts:\n+        _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = get_packet_checksum_rx_stat(\n+            pkt, support_rx_tunnel=support_rx_tunnel\n+        )\n+        stats[k_bad_oip] += _bad_oip\n+        stats[k_bad_ol4] += _bad_ol4\n+        stats[k_bad_ip] += _bad_ip\n+        stats[k_bad_l4] += _bad_l4\n+        stats[k_rx] += 1\n+    return stats\n+\n+\n+def get_packets_tso_seg_tx_flag_list(\n+    pkts: list,\n+    seg_len: int = 800,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> list:\n+    \"\"\"\n+    Get the list of TSO TX offload flags of a seq of packet.\n+    \"\"\"\n+    flag_list = []\n+    for pkt in pkts:\n+        flag_list.append(\n+            get_packet_tso_seg_tx_flag(\n+                pkt,\n+                seg_len=seg_len,\n+                support_rx_tunnel=support_rx_tunnel,\n+                support_seg_non_tunnel=support_seg_non_tunnel,\n+                support_seg_tunnel=support_seg_tunnel,\n+                support_tso=support_tso,\n+                support_ufo=support_ufo,\n+            )\n+        )\n+    return flag_list\n+\n+\n+def get_packets_checksum_rx_olflag_list(\n+    pkts: list, support_rx_tunnel: bool = True\n+) -> list:\n+    \"\"\"\n+    Get the checksum RX offload flags of a seq of packets.\n+    \"\"\"\n+    flag_list = []\n+    for pkt in pkts:\n+        flag_list.append(\n+            get_packet_checksum_rx_olflag(pkt, support_rx_tunnel=support_rx_tunnel)\n+        )\n+    return flag_list\n+\n+\n+def check_packet_segment(\n+    pkt,\n+    segments: list,\n+    seg_len: int,\n+    support_rx_tunnel: bool = True,\n+    support_seg_non_tunnel: bool = True,\n+    support_seg_tunnel: bool = True,\n+    support_tso: bool = True,\n+    support_ufo: bool = True,\n+) -> bool:\n+    \"\"\"\n+    Check the packet segmentation.\n+    \"\"\"\n+    payload_expected = get_packet_payload_pre_tso_seg(\n+        pkt,\n+        support_rx_tunnel=support_rx_tunnel,\n+        support_seg_non_tunnel=support_seg_non_tunnel,\n+        support_seg_tunnel=support_seg_tunnel,\n+        support_tso=support_tso,\n+        support_ufo=support_ufo,\n+    )\n+    segments_expected = get_packet_payload_post_tso_seg(\n+        pkt,\n+        seg_len=seg_len,\n+        support_rx_tunnel=support_rx_tunnel,\n+        support_seg_non_tunnel=support_seg_non_tunnel,\n+        support_seg_tunnel=support_seg_tunnel,\n+        support_tso=support_tso,\n+        support_ufo=support_ufo,\n+    )\n+    if not segments:\n+        return False\n+    if segments == segments_expected:\n+        return True\n+    else:\n+        payload_segmented = b\"\"\n+        for _i, _seg in enumerate(segments, 1):\n+            if len(segments_expected) > 1 and len(_seg) > seg_len:\n+                return False\n+            payload_segmented += _seg\n+        return payload_expected == payload_segmented\n+\n+\n+def check_packet_checksum_of(pkt: Packet, layer: int, checksum_ref: int = None) -> bool:\n+    \"\"\"\n+    Check the packet checksum of a specific layer indicated by `layer`.\n+    \"\"\"\n+    if checksum_ref is None:\n+        checksum_ref = get_packet_checksum_of(pkt, layer, good_checksum=True)\n+    return get_packet_checksum_of(pkt, layer) == checksum_ref\n+\n+\n+def check_packet_checksums(\n+    pkt,\n+    checksum_ref_list: list = None,\n+    support_rx_tunnel: bool = True,\n+) -> bool:\n+    \"\"\"\n+    Check the packet checksum.\n+    pkt: checked packet;\n+    checksum_ref_list: refering checksum list, recalculate the whole packet for correct checksum in default;\n+    return: if this packet is in good checksum.\n+    \"\"\"\n+    if checksum_ref_list is None:\n+        checksum_ref_list = get_packet_checksums(\n+            pkt, support_rx_tunnel=support_rx_tunnel, good_checksum=True\n+        )\n+    return (\n+        get_packet_checksums(pkt, support_rx_tunnel=support_rx_tunnel)\n+        == checksum_ref_list\n+    )\n+\n+\n+def check_packet_checksum_of_each(\n+    pkt,\n+    checksum_ref_list: list = None,\n+    support_rx_tunnel: bool = True,\n+    allow_zero_outer_ip: bool = False,\n+    allow_zero_outer_udp: bool = False,\n+    allow_zero_inner_ip: bool = False,\n+    allow_zero_inner_udp: bool = False,\n+    allow_zero_inner_tcp: bool = False,\n+    allow_zero_inner_sctp: bool = False,\n+) -> tuple:\n+    \"\"\"\n+    Check the packet checksum of each layer.\n+    pkt: checked packet;\n+    checksum_ref_list: refering checksum list, recalculate each layer seperately for correct checksum in default;\n+    allow_zero_*: allowing zero checksum validated passed on specific layer;\n+    return: if this packet is validated passed on outer-ip, outer-l4, inner-ip, inner-l4 layer checksum.\n+    \"\"\"\n+    (\n+        _index_oip,\n+        _index_ol4,\n+        _index_tunnel,\n+        _index_ip,\n+        _index_l4,\n+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(\n+        pkt, support_rx_tunnel=support_rx_tunnel\n+    )\n+    if checksum_ref_list and len(checksum_ref_list) == 2:\n+        (_checksum_ref_ip, _checksum_ref_l4) = (\n+            checksum_ref_list[0],\n+            checksum_ref_list[1],\n+        )\n+        return (\n+            True,\n+            True,\n+            check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),\n+            check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),\n+        )\n+    elif checksum_ref_list and len(checksum_ref_list) == 4:\n+        (_checksum_ref_oip, _checksum_ref_ol4, _checksum_ref_ip, _checksum_ref_l4) = (\n+            checksum_ref_list[0],\n+            checksum_ref_list[1],\n+            checksum_ref_list[2],\n+            checksum_ref_list[3],\n+        )\n+        return (\n+            check_packet_checksum_of(pkt, _index_oip, _checksum_ref_oip),\n+            check_packet_checksum_of(pkt, _index_ol4, _checksum_ref_ol4),\n+            check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),\n+            check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),\n+        )\n+    elif checksum_ref_list:\n+        raise ValueError(\"Wrong checksum ref list length.\")\n+    else:\n+        _verified_l4 = (\n+            _index_l4 is None\n+            or check_packet_checksum_of(pkt, _index_l4)\n+            or (\n+                not _index_l4 is None\n+                and allow_zero_inner_udp\n+                and isinstance(pkt[_index_l4], UDP)\n+                and check_packet_checksum_of(pkt, _index_l4, 0x00)\n+            )\n+            or (\n+                not _index_l4 is None\n+                and allow_zero_inner_tcp\n+                and isinstance(pkt[_index_l4], TCP)\n+                and check_packet_checksum_of(pkt, _index_l4, 0x00)\n+            )\n+            or (\n+                not _index_l4 is None\n+                and allow_zero_inner_sctp\n+                and isinstance(pkt[_index_l4], SCTP)\n+                and check_packet_checksum_of(pkt, _index_l4, 0x0000)\n+            )\n+        )\n+        _verified_ip = (\n+            _index_ip is None\n+            or check_packet_checksum_of(pkt, _index_ip)\n+            or (\n+                not _index_ip is None\n+                and allow_zero_inner_ip\n+                and isinstance(pkt[_index_ip], IP)\n+                and check_packet_checksum_of(pkt, _index_ip, 0x00)\n+            )\n+        )\n+        _verified_ol4 = (\n+            _index_ol4 is None\n+            or check_packet_checksum_of(pkt, _index_ol4)\n+            or (\n+                not _index_ol4 is None\n+                and allow_zero_outer_udp\n+                and isinstance(pkt[_index_ol4], UDP)\n+                and get_packet_checksum_of(pkt, _index_ol4) == 0x00\n+            )\n+        )\n+        _verified_oip = (\n+            _index_oip is None\n+            or check_packet_checksum_of(pkt, _index_oip)\n+            or (\n+                not _index_oip is None\n+                and allow_zero_outer_ip\n+                and isinstance(pkt[_index_oip], IP)\n+                and get_packet_checksum_of(pkt, _index_oip) == 0x00\n+            )\n+        )\n+        return _verified_oip, _verified_ol4, _verified_ip, _verified_l4\n+\n+\n+def check_packet_verbose(verbose: str, verbose_ref_list: list) -> bool:\n+    \"\"\"\n+    Check the verbose with the reference list.\n+    \"\"\"\n+    for _ref in verbose_ref_list:\n+        if not _ref in verbose:\n+            return False\n+    return True\n+\n+\n ###############################################################################\n ###############################################################################\n if __name__ == \"__main__\":\n",
    "prefixes": [
        "V1",
        "2/4"
    ]
}