Skip to content

Commit

Permalink
[sfputil] Firmware download/upgrade CLI support for QSFP-DD (#1947)
Browse files Browse the repository at this point in the history
* Firmware download/upgrade CLI support for QSFP-DD

Signed-off-by: Prince George <prgeor@microsoft.com>
  • Loading branch information
prgeor authored Dec 7, 2021
1 parent 7c34b79 commit 2e462ef
Show file tree
Hide file tree
Showing 2 changed files with 376 additions and 0 deletions.
324 changes: 324 additions & 0 deletions sfputil/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import sys
import natsort
import ast
import time
import datetime

import subprocess
import click
Expand All @@ -26,12 +28,20 @@
PLATFORM_JSON = 'platform.json'
PORT_CONFIG_INI = 'port_config.ini'

EXIT_FAIL = -1
EXIT_SUCCESS = 0
ERROR_PERMISSIONS = 1
ERROR_CHASSIS_LOAD = 2
ERROR_SFPUTILHELPER_LOAD = 3
ERROR_PORT_CONFIG_LOAD = 4
ERROR_NOT_IMPLEMENTED = 5
ERROR_INVALID_PORT = 6
SMBUS_BLOCK_WRITE_SIZE = 32
# Default host password as per CMIS spec:
# http://www.qsfp-dd.com/wp-content/uploads/2021/05/CMIS5p0.pdf
CDB_DEFAULT_HOST_PASSWORD = 0x00001011

MAX_LPL_FIRMWARE_BLOCK_SIZE = 116 #Bytes

# TODO: We should share these maps and the formatting functions between sfputil and sfpshow
QSFP_DATA_MAP = {
Expand Down Expand Up @@ -225,6 +235,17 @@
# Global logger instance
log = logger.Logger(SYSLOG_IDENTIFIER)

def is_sfp_present(port_name):
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
presence = sfp.get_presence()
except NotImplementedError:
click.echo("sfp get_presence() NOT implemented!", err=True)
sys.exit(ERROR_NOT_IMPLEMENTED)

return bool(presence)

# ========================== Methods for formatting output ==========================

Expand Down Expand Up @@ -409,6 +430,19 @@ def logical_port_name_to_physical_port_list(port_name):
else:
return [int(port_name)]

def logical_port_to_physical_port_index(port_name):
if not platform_sfputil.is_logical_port(port_name):
click.echo("Error: invalid port '{}'\n".format(port_name))
print_all_valid_port_values()
sys.exit(ERROR_INVALID_PORT)

physical_port = logical_port_name_to_physical_port_list(port_name)[0]
if physical_port is None:
click.echo("Error: No physical port found for logical port '{}'".format(port_name))
sys.exit(EXIT_FAIL)

return physical_port


def print_all_valid_port_values():
click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical)))
Expand Down Expand Up @@ -806,6 +840,37 @@ def lpmode(port):

click.echo(tabulate(output_table, table_header, tablefmt='simple'))

def show_firmware_version(physical_port):
try:
sfp = platform_chassis.get_sfp(physical_port)
api = sfp.get_xcvr_api()
out = api.get_module_fw_info()
click.echo(out['info'])
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

# 'fwversion' subcommand
@show.command()
@click.argument('port_name', metavar='<port_name>', required=True)
def fwversion(port_name):
"""Show firmware version of the transceiver"""

physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
presence = sfp.get_presence()
except NotImplementedError:
click.echo("sfp get_presence() NOT implemented!")
sys.exit(EXIT_FAIL)

if not presence:
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

show_firmware_version(physical_port)
sys.exit(EXIT_SUCCESS)

# 'lpmode' subgroup
@cli.group()
Expand Down Expand Up @@ -904,6 +969,265 @@ def reset(port_name):

i += 1

# 'firmware' subgroup
@cli.group()
def firmware():
"""Download/Upgrade firmware on the transceiver"""
pass

def run_firmware(port_name, mode):
"""
Make the inactive firmware as the current running firmware
@port_name:
@mode: 0, 1, 2, 3 different modes to run the firmware
Returns 1 on success, and exit_code = -1 on failure
"""
status = 0
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

if mode == 0:
click.echo("Running firmware: Non-hitless Reset to Inactive Image")
elif mode == 1:
click.echo("Running firmware: Hitless Reset to Inactive Image")
elif mode == 2:
click.echo("Running firmware: Attempt non-hitless Reset to Running Image")
elif mode == 3:
click.echo("Running firmware: Attempt Hitless Reset to Running Image")
else:
click.echo("Running firmware: Unknown mode {}".format(mode))
sys.exit(EXIT_FAIL)

try:
status = api.cdb_run_firmware(mode)
except NotImplementedError:
click.echo("This functionality is not applicable for this transceiver")
sys.exit(EXIT_FAIL)

return status

def commit_firmware(port_name):
status = 0
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

try:
status = api.cdb_commit_firmware()
except NotImplementedError:
click.echo("This functionality is not applicable for this transceiver")

