Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[platform/test_sfp]Backport #1338 "Operate SFP only once for split ports when resetting or setting LPM" to 201811 #1378

Merged
merged 1 commit into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/platform/files/getportmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python
# This script runs on the DUT and is intended to retrieve the portmapping from logical interfaces to physical ones
# The way the port mapping retrieved is exactly the same as what xcvrd does

import sfputil
import json
import subprocess

PLATFORM_ROOT_PATH = '/usr/share/sonic/device'
SONIC_CFGGEN_PATH = '/usr/local/bin/sonic-cfggen'
HWSKU_KEY = 'DEVICE_METADATA.localhost.hwsku'
PLATFORM_KEY = 'DEVICE_METADATA.localhost.platform'
PLATFORM_ROOT_DOCKER = "/usr/share/sonic/platform"

platform_sfputil = sfputil.SfpUtil()

# Returns platform and HW SKU
def get_hwsku():
proc = subprocess.Popen([SONIC_CFGGEN_PATH, '-d', '-v', HWSKU_KEY],
stdout=subprocess.PIPE,
shell=False,
stderr=subprocess.STDOUT)
stdout = proc.communicate()[0]
proc.wait()
hwsku = stdout.rstrip('\n')

return hwsku


# Returns path to port config file
def get_path_to_port_config_file():
# Get platform and hwsku
hwsku = get_hwsku()

# Load platform module from source
platform_path = PLATFORM_ROOT_DOCKER
hwsku_path = "/".join([platform_path, hwsku])

port_config_file_path = "/".join([hwsku_path, "port_config.ini"])

return port_config_file_path


port_config_path = get_path_to_port_config_file()
platform_sfputil.read_porttab_mappings(port_config_path)

# print the mapping to stdout in json format
print json.dumps(platform_sfputil.logical_to_physical)

# json will be loaded by sonic-mgmt
69 changes: 69 additions & 0 deletions tests/platform/test_sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import time
import copy
import json

import pytest

Expand All @@ -19,6 +20,15 @@
pytest.mark.disable_loganalyzer # disable automatic loganalyzer
]

ans_host = None
port_mapping = None


def teardown_module():
logging.info("remove script to retrieve port mapping")
file_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.file(path=file_path, state='absent')

def parse_output(output_lines):
"""
@summary: For parsing command output. The output lines should have format 'key value'.
Expand Down Expand Up @@ -48,6 +58,38 @@ def parse_eeprom(output_lines):
return res


def get_port_map(testbed_devices):
"""
@summary: Get the port mapping info from the DUT
@return: a dictionary containing the port map
"""
global port_mapping
global ans_host

# we've already retrieve port mapping for the DUT, just return it
if not port_mapping is None:
logging.info("Return the previously retrievd port mapping")
return port_mapping

# this is the first running
logging.info("Retrieving port mapping from DUT")
# copy the helper to DUT
ans_host = testbed_devices["dut"]
src_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'files/getportmap.py')
dest_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.copy(src=src_path, dest=dest_path)

# execute command on the DUT to get portmap
get_portmap_cmd = 'docker exec pmon python /usr/share/sonic/platform/plugins/getportmap.py'
portmap_json_string = ans_host.command(get_portmap_cmd)["stdout"]

# parse the json
port_mapping = json.loads(portmap_json_string)
assert port_mapping, "Retrieve port mapping from DUT failed"

return port_mapping


def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
"""
@summary: Check SFP status and configure SFP
Expand Down Expand Up @@ -75,6 +117,9 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
cmd_xcvr_presence = "show interface transceiver presence"
cmd_xcvr_eeprom = "show interface transceiver eeprom"

portmap = get_port_map(testbed_devices)
logging.info("Got portmap {}".format(portmap))

logging.info("Check output of '%s'" % cmd_sfp_presence)
sfp_presence = ans_host.command(cmd_sfp_presence)
parsed_presence = parse_output(sfp_presence["stdout_lines"][2:])
Expand Down Expand Up @@ -104,7 +149,14 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
assert parsed_eeprom[intf] == "SFP EEPROM detected"

logging.info("Test '%s <interface name>'" % cmd_sfp_reset)
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("resetting {} physical interface {}".format(intf, phy_intf))
reset_result = ans_host.command("%s %s" % (cmd_sfp_reset, intf))
assert reset_result["rc"] == 0, "'%s %s' failed" % (cmd_sfp_reset, intf)
time.sleep(5)
Expand Down Expand Up @@ -150,6 +202,9 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
cmd_sfp_show_lpmode = "sudo sfputil show lpmode"
cmd_sfp_set_lpmode = "sudo sfputil lpmode"

portmap = get_port_map(testbed_devices)
logging.info("Got portmap {}".format(portmap))

logging.info("Check output of '%s'" % cmd_sfp_show_lpmode)
lpmode_show = ans_host.command(cmd_sfp_show_lpmode)
parsed_lpmode = parse_output(lpmode_show["stdout_lines"][2:])
Expand All @@ -159,7 +214,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode"

logging.info("Try to change SFP lpmode")
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("setting {} physical interface {}".format(intf, phy_intf))
new_lpmode = "off" if original_lpmode[intf].lower() == "on" else "on"
lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf))
assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)
Expand All @@ -173,7 +235,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode"

logging.info("Try to change SFP lpmode")
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("restoring {} physical interface {}".format(intf, phy_intf))
new_lpmode = original_lpmode[intf].lower()
lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf))
assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)
Expand Down