-
Notifications
You must be signed in to change notification settings - Fork 661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sfputil] Firmware download/upgrade CLI support for QSFP-DD #1947
Changes from 5 commits
a65f974
2fe2671
f71a7f0
83d8b94
f53f590
e666291
716d3c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import sys | ||
import natsort | ||
import ast | ||
import time | ||
|
||
import subprocess | ||
import click | ||
|
@@ -26,12 +27,17 @@ | |
PLATFORM_JSON = 'platform.json' | ||
PORT_CONFIG_INI = 'port_config.ini' | ||
|
||
EXIT_FAIL = -1 | ||
EXIT_SUCCESS = 0 | ||
ERROR_PERMISSIONS = 1 | ||
ERROR_CHASSIS_LOAD = 2 | ||
ERROR_SFPUTILHELPER_LOAD = 3 | ||
ERROR_PORT_CONFIG_LOAD = 4 | ||
ERROR_NOT_IMPLEMENTED = 5 | ||
ERROR_INVALID_PORT = 6 | ||
SMBUS_BLOCK_WRITE_SIZE = 32 | ||
# Default host password as per CMIS spec | ||
CDB_DEFAULT_HOST_PASSWORD = 0x00001011 | ||
|
||
# TODO: We should share these maps and the formatting functions between sfputil and sfpshow | ||
QSFP_DATA_MAP = { | ||
|
@@ -225,6 +231,17 @@ | |
# Global logger instance | ||
log = logger.Logger(SYSLOG_IDENTIFIER) | ||
|
||
def is_sfp_present(port_name): | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
presence = sfp.get_presence() | ||
except NotImplementedError: | ||
click.echo("sfp get_presence() NOT implemented!", err=True) | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
return bool(presence) | ||
|
||
# ========================== Methods for formatting output ========================== | ||
|
||
|
@@ -408,6 +425,19 @@ def logical_port_name_to_physical_port_list(port_name): | |
else: | ||
return [int(port_name)] | ||
|
||
def logical_port_to_physical_port_index(port_name): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this function available in some other library? can we reuse? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i couldn't find one in sonic-utilities |
||
if not platform_sfputil.is_logical_port(port_name): | ||
click.echo("Error: invalid port '{}'\n".format(port_name)) | ||
print_all_valid_port_values() | ||
sys.exit(ERROR_INVALID_PORT) | ||
|
||
physical_port = logical_port_name_to_physical_port_list(port_name)[0] | ||
if physical_port is None: | ||
click.echo("Error: No physical port found for logical port '{}'".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
return physical_port | ||
|
||
|
||
def print_all_valid_port_values(): | ||
click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical))) | ||
|
@@ -805,6 +835,37 @@ def lpmode(port): | |
|
||
click.echo(tabulate(output_table, table_header, tablefmt='simple')) | ||
|
||
def show_firmware_version(physical_port): | ||
try: | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
api = sfp.get_xcvr_api() | ||
out = api.get_module_fw_info() | ||
click.echo(out['info']) | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
# 'fwversion' subcommand | ||
@show.command() | ||
@click.argument('port_name', metavar='<port_name>', required=True) | ||
def fwversion(port_name): | ||
"""Show firmware version of the transceiver""" | ||
|
||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
presence = sfp.get_presence() | ||
except NotImplementedError: | ||
click.echo("sfp get_presence() NOT implemented!") | ||
sys.exit(EXIT_FAIL) | ||
|
||
if not presence: | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
show_firmware_version(physical_port) | ||
sys.exit(EXIT_SUCCESS) | ||
|
||
# 'lpmode' subgroup | ||
@cli.group() | ||
|
@@ -903,6 +964,267 @@ def reset(port_name): | |
|
||
i += 1 | ||
|
||
# 'firmware' subgroup | ||
@cli.group() | ||
def firmware(): | ||
"""Download/Upgrade firmware on the transceiver""" | ||
pass | ||
|
||
def run_firmware(port_name, mode): | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Make the inactive firmware as the current running firmware | ||
@port_name: | ||
@mode: 0, 1, 2, 3 different modes to run the firmware | ||
Returns 1 on success, and exit_code = -1 on failure | ||
""" | ||
status = 0 | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
if mode == 0: | ||
click.echo("Running firmare: Non-hitless Reset to Inactive Image") | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
elif mode == 1: | ||
click.echo("Running firmware: Hitless Reset to Inactive Image") | ||
elif mode == 2: | ||
click.echo("Running firmware: Attempt non-hitless Reset to Running Image") | ||
elif mode == 3: | ||
click.echo("Running firmware: Attempt Hitless Reset to Running Image") | ||
else: | ||
click.echo("Running firmware: Unknown mode {}".format(mode)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
try: | ||
status = api.cdb_run_firmware(mode) | ||
except NotImplementedError: | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
click.echo("This functionality is not applicable for this transceiver") | ||
sys.exit(EXIT_FAIL) | ||
|
||
return status | ||
|
||
def commit_firmware(port_name): | ||
status = 0 | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
try: | ||
status = api.cdb_commit_firmware() | ||
except NotImplementedError: | ||
click.echo("This functionality is not applicable for this transceiver") | ||
|
||
return status | ||
|
||
def download_firmware(port_name, filepath): | ||
"""Download firmware on the transceiver""" | ||
try: | ||
fd = open(filepath, 'rb') | ||
fd.seek(0, 2) | ||
file_size = fd.tell() | ||
fd.seek(0, 0) | ||
except FileNotFoundError: | ||
click.echo("Firmware file {} NOT found".format(filepath)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is NOT applicable to this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
try: | ||
fwinfo = api.get_module_fw_mgmt_feature() | ||
if fwinfo['status'] == True: | ||
startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature'] | ||
else: | ||
click.echo("Failed to fetch CDB Firmware management features") | ||
sys.exit(EXIT_FAIL) | ||
except NotImplementedError: | ||
click.echo("This functionality is NOT applicable for this transceiver") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
click.echo('CDB: Starting firmware download') | ||
startdata = fd.read(startLPLsize) | ||
status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size) | ||
if status != 1: | ||
click.echo('CDB: Start firmware download failed - status {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
# Increase the optoe driver's write max to speed up firmware download | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE) | ||
|
||
with click.progressbar(length=file_size, label="Downloading ...") as bar: | ||
address = 0 | ||
BLOCK_SIZE = 116 if lplonly_flag else maxblocksize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
remaining = file_size - startLPLsize | ||
while remaining > 0: | ||
count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining | ||
data = fd.read(count) | ||
assert(len(data) == count) | ||
if lplonly_flag: | ||
status = api.cdb_lpl_block_write(address, data) | ||
else: | ||
status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength) | ||
if (status != 1): | ||
click.echo("CDB: firmware download failed! - status {}".format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
bar.update(count) | ||
#time.sleep(0.1) | ||
address += count | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we make sure count matches len of the data read There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If that does not match, you trigger assert. However this is a runtime error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now, exiting |
||
remaining -= count | ||
|
||
# Restore the optoe driver's write max to '1' (default value) | ||
sfp.set_optoe_write_max(1) | ||
|
||
#time.sleep(2) | ||
status = api.cdb_firmware_download_complete() | ||
click.echo('CDB: firmware download complete') | ||
return status | ||
|
||
# 'run' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.option('--mode', type=click.Choice(["0", "1", "2", "3"]), help="0 = Non-hitless Reset to Inactive Image\n \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean click.option has a |
||
1 = Hitless Reset to Inactive Image (Default)\n \ | ||
2 = Attempt non-hitless Reset to Running Image\n \ | ||
3 = Attempt Hitless Reset to Running Image\n") | ||
def run(port_name, mode): | ||
"""Run the firmware with default mode=1""" | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
if mode is None: | ||
mode = 1 | ||
|
||
status = run_firmware(port_name, int(mode)) | ||
if status != 1: | ||
click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware run in mode={} success".format(mode)) | ||
|
||
# 'commit' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
def commit(port_name): | ||
"""Commit the running firmware""" | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
status = commit_firmware(port_name) | ||
if status != 1: | ||
click.echo('Failed to commit firmware! CDB status: {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware commit successful") | ||
|
||
# 'upgrade' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.argument('filepath', required=True, default=None) | ||
def upgrade(port_name, filepath): | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Upgrade firmware on the transceiver""" | ||
|
||
physical_port = logical_port_to_physical_port_index(port_name) | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
show_firmware_version(physical_port) | ||
|
||
status = download_firmware(port_name, filepath) | ||
if status == 1: | ||
click.echo("Firmware download complete success") | ||
else: | ||
click.echo("Firmware download complete failed! CDB status = {}".format(status)) | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sys.exit(EXIT_FAIL) | ||
|
||
status = run_firmware(port_name, 1) | ||
if status != 1: | ||
click.echo('Failed to run firmware in mode=1 ! CDB status: {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware run in mode 1 successful") | ||
|
||
status = commit_firmware(port_name) | ||
if status != 1: | ||
click.echo('Failed to commit firmware! CDB status: {}'.format(status)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
click.echo("Firmware commit successful") | ||
|
||
# 'download' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.argument('filepath', required=True, default=None) | ||
def download(port_name, filepath): | ||
"""Download firmware on the transceiver""" | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
start = time.time() | ||
status = download_firmware(port_name, filepath) | ||
if status == 1: | ||
click.echo("Firmware download complete success") | ||
else: | ||
click.echo("Firmware download complete failed! status = {}".format(status)) | ||
sys.exit(EXIT_FAIL) | ||
end = time.time() | ||
hours, rem = divmod(end-start, 3600) | ||
minutes, seconds = divmod(rem, 60) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it better just echo the seconds? #Closed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the firmware download can run into miniutes, so hh:mm:ss is more readable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel seconds is good enough. |
||
click.echo("Total download Time: {:0>2}:{:0>2}:{:05.2f}".format(int(hours), int(minutes), seconds)) | ||
|
||
# 'unlock' subcommand | ||
@firmware.command() | ||
@click.argument('port_name', required=True, default=None) | ||
@click.option('--password', type=click.INT, help="Password in integer\n") | ||
def unlock(port_name, password): | ||
"""Unlock the firmware download feature via CDB host password""" | ||
physical_port = logical_port_to_physical_port_index(port_name) | ||
sfp = platform_chassis.get_sfp(physical_port) | ||
|
||
if not is_sfp_present(port_name): | ||
click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
sys.exit(EXIT_FAIL) | ||
|
||
try: | ||
api = sfp.get_xcvr_api() | ||
except NotImplementedError: | ||
click.echo("This functionality is currently not implemented for this platform") | ||
sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
||
if password is None: | ||
password = CDB_DEFAULT_HOST_PASSWORD | ||
try: | ||
status = api.cdb_enter_host_password(int(password)) | ||
except NotImplementedError: | ||
click.echo("This functionality is not applicable for this transceiver") | ||
sys.exit(EXIT_FAIL) | ||
|
||
if status == 1: | ||
click.echo("CDB: Host password accepted") | ||
else: | ||
click.echo("CDB: Host password NOT accepted! status = {}".format(status)) | ||
|
||
# 'version' subcommand | ||
@cli.command() | ||
|
@@ -912,4 +1234,4 @@ def version(): | |
|
||
|
||
if __name__ == '__main__': | ||
cli() | ||
cli() | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this spec public available? if yes, add as code comment? #Closed