@@ -35,9 +35,7 @@ Base on scapy(python program for packet manipulation)
from importlib import import_module
from socket import AF_INET6
-
from scapy.all import *
-
from .utils import convert_int2ip, convert_ip2int
# load extension layers
@@ -50,20 +48,24 @@ if not os.path.exists(TMP_PATH):
scapy_modules_required = {'scapy.contrib.gtp': ['GTP_U_Header', 'GTPPDUSessionContainer'],
'scapy.contrib.lldp': ['LLDPDU', 'LLDPDUManagementAddress'],
- 'dep.scapy_modules.Dot1BR': ['Dot1BR'],
+ 'Dot1BR': ['Dot1BR'],
'scapy.contrib.pfcp': ['PFCP'],
'scapy.contrib.nsh': ['NSH'],
'scapy.contrib.igmp': ['IGMP'],
'scapy.contrib.mpls': ['MPLS'],
}
-for m in scapy_modules_required:
- try:
- module = import_module(m)
- for clazz in scapy_modules_required[m]:
- locals().update({clazz: getattr(module, clazz)})
- except Exception as e:
- print(e)
+
+def bind_scapy_modules(_module_path):
+ sys.path.append(_module_path)
+ for _module in scapy_modules_required:
+ try:
+ module = import_module(_module)
+ for clazz in scapy_modules_required[_module]:
+ setattr(sys.modules[__name__], clazz, getattr(module, clazz))
+ except Exception as e:
+ print(e)
+
def get_scapy_module_impcmd():
cmd_li = list()
@@ -71,11 +73,10 @@ def get_scapy_module_impcmd():
cmd_li.append(f'from {m} import {",".join(scapy_modules_required[m])}')
return ';'.join(cmd_li)
-SCAPY_IMP_CMD = get_scapy_module_impcmd()
+SCAPY_IMP_CMD = get_scapy_module_impcmd()
# packet generator type should be configured later
PACKETGEN = "scapy"
-
LayersTypes = {
"L2": ['ether', 'vlan', 'etag', '1588', 'arp', 'lldp', 'mpls', 'nsh'],
# ipv4_ext_unknown, ipv6_ext_unknown
@@ -96,17 +97,15 @@ LayersTypes = {
"INNER L4": ['inner_tcp', 'inner_udp', 'inner_frag', 'inner_sctp', 'inner_icmp', 'inner_nofrag'],
"PAYLOAD": ['raw']
}
-
# Saved background sniff process id
SNIFF_PIDS = {}
-
# Saved packet generator process id
# used in pktgen or tgen
PKTGEN_PIDS = {}
-
# default filter for LLDP packet
LLDP_FILTER = {'layer': 'ether', 'config': {'type': 'not lldp'}}
+
def write_raw_pkt(pkt_str, file_name):
tmp = eval(pkt_str)
tmp = bytearray(bytes(tmp))
@@ -114,6 +113,7 @@ def write_raw_pkt(pkt_str, file_name):
w.write(tmp)
w.close()
+
class scapy(object):
SCAPY_LAYERS = {
'ether': Ether(dst="ff:ff:ff:ff:ff:ff"),
@@ -1117,8 +1117,6 @@ def strip_pktload(pkt=None, layer="L2", p_index=0):
return load
-###############################################################################
-###############################################################################
if __name__ == "__main__":
pkt = Packet('Ether(type=0x894f)/NSH(Len=0x6,NextProto=0x0,NSP=0x000002,NSI=0xff)')
sendp(pkt, iface='lo')
@@ -52,6 +52,7 @@ from .packet import (
start_tcpdump,
stop_and_load_tcpdump_packets,
strip_pktload,
+ bind_scapy_modules
)
from .pktgen import getPacketGenerator
from .settings import (
@@ -106,7 +107,11 @@ class Tester(Crb):
session_name = 'tester_scapy' if not self.scapy_sessions_li else f'tester_scapy_{random.random()}'
session = self.create_session(session_name)
self.scapy_sessions_li.append(session)
+
+ # launch scapy
session.send_expect('scapy', '>>> ')
+
+ # copy scapy dep modules in dts to tester
file_dir = os.path.dirname(__file__).split(os.path.sep)
lib_path = os.path.sep.join(file_dir[:-1]) + '/dep/scapy_modules/'
exists_flag = self.alt_session.session.send_expect(f'ls {self.tmp_scapy_module_dir}', '# ', verify=True)
@@ -115,11 +120,16 @@ class Tester(Crb):
scapy_modules_path = [lib_path+i for i in os.listdir(lib_path) if i.endswith('.py')]
path = ' '.join(scapy_modules_path)
session.copy_file_to(src=path, dst=self.tmp_scapy_module_dir)
- session.session.send_expect(f"sys.path.append('{self.tmp_scapy_module_dir}')", ">>> ")
+ # bind scapy modules to packet module
+ bind_scapy_modules(self.tmp_scapy_module_dir)
+
+ # import scapy required modules to scapy env
+ session.session.send_expect(f"sys.path.append('{self.tmp_scapy_module_dir}')", ">>> ")
out = session.session.send_expect(SCAPY_IMP_CMD, '>>> ')
if 'ImportError' in out:
session.logger.warning(f'entering import error: {out}')
+
return session
def check_scapy_version(self):