Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sfputil] Configure the debug loopback mode only on the relevant lanes of the logical port #3485

Merged
merged 9 commits into from
Sep 11, 2024
Merged
6 changes: 3 additions & 3 deletions doc/Command-Reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3106,19 +3106,19 @@ This command is the standard CMIS diagnostic control used for troubleshooting li

- Usage:
```
sfputil debug loopback PORT_NAME LOOPBACK_MODE
sfputil debug loopback PORT_NAME LOOPBACK_MODE <enable/disable>

Set the loopback mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Set the loopback mode
Valid values for LOOPBACK_MODE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihirpat1 Thanks! It has been revised.

host-side-input: host side input loopback mode
host-side-output: host side output loopback mode
media-side-input: media side input loopback mode
media-side-output: media side output loopback mode
none: disable loopback mode
```

- Example:
```
admin@sonic:~$ sfputil debug loopback Ethernet88 host-side-input
admin@sonic:~$ sfputil debug loopback Ethernet88 host-side-input enable
admin@sonic:~$ sfputil debug loopback Ethernet88 media-side-output disable
```

## DHCP Relay
Expand Down
82 changes: 74 additions & 8 deletions sfputil/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import sonic_platform
import sonic_platform_base.sonic_sfp.sfputilhelper
from sonic_platform_base.sfp_base import SfpBase
from swsscommon.swsscommon import SonicV2Connector
from swsscommon.swsscommon import SonicV2Connector, ConfigDBConnector
from natsort import natsorted
from sonic_py_common import device_info, logger, multi_asic
from utilities_common.sfp_helper import covert_application_advertisement_to_output_string
Expand Down Expand Up @@ -1967,11 +1967,12 @@ def debug():

# 'loopback' subcommand
@debug.command()
@click.argument('port_name', required=True, default=None)
@click.argument('loopback_mode', required=True, default="none",
type=click.Choice(["none", "host-side-input", "host-side-output",
@click.argument('port_name', required=True)
@click.argument('loopback_mode', required=True,
type=click.Choice(["host-side-input", "host-side-output",
"media-side-input", "media-side-output"]))
def loopback(port_name, loopback_mode):
@click.argument('enable', required=True, type=click.Choice(["enable", "disable"]))
def loopback(port_name, loopback_mode, enable):
"""Set module diagnostic loopback mode
"""
physical_port = logical_port_to_physical_port_index(port_name)
Expand All @@ -1991,17 +1992,82 @@ def loopback(port_name, loopback_mode):
click.echo("{}: This functionality is not implemented".format(port_name))
sys.exit(ERROR_NOT_IMPLEMENTED)

namespace = multi_asic.get_namespace_for_port(port_name)
config_db = ConfigDBConnector(use_unix_socket_path=True, namespace=namespace)
if config_db is not None:
config_db.connect()
try:
subport = int(config_db.get(config_db.CONFIG_DB, f'PORT|{port_name}', 'subport'))
except TypeError:
click.echo(f"{port_name}: subport is not present in CONFIG_DB")
sys.exit(EXIT_FAIL)

# If subport is set to 0, assign a default value of 1 to ensure valid subport configuration
if subport == 0:
subport = 1
else:
click.echo(f"{port_name}: Failed to connect to CONFIG_DB")
sys.exit(EXIT_FAIL)

state_db = SonicV2Connector(use_unix_socket_path=False, namespace=namespace)
if state_db is not None:
state_db.connect(state_db.STATE_DB)
try:
host_lane_count = int(state_db.get(state_db.STATE_DB,
f'TRANSCEIVER_INFO|{port_name}',
'host_lane_count'))
except TypeError:
click.echo(f"{port_name}: host_lane_count is not present in STATE_DB")
sys.exit(EXIT_FAIL)

try:
media_lane_count = int(state_db.get(state_db.STATE_DB,
f'TRANSCEIVER_INFO|{port_name}',
'media_lane_count'))
except TypeError:
click.echo(f"{port_name}: media_lane_count is not present in STATE_DB")
sys.exit(EXIT_FAIL)
else:
click.echo(f"{port_name}: Failed to connect to STATE_DB")
sys.exit(EXIT_FAIL)

if 'host-side' in loopback_mode:
lane_mask = get_subport_lane_mask(subport, host_lane_count)
elif 'media-side' in loopback_mode:
lane_mask = get_subport_lane_mask(subport, media_lane_count)
else:
lane_mask = 0

try:
status = api.set_loopback_mode(loopback_mode)
status = api.set_loopback_mode(loopback_mode,
lane_mask=lane_mask,
enable=enable == 'enable')
except AttributeError:
click.echo("{}: Set loopback mode is not applicable for this module".format(port_name))
sys.exit(ERROR_NOT_IMPLEMENTED)
except TypeError:
click.echo("{}: Set loopback mode failed. Parameter is not supported".format(port_name))
sys.exit(EXIT_FAIL)

if status:
click.echo("{}: Set {} loopback".format(port_name, loopback_mode))
click.echo("{}: {} {} loopback".format(port_name, enable, loopback_mode))
else:
click.echo("{}: Set {} loopback failed".format(port_name, loopback_mode))
click.echo("{}: {} {} loopback failed".format(port_name, enable, loopback_mode))
sys.exit(EXIT_FAIL)


def get_subport_lane_mask(subport, lane_count):
"""Get the lane mask for the given subport and lane count

Args:
subport (int): Subport number
lane_count (int): Lane count for the subport

Returns:
int: Lane mask for the given subport and lane count
"""
return ((1 << lane_count) - 1) << ((subport - 1) * lane_count)


if __name__ == '__main__':
cli()
65 changes: 57 additions & 8 deletions tests/sfputil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1631,43 +1631,92 @@ def test_load_port_config(self, mock_is_multi_asic):

@patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=False))
@patch('sfputil.main.platform_chassis')
@patch('sfputil.main.ConfigDBConnector')
@patch('sfputil.main.SonicV2Connector')
@patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1)))
@patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1))
def test_debug_loopback(self, mock_chassis):
@patch('sonic_py_common.multi_asic.get_front_end_namespaces', MagicMock(return_value=['']))
def test_debug_loopback(self, mock_sonic_v2_connector, mock_config_db_connector, mock_chassis):
mock_sfp = MagicMock()
mock_api = MagicMock()
mock_config_db_connector.return_value = MagicMock()
mock_sonic_v2_connector.return_value = MagicMock()
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)
mock_sfp.get_presence.return_value = True
mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api)

