diff --git a/tests/platform/files/getportmap.py b/tests/platform/files/getportmap.py new file mode 100755 index 0000000000..de7ea740f9 --- /dev/null +++ b/tests/platform/files/getportmap.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# This script runs on the DUT and is intended to retrieve the portmapping from logical interfaces to physical ones +# The way the port mapping retrieved is exactly the same as what xcvrd does + +import sfputil +import json +import subprocess + +PLATFORM_ROOT_PATH = '/usr/share/sonic/device' +SONIC_CFGGEN_PATH = '/usr/local/bin/sonic-cfggen' +HWSKU_KEY = 'DEVICE_METADATA.localhost.hwsku' +PLATFORM_KEY = 'DEVICE_METADATA.localhost.platform' +PLATFORM_ROOT_DOCKER = "/usr/share/sonic/platform" + +platform_sfputil = sfputil.SfpUtil() + +# Returns platform and HW SKU +def get_hwsku(): + proc = subprocess.Popen([SONIC_CFGGEN_PATH, '-d', '-v', HWSKU_KEY], + stdout=subprocess.PIPE, + shell=False, + stderr=subprocess.STDOUT) + stdout = proc.communicate()[0] + proc.wait() + hwsku = stdout.rstrip('\n') + + return hwsku + + +# Returns path to port config file +def get_path_to_port_config_file(): + # Get platform and hwsku + hwsku = get_hwsku() + + # Load platform module from source + platform_path = PLATFORM_ROOT_DOCKER + hwsku_path = "/".join([platform_path, hwsku]) + + port_config_file_path = "/".join([hwsku_path, "port_config.ini"]) + + return port_config_file_path + + +port_config_path = get_path_to_port_config_file() +platform_sfputil.read_porttab_mappings(port_config_path) + +# print the mapping to stdout in json format +print json.dumps(platform_sfputil.logical_to_physical) + +# json will be loaded by sonic-mgmt diff --git a/tests/platform/test_sfp.py b/tests/platform/test_sfp.py index d6b057daff..f9eba6b075 100644 --- a/tests/platform/test_sfp.py +++ b/tests/platform/test_sfp.py @@ -9,6 +9,7 @@ import os import time import copy +import json import pytest @@ -19,6 +20,15 @@ pytest.mark.disable_loganalyzer # disable automatic loganalyzer ] +ans_host = None +port_mapping = None + + +def teardown_module(): + logging.info("remove script to retrieve port mapping") + file_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py') + ans_host.file(path=file_path, state='absent') + def parse_output(output_lines): """ @summary: For parsing command output. The output lines should have format 'key value'. @@ -48,6 +58,38 @@ def parse_eeprom(output_lines): return res +def get_port_map(testbed_devices): + """ + @summary: Get the port mapping info from the DUT + @return: a dictionary containing the port map + """ + global port_mapping + global ans_host + + # we've already retrieve port mapping for the DUT, just return it + if not port_mapping is None: + logging.info("Return the previously retrievd port mapping") + return port_mapping + + # this is the first running + logging.info("Retrieving port mapping from DUT") + # copy the helper to DUT + ans_host = testbed_devices["dut"] + src_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'files/getportmap.py') + dest_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py') + ans_host.copy(src=src_path, dest=dest_path) + + # execute command on the DUT to get portmap + get_portmap_cmd = 'docker exec pmon python /usr/share/sonic/platform/plugins/getportmap.py' + portmap_json_string = ans_host.command(get_portmap_cmd)["stdout"] + + # parse the json + port_mapping = json.loads(portmap_json_string) + assert port_mapping, "Retrieve port mapping from DUT failed" + + return port_mapping + + def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts): """ @summary: Check SFP status and configure SFP @@ -75,6 +117,9 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts): cmd_xcvr_presence = "show interface transceiver presence" cmd_xcvr_eeprom = "show interface transceiver eeprom" + portmap = get_port_map(testbed_devices) + logging.info("Got portmap {}".format(portmap)) + logging.info("Check output of '%s'" % cmd_sfp_presence) sfp_presence = ans_host.command(cmd_sfp_presence) parsed_presence = parse_output(sfp_presence["stdout_lines"][2:]) @@ -104,7 +149,14 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts): assert parsed_eeprom[intf] == "SFP EEPROM detected" logging.info("Test '%s '" % cmd_sfp_reset) + tested_physical_ports = set() for intf in conn_graph_facts["device_conn"]: + phy_intf = portmap[intf][0] + if phy_intf in tested_physical_ports: + logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf)) + continue + tested_physical_ports.add(phy_intf) + logging.info("resetting {} physical interface {}".format(intf, phy_intf)) reset_result = ans_host.command("%s %s" % (cmd_sfp_reset, intf)) assert reset_result["rc"] == 0, "'%s %s' failed" % (cmd_sfp_reset, intf) time.sleep(5) @@ -150,6 +202,9 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts): cmd_sfp_show_lpmode = "sudo sfputil show lpmode" cmd_sfp_set_lpmode = "sudo sfputil lpmode" + portmap = get_port_map(testbed_devices) + logging.info("Got portmap {}".format(portmap)) + logging.info("Check output of '%s'" % cmd_sfp_show_lpmode) lpmode_show = ans_host.command(cmd_sfp_show_lpmode) parsed_lpmode = parse_output(lpmode_show["stdout_lines"][2:]) @@ -159,7 +214,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts): assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode" logging.info("Try to change SFP lpmode") + tested_physical_ports = set() for intf in conn_graph_facts["device_conn"]: + phy_intf = portmap[intf][0] + if phy_intf in tested_physical_ports: + logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf)) + continue + tested_physical_ports.add(phy_intf) + logging.info("setting {} physical interface {}".format(intf, phy_intf)) new_lpmode = "off" if original_lpmode[intf].lower() == "on" else "on" lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf)) assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf) @@ -173,7 +235,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts): assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode" logging.info("Try to change SFP lpmode") + tested_physical_ports = set() for intf in conn_graph_facts["device_conn"]: + phy_intf = portmap[intf][0] + if phy_intf in tested_physical_ports: + logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf)) + continue + tested_physical_ports.add(phy_intf) + logging.info("restoring {} physical interface {}".format(intf, phy_intf)) new_lpmode = original_lpmode[intf].lower() lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf)) assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)