@@ -1,4 +1,4 @@
-#BSD LICENSE
+# BSD LICENSE
#
# Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
# All rights reserved.
@@ -42,10 +42,12 @@ from test_case import TestCase
from packet import Packet
import os
+
class TestPortHotPlug(TestCase):
"""
This feature supports igb_uio, vfio-pci and vfio-pci:noiommu now and not support freebsd
"""
+
def set_up_all(self):
"""
Run at the start of each test suite.
@@ -59,59 +61,62 @@ class TestPortHotPlug(TestCase):
self.driver_name = "vfio-pci"
else:
self.driver_name = self.drivername
+ self.path=self.dut.apps_name['test-pmd']
def set_up(self):
"""
Run before each test case.
"""
- self.dut.send_expect("./usertools/dpdk-devbind.py -u %s" % self.dut.ports_info[self.port]['pci'],"#",60)
+ self.dut.send_expect("./usertools/dpdk-devbind.py -u %s" % self.dut.ports_info[self.port]['pci'], "#", 60)
def attach(self, port):
"""
attach port
"""
# dpdk hotplug discern NIC by pci bus and include domid
- self.dut.send_expect("port attach %s" % self.dut.ports_info[port]['pci'],"is attached",60)
- self.dut.send_expect("port start %s" % port,"Configuring Port",120)
+ self.dut.send_expect("port attach %s" % self.dut.ports_info[port]['pci'], "is attached", 60)
+ self.dut.send_expect("port start %s" % port, "Configuring Port", 120)
# sleep 10 seconds for fortville update link stats
time.sleep(10)
- self.dut.send_expect("show port info %s" % port,"testpmd>",60)
+ self.dut.send_expect("show port info %s" % port, "testpmd>", 60)
def detach(self, port):
"""
- detach port
+ detach port
"""
- self.dut.send_expect("port stop %s" % port,"Stopping ports",60)
+ self.dut.send_expect("port stop %s" % port, "Stopping ports", 60)
# sleep 10 seconds for fortville update link stats
time.sleep(10)
- self.dut.send_expect("port detach %s" % port,"is detached",60)
+ self.dut.send_expect("port detach %s" % port, "is detached", 60)
def test_after_attach(self):
"""
first run testpmd after attach port
"""
- cmd = "./%s/app/testpmd -c %s -n %s -- -i" % (self.target,self.coremask,self.dut.get_memory_channels())
- self.dut.send_expect(cmd,"testpmd>",60)
+ cmd = "%s -c %s -n %s -- -i" % (self.path, self.coremask, self.dut.get_memory_channels())
+ self.dut.send_expect(cmd, "testpmd>", 60)
session_secondary = self.dut.new_session()
- session_secondary.send_expect("./usertools/dpdk-devbind.py --bind=%s %s" % (self.driver_name, self.dut.ports_info[self.port]['pci']), "#", 60)
+ session_secondary.send_expect(
+ "./usertools/dpdk-devbind.py --bind=%s %s" % (self.driver_name, self.dut.ports_info[self.port]['pci']), "#",
+ 60)
self.dut.close_session(session_secondary)
self.attach(self.port)
- self.dut.send_expect("start","testpmd>",60)
- self.dut.send_expect("port detach %s" % self.port,"Port not stopped",60)
- self.dut.send_expect("stop","testpmd>",60)
+ self.dut.send_expect("start", "testpmd>", 60)
+ self.dut.send_expect("port detach %s" % self.port, "Port not stopped", 60)
+ self.dut.send_expect("stop", "testpmd>", 60)
self.detach(self.port)
self.attach(self.port)
-
- self.dut.send_expect("start","testpmd>",60)
- self.dut.send_expect("port detach %s" % self.port,"Port not stopped",60)
- self.dut.send_expect("clear port stats %s" % self.port ,"testpmd>",60)
+
+ self.dut.send_expect("start", "testpmd>", 60)
+ self.dut.send_expect("port detach %s" % self.port, "Port not stopped", 60)
+ self.dut.send_expect("clear port stats %s" % self.port, "testpmd>", 60)
self.send_packet(self.port)
- out = self.dut.send_expect("show port stats %s" % self.port ,"testpmd>",60)
- packet = re.search("RX-packets:\s*(\d*)",out)
+ out = self.dut.send_expect("show port stats %s" % self.port, "testpmd>", 60)
+ packet = re.search("RX-packets:\s*(\d*)", out)
sum_packet = packet.group(1)
self.verify(int(sum_packet) == 1, "Insufficient the received package")
- self.dut.send_expect("quit","#",60)
-
+ self.dut.send_expect("quit", "#", 60)
+
def send_packet(self, port):
"""
Send a packet to port
@@ -120,30 +125,32 @@ class TestPortHotPlug(TestCase):
txport = self.tester.get_local_port(port)
self.txItf = self.tester.get_interface(txport)
pkt = Packet(pkt_type='UDP')
- pkt.config_layer('ether', {'dst': self.dmac,})
+ pkt.config_layer('ether', {'dst': self.dmac, })
pkt.send_pkt(self.tester, tx_port=self.txItf)
-
+
def test_before_attach(self):
"""
first attach port after run testpmd
"""
session_secondary = self.dut.new_session()
- session_secondary.send_expect("./usertools/dpdk-devbind.py --bind=%s %s" % (self.driver_name, self.dut.ports_info[self.port]['pci']), "#", 60)
+ session_secondary.send_expect(
+ "./usertools/dpdk-devbind.py --bind=%s %s" % (self.driver_name, self.dut.ports_info[self.port]['pci']), "#",
+ 60)
self.dut.close_session(session_secondary)
- cmd = "./%s/app/testpmd -c %s -n %s -- -i" % (self.target,self.coremask,self.dut.get_memory_channels())
- self.dut.send_expect(cmd,"testpmd>",60)
+ cmd = "%s -c %s -n %s -- -i" % (self.path, self.coremask, self.dut.get_memory_channels())
+ self.dut.send_expect(cmd, "testpmd>", 60)
self.detach(self.port)
self.attach(self.port)
- self.dut.send_expect("start","testpmd>",60)
- self.dut.send_expect("port detach %s" % self.port, "Port not stopped",60)
- self.dut.send_expect("clear port stats %s" % self.port ,"testpmd>",60)
+ self.dut.send_expect("start", "testpmd>", 60)
+ self.dut.send_expect("port detach %s" % self.port, "Port not stopped", 60)
+ self.dut.send_expect("clear port stats %s" % self.port, "testpmd>", 60)
self.send_packet(self.port)
- out = self.dut.send_expect("show port stats %s" % self.port ,"testpmd>",60)
- packet = re.search("RX-packets:\s*(\d*)",out)
+ out = self.dut.send_expect("show port stats %s" % self.port, "testpmd>", 60)
+ packet = re.search("RX-packets:\s*(\d*)", out)
sum_packet = packet.group(1)
self.verify(int(sum_packet) == 1, "Insufficient the received package")
- self.dut.send_expect("quit","#",60)
+ self.dut.send_expect("quit", "#", 60)
def test_port_detach_attach_for_vhost_user_virtio_user(self):
vdev = "eth_vhost0,iface=vhost-net,queues=1"
@@ -154,25 +161,27 @@ class TestPortHotPlug(TestCase):
cores = self.dut.get_core_list("all")
self.verify(len(cores) > 8, "insufficient cores for this case")
eal_param = self.dut.create_eal_parameters(no_pci=True, cores=cores[1:5], vdevs=[vdev], prefix="vhost")
- testpmd_cmd = "./%s/app/testpmd " % self.target + eal_param + ' -- -i'
+ testpmd_cmd = "%s " % self.path + eal_param + ' -- -i'
self.dut.send_expect(testpmd_cmd, "testpmd>", timeout=60)
self.dut.send_expect("port stop 0", "testpmd>")
self.dut.send_expect("port detach 0", "Device is detached")
stats = self.dut.send_expect("ls %s" % path, "#", timeout=3,
- alt_session=True, verify=True)
- self.verify(stats==2, 'port detach failed')
+ alt_session=True, verify=True)
+ self.verify(stats == 2, 'port detach failed')
time.sleep(1)
self.dut.send_expect("port attach eth_vhost1,iface=%s,queues=1" % iface, "Port 0 is attached.")
self.dut.send_expect("port start 0", "testpmd>")
out = self.dut.send_expect("ls %s" % path, "#", timeout=3,
alt_session=True, verify=True)
- self.verify(iface in out , 'port attach failed')
+ self.verify(iface in out, 'port attach failed')
self.session2 = self.dut.create_session(name="virtio_user")
eal_param = self.dut.create_eal_parameters(no_pci=True, fixed_prefix="virtio1", cores=cores[5:9])
- testpmd_cmd2 = "%s/%s/app/testpmd " % (self.dut.base_dir,self.target) + eal_param + ' -- -i'
+ testpmd_cmd2 = "%s " % (self.path) + eal_param + ' -- -i'
self.session2.send_expect(testpmd_cmd2, "testpmd>", timeout=60)
- self.session2.send_expect("port attach net_virtio_user1,mac=00:01:02:03:04:05,path=%s,queues=1,packed_vq=1,mrg_rxbuf=1,in_order=0" % path, "testpmd")
+ self.session2.send_expect(
+ "port attach net_virtio_user1,mac=00:01:02:03:04:05,path=%s,queues=1,packed_vq=1,mrg_rxbuf=1,in_order=0" % path,
+ "testpmd")
self.session2.send_expect("port start 0", "testpmd>", timeout=60)
out = self.dut.send_expect("ls %s" % path, "#", timeout=3,
alt_session=True, verify=True)
@@ -197,9 +206,10 @@ class TestPortHotPlug(TestCase):
Run after each test case.
"""
self.dut.kill_all()
- self.dut.send_expect("./usertools/dpdk-devbind.py --bind=%s %s" % (self.driver_name, self.dut.ports_info[self.port]['pci']), "#", 60)
+ self.dut.send_expect(
+ "./usertools/dpdk-devbind.py --bind=%s %s" % (self.driver_name, self.dut.ports_info[self.port]['pci']), "#",
+ 60)
time.sleep(2)
-
def tear_down_all(self):
"""