new file mode 100644
@@ -0,0 +1,219 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 Intel Corporation
+#
+
+import copy
+import os
+import re
+import time
+
+from framework.packet import Packet
+from framework.pmd_output import PmdOutput
+from framework.test_case import TestCase
+from framework.utils import GREEN, RED
+
+tv_packets_basic = {
+ "tv_mac": 'Ether(dst="68:05:CA:C1:BA:28")/("X"*480)',
+ "tv_mac_ipv4": 'Ether(dst="68:05:CA:C1:BA:28")/IP(src="192.168.0.2",dst="192.168.0.3")/("X"*480)',
+ "tv_mac_ipv6": 'Ether(dst="68:05:CA:C1:BA:28")/IPv6(src="2001::2", dst="2001::3")/("X"*480)',
+ "tv_mac_ipv4_udp": 'Ether(dst="68:05:CA:C1:BA:28")/IP(src="192.168.0.2", dst="192.168.0.3")/UDP(sport=1026, dport=1027)/("X"*480)',
+ "tv_mac_ipv6_udp": 'Ether(dst="68:05:CA:C1:BA:28")/IPv6(src="2001::2", dst="2001::3")/UDP(sport=1026, dport=1027)/("X"*480)',
+ "tv_mac_ipv4_tcp": 'Ether(dst="68:05:CA:C1:BA:28")/IP(src="192.168.0.2", dst="192.168.0.3")/TCP(sport=1026, dport=1027)/("X"*480)',
+ "tv_mac_ipv6_tcp": 'Ether(dst="68:05:CA:C1:BA:28")/IPv6(src="2001::2", dst="2001::3")/TCP(sport=1026, dport=1027)/("X"*480)',
+ "tv_mac_ipv4_sctp": 'Ether(dst="68:05:CA:C1:BA:28")/IP(src="192.168.0.2", dst="192.168.0.3")/SCTP(sport=1026, dport=1027)/("X"*480)',
+ "tv_mac_ipv6_sctp": 'Ether(dst="68:05:CA:C1:BA:28")/IPv6(src="2001::2", dst="2001::3")/SCTP(sport=1026, dport=1027)/("X"*480)',
+}
+
+command_line_option_with_timestamp = {
+ "casename": "command_line_option_with_timestamp",
+ "port_id": 0,
+ "test": [
+ {
+ "send_packet": tv_packets_basic["tv_mac"],
+ "action": {"check_timestamp": "ether"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv4"],
+ "action": {"check_timestamp": "ipv4"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv6"],
+ "action": {"check_timestamp": "ipv6"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv4_udp"],
+ "action": {"check_timestamp": "ipv4-udp"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv6_udp"],
+ "action": {"check_timestamp": "ipv6-udp"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv4_tcp"],
+ "action": {"check_timestamp": "ipv4-tcp"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv6_tcp"],
+ "action": {"check_timestamp": "ipv6-tcp"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv4_sctp"],
+ "action": {"check_timestamp": "ipv4-sctp"},
+ },
+ {
+ "send_packet": tv_packets_basic["tv_mac_ipv6_sctp"],
+ "action": {"check_timestamp": "ipv6-sctp"},
+ },
+ ],
+}
+
+
+class TimestampConfigureTest(TestCase):
+ def set_up_all(self):
+ """
+ Run at the start of each test suite.
+ Generic filter Prerequistites
+ """
+ self.verify(
+ self.nic in ["ICE_25G-E810C_SFP", "ICE_100G-E810C_QSFP"],
+ "%s nic not support timestamp" % self.nic,
+ )
+ self.dut_ports = self.dut.get_ports(self.nic)
+ # Verify that enough ports are available
+ self.verify(len(self.dut_ports) >= 1, "Insufficient ports")
+ self.tester_port0 = self.tester.get_local_port(self.dut_ports[0])
+ self.tester_iface0 = self.tester.get_interface(self.tester_port0)
+ self.dut.send_expect("ifconfig %s up" % self.tester_iface0, "# ")
+ self.pkt = Packet()
+ self.pmdout = PmdOutput(self.dut)
+
+ self.pf_interface = self.dut.ports_info[self.dut_ports[0]]["intf"]
+ self.pf_mac = self.dut.get_mac_address(0)
+ self.pf_pci = self.dut.ports_info[self.dut_ports[0]]["pci"]
+
+ self.current_saved_timestamp = ""
+ self.timestamp_records = {}
+ self.handle_output_methods = {"check_timestamp": self.check_timestamp}
+ self.error_msgs = []
+
+ def set_up(self):
+ """
+ Run before each test case.
+ """
+ pass
+
+ def launch_testpmd(self, line_option=""):
+ self.pmdout.start_testpmd(ports=[self.pf_pci], param=line_option)
+ self.pmdout.execute_cmd("set fwd rxonly")
+ self.pmdout.execute_cmd("set verbose 1")
+ self.pmdout.execute_cmd("start")
+
+ def check_timestamp(self, out, key="", port_id=0):
+ timestamps = self.get_timestamp(out, port_id)
+ if len(timestamps) == 0:
+ self.logger.info("There is no timestamp value")
+ return
+ if len(key) == 0:
+ for item in timestamps:
+ if item <= self.current_saved_timestamp:
+ error_msg = (
+ "timestamp value {} should be larger "
+ "than current saved timestamp {}".format(
+ item, self.current_saved_timestamp
+ )
+ )
+ self.logger.error(error_msg)
+ self.error_msgs.append(error_msg)
+ else:
+ self.timestamp_records[key] = timestamps
+ for item in timestamps:
+ if item == self.timestamp_records[key]:
+ error_msg = (
+ "timestamp value {} should be increment "
+ "with {} {}".format(item, key, self.timestamp_records[key])
+ )
+ self.logger.error(error_msg)
+ self.error_msgs.append(error_msg)
+
+ def get_timestamp(self, out, port_id=0):
+ timestamp_pattern = re.compile(
+ "port\s%s/queue\s\d+:\sreceived\s\d+\spackets.+?\n.*timestamp\s(\w+)"
+ % port_id
+ )
+ timestamp_infos = timestamp_pattern.findall(out)
+ self.logger.info("timestamp_infos: {}".format(timestamp_infos))
+ return timestamp_infos
+
+ def send_pkt_get_output(self, pkts, port_id=0, count=3):
+ self.logger.info("----------send packet-------------")
+ self.logger.info("{}".format(pkts))
+ self.pkt.update_pkt(pkts)
+ self.pkt.send_pkt(crb=self.tester, tx_port=self.tester_iface0, count=count)
+ out = self.pmdout.get_output(timeout=1)
+ pkt_pattern = (
+ "port\s%d/queue\s\d+:\sreceived\s(\d+)\spackets.+?\n.*length=\d{2,}\s"
+ % port_id
+ )
+ reveived_data = re.findall(pkt_pattern, out)
+ reveived_pkts = sum(map(int, [i[0] for i in reveived_data]))
+ return out
+
+ def send_pkt_get_timestamp(self, pkts, port_id=0, count=3):
+ output = self.send_pkt_get_output(pkts, port_id, count)
+ timestamps = self.get_timestamp(output, port_id)
+ return timestamps
+
+ def handle_actions(self, output, actions, port_id=0):
+ actions = [actions]
+ for action in actions: # [{}]
+ self.logger.info("action: {}\n".format(action))
+ for method in action: # {'save': ''}
+ if method in self.handle_output_methods:
+ self.handle_output_methods[method](
+ output, action[method], port_id=port_id
+ )
+
+ def handle_tests(self, tests, port_id=0):
+ out = ""
+ for test in tests:
+ if "send_packet" in test:
+ out = self.send_pkt_get_output(test["send_packet"], port_id)
+ if "action" in test:
+ self.handle_actions(out, test["action"])
+
+ def handle_timestamp_case(self, case_info):
+ # clear timestamp_records before each case
+ self.timestamp_records = {}
+ self.error_msgs = []
+ self.current_saved_timestamp = ""
+ port_id = case_info.get("port_id") if case_info.get("port_id") else 0
+ # handle tests
+ tests = case_info["test"]
+ self.logger.info("------------handle test--------------")
+ self.handle_tests(tests, port_id)
+ if self.error_msgs:
+ self.verify(False, str(self.error_msgs[:500]))
+
+ def test_without_timestamp(self):
+ self.launch_testpmd(line_option="--rxq=16 --txq=16")
+ self.handle_timestamp_case(command_line_option_with_timestamp)
+
+ def test_single_queue_with_timestamp(self):
+ self.launch_testpmd(line_option="--enable-rx-timestamp")
+ self.handle_timestamp_case(command_line_option_with_timestamp)
+
+ def test_multi_queues_with_timestamp(self):
+ self.launch_testpmd(line_option="--rxq=16 --txq=16 --enable-rx-timestamp")
+ self.handle_timestamp_case(command_line_option_with_timestamp)
+
+ def tear_down(self):
+ """
+ Run after each test case.
+ """
+ self.pmdout.execute_cmd("quit", "# ")
+
+ def tear_down_all(self):
+ """
+ Run after each test suite.
+ """
+ self.dut.kill_all()