runner = CliRunner()
mock_sfp.get_presence.return_value = False
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "host-side-input"])
["Ethernet0", "host-side-input", "enable"])
assert result.output == 'Ethernet0: SFP EEPROM not detected\n'
mock_sfp.get_presence.return_value = True

mock_sfp.get_xcvr_api = MagicMock(side_effect=NotImplementedError)
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "host-side-input"])
["Ethernet0", "host-side-input", "enable"])
assert result.output == 'Ethernet0: This functionality is not implemented\n'
assert result.exit_code == ERROR_NOT_IMPLEMENTED

mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api)
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "host-side-input"])
assert result.output == 'Ethernet0: Set host-side-input loopback\n'
["Ethernet0", "host-side-input", "enable"])
assert result.output == 'Ethernet0: enable host-side-input loopback\n'
assert result.exit_code != ERROR_NOT_IMPLEMENTED

mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api)
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "media-side-input", "enable"])
assert result.output == 'Ethernet0: enable media-side-input loopback\n'
assert result.exit_code != ERROR_NOT_IMPLEMENTED

mock_api.set_loopback_mode.return_value = False
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "none"])
assert result.output == 'Ethernet0: Set none loopback failed\n'
["Ethernet0", "media-side-output", "enable"])
assert result.output == 'Ethernet0: enable media-side-output loopback failed\n'
assert result.exit_code == EXIT_FAIL

mock_api.set_loopback_mode.return_value = True
mock_api.set_loopback_mode.side_effect = AttributeError
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "none"])
["Ethernet0", "host-side-input", "enable"])
assert result.output == 'Ethernet0: Set loopback mode is not applicable for this module\n'
assert result.exit_code == ERROR_NOT_IMPLEMENTED

mock_api.set_loopback_mode.side_effect = [TypeError, True]
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "host-side-input", "enable"])
assert result.output == 'Ethernet0: Set loopback mode failed. Parameter is not supported\n'
assert result.exit_code == EXIT_FAIL

mock_config_db = MagicMock()
mock_config_db.get.side_effect = TypeError
mock_config_db_connector.return_value = mock_config_db
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "media-side-input", "enable"])
assert result.output == 'Ethernet0: subport is not present in CONFIG_DB\n'
assert result.exit_code == EXIT_FAIL

mock_config_db_connector.return_value = None
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "media-side-input", "enable"])
assert result.output == 'Ethernet0: Failed to connect to CONFIG_DB\n'
assert result.exit_code == EXIT_FAIL

mock_config_db_connector.return_value = MagicMock()
mock_sonic_v2_connector.return_value = None
result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'],
["Ethernet0", "media-side-input", "enable"])
assert result.output == 'Ethernet0: Failed to connect to STATE_DB\n'
assert result.exit_code == EXIT_FAIL

@pytest.mark.parametrize("subport, lane_count, expected_mask", [
(1, 1, 0x1),
(1, 4, 0xf),
(2, 1, 0x2),
(2, 4, 0xf0),
(3, 2, 0x30),
(4, 1, 0x8),
])
def test_get_subport_lane_mask(self, subport, lane_count, expected_mask):
assert sfputil.get_subport_lane_mask(subport, lane_count) == expected_mask
Loading