From: "Dong,JunX" <junx.dong@intel.com>
scapy required modules contain itself contained modules and dts self
contained module, dts self contained module that be in dep dir must
copy from platform to tester, so can delay import the modules after
tester copied it
Signed-off-by: Dong,JunX <junx.dong@intel.com>
---
framework/packet.py | 32 +++++++++++++++-----------------
framework/tester.py | 5 +++++
2 files changed, 20 insertions(+), 17 deletions(-)
@@ -35,9 +35,7 @@ Base on scapy(python program for packet manipulation)
from importlib import import_module
from socket import AF_INET6
-
from scapy.all import *
-
from .utils import convert_int2ip, convert_ip2int
# load extension layers
@@ -50,20 +48,24 @@ if not os.path.exists(TMP_PATH):
scapy_modules_required = {'scapy.contrib.gtp': ['GTP_U_Header', 'GTPPDUSessionContainer'],
'scapy.contrib.lldp': ['LLDPDU', 'LLDPDUManagementAddress'],
- 'dep.scapy_modules.Dot1BR': ['Dot1BR'],
+ 'Dot1BR': ['Dot1BR'],
'scapy.contrib.pfcp': ['PFCP'],
'scapy.contrib.nsh': ['NSH'],
'scapy.contrib.igmp': ['IGMP'],
'scapy.contrib.mpls': ['MPLS'],
}
-for m in scapy_modules_required:
- try:
- module = import_module(m)
- for clazz in scapy_modules_required[m]:
- locals().update({clazz: getattr(module, clazz)})
- except Exception as e:
- print(e)
+
+def bind_scapy_modules(_module_path):
+ sys.path.append(_module_path)
+ for _module in scapy_modules_required:
+ try:
+ module = import_module(_module)
+ for clazz in scapy_modules_required[_module]:
+ setattr(sys.modules[__name__], clazz, getattr(module, clazz))
+ except Exception as e:
+ print(e)
+
def get_scapy_module_impcmd():
cmd_li = list()
@@ -71,11 +73,10 @@ def get_scapy_module_impcmd():
cmd_li.append(f'from {m} import {",".join(scapy_modules_required[m])}')
return ';'.join(cmd_li)
-SCAPY_IMP_CMD = get_scapy_module_impcmd()
+SCAPY_IMP_CMD = get_scapy_module_impcmd()
# packet generator type should be configured later
PACKETGEN = "scapy"
-
LayersTypes = {
"L2": ['ether', 'vlan', 'etag', '1588', 'arp', 'lldp', 'mpls', 'nsh'],
# ipv4_ext_unknown, ipv6_ext_unknown
@@ -96,17 +97,15 @@ LayersTypes = {
"INNER L4": ['inner_tcp', 'inner_udp', 'inner_frag', 'inner_sctp', 'inner_icmp', 'inner_nofrag'],
"PAYLOAD": ['raw']
}
-
# Saved background sniff process id
SNIFF_PIDS = {}
-
# Saved packet generator process id
# used in pktgen or tgen
PKTGEN_PIDS = {}
-
# default filter for LLDP packet
LLDP_FILTER = {'layer': 'ether', 'config': {'type': 'not lldp'}}
+
def write_raw_pkt(pkt_str, file_name):
tmp = eval(pkt_str)
tmp = bytearray(bytes(tmp))
@@ -114,6 +113,7 @@ def write_raw_pkt(pkt_str, file_name):
w.write(tmp)
w.close()
+
class scapy(object):
SCAPY_LAYERS = {
'ether': Ether(dst="ff:ff:ff:ff:ff:ff"),
@@ -1117,8 +1117,6 @@ def strip_pktload(pkt=None, layer="L2", p_index=0):
return load
-###############################################################################
-###############################################################################
if __name__ == "__main__":
pkt = Packet('Ether(type=0x894f)/NSH(Len=0x6,NextProto=0x0,NSP=0x000002,NSI=0xff)')
sendp(pkt, iface='lo')
@@ -52,6 +52,7 @@ from .packet import (
start_tcpdump,
stop_and_load_tcpdump_packets,
strip_pktload,
+ bind_scapy_modules
)
from .pktgen import getPacketGenerator
from .settings import (
@@ -120,6 +121,10 @@ class Tester(Crb):
out = session.session.send_expect(SCAPY_IMP_CMD, '>>> ')
if 'ImportError' in out:
session.logger.warning(f'entering import error: {out}')
+
+ # bind scapy modules to packet module
+ bind_scapy_modules(self.tmp_scapy_module_dir)
+
return session
def check_scapy_version(self):