Skip to content

Commit

Permalink
Rebased and squashed
Browse files Browse the repository at this point in the history
Signed-off-by: Dante Su <dante.su@broadcom.com>
  • Loading branch information
ds952811 committed Dec 2, 2021
1 parent 2dff8cd commit 2a8bfa6
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 4 deletions.
14 changes: 14 additions & 0 deletions sonic_platform_base/sfp_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class SfpBase(device_base.DeviceBase):
# Device type definition. Note, this is a constant.
DEVICE_TYPE = "sfp"

# SFP port type. Please note this is the cage type rather than transceiver type.
SFP_PORT_TYPE_UNSPECIFIED = "UNSPECIFIED"
SFP_PORT_TYPE_SFP = "SFP"
SFP_PORT_TYPE_QSFP = "QSFP"
SFP_PORT_TYPE_QSFPDD = "QSFP_DD"

# Generic error types definition
SFP_STATUS_INITIALIZING = 'Initializing'
SFP_STATUS_OK = 'OK'
Expand Down Expand Up @@ -59,6 +65,14 @@ def __init__(self):
self._xcvr_api_factory = XcvrApiFactory(self.read_eeprom, self.write_eeprom)
self._xcvr_api = None

def get_port_type(self):
"""
Retrieves the port/cage type of this SFP
Returns:
A string, the port/cage type of this SFP
"""
return self.SFP_PORT_TYPE_UNSPECIFIED

def get_num_thermals(self):
"""
Retrieves the number of thermals available on this SFP
Expand Down
302 changes: 299 additions & 3 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from ...fields import consts
from ..xcvr_api import XcvrApi

import ast
import logging
from ...codes.public.cmis import CmisCodes
from ...fields import consts
from ..xcvr_api import XcvrApi
from .cmisCDB import CmisCdbApi
Expand Down Expand Up @@ -135,8 +137,7 @@ def get_transceiver_info(self):
"specification_compliance": admin_info[consts.MEDIA_TYPE_FIELD],
"vendor_date": admin_info[consts.VENDOR_DATE_FIELD],
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD],
# TODO
"application_advertisement": "N/A",
"application_advertisement": admin_info.get(consts.APPLS_ADVT_FIELD, "N/A")
}
xcvr_info['host_electrical_interface'] = self.get_host_electrical_interface()
xcvr_info['media_interface_code'] = self.get_module_media_interface()
Expand Down Expand Up @@ -903,6 +904,9 @@ def set_lpmode(self, lpmode):

lpmode_val = self.xcvr_eeprom.read(consts.MODULE_LEVEL_CONTROL)
if lpmode_val is not None:
# Turn on software control mode
if self.xcvr_eeprom.read(consts.CMIS_MAJOR_REVISION) >= 4:
lpmode_val = lpmode_val & ~(1 << 6) # LowPwrAllowRequestHW
if lpmode is True:
lpmode_val = lpmode_val | (1 << 4)
self.xcvr_eeprom.write(consts.MODULE_LEVEL_CONTROL, lpmode_val)
Expand All @@ -914,7 +918,7 @@ def set_lpmode(self, lpmode):
time.sleep(1)
lpmode = self.xcvr_eeprom.read(consts.TRANS_MODULE_STATUS_FIELD)
if lpmode is not None:
if lpmode.get('ModuleState') == 'ModuleReady':
if lpmode.get('ModuleState') in ['ModulePwrUp', 'ModuleReady']:
return True
return False
return False
Expand Down Expand Up @@ -1745,4 +1749,296 @@ def get_transceiver_loopback(self):
for lane in range(1, self.NUM_CHANNELS+1):
trans_loopback['host_input_loopback_lane%d' % lane] = 'N/A'
return trans_loopback

def set_datapath_init(self, host_lanemask):
"""
Put the datapath into the initialized state
Args:
host_lanemask: Integer, a bitmask of the lanes on the system/host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
Boolean, true if success otherwise false
"""
cmis_major = self.xcvr_eeprom.read(consts.CMIS_MAJOR_REVISION)
data = self.xcvr_eeprom.read(consts.DATAPATH_DEINIT_FIELD)
for lane in range(self.NUM_CHANNELS):
if (1 << lane) & host_lanemask == 0:
continue
if cmis_major >= 4: # CMIS v4 onwards
data &= ~(1 << lane)
else: # CMIS v3
data |= (1 << lane)
self.xcvr_eeprom.write(consts.DATAPATH_DEINIT_FIELD, data)

def set_datapath_deinit(self, host_lanemask):
"""
Put the datapath into the de-initialized state
Args:
host_lanemask: Integer, a bitmask of the lanes on the system/host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
Boolean, true if success otherwise false
"""
cmis_major = self.xcvr_eeprom.read(consts.CMIS_MAJOR_REVISION)
data = self.xcvr_eeprom.read(consts.DATAPATH_DEINIT_FIELD)
for lane in range(self.NUM_CHANNELS):
if (1 << lane) & host_lanemask == 0:
continue
if cmis_major >= 4: # CMIS v4 onwards
data |= (1 << lane)
else: # CMIS v3
data &= ~(1 << lane)
self.xcvr_eeprom.write(consts.DATAPATH_DEINIT_FIELD, data)

def get_host_speed(self, ifname):
"""
Get the port speed from the host interface name
Args:
ifname: String, host interface name
Returns:
Integer, the port speed if success otherwise 0
"""
# see HOST_ELECTRICAL_INTERFACE of sff8024.py
speed = 0
if '400G' in ifname:
speed = 400000
elif '200G' in ifname:
speed = 200000
elif '100G' in ifname or 'CAUI-4' in ifname:
speed = 100000
elif '50G' in ifname or 'LAUI-2' in ifname:
speed = 50000
elif '40G' in ifname or 'XLAUI' in ifname or 'XLPPI' in ifname:
speed = 40000
elif '25G' in ifname:
speed = 25000
elif '10G' in ifname or 'SFI' in ifname or 'XFI' in ifname:
speed = 10000
elif '1000BASE' in ifname:
speed = 1000
return speed

def get_cmis_state(self):
"""
Get the CMIS states
Returns:
Dictionary, the states of module, config error and datapath
"""
state = {
'module_state': self.get_module_state(),
'config_state': self.get_config_datapath_hostlane_status(),
'datapath_state': self.get_datapath_state()
}
return state

def get_cmis_application_selected(self, host_lane):
"""
Get the CMIS selected application code of a host lane
Args:
host_lane:
Integer, the lane id on the host/system side
Returns:
Integer, the transceiver-specific application code
"""
ap_code = 0
if host_lane in range(self.NUM_CHANNELS) and not self.is_flat_memory():
field = "{}_{}_{}".format(consts.STAGED_CTRL_APSEL_FIELD, 0, host_lane + 1)
ap_code = self.xcvr_eeprom.read(field) >> 4

return (ap_code & 0xf)

def get_cmis_application_matched(self, host_speed, host_lanemask):
"""
Get the CMIS application code that matches the specified host side configurations
Args:
host_speed:
Integer, the port speed of the host interface
host_lanemask:
Integer, a bitmask of the lanes on the host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
Integer, the transceiver-specific application code
"""
if host_speed == 0 or host_lanemask == 0:
return 0

host_lane_count = 0
for lane in range(self.NUM_CHANNELS):
if (1 << lane) & host_lanemask == 0:
continue
host_lane_count += 1

appl_code = 0
appl_dict = self.xcvr_eeprom.read(consts.APPLS_ADVT_FIELD)
if appl_dict is None or appl_dict.strip() in ["None", "{}", ""]:
return 0
appl_dict = ast.literal_eval(appl_dict)
for c in appl_dict.keys():
d = appl_dict[c]
if d.get('host_lane_count') != host_lane_count:
continue
if self.get_host_speed(d.get('host_electrical_interface_id')) != host_speed:
continue
appl_code = c
break

return (appl_code & 0xf)

def has_cmis_application_update(self, host_speed, host_lanemask):
"""
Check for CMIS application update and retrieve the new application code
Args:
host_speed:
Integer, the port speed of the host interface
host_lanemask:
Integer, a bitmask of the lanes on the host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
(has_update, new_appl)
"""
if host_speed == 0 or host_lanemask == 0 or self.is_flat_memory():
return (False, 1)

app_new = self.get_cmis_application_matched(host_speed, host_lanemask)
if app_new != 1 or host_lanemask != (1 << self.NUM_CHANNELS) - 1:
logger.info("Non-default application is not supported")
return (False, 1)

app_old = 0
for lane in range(self.NUM_CHANNELS):
if (1 << lane) & host_lanemask == 0:
continue
if app_old == 0:
app_old = self.get_cmis_application_selected(lane)
elif app_old != self.get_cmis_application_selected(lane):
logger.info("Not all the lanes are in the same application mode")
logger.info("Forcing application update...")
return (True, app_new)

if app_old == app_new:
skip = True
dp_state = self.get_datapath_state()
conf_state = self.get_config_datapath_hostlane_status()
for lane in range(self.NUM_CHANNELS):
if (1 << lane) & host_lanemask == 0:
continue
name = "DP{}State".format(lane + 1)
if dp_state[name] != CmisCodes.DATAPATH_STATE[4]:
skip = False
break
name = "ConfigStatusLane{}".format(lane + 1)
if conf_state[name] != CmisCodes.CONFIG_STATUS[1]:
skip = False
break
if skip:
return (False, app_old)

return (True, app_new)

def set_cmis_application_stop(self, host_lanemask):
"""
Deinitialize the datapath and turn off Tx power to the associated line lanes
Args:
host_lanemask:
Integer, a bitmask of the lanes on the host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
Boolean, true if success otherwise false
"""
# D.2.2 Software Deinitialization
self.set_datapath_deinit(host_lanemask)
self.set_lpmode(True)

# D.1.3 Software Configuration and Initialization
self.tx_disable_channel(host_lanemask, True)
self.set_lpmode(False)
return True

def set_cmis_application_apsel(self, host_lanemask, appl_code):
"""
Update the selected application code to the specified host lanes
Args:
host_lanemask:
Integer, a bitmask of the lanes on the host side
e.g. 0x5 for lane 0 and lane 2.
appl_code:
Integer, the desired application code
Returns:
Boolean, true if success otherwise false
"""
# Update the application selection
for lane in range(self.NUM_CHANNELS):
if (1 << lane) & host_lanemask == 0:
continue
addr = "{}_{}_{}".format(consts.STAGED_CTRL_APSEL_FIELD, 0, lane + 1)
data = appl_code << 4
self.xcvr_eeprom.write(addr, data)

# Apply DataPathInit
return self.xcvr_eeprom.write("%s_%d" % (consts.STAGED_CTRL_APPLY_DPINIT_FIELD, 0), host_lanemask)

def set_cmis_application_start(self, host_lanemask):
"""
Initialize the datapath associated with the specified host lanes, while the Tx power
state of the line side will not be updated.
Args:
host_lanemask:
Integer, a bitmask of the lanes on the host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
Boolean, true if success otherwise false
"""
return self.set_datapath_init(host_lanemask)

def set_cmis_application_txon(self, host_lanemask):
"""
Turn on Tx power of the lanes on the line side associated with the specified host lanes
Args:
host_lanemask:
Integer, a bitmask of the lanes on the host side
e.g. 0x5 for lane 0 and lane 2.
Returns:
Boolean, true if success otherwise false
"""
self.tx_disable_channel(host_lanemask, False)

def get_error_description(self):
dp_state = self.get_datapath_state()
conf_state = self.get_config_datapath_hostlane_status()
for lane in range(self.NUM_CHANNELS):
name = "DP{}State".format(lane + 1)
if dp_state[name] != CmisCodes.DATAPATH_STATE[4]:
return dp_state[name]

name = "ConfigStatusLane{}".format(lane + 1)
if conf_state[name] != CmisCodes.CONFIG_STATUS[1]:
return conf_state[name]

state = self.get_module_state()
if state != CmisCodes.MODULE_STATE[3]:
return state

return None

# TODO: other XcvrApi methods
2 changes: 1 addition & 1 deletion sonic_platform_base/sonic_xcvr/api/public/cmisCDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, xcvr_eeprom):
super(CmisCdbApi, self).__init__(xcvr_eeprom)
self.cdb_instance_supported = self.xcvr_eeprom.read(consts.CDB_SUPPORT)
self.failed_status_dict = self.xcvr_eeprom.mem_map.codes.CDB_FAIL_STATUS
assert self.cdb_instance_supported != 0
#assert self.cdb_instance_supported != 0

def cdb1_chkflags(self):
'''
Expand Down
Loading

0 comments on commit 2a8bfa6

Please sign in to comment.