Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[platform/test_sfp]Operate SFP only once for split ports when resetting or setting LPM #1338

Merged
merged 3 commits into from
Jan 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tests/platform/files/getportmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python
# This script runs on the DUT and is intended to retrieve the portmapping from logical interfaces to physical ones
# The way the port mapping retrieved is exactly the same as what xcvrd does

import sonic_platform_base.sonic_sfp.sfputilhelper
import json
from sonic_daemon_base.daemon_base import DaemonBase

# load and parse the port configuration file on DUT
db = DaemonBase()
port_config_path = db.get_path_to_port_config_file()
platform_sfputil = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper()
platform_sfputil.read_porttab_mappings(port_config_path)

# print the mapping to stdout in json format
print json.dumps(platform_sfputil.logical_to_physical)

# json will be loaded by sonic-mgmt
69 changes: 69 additions & 0 deletions tests/platform/test_sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
import os
import time
import copy
import json

import pytest

from platform_fixtures import conn_graph_facts

ans_host = None
port_mapping = None


def teardown_module():
logging.info("remove script to retrieve port mapping")
file_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.file(path=file_path, state='absent')


def parse_output(output_lines):
"""
Expand Down Expand Up @@ -44,6 +54,38 @@ def parse_eeprom(output_lines):
return res


def get_port_map(testbed_devices):
"""
@summary: Get the port mapping info from the DUT
@return: a dictionary containing the port map
"""
global port_mapping
global ans_host

# we've already retrieve port mapping for the DUT, just return it
if not port_mapping is None:
logging.info("Return the previously retrievd port mapping")
return port_mapping

# this is the first running
logging.info("Retrieving port mapping from DUT")
# copy the helper to DUT
ans_host = testbed_devices["dut"]
src_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'files/getportmap.py')
dest_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.copy(src=src_path, dest=dest_path)

# execute command on the DUT to get portmap
get_portmap_cmd = 'docker exec pmon python /usr/share/sonic/platform/plugins/getportmap.py'
portmap_json_string = ans_host.command(get_portmap_cmd)["stdout"]

# parse the json
port_mapping = json.loads(portmap_json_string)
assert port_mapping, "Retrieve port mapping from DUT failed"

return port_mapping


def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
"""
@summary: Check SFP status and configure SFP
Expand All @@ -65,6 +107,9 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
cmd_xcvr_presence = "show interface transceiver presence"
cmd_xcvr_eeprom = "show interface transceiver eeprom"

portmap = get_port_map(testbed_devices)
logging.info("Got portmap {}".format(portmap))

logging.info("Check output of '%s'" % cmd_sfp_presence)
sfp_presence = ans_host.command(cmd_sfp_presence)
parsed_presence = parse_output(sfp_presence["stdout_lines"][2:])
Expand Down Expand Up @@ -94,7 +139,14 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
assert parsed_eeprom[intf] == "SFP EEPROM detected"

logging.info("Test '%s <interface name>'" % cmd_sfp_reset)
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("resetting {} physical interface {}".format(intf, phy_intf))
reset_result = ans_host.command("%s %s" % (cmd_sfp_reset, intf))
assert reset_result["rc"] == 0, "'%s %s' failed" % (cmd_sfp_reset, intf)
time.sleep(5)
Expand Down Expand Up @@ -130,6 +182,9 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
cmd_sfp_show_lpmode = "sudo sfputil show lpmode"
cmd_sfp_set_lpmode = "sudo sfputil lpmode"

portmap = get_port_map(testbed_devices)
logging.info("Got portmap {}".format(portmap))

logging.info("Check output of '%s'" % cmd_sfp_show_lpmode)
lpmode_show = ans_host.command(cmd_sfp_show_lpmode)
parsed_lpmode = parse_output(lpmode_show["stdout_lines"][2:])
Expand All @@ -139,7 +194,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode"

logging.info("Try to change SFP lpmode")
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("setting {} physical interface {}".format(intf, phy_intf))
new_lpmode = "off" if original_lpmode[intf].lower() == "on" else "on"
lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf))
assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)
Expand All @@ -153,7 +215,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode"

logging.info("Try to change SFP lpmode")
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("restoring {} physical interface {}".format(intf, phy_intf))
new_lpmode = original_lpmode[intf].lower()
lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf))
assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)
Expand Down