@@ -34,35 +34,54 @@ Base on scapy(python program for packet manipulation)
"""
from socket import AF_INET6
+from importlib import import_module
from scapy.all import *
-from scapy.layers.sctp import SCTP, SCTPChunkData
-
# load extension layers
exec_file = os.path.realpath(__file__)
DTS_PATH = exec_file.replace('/framework/packet.py', '')
# exec_file might be .pyc file, if so, remove 'c'.
TMP_PATH = DTS_PATH[:-1] + '/output/tmp/pcap/' if exec_file.endswith('.pyc') else DTS_PATH + '/output/tmp/pcap/'
-
if not os.path.exists(TMP_PATH):
os.system('mkdir -p %s' % TMP_PATH)
+
DEP_FOLDER = DTS_PATH + '/dep'
sys.path.append(DEP_FOLDER)
-
-from vxlan import VXLAN
-from nvgre import NVGRE, IPPROTO_NVGRE
-from scapy.contrib.gtp import *
-from lldp import LLDP, LLDPManagementAddress
-from Dot1BR import Dot1BR
-from nsh import NSH
-from mpls import MPLS
-from igmp import IGMP
-from pfcp import PFCP
+sys.path.append(DEP_FOLDER + '/scapy_modules')
from utils import convert_ip2int
from utils import convert_int2ip
-# for saving command history
-from utils import get_backtrace_object
+scapy_modules_required = {'nvgre': ['NVGRE', 'IPPROTO_NVGRE'],
+ 'gtp': ['GTP_U_Header', 'GTP_PDUSession_ExtensionHeader'],
+ 'lldp': ['LLDP', 'LLDPManagementAddress'], 'Dot1BR': ['Dot1BR'], 'pfcp': ['PFCP'],
+ 'nsh': ['NSH'], 'igmp': ['IGMP'], 'mpls': ['MPLS'], 'sctp': ['SCTP', 'SCTPChunkData'], 'vxlan': ['VXLAN']}
+local_modules = [m[:-3] for m in os.listdir(DEP_FOLDER + '/scapy_modules') if (m.endswith('.py') and not m.startswith('__'))]
+
+for m in scapy_modules_required:
+ if m in local_modules:
+ module = import_module(m)
+ for clazz in scapy_modules_required[m]:
+ locals().update({clazz: getattr(module, clazz)})
+ else:
+ if m == 'sctp':
+ module = import_module(f'scapy.layers.{m}')
+ for clazz in scapy_modules_required[m]:
+ locals().update({clazz: getattr(module, clazz)})
+ else:
+ module = import_module(f'scapy.contrib.{m}')
+ for clazz in scapy_modules_required[m]:
+ locals().update({clazz: getattr(module, clazz)})
+
+def get_scapy_module_impcmd():
+ cmd_li = list()
+ for m in scapy_modules_required:
+ if m in local_modules:
+ cmd_li.append(f'from {m} import {",".join(scapy_modules_required[m])}')
+ else:
+ cmd_li.append(f'from scapy.contirb.{m} import {",".join(scapy_modules_required[m])}')
+ return ';'.join(cmd_li)
+
+SCAPY_IMP_CMD = get_scapy_module_impcmd()
# packet generator type should be configured later
PACKETGEN = "scapy"
@@ -139,7 +158,7 @@ class scapy(object):
def __init__(self):
self.pkt = None
- self.pkts = []
+ self.pkts = list()
def append_pkts(self):
self.pkts.append(self.pkt)
@@ -329,7 +348,7 @@ class scapy(object):
pkt_layer.vni = vni
def nsh(self, pkt_layer, ver=0, oam=0, critical=0, reserved=0, len=0, mdtype=1, nextproto=3,
- nsp=0x0, nsi=1, npc= 0x0, nsc= 0x0, spc= 0x0, ssc= 0x0):
+ nsp=0x0, nsi=1, npc=0x0, nsc=0x0, spc=0x0, ssc=0x0):
pkt_layer.Ver = ver
pkt_layer.OAM = oam
pkt_layer.Critical = critical
@@ -354,7 +373,6 @@ class scapy(object):
class Packet(object):
-
"""
Module for config/create packet
Based on scapy module
@@ -469,8 +487,7 @@ class Packet(object):
layer_li = [re.sub('\(.*?\)', '', i) for i in scapy_str.split('/')]
self.pkt_type = '_'.join(layer_li)
self._load_pkt_layers()
- pkt = eval(scapy_str)
- self.pktgen.assign_pkt(pkt)
+ self.pktgen.assign_pkt(scapy_str)
def append_pkt(self, args=None, **kwargs):
"""
@@ -641,67 +658,58 @@ class Packet(object):
self.pktgen.pkts.append(i)
return p
- def _send_pkt(self, crb, tx_port='', count=1, send_bg=False, loop=0, inter=0, timeout=15):
+ def send_pkt_bg_with_pcapfile(self, crb, tx_port='', count=1, loop=0, inter=0):
"""
-
+ send packet background with a pcap file, got an advantage in sending a large number of packets
:param crb: session or crb object
:param tx_port: ether to send packet
:param count: send times
- :param send_bg: send packet background
:param loop: send packet in a loop
- :param inter: interval time
- :return: None
+ :param inter: interval time per packet
+ :return: send session
"""
- # save pkts to local pcap file, then copy to remote tester tmp directory
-
- time_stamp = str(time.time())
- pcap_file = 'scapy_{}.pcap'.format(tx_port) + time_stamp
- self.save_pcapfile(crb, pcap_file)
- scapy_cmd = 'scapy_{}.cmd'.format(tx_port) + time_stamp
- cmd_str = 'from scapy.all import *\np=rdpcap("%s")\nprint("packet ready for sending...")\nfor i in p:\n\tprint(i.command())\nsendp(p, iface="%s", count=%d, loop=%d, inter=%0.3f, verbose=False)' % (
- crb.tmp_file + pcap_file, tx_port, count, loop, inter)
- # write send cmd file to local tmp directory then copy to remote tester tmp folder
- with open(TMP_PATH + scapy_cmd, 'w') as f:
- f.write(cmd_str)
- crb.session.copy_file_to(TMP_PATH + scapy_cmd, crb.tmp_file)
-
- if send_bg: # if send_bg create a new session to execute send action
- session_prefix = 'scapy_bg_session'
- scapy_session = crb.create_session(session_prefix + time_stamp)
- scapy_session.send_command('python3 %s' % crb.tmp_file + scapy_cmd)
+ if crb.name != 'tester':
+ raise Exception('crb should be tester')
+ wrpcap('_', self.pktgen.pkts)
+ file_path = '/tmp/%s.pcap' % tx_port
+ scapy_session_bg = crb.prepare_scapy_env()
+ scapy_session_bg.copy_file_to('_', file_path)
+ scapy_session_bg.send_expect('pkts = rdpcap("%s")' % file_path, '>>> ')
+ scapy_session_bg.send_command('sendp(pkts, iface="%s",count=%s,loop=%s,inter=%s)' % (tx_port, count, loop, inter))
+ return scapy_session_bg
+
+ def _recompose_pkts_str(self, pkts_str):
+ method_pattern = re.compile('<.+?>')
+ method_li = method_pattern.findall(pkts_str)
+ for i in method_li:
+ pkts_str = method_pattern.sub(i.strip('<>')+'()', pkts_str, count=1)
+ return pkts_str
+
+ def send_pkt(self, crb, tx_port='', count=1, interval=0, timeout=120):
+ p_str = '[' + ','.join([p.command() if not isinstance(p, str) else p for p in self.pktgen.pkts]) + ']'
+ pkts_str = self._recompose_pkts_str(pkts_str=p_str)
+ cmd = 'sendp(' + pkts_str + f',iface="{tx_port}",count={count},inter={interval},verbose=False)'
+ if crb.name == 'tester':
+ crb.scapy_session.send_expect(cmd, '>>> ', timeout=timeout)
+ elif crb.name.startswith("tester_scapy"):
+ crb.send_expect(cmd, '>>> ', timeout=timeout)
else:
- crb.send_expect('python3 %s' % crb.tmp_file + scapy_cmd, '# ', timeout=timeout)
- return crb.tmp_file + scapy_cmd
-
- def send_pkt(self, crb, tx_port='', count=1, interval=0, timeout=15):
- self._send_pkt(crb, tx_port, count, inter=interval, timeout=timeout)
-
- def send_pkt_bg(self, crb, tx_port='', count=-1, loop=1, interval=0, timeout=3):
- return self._send_pkt(crb, tx_port=tx_port, count=count, send_bg=True, loop=loop, inter=interval,
- timeout=timeout)
-
- def stop_send_pkt_bg(self, crb, filenames):
+ raise Exception("crb should be tester\'s session and initialized")
+
+ def send_pkt_bg(self, crb, tx_port='', count=-1, interval=0, loop=1):
+ if crb.name != 'tester':
+ raise Exception('crb should be tester')
+ scapy_session_bg = crb.prepare_scapy_env()
+ p_str = '[' + ','.join([p.command() if not isinstance(p, str) else p for p in self.pktgen.pkts]) + ']'
+ pkts_str = self._recompose_pkts_str(pkts_str=p_str)
+ cmd = 'sendp(' + pkts_str + f',iface="{tx_port}",count={count},inter={interval},loop={loop},verbose=False)'
+ scapy_session_bg.send_command(cmd)
+ return scapy_session_bg
+
+ @staticmethod
+ def stop_send_pkt_bg(session):
# stop sending action
- pids = []
- if isinstance(filenames, list):
- for file in filenames:
- out = crb.send_expect('ps -ef |grep %s|grep -v grep' % file, expected='# ')
- try:
- pids.append(re.search('\d+', out).group())
- except AttributeError as e:
- print((e, ' :%s not killed' % file))
- else:
- out = crb.send_expect('ps -ef |grep %s|grep -v grep' % filenames, expected='# ')
- try:
- pids.append(re.search('\d+', out).group())
- except AttributeError as e:
- print((e, ' :%s not killed' % filenames))
- pid = ' '.join(pids)
- if pid:
- crb.send_expect('kill -9 %s' % pid, expected='# ')
- for i in crb.sessions:
- if i.name.startswith('scapy_bg_session'):
- crb.destroy_session(i)
+ session.send_expect('^C', '>>> ')
def check_layer_config(self):
"""
@@ -835,7 +843,8 @@ class Packet(object):
if 'inner' in layer:
layer = layer[6:]
-
+ if isinstance(self.pktgen.pkt, str):
+ raise Exception('string type packet not support config layer')
pkt_layer = self.pktgen.pkt.getlayer(idx)
layer_conf = getattr(self.pktgen, layer)
setattr(self, 'configured_layer_%s' % layer, True)
@@ -1081,10 +1090,8 @@ def strip_pktload(pkt=None, layer="L2", p_index=0):
###############################################################################
###############################################################################
if __name__ == "__main__":
-
pkt = Packet('Ether(type=0x894f)/NSH(Len=0x6,NextProto=0x0,NSP=0x000002,NSI=0xff)')
sendp(pkt, iface='lo')
- pkt.config_layer('ipv4', {'dst': '192.168.8.8'})
pkt.append_pkt(pkt_type='IPv6_TCP', pkt_len=100)
pkt.append_pkt(pkt_type='TCP', pkt_len=100)
pkt.config_layer('tcp', config={'flags': 'A'})