Skip to content

Commit

Permalink
[platform/test_sfp]Operate SFP only once for split ports when resetti…
Browse files Browse the repository at this point in the history
…ng or setting LPM (#1338)

* [sfp]operating the logical interfaces sharing a physical interface only once

* [sfp]add teardown, beautify and optimize code

* [sfp]fix comments
  • Loading branch information
stephenxs authored and liat-grozovik committed Jan 19, 2020
1 parent 8c471a3 commit 08c3915
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
18 changes: 18 additions & 0 deletions tests/platform/files/getportmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python
# This script runs on the DUT and is intended to retrieve the portmapping from logical interfaces to physical ones
# The way the port mapping retrieved is exactly the same as what xcvrd does

import sonic_platform_base.sonic_sfp.sfputilhelper
import json
from sonic_daemon_base.daemon_base import DaemonBase

# load and parse the port configuration file on DUT
db = DaemonBase()
port_config_path = db.get_path_to_port_config_file()
platform_sfputil = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper()
platform_sfputil.read_porttab_mappings(port_config_path)

# print the mapping to stdout in json format
print json.dumps(platform_sfputil.logical_to_physical)

# json will be loaded by sonic-mgmt
69 changes: 69 additions & 0 deletions tests/platform/test_sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
import os
import time
import copy
import json

import pytest

from platform_fixtures import conn_graph_facts

ans_host = None
port_mapping = None


def teardown_module():
logging.info("remove script to retrieve port mapping")
file_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.file(path=file_path, state='absent')


def parse_output(output_lines):
"""
Expand Down Expand Up @@ -44,6 +54,38 @@ def parse_eeprom(output_lines):
return res


def get_port_map(testbed_devices):
"""
@summary: Get the port mapping info from the DUT
@return: a dictionary containing the port map
"""
global port_mapping
global ans_host

# we've already retrieve port mapping for the DUT, just return it
if not port_mapping is None:
logging.info("Return the previously retrievd port mapping")
return port_mapping

# this is the first running
logging.info("Retrieving port mapping from DUT")
# copy the helper to DUT
ans_host = testbed_devices["dut"]
src_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'files/getportmap.py')
dest_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.copy(src=src_path, dest=dest_path)

# execute command on the DUT to get portmap
get_portmap_cmd = 'docker exec pmon python /usr/share/sonic/platform/plugins/getportmap.py'
portmap_json_string = ans_host.command(get_portmap_cmd)["stdout"]

# parse the json
port_mapping = json.loads(portmap_json_string)
assert port_mapping, "Retrieve port mapping from DUT failed"

return port_mapping


def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
"""
@summary: Check SFP status and configure SFP
Expand All @@ -65,6 +107,9 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
cmd_xcvr_presence = "show interface transceiver presence"
cmd_xcvr_eeprom = "show interface transceiver eeprom"

portmap = get_port_map(testbed_devices)
logging.info("Got portmap {}".format(portmap))

logging.info("Check output of '%s'" % cmd_sfp_presence)
sfp_presence = ans_host.command(cmd_sfp_presence)
parsed_presence = parse_output(sfp_presence["stdout_lines"][2:])
Expand Down Expand Up @@ -94,7 +139,14 @@ def test_check_sfp_status_and_configure_sfp(testbed_devices, conn_graph_facts):
assert parsed_eeprom[intf] == "SFP EEPROM detected"

logging.info("Test '%s <interface name>'" % cmd_sfp_reset)
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("resetting {} physical interface {}".format(intf, phy_intf))
reset_result = ans_host.command("%s %s" % (cmd_sfp_reset, intf))
assert reset_result["rc"] == 0, "'%s %s' failed" % (cmd_sfp_reset, intf)
time.sleep(5)
Expand Down Expand Up @@ -130,6 +182,9 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
cmd_sfp_show_lpmode = "sudo sfputil show lpmode"
cmd_sfp_set_lpmode = "sudo sfputil lpmode"

portmap = get_port_map(testbed_devices)
logging.info("Got portmap {}".format(portmap))

logging.info("Check output of '%s'" % cmd_sfp_show_lpmode)
lpmode_show = ans_host.command(cmd_sfp_show_lpmode)
parsed_lpmode = parse_output(lpmode_show["stdout_lines"][2:])
Expand All @@ -139,7 +194,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode"

logging.info("Try to change SFP lpmode")
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("setting {} physical interface {}".format(intf, phy_intf))
new_lpmode = "off" if original_lpmode[intf].lower() == "on" else "on"
lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf))
assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)
Expand All @@ -153,7 +215,14 @@ def test_check_sfp_low_power_mode(testbed_devices, conn_graph_facts):
assert parsed_lpmode[intf].lower() == "on" or parsed_lpmode[intf].lower() == "off", "Unexpected SFP lpmode"

logging.info("Try to change SFP lpmode")
tested_physical_ports = set()
for intf in conn_graph_facts["device_conn"]:
phy_intf = portmap[intf][0]
if phy_intf in tested_physical_ports:
logging.info("skip tested SFPs {} to avoid repeating operating physical interface {}".format(intf, phy_intf))
continue
tested_physical_ports.add(phy_intf)
logging.info("restoring {} physical interface {}".format(intf, phy_intf))
new_lpmode = original_lpmode[intf].lower()
lpmode_set_result = ans_host.command("%s %s %s" % (cmd_sfp_set_lpmode, new_lpmode, intf))
assert lpmode_set_result["rc"] == 0, "'%s %s %s' failed" % (cmd_sfp_set_lpmode, new_lpmode, intf)
Expand Down

0 comments on commit 08c3915

Please sign in to comment.