return status

def download_firmware(port_name, filepath):
"""Download firmware on the transceiver"""
try:
fd = open(filepath, 'rb')
fd.seek(0, 2)
file_size = fd.tell()
fd.seek(0, 0)
except FileNotFoundError:
click.echo("Firmware file {} NOT found".format(filepath))
sys.exit(EXIT_FAIL)

physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)
try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is NOT applicable to this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

try:
fwinfo = api.get_module_fw_mgmt_feature()
if fwinfo['status'] == True:
startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature']
else:
click.echo("Failed to fetch CDB Firmware management features")
sys.exit(EXIT_FAIL)
except NotImplementedError:
click.echo("This functionality is NOT applicable for this transceiver")
sys.exit(ERROR_NOT_IMPLEMENTED)

click.echo('CDB: Starting firmware download')
startdata = fd.read(startLPLsize)
status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size)
if status != 1:
click.echo('CDB: Start firmware download failed - status {}'.format(status))
sys.exit(EXIT_FAIL)

# Increase the optoe driver's write max to speed up firmware download
sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE)

with click.progressbar(length=file_size, label="Downloading ...") as bar:
address = 0
BLOCK_SIZE = MAX_LPL_FIRMWARE_BLOCK_SIZE if lplonly_flag else maxblocksize
remaining = file_size - startLPLsize
while remaining > 0:
count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining
data = fd.read(count)
if len(data) != count:
click.echo("Firmware file read failed!")
sys.exit(EXIT_FAIL)

if lplonly_flag:
status = api.cdb_lpl_block_write(address, data)
else:
status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength)
if (status != 1):
click.echo("CDB: firmware download failed! - status {}".format(status))
sys.exit(EXIT_FAIL)

bar.update(count)
address += count
remaining -= count

# Restore the optoe driver's write max to '1' (default value)
sfp.set_optoe_write_max(1)

status = api.cdb_firmware_download_complete()
click.echo('CDB: firmware download complete')
return status

# 'run' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.option('--mode', default="1", type=click.Choice(["0", "1", "2", "3"]), show_default=True,
help="0 = Non-hitless Reset to Inactive Image\n \
1 = Hitless Reset to Inactive Image (Default)\n \
2 = Attempt non-hitless Reset to Running Image\n \
3 = Attempt Hitless Reset to Running Image\n")
def run(port_name, mode):
"""Run the firmware with default mode=1"""

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

status = run_firmware(port_name, int(mode))
if status != 1:
click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status))
sys.exit(EXIT_FAIL)

click.echo("Firmware run in mode={} success".format(mode))

# 'commit' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
def commit(port_name):
"""Commit the running firmware"""

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

status = commit_firmware(port_name)
if status != 1:
click.echo('Failed to commit firmware! CDB status: {}'.format(status))
sys.exit(EXIT_FAIL)

click.echo("Firmware commit successful")

# 'upgrade' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.argument('filepath', required=True, default=None)
def upgrade(port_name, filepath):
"""Upgrade firmware on the transceiver"""

physical_port = logical_port_to_physical_port_index(port_name)

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

show_firmware_version(physical_port)

status = download_firmware(port_name, filepath)
if status == 1:
click.echo("Firmware download complete success")
else:
click.echo("Firmware download complete failed! CDB status = {}".format(status))
sys.exit(EXIT_FAIL)

status = run_firmware(port_name, 1)
if status != 1:
click.echo('Failed to run firmware in mode=1 ! CDB status: {}'.format(status))
sys.exit(EXIT_FAIL)

click.echo("Firmware run in mode 1 successful")

status = commit_firmware(port_name)
if status != 1:
click.echo('Failed to commit firmware! CDB status: {}'.format(status))
sys.exit(EXIT_FAIL)

click.echo("Firmware commit successful")

# 'download' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.argument('filepath', required=True, default=None)
def download(port_name, filepath):
"""Download firmware on the transceiver"""

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

start = time.time()
status = download_firmware(port_name, filepath)
if status == 1:
click.echo("Firmware download complete success")
else:
click.echo("Firmware download complete failed! status = {}".format(status))
sys.exit(EXIT_FAIL)
end = time.time()
click.echo("Total download Time: {}".format(str(datetime.timedelta(seconds=end-start))))


# 'unlock' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.option('--password', type=click.INT, help="Password in integer\n")
def unlock(port_name, password):
"""Unlock the firmware download feature via CDB host password"""
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

if password is None:
password = CDB_DEFAULT_HOST_PASSWORD
try:
status = api.cdb_enter_host_password(int(password))
except NotImplementedError:
click.echo("This functionality is not applicable for this transceiver")
sys.exit(EXIT_FAIL)

if status == 1:
click.echo("CDB: Host password accepted")
else:
click.echo("CDB: Host password NOT accepted! status = {}".format(status))

# 'version' subcommand
@cli.command()
Expand Down
Loading

0 comments on commit 2e462ef

Please sign in to comment.