From f687a9ecf7e520dbb70fdc7d52603bda12b60cf8 Mon Sep 17 00:00:00 2001 From: Oleksandr Ivantsiv Date: Tue, 8 Mar 2022 16:12:31 +0000 Subject: [PATCH 1/2] [Mellanox] Refactor SFP to use new APIs. --- .../sonic_platform/chassis.py | 20 +- .../mlnx-platform-api/sonic_platform/sfp.py | 1828 +---------------- .../mlnx-platform-api/tests/test_chassis.py | 4 +- .../mlnx-platform-api/tests/test_sfp.py | 2 +- 4 files changed, 38 insertions(+), 1816 deletions(-) diff --git a/platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py b/platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py index 6a712af68ff2..1cc3d0f75660 100644 --- a/platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py +++ b/platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py @@ -74,7 +74,7 @@ def __init__(self): # Initialize DMI data self.dmi_data = None - + # move the initialization of each components to their dedicated initializer # which will be called from platform # @@ -115,7 +115,7 @@ def __del__(self): if SFP.shared_sdk_handle: deinitialize_sdk_handle(SFP.shared_sdk_handle) - + ############################################## # PSU methods ############################################## @@ -238,7 +238,7 @@ def initialize_single_sfp(self, index): if index < sfp_count: if not self._sfp_list: self._sfp_list = [None] * sfp_count - + if not self._sfp_list[index]: from .sfp import SFP self._sfp_list[index] = SFP(index) @@ -295,7 +295,7 @@ def get_sfp(self, index): index = index - 1 self.initialize_single_sfp(index) return super(Chassis, self).get_sfp(index) - + def get_change_event(self, timeout=0): """ Returns a nested dictionary containing all devices which have @@ -310,7 +310,7 @@ def get_change_event(self, timeout=0): - True if call successful, False if not; - A nested dictionary where key is a device type, value is a dictionary with key:value pairs in the format of - {'device_id':'device_event'}, + {'device_id':'device_event'}, where device_id is the device ID for this device and device_event, status='1' represents device inserted, @@ -596,7 +596,7 @@ def get_watchdog(self): Note: We overload this method to ensure that watchdog is only initialized when it is referenced. Currently, only one daemon can open the watchdog. - To initialize watchdog in the constructor causes multiple daemon + To initialize watchdog in the constructor causes multiple daemon try opening watchdog when loading and constructing a chassis object and fail. By doing so we can eliminate that risk. """ @@ -609,11 +609,11 @@ def get_watchdog(self): return self._watchdog - + def get_revision(self): """ Retrieves the hardware revision of the device - + Returns: string: Revision value of device """ @@ -647,7 +647,7 @@ def _parse_dmi(self, filename): def _verify_reboot_cause(self, filename): ''' - Open and read the reboot cause file in + Open and read the reboot cause file in /var/run/hwmanagement/system (which is defined as REBOOT_CAUSE_ROOT) If a reboot cause file doesn't exists, returns '0'. ''' @@ -754,7 +754,7 @@ def initialize_single_module(self, index): if index < count: if not self._module_list: self._module_list = [None] * count - + if not self._module_list[index]: from .module import Module self._module_list[index] = Module(index + 1) diff --git a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py index 59f71bebc9a5..92e6537bf492 100644 --- a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py +++ b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py @@ -37,12 +37,13 @@ from sonic_py_common.logger import Logger from . import utils from .device_data import DeviceDataManager + from sonic_platform_base.sonic_xcvr.sfp_optoe_base import SfpOptoeBase except ImportError as e: raise ImportError (str(e) + "- required module not found") try: - # python_sdk_api does not support python3 for now. Daemons like thermalctld or psud + # python_sdk_api does not support python3 for now. Daemons like thermalctld or psud # also import this file without actually use the sdk lib. So we catch the ImportError # and ignore it here. Meanwhile, we have to trigger xcvrd using python2 now because it # uses the sdk lib. @@ -63,171 +64,6 @@ except KeyError: pass -# definitions of the offset and width for values in XCVR info eeprom -XCVR_INTFACE_BULK_OFFSET = 0 -XCVR_INTFACE_BULK_WIDTH_QSFP = 20 -XCVR_INTFACE_BULK_WIDTH_SFP = 21 -XCVR_TYPE_OFFSET = 0 -XCVR_TYPE_WIDTH = 1 -XCVR_EXT_TYPE_OFFSET = 1 -XCVR_EXT_TYPE_WIDTH = 1 -XCVR_CONNECTOR_OFFSET = 2 -XCVR_CONNECTOR_WIDTH = 1 -XCVR_COMPLIANCE_CODE_OFFSET = 3 -XCVR_COMPLIANCE_CODE_WIDTH = 8 -XCVR_ENCODING_OFFSET = 11 -XCVR_ENCODING_WIDTH = 1 -XCVR_NBR_OFFSET = 12 -XCVR_NBR_WIDTH = 1 -XCVR_EXT_RATE_SEL_OFFSET = 13 -XCVR_EXT_RATE_SEL_WIDTH = 1 -XCVR_CABLE_LENGTH_OFFSET = 14 -XCVR_CABLE_LENGTH_WIDTH_QSFP = 5 -XCVR_CABLE_LENGTH_WIDTH_SFP = 6 -XCVR_VENDOR_NAME_OFFSET = 20 -XCVR_VENDOR_NAME_WIDTH = 16 -XCVR_VENDOR_OUI_OFFSET = 37 -XCVR_VENDOR_OUI_WIDTH = 3 -XCVR_VENDOR_PN_OFFSET = 40 -XCVR_VENDOR_PN_WIDTH = 16 -XCVR_HW_REV_OFFSET = 56 -XCVR_HW_REV_WIDTH_OSFP = 2 -XCVR_HW_REV_WIDTH_QSFP = 2 -XCVR_HW_REV_WIDTH_SFP = 4 -XCVR_EXT_SPECIFICATION_COMPLIANCE_OFFSET = 64 -XCVR_EXT_SPECIFICATION_COMPLIANCE_WIDTH = 1 -XCVR_VENDOR_SN_OFFSET = 68 -XCVR_VENDOR_SN_WIDTH = 16 -XCVR_VENDOR_DATE_OFFSET = 84 -XCVR_VENDOR_DATE_WIDTH = 8 -XCVR_DOM_CAPABILITY_OFFSET = 92 -XCVR_DOM_CAPABILITY_WIDTH = 2 - -# definitions of the offset and width for values in XCVR_QSFP_DD info eeprom -XCVR_EXT_TYPE_OFFSET_QSFP_DD = 72 -XCVR_EXT_TYPE_WIDTH_QSFP_DD = 2 -XCVR_CONNECTOR_OFFSET_QSFP_DD = 75 -XCVR_CONNECTOR_WIDTH_QSFP_DD = 1 -XCVR_CABLE_LENGTH_OFFSET_QSFP_DD = 74 -XCVR_CABLE_LENGTH_WIDTH_QSFP_DD = 1 -XCVR_HW_REV_OFFSET_QSFP_DD = 36 -XCVR_HW_REV_WIDTH_QSFP_DD = 2 -XCVR_VENDOR_DATE_OFFSET_QSFP_DD = 54 -XCVR_VENDOR_DATE_WIDTH_QSFP_DD = 8 -XCVR_DOM_CAPABILITY_OFFSET_QSFP_DD = 2 -XCVR_DOM_CAPABILITY_WIDTH_QSFP_DD = 1 -XCVR_MEDIA_TYPE_OFFSET_QSFP_DD = 85 -XCVR_MEDIA_TYPE_WIDTH_QSFP_DD = 1 -XCVR_FIRST_APPLICATION_LIST_OFFSET_QSFP_DD = 86 -XCVR_FIRST_APPLICATION_LIST_WIDTH_QSFP_DD = 32 -XCVR_SECOND_APPLICATION_LIST_OFFSET_QSFP_DD = 351 -XCVR_SECOND_APPLICATION_LIST_WIDTH_QSFP_DD = 28 - -# to improve performance we retrieve all eeprom data via a single ethtool command -# in function get_transceiver_info and get_transceiver_bulk_status -# XCVR_INTERFACE_DATA_SIZE stands for the max size to be read -# this variable is only used by get_transceiver_info. -# please be noted that each time some new value added to the function -# we should make sure that it falls into the area -# [XCVR_INTERFACE_DATA_START, XCVR_INTERFACE_DATA_SIZE] or -# adjust XCVR_INTERFACE_MAX_SIZE to contain the new data -# It's same for [QSFP_DOM_BULK_DATA_START, QSFP_DOM_BULK_DATA_SIZE] and -# [SFP_DOM_BULK_DATA_START, SFP_DOM_BULK_DATA_SIZE] which are used by -# get_transceiver_bulk_status -XCVR_INTERFACE_DATA_START = 0 -XCVR_INTERFACE_DATA_SIZE = 92 -SFP_MODULE_ADDRA2_OFFSET = 256 -SFP_MODULE_THRESHOLD_OFFSET = 0 -SFP_MODULE_THRESHOLD_WIDTH = 56 - -QSFP_DOM_BULK_DATA_START = 22 -QSFP_DOM_BULK_DATA_SIZE = 36 -SFP_DOM_BULK_DATA_START = 96 -SFP_DOM_BULK_DATA_SIZE = 10 - -QSFP_DD_DOM_BULK_DATA_START = 14 -QSFP_DD_DOM_BULK_DATA_SIZE = 4 - -# definitions of the offset for values in OSFP info eeprom -OSFP_TYPE_OFFSET = 0 -OSFP_VENDOR_NAME_OFFSET = 129 -OSFP_VENDOR_PN_OFFSET = 148 -OSFP_HW_REV_OFFSET = 164 -OSFP_VENDOR_SN_OFFSET = 166 - -# definitions of the offset for values in QSFP_DD info eeprom -QSFP_DD_TYPE_OFFSET = 0 -QSFP_DD_VENDOR_NAME_OFFSET = 1 -QSFP_DD_VENDOR_PN_OFFSET = 20 -QSFP_DD_VENDOR_SN_OFFSET = 38 -QSFP_DD_VENDOR_OUI_OFFSET = 17 - -#definitions of the offset and width for values in DOM info eeprom -QSFP_DOM_REV_OFFSET = 1 -QSFP_DOM_REV_WIDTH = 1 -QSFP_TEMPE_OFFSET = 22 -QSFP_TEMPE_WIDTH = 2 -QSFP_VOLT_OFFSET = 26 -QSFP_VOLT_WIDTH = 2 -QSFP_VERSION_COMPLIANCE_OFFSET = 1 -QSFP_VERSION_COMPLIANCE_WIDTH = 2 -QSFP_CHANNL_MON_OFFSET = 34 -QSFP_CHANNL_MON_WIDTH = 16 -QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH = 24 -QSFP_CHANNL_DISABLE_STATUS_OFFSET = 86 -QSFP_CHANNL_DISABLE_STATUS_WIDTH = 1 -QSFP_CHANNL_RX_LOS_STATUS_OFFSET = 3 -QSFP_CHANNL_RX_LOS_STATUS_WIDTH = 1 -QSFP_CHANNL_TX_FAULT_STATUS_OFFSET = 4 -QSFP_CHANNL_TX_FAULT_STATUS_WIDTH = 1 -QSFP_CONTROL_OFFSET = 86 -QSFP_CONTROL_WIDTH = 8 -QSFP_MODULE_MONITOR_OFFSET = 0 -QSFP_MODULE_MONITOR_WIDTH = 9 -QSFP_POWEROVERRIDE_OFFSET = 93 -QSFP_POWEROVERRIDE_WIDTH = 1 -QSFP_POWEROVERRIDE_BIT = 0 -QSFP_POWERSET_BIT = 1 -QSFP_OPTION_VALUE_OFFSET = 192 -QSFP_OPTION_VALUE_WIDTH = 4 - -QSFP_MODULE_UPPER_PAGE3_START = 384 -QSFP_MODULE_THRESHOLD_OFFSET = 128 -QSFP_MODULE_THRESHOLD_WIDTH = 24 -QSFP_CHANNL_THRESHOLD_OFFSET = 176 -QSFP_CHANNL_THRESHOLD_WIDTH = 24 - -SFP_TEMPE_OFFSET = 96 -SFP_TEMPE_WIDTH = 2 -SFP_VOLT_OFFSET = 98 -SFP_VOLT_WIDTH = 2 -SFP_CHANNL_MON_OFFSET = 100 -SFP_CHANNL_MON_WIDTH = 6 -SFP_CHANNL_STATUS_OFFSET = 110 -SFP_CHANNL_STATUS_WIDTH = 1 - -QSFP_DD_TEMPE_OFFSET = 14 -QSFP_DD_TEMPE_WIDTH = 2 -QSFP_DD_VOLT_OFFSET = 16 -QSFP_DD_VOLT_WIDTH = 2 -QSFP_DD_TX_BIAS_OFFSET = 42 -QSFP_DD_TX_BIAS_WIDTH = 16 -QSFP_DD_RX_POWER_OFFSET = 58 -QSFP_DD_RX_POWER_WIDTH = 16 -QSFP_DD_TX_POWER_OFFSET = 26 -QSFP_DD_TX_POWER_WIDTH = 16 -QSFP_DD_CHANNL_MON_OFFSET = 26 -QSFP_DD_CHANNL_MON_WIDTH = 48 -QSFP_DD_CHANNL_DISABLE_STATUS_OFFSET = 86 -QSFP_DD_CHANNL_DISABLE_STATUS_WIDTH = 1 -QSFP_DD_CHANNL_RX_LOS_STATUS_OFFSET = 19 -QSFP_DD_CHANNL_RX_LOS_STATUS_WIDTH = 1 -QSFP_DD_CHANNL_TX_FAULT_STATUS_OFFSET = 7 -QSFP_DD_CHANNL_TX_FAULT_STATUS_WIDTH = 1 -QSFP_DD_MODULE_THRESHOLD_OFFSET = 0 -QSFP_DD_MODULE_THRESHOLD_WIDTH = 72 -QSFP_DD_CHANNL_STATUS_OFFSET = 26 -QSFP_DD_CHANNL_STATUS_WIDTH = 1 # identifier value of xSFP module which is in the first byte of the EEPROM # if the identifier value falls into SFP_TYPE_CODE_LIST the module is treated as a SFP module and parsed according to 8472 @@ -252,31 +88,6 @@ '18' # QSFP-DD Double Density 8X Pluggable Transceiver ] -qsfp_cable_length_tup = ('Length(km)', 'Length OM3(2m)', - 'Length OM2(m)', 'Length OM1(m)', - 'Length Cable Assembly(m)') - -sfp_cable_length_tup = ('LengthSMFkm-UnitsOfKm', 'LengthSMF(UnitsOf100m)', - 'Length50um(UnitsOf10m)', 'Length62.5um(UnitsOfm)', - 'LengthCable(UnitsOfm)', 'LengthOM3(UnitsOf10m)') - -sfp_compliance_code_tup = ('10GEthernetComplianceCode', 'InfinibandComplianceCode', - 'ESCONComplianceCodes', 'SONETComplianceCodes', - 'EthernetComplianceCodes','FibreChannelLinkLength', - 'FibreChannelTechnology', 'SFP+CableTechnology', - 'FibreChannelTransmissionMedia','FibreChannelSpeed') - -qsfp_compliance_code_tup = ('10/40G Ethernet Compliance Code', 'SONET Compliance codes', - 'SAS/SATA compliance codes', 'Gigabit Ethernet Compliant codes', - 'Fibre Channel link length/Transmitter Technology', - 'Fibre Channel transmission media', 'Fibre Channel Speed') - -SFP_PATH = "/var/run/hw-management/qsfp/" -SFP_TYPE = "SFP" -QSFP_TYPE = "QSFP" -OSFP_TYPE = "OSFP" -QSFP_DD_TYPE = "QSFP_DD" - #variables for sdk REGISTER_NUM = 1 DEVICE_ID = 1 @@ -331,6 +142,7 @@ def initialize_sdk_handle(): return sdk_handle + def deinitialize_sdk_handle(sdk_handle): if sdk_handle is not None: rc = sx_api_close(sdk_handle) @@ -355,23 +167,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): deinitialize_sdk_handle(self.sdk_handle) -class SfpCapability: - def __init__(self): - self.dom_supported = False - self.dom_temp_supported = False - self.dom_volt_supported = False - self.dom_rx_power_supported = False - self.dom_tx_bias_power_supported = False - self.dom_tx_power_supported = False - self.dom_tx_disable_supported = False - self.dom_thresholds_supported = False - self.dom_rx_tx_power_bias_supported = False - self.calibration = 0 - self.qsfp_page3_available = False - self.second_application_list = False - - -class SFP(SfpBase): +class SFP(SfpOptoeBase): """Platform-specific SFP class""" shared_sdk_handle = None SFP_MLNX_ERROR_DESCRIPTION_LONGRANGE_NON_MLNX_CABLE = 'Long range for non-Mellanox cable or module' @@ -388,7 +184,7 @@ class SFP(SfpBase): def __init__(self, sfp_index, slot_id=0, linecard_port_count=0, lc_name=None): super(SFP, self).__init__() - + if slot_id == 0: # For non-modular chassis self.index = sfp_index + 1 self.sdk_index = sfp_index @@ -406,9 +202,7 @@ def __init__(self, sfp_index, slot_id=0, linecard_port_count=0, lc_name=None): self._thermal_list = initialize_linecard_sfp_thermal(lc_name, slot_id, sfp_index) self.slot_id = slot_id - self._sfp_type = None - self._sfp_capability = None - + @property def sdk_handle(self): if not SFP.shared_sdk_handle: @@ -417,218 +211,12 @@ def sdk_handle(self): logger.log_error('Failed to open SDK handle') return SFP.shared_sdk_handle - @property - def sfp_type(self): - if not self._sfp_type: - eeprom_raw = [] - eeprom_raw = self._read_eeprom_specific_bytes(XCVR_TYPE_OFFSET, XCVR_TYPE_WIDTH) - if eeprom_raw: - if eeprom_raw[0] in SFP_TYPE_CODE_LIST: - self._sfp_type = SFP_TYPE - elif eeprom_raw[0] in QSFP_TYPE_CODE_LIST: - self._sfp_type = QSFP_TYPE - elif eeprom_raw[0] in QSFP_DD_TYPE_CODE_LIST: - self._sfp_type = QSFP_DD_TYPE - - # we don't regonize this identifier value, treat the xSFP module as the default type - if not self._sfp_type: - raise RuntimeError("Failed to detect SFP type for SFP {}".format(self.index)) - else: - return self._sfp_type - - def _dom_capability_detect(self): - if self._sfp_capability: - return - - self._sfp_capability = SfpCapability() - if not self.get_presence(): - return - - if self.sfp_type == QSFP_TYPE: - self._sfp_capability.calibration = 1 - sfpi_obj = sff8436InterfaceId() - if sfpi_obj is None: - self._sfp_capability.dom_supported = False - offset = 128 - - # QSFP capability byte parse, through this byte can know whether it support tx_power or not. - # TODO: in the future when decided to migrate to support SFF-8636 instead of SFF-8436, - # need to add more code for determining the capability and version compliance - # in SFF-8636 dom capability definitions evolving with the versions. - qsfp_dom_capability_raw = self._read_eeprom_specific_bytes((offset + XCVR_DOM_CAPABILITY_OFFSET), XCVR_DOM_CAPABILITY_WIDTH) - if qsfp_dom_capability_raw is not None: - qsfp_version_compliance_raw = self._read_eeprom_specific_bytes(QSFP_VERSION_COMPLIANCE_OFFSET, QSFP_VERSION_COMPLIANCE_WIDTH) - qsfp_version_compliance = int(qsfp_version_compliance_raw[0], 16) - dom_capability = sfpi_obj.parse_dom_capability(qsfp_dom_capability_raw, 0) - if qsfp_version_compliance >= 0x08: - self._sfp_capability.dom_temp_supported = dom_capability['data']['Temp_support']['value'] == 'On' - self._sfp_capability.dom_volt_supported = dom_capability['data']['Voltage_support']['value'] == 'On' - self._sfp_capability.dom_rx_power_supported = dom_capability['data']['Rx_power_support']['value'] == 'On' - self._sfp_capability.dom_tx_power_supported = dom_capability['data']['Tx_power_support']['value'] == 'On' - else: - self._sfp_capability.dom_temp_supported = True - self._sfp_capability.dom_volt_supported = True - self._sfp_capability.dom_rx_power_supported = dom_capability['data']['Rx_power_support']['value'] == 'On' - self._sfp_capability.dom_tx_power_supported = True - self._sfp_capability.dom_supported = True - self._sfp_capability.calibration = 1 - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return None - qsfp_option_value_raw = self._read_eeprom_specific_bytes(QSFP_OPTION_VALUE_OFFSET, QSFP_OPTION_VALUE_WIDTH) - if qsfp_option_value_raw is not None: - optional_capability = sfpd_obj.parse_option_params(qsfp_option_value_raw, 0) - self._sfp_capability.dom_tx_disable_supported = optional_capability['data']['TxDisable']['value'] == 'On' - dom_status_indicator = sfpd_obj.parse_dom_status_indicator(qsfp_version_compliance_raw, 1) - self._sfp_capability.qsfp_page3_available = dom_status_indicator['data']['FlatMem']['value'] == 'Off' - else: - self._sfp_capability.dom_supported = False - self._sfp_capability.dom_temp_supported = False - self._sfp_capability.dom_volt_supported = False - self._sfp_capability.dom_rx_power_supported = False - self._sfp_capability.dom_tx_power_supported = False - self._sfp_capability.calibration = 0 - self._sfp_capability.qsfp_page3_available = False - - elif self.sfp_type == QSFP_DD_TYPE: - sfpi_obj = qsfp_dd_InterfaceId() - if sfpi_obj is None: - self._sfp_capability.dom_supported = False - - offset = 0 - # two types of QSFP-DD cable types supported: Copper and Optical. - qsfp_dom_capability_raw = self._read_eeprom_specific_bytes((offset + XCVR_DOM_CAPABILITY_OFFSET_QSFP_DD), XCVR_DOM_CAPABILITY_WIDTH_QSFP_DD) - if qsfp_dom_capability_raw is not None: - self._sfp_capability.dom_temp_supported = True - self._sfp_capability.dom_volt_supported = True - dom_capability = sfpi_obj.parse_dom_capability(qsfp_dom_capability_raw, 0) - if dom_capability['data']['Flat_MEM']['value'] == 'Off': - self._sfp_capability.dom_supported = True - self._sfp_capability.second_application_list = True - self._sfp_capability.dom_rx_power_supported = True - self._sfp_capability.dom_tx_power_supported = True - self._sfp_capability.dom_tx_bias_power_supported = True - self._sfp_capability.dom_thresholds_supported = True - self._sfp_capability.dom_rx_tx_power_bias_supported = True - else: - self._sfp_capability.dom_supported = False - self._sfp_capability.second_application_list = False - self._sfp_capability.dom_rx_power_supported = False - self._sfp_capability.dom_tx_power_supported = False - self._sfp_capability.dom_tx_bias_power_supported = False - self._sfp_capability.dom_thresholds_supported = False - self._sfp_capability.dom_rx_tx_power_bias_supported = False - else: - self._sfp_capability.dom_supported = False - self._sfp_capability.dom_temp_supported = False - self._sfp_capability.dom_volt_supported = False - self._sfp_capability.dom_rx_power_supported = False - self._sfp_capability.dom_tx_power_supported = False - self._sfp_capability.dom_tx_bias_power_supported = False - self._sfp_capability.dom_thresholds_supported = False - self._sfp_capability.dom_rx_tx_power_bias_supported = False - - elif self.sfp_type == SFP_TYPE: - sfpi_obj = sff8472InterfaceId() - if sfpi_obj is None: - return None - sfp_dom_capability_raw = self._read_eeprom_specific_bytes(XCVR_DOM_CAPABILITY_OFFSET, XCVR_DOM_CAPABILITY_WIDTH) - if sfp_dom_capability_raw is not None: - sfp_dom_capability = int(sfp_dom_capability_raw[0], 16) - self._sfp_capability.dom_supported = (sfp_dom_capability & 0x40 != 0) - if self._sfp_capability.dom_supported: - self._sfp_capability.dom_temp_supported = True - self._sfp_capability.dom_volt_supported = True - self._sfp_capability.dom_rx_power_supported = True - self._sfp_capability.dom_tx_power_supported = True - if sfp_dom_capability & 0x20 != 0: - self._sfp_capability.calibration = 1 - elif sfp_dom_capability & 0x10 != 0: - self._sfp_capability.calibration = 2 - else: - self._sfp_capability.calibration = 0 - else: - self._sfp_capability.dom_temp_supported = False - self._sfp_capability.dom_volt_supported = False - self._sfp_capability.dom_rx_power_supported = False - self._sfp_capability.dom_tx_power_supported = False - self._sfp_capability.calibration = 0 - self._sfp_capability.dom_tx_disable_supported = (int(sfp_dom_capability_raw[1], 16) & 0x40 != 0) - else: - self._sfp_capability.dom_supported = False - self._sfp_capability.dom_temp_supported = False - self._sfp_capability.dom_volt_supported = False - self._sfp_capability.dom_rx_power_supported = False - self._sfp_capability.dom_tx_power_supported = False - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_supported(self): - return self._sfp_capability.dom_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_temp_supported(self): - return self._sfp_capability.dom_temp_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_volt_supported(self): - return self._sfp_capability.dom_volt_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_rx_power_supported(self): - return self._sfp_capability.dom_rx_power_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_tx_power_supported(self): - return self._sfp_capability.dom_tx_power_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def calibration(self): - return self._sfp_capability.calibration - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_tx_bias_power_supported(self): - return self._sfp_capability.dom_tx_bias_power_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_tx_disable_supported(self): - return self._sfp_capability.dom_tx_disable_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def qsfp_page3_available(self): - return self._sfp_capability.qsfp_page3_available - - @property - @utils.pre_initialize(_dom_capability_detect) - def second_application_list(self): - return self._sfp_capability.second_application_list - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_thresholds_supported(self): - return self._sfp_capability.dom_thresholds_supported - - @property - @utils.pre_initialize(_dom_capability_detect) - def dom_rx_tx_power_bias_supported(self): - return self._sfp_capability.dom_rx_tx_power_bias_supported - def reinit(self): - """ Re-initialize this SFP object when a new SFP inserted - :return: + :return: """ - self._sfp_type = None - self._sfp_capability = None + self.refresh_xcvr_api() def get_presence(self): """ @@ -637,32 +225,17 @@ def get_presence(self): Returns: bool: True if device is present, False if not """ - presence = False - ethtool_cmd = "ethtool -m sfp{} hex on offset 0 length 1 2>/dev/null".format(self.index) - try: - proc = subprocess.Popen(ethtool_cmd, - stdout=subprocess.PIPE, - shell=True, - stderr=subprocess.STDOUT, - universal_newlines=True) - stdout = proc.communicate()[0] - proc.wait() - result = stdout.rstrip('\n') - if result != '': - presence = True - - except OSError as e: - raise OSError("Cannot detect sfp") - - return presence + eeprom_raw = self.read_eeprom(0, 1) + + return eeprom_raw is not None # Read out any bytes from any offset - def _read_eeprom_specific_bytes(self, offset, num_bytes): + def read_eeprom(self, offset, num_bytes): eeprom_raw = [] ethtool_cmd = "ethtool -m sfp{} hex on offset {} length {}".format(self.index, offset, num_bytes) try: - output = subprocess.check_output(ethtool_cmd, - shell=True, + output = subprocess.check_output(ethtool_cmd, + shell=True, universal_newlines=True) output_lines = output.splitlines() first_line_raw = output_lines[0] @@ -673,933 +246,8 @@ def _read_eeprom_specific_bytes(self, offset, num_bytes): except subprocess.CalledProcessError as e: return None - return eeprom_raw - - def _convert_string_to_num(self, value_str): - if "-inf" in value_str: - return 'N/A' - elif "Unknown" in value_str: - return 'N/A' - elif 'dBm' in value_str: - t_str = value_str.rstrip('dBm') - return float(t_str) - elif 'mA' in value_str: - t_str = value_str.rstrip('mA') - return float(t_str) - elif 'C' in value_str: - t_str = value_str.rstrip('C') - return float(t_str) - elif 'Volts' in value_str: - t_str = value_str.rstrip('Volts') - return float(t_str) - else: - return 'N/A' - - def get_transceiver_info(self): - """ - Retrieves transceiver info of this SFP - - Returns: - A dict which contains following keys/values : - ================================================================================ - keys |Value Format |Information - ---------------------------|---------------|---------------------------- - type |1*255VCHAR |type of SFP - vendor_rev |1*255VCHAR |vendor revision of SFP - serial |1*255VCHAR |serial number of the SFP - manufacturer |1*255VCHAR |SFP vendor name - model |1*255VCHAR |SFP model name - connector |1*255VCHAR |connector information - encoding |1*255VCHAR |encoding information - ext_identifier |1*255VCHAR |extend identifier - ext_rateselect_compliance |1*255VCHAR |extended rateSelect compliance - cable_length |INT |cable length in m - mominal_bit_rate |INT |nominal bit rate by 100Mbs - specification_compliance |1*255VCHAR |specification compliance - vendor_date |1*255VCHAR |vendor date - vendor_oui |1*255VCHAR |vendor OUI - application_advertisement |1*255VCHAR |supported applications advertisement - ================================================================================ - """ - transceiver_info_dict = {} - compliance_code_dict = {} - - # ToDo: OSFP tranceiver info parsing not fully supported. - # in inf8628.py lack of some memory map definition - # will be implemented when the inf8628 memory map ready - if self.sfp_type == OSFP_TYPE: - offset = 0 - vendor_rev_width = XCVR_HW_REV_WIDTH_OSFP - - sfpi_obj = inf8628InterfaceId() - if sfpi_obj is None: - return None - - sfp_type_raw = self._read_eeprom_specific_bytes((offset + OSFP_TYPE_OFFSET), XCVR_TYPE_WIDTH) - if sfp_type_raw is not None: - sfp_type_data = sfpi_obj.parse_sfp_type(sfp_type_raw, 0) - else: - return None - - sfp_vendor_name_raw = self._read_eeprom_specific_bytes((offset + OSFP_VENDOR_NAME_OFFSET), XCVR_VENDOR_NAME_WIDTH) - if sfp_vendor_name_raw is not None: - sfp_vendor_name_data = sfpi_obj.parse_vendor_name(sfp_vendor_name_raw, 0) - else: - return None - - sfp_vendor_pn_raw = self._read_eeprom_specific_bytes((offset + OSFP_VENDOR_PN_OFFSET), XCVR_VENDOR_PN_WIDTH) - if sfp_vendor_pn_raw is not None: - sfp_vendor_pn_data = sfpi_obj.parse_vendor_pn(sfp_vendor_pn_raw, 0) - else: - return None - - sfp_vendor_rev_raw = self._read_eeprom_specific_bytes((offset + OSFP_HW_REV_OFFSET), vendor_rev_width) - if sfp_vendor_rev_raw is not None: - sfp_vendor_rev_data = sfpi_obj.parse_vendor_rev(sfp_vendor_rev_raw, 0) - else: - return None - - sfp_vendor_sn_raw = self._read_eeprom_specific_bytes((offset + OSFP_VENDOR_SN_OFFSET), XCVR_VENDOR_SN_WIDTH) - if sfp_vendor_sn_raw is not None: - sfp_vendor_sn_data = sfpi_obj.parse_vendor_sn(sfp_vendor_sn_raw, 0) - else: - return None - - transceiver_info_dict['type'] = sfp_type_data['data']['type']['value'] - transceiver_info_dict['manufacturer'] = sfp_vendor_name_data['data']['Vendor Name']['value'] - transceiver_info_dict['model'] = sfp_vendor_pn_data['data']['Vendor PN']['value'] - transceiver_info_dict['vendor_rev'] = sfp_vendor_rev_data['data']['Vendor Rev']['value'] - transceiver_info_dict['serial'] = sfp_vendor_sn_data['data']['Vendor SN']['value'] - transceiver_info_dict['vendor_oui'] = 'N/A' - transceiver_info_dict['vendor_date'] = 'N/A' - transceiver_info_dict['connector'] = 'N/A' - transceiver_info_dict['encoding'] = 'N/A' - transceiver_info_dict['ext_identifier'] = 'N/A' - transceiver_info_dict['ext_rateselect_compliance'] = 'N/A' - transceiver_info_dict['cable_type'] = 'N/A' - transceiver_info_dict['cable_length'] = 'N/A' - transceiver_info_dict['specification_compliance'] = 'N/A' - transceiver_info_dict['nominal_bit_rate'] = 'N/A' - transceiver_info_dict['application_advertisement'] = 'N/A' - - elif self.sfp_type == QSFP_TYPE: - offset = 128 - vendor_rev_width = XCVR_HW_REV_WIDTH_QSFP - interface_info_bulk_width = XCVR_INTFACE_BULK_WIDTH_QSFP - - sfpi_obj = sff8436InterfaceId() - if sfpi_obj is None: - print("Error: sfp_object open failed") - return None - - elif self.sfp_type == QSFP_DD_TYPE: - offset = 128 - - sfpi_obj = qsfp_dd_InterfaceId() - if sfpi_obj is None: - print("Error: sfp_object open failed") - return None - - sfp_type_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_TYPE_OFFSET), XCVR_TYPE_WIDTH) - if sfp_type_raw is not None: - sfp_type_data = sfpi_obj.parse_sfp_type(sfp_type_raw, 0) - else: - return None - - sfp_vendor_name_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_VENDOR_NAME_OFFSET), XCVR_VENDOR_NAME_WIDTH) - if sfp_vendor_name_raw is not None: - sfp_vendor_name_data = sfpi_obj.parse_vendor_name(sfp_vendor_name_raw, 0) - else: - return None - - sfp_vendor_pn_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_VENDOR_PN_OFFSET), XCVR_VENDOR_PN_WIDTH) - if sfp_vendor_pn_raw is not None: - sfp_vendor_pn_data = sfpi_obj.parse_vendor_pn(sfp_vendor_pn_raw, 0) - else: - return None - - sfp_vendor_rev_raw = self._read_eeprom_specific_bytes((offset + XCVR_HW_REV_OFFSET_QSFP_DD), XCVR_HW_REV_WIDTH_QSFP_DD) - if sfp_vendor_rev_raw is not None: - sfp_vendor_rev_data = sfpi_obj.parse_vendor_rev(sfp_vendor_rev_raw, 0) - else: - return None - - sfp_vendor_sn_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_VENDOR_SN_OFFSET), XCVR_VENDOR_SN_WIDTH) - if sfp_vendor_sn_raw is not None: - sfp_vendor_sn_data = sfpi_obj.parse_vendor_sn(sfp_vendor_sn_raw, 0) - else: - return None - - sfp_vendor_oui_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_VENDOR_OUI_OFFSET), XCVR_VENDOR_OUI_WIDTH) - if sfp_vendor_oui_raw is not None: - sfp_vendor_oui_data = sfpi_obj.parse_vendor_oui(sfp_vendor_oui_raw, 0) - else: - return None - - sfp_vendor_date_raw = self._read_eeprom_specific_bytes((offset + XCVR_VENDOR_DATE_OFFSET_QSFP_DD), XCVR_VENDOR_DATE_WIDTH_QSFP_DD) - if sfp_vendor_date_raw is not None: - sfp_vendor_date_data = sfpi_obj.parse_vendor_date(sfp_vendor_date_raw, 0) - else: - return None - - sfp_connector_raw = self._read_eeprom_specific_bytes((offset + XCVR_CONNECTOR_OFFSET_QSFP_DD), XCVR_CONNECTOR_WIDTH_QSFP_DD) - if sfp_connector_raw is not None: - sfp_connector_data = sfpi_obj.parse_connector(sfp_connector_raw, 0) - else: - return None - - sfp_ext_identifier_raw = self._read_eeprom_specific_bytes((offset + XCVR_EXT_TYPE_OFFSET_QSFP_DD), XCVR_EXT_TYPE_WIDTH_QSFP_DD) - if sfp_ext_identifier_raw is not None: - sfp_ext_identifier_data = sfpi_obj.parse_ext_iden(sfp_ext_identifier_raw, 0) - else: - return None - - sfp_cable_len_raw = self._read_eeprom_specific_bytes((offset + XCVR_CABLE_LENGTH_OFFSET_QSFP_DD), XCVR_CABLE_LENGTH_WIDTH_QSFP_DD) - if sfp_cable_len_raw is not None: - sfp_cable_len_data = sfpi_obj.parse_cable_len(sfp_cable_len_raw, 0) - else: - return None - - sfp_media_type_raw = self._read_eeprom_specific_bytes(XCVR_MEDIA_TYPE_OFFSET_QSFP_DD, XCVR_MEDIA_TYPE_WIDTH_QSFP_DD) - if sfp_media_type_raw is not None: - sfp_media_type_dict = sfpi_obj.parse_media_type(sfp_media_type_raw, 0) - if sfp_media_type_dict is None: - return None - - host_media_list = "" - sfp_application_type_first_list = self._read_eeprom_specific_bytes((XCVR_FIRST_APPLICATION_LIST_OFFSET_QSFP_DD), XCVR_FIRST_APPLICATION_LIST_WIDTH_QSFP_DD) - if self.second_application_list: - possible_application_count = 15 - sfp_application_type_second_list = self._read_eeprom_specific_bytes((XCVR_SECOND_APPLICATION_LIST_OFFSET_QSFP_DD), XCVR_SECOND_APPLICATION_LIST_WIDTH_QSFP_DD) - if sfp_application_type_first_list is not None and sfp_application_type_second_list is not None: - sfp_application_type_list = sfp_application_type_first_list + sfp_application_type_second_list - else: - return None - else: - possible_application_count = 8 - if sfp_application_type_first_list is not None: - sfp_application_type_list = sfp_application_type_first_list - else: - return None - - for i in range(0, possible_application_count): - if sfp_application_type_list[i * 4] == 'ff': - break - host_electrical, media_interface = sfpi_obj.parse_application(sfp_media_type_dict, sfp_application_type_list[i * 4], sfp_application_type_list[i * 4 + 1]) - host_media_list = host_media_list + host_electrical + ' - ' + media_interface + '\n\t\t\t\t ' - else: - return None - - transceiver_info_dict['type'] = str(sfp_type_data['data']['type']['value']) - transceiver_info_dict['manufacturer'] = str(sfp_vendor_name_data['data']['Vendor Name']['value']) - transceiver_info_dict['model'] = str(sfp_vendor_pn_data['data']['Vendor PN']['value']) - transceiver_info_dict['vendor_rev'] = str(sfp_vendor_rev_data['data']['Vendor Rev']['value']) - transceiver_info_dict['serial'] = str(sfp_vendor_sn_data['data']['Vendor SN']['value']) - transceiver_info_dict['vendor_oui'] = str(sfp_vendor_oui_data['data']['Vendor OUI']['value']) - transceiver_info_dict['vendor_date'] = str(sfp_vendor_date_data['data']['VendorDataCode(YYYY-MM-DD Lot)']['value']) - transceiver_info_dict['connector'] = str(sfp_connector_data['data']['Connector']['value']) - transceiver_info_dict['encoding'] = "Not supported for CMIS cables" - transceiver_info_dict['ext_identifier'] = str(sfp_ext_identifier_data['data']['Extended Identifier']['value']) - transceiver_info_dict['ext_rateselect_compliance'] = "Not supported for CMIS cables" - transceiver_info_dict['specification_compliance'] = "Not supported for CMIS cables" - transceiver_info_dict['cable_type'] = "Length Cable Assembly(m)" - transceiver_info_dict['cable_length'] = str(sfp_cable_len_data['data']['Length Cable Assembly(m)']['value']) - transceiver_info_dict['nominal_bit_rate'] = "Not supported for CMIS cables" - transceiver_info_dict['application_advertisement'] = host_media_list - - else: - offset = 0 - vendor_rev_width = XCVR_HW_REV_WIDTH_SFP - interface_info_bulk_width = XCVR_INTFACE_BULK_WIDTH_SFP - - sfpi_obj = sff8472InterfaceId() - if sfpi_obj is None: - print("Error: sfp_object open failed") - return None - - if self.sfp_type != QSFP_DD_TYPE: - sfp_interface_bulk_raw = self._read_eeprom_specific_bytes(offset + XCVR_INTERFACE_DATA_START, XCVR_INTERFACE_DATA_SIZE) - if sfp_interface_bulk_raw is None: - return None - - start = XCVR_INTFACE_BULK_OFFSET - XCVR_INTERFACE_DATA_START - end = start + interface_info_bulk_width - sfp_interface_bulk_data = sfpi_obj.parse_sfp_info_bulk(sfp_interface_bulk_raw[start : end], 0) - - start = XCVR_VENDOR_NAME_OFFSET - XCVR_INTERFACE_DATA_START - end = start + XCVR_VENDOR_NAME_WIDTH - sfp_vendor_name_data = sfpi_obj.parse_vendor_name(sfp_interface_bulk_raw[start : end], 0) - - start = XCVR_VENDOR_PN_OFFSET - XCVR_INTERFACE_DATA_START - end = start + XCVR_VENDOR_PN_WIDTH - sfp_vendor_pn_data = sfpi_obj.parse_vendor_pn(sfp_interface_bulk_raw[start : end], 0) - - start = XCVR_HW_REV_OFFSET - XCVR_INTERFACE_DATA_START - end = start + vendor_rev_width - sfp_vendor_rev_data = sfpi_obj.parse_vendor_rev(sfp_interface_bulk_raw[start : end], 0) - - start = XCVR_VENDOR_SN_OFFSET - XCVR_INTERFACE_DATA_START - end = start + XCVR_VENDOR_SN_WIDTH - sfp_vendor_sn_data = sfpi_obj.parse_vendor_sn(sfp_interface_bulk_raw[start : end], 0) - - start = XCVR_VENDOR_OUI_OFFSET - XCVR_INTERFACE_DATA_START - end = start + XCVR_VENDOR_OUI_WIDTH - sfp_vendor_oui_data = sfpi_obj.parse_vendor_oui(sfp_interface_bulk_raw[start : end], 0) - - start = XCVR_VENDOR_DATE_OFFSET - XCVR_INTERFACE_DATA_START - end = start + XCVR_VENDOR_DATE_WIDTH - sfp_vendor_date_data = sfpi_obj.parse_vendor_date(sfp_interface_bulk_raw[start : end], 0) - - transceiver_info_dict['type'] = sfp_interface_bulk_data['data']['type']['value'] - transceiver_info_dict['manufacturer'] = sfp_vendor_name_data['data']['Vendor Name']['value'] - transceiver_info_dict['model'] = sfp_vendor_pn_data['data']['Vendor PN']['value'] - transceiver_info_dict['vendor_rev'] = sfp_vendor_rev_data['data']['Vendor Rev']['value'] - transceiver_info_dict['serial'] = sfp_vendor_sn_data['data']['Vendor SN']['value'] - transceiver_info_dict['vendor_oui'] = sfp_vendor_oui_data['data']['Vendor OUI']['value'] - transceiver_info_dict['vendor_date'] = sfp_vendor_date_data['data']['VendorDataCode(YYYY-MM-DD Lot)']['value'] - transceiver_info_dict['connector'] = sfp_interface_bulk_data['data']['Connector']['value'] - transceiver_info_dict['encoding'] = sfp_interface_bulk_data['data']['EncodingCodes']['value'] - transceiver_info_dict['ext_identifier'] = sfp_interface_bulk_data['data']['Extended Identifier']['value'] - transceiver_info_dict['ext_rateselect_compliance'] = sfp_interface_bulk_data['data']['RateIdentifier']['value'] - transceiver_info_dict['application_advertisement'] = 'N/A' - - if self.sfp_type == QSFP_TYPE: - for key in qsfp_cable_length_tup: - if key in sfp_interface_bulk_data['data']: - transceiver_info_dict['cable_type'] = key - transceiver_info_dict['cable_length'] = str(sfp_interface_bulk_data['data'][key]['value']) - - for key in qsfp_compliance_code_tup: - if key in sfp_interface_bulk_data['data']['Specification compliance']['value']: - compliance_code_dict[key] = sfp_interface_bulk_data['data']['Specification compliance']['value'][key]['value'] - sfp_ext_specification_compliance_raw = self._read_eeprom_specific_bytes(offset + XCVR_EXT_SPECIFICATION_COMPLIANCE_OFFSET, XCVR_EXT_SPECIFICATION_COMPLIANCE_WIDTH) - if sfp_ext_specification_compliance_raw is not None: - sfp_ext_specification_compliance_data = sfpi_obj.parse_ext_specification_compliance(sfp_ext_specification_compliance_raw[0 : 1], 0) - if sfp_ext_specification_compliance_data['data']['Extended Specification compliance']['value'] != "Unspecified": - compliance_code_dict['Extended Specification compliance'] = sfp_ext_specification_compliance_data['data']['Extended Specification compliance']['value'] - transceiver_info_dict['specification_compliance'] = str(compliance_code_dict) - - transceiver_info_dict['nominal_bit_rate'] = str(sfp_interface_bulk_data['data']['Nominal Bit Rate(100Mbs)']['value']) - else: - for key in sfp_cable_length_tup: - if key in sfp_interface_bulk_data['data']: - transceiver_info_dict['cable_type'] = key - transceiver_info_dict['cable_length'] = str(sfp_interface_bulk_data['data'][key]['value']) - - for key in sfp_compliance_code_tup: - if key in sfp_interface_bulk_data['data']['Specification compliance']['value']: - compliance_code_dict[key] = sfp_interface_bulk_data['data']['Specification compliance']['value'][key]['value'] - transceiver_info_dict['specification_compliance'] = str(compliance_code_dict) - - transceiver_info_dict['nominal_bit_rate'] = str(sfp_interface_bulk_data['data']['NominalSignallingRate(UnitsOf100Mbd)']['value']) - - return transceiver_info_dict - - - def get_transceiver_bulk_status(self): - """ - Retrieves transceiver bulk status of this SFP - - Returns: - A dict which contains following keys/values : - ======================================================================== - keys |Value Format |Information - ---------------------------|---------------|---------------------------- - RX LOS |BOOLEAN |RX lost-of-signal status, - | |True if has RX los, False if not. - TX FAULT |BOOLEAN |TX fault status, - | |True if has TX fault, False if not. - Reset status |BOOLEAN |reset status, - | |True if SFP in reset, False if not. - LP mode |BOOLEAN |low power mode status, - | |True in lp mode, False if not. - TX disable |BOOLEAN |TX disable status, - | |True TX disabled, False if not. - TX disabled channel |HEX |disabled TX channles in hex, - | |bits 0 to 3 represent channel 0 - | |to channel 3. - Temperature |INT |module temperature in Celsius - Voltage |INT |supply voltage in mV - TX bias |INT |TX Bias Current in mA - RX power |INT |received optical power in mW - TX power |INT |TX output power in mW - ======================================================================== - """ - transceiver_dom_info_dict = {} - - dom_info_dict_keys = ['temperature', 'voltage', - 'rx1power', 'rx2power', - 'rx3power', 'rx4power', - 'rx5power', 'rx6power', - 'rx7power', 'rx8power', - 'tx1bias', 'tx2bias', - 'tx3bias', 'tx4bias', - 'tx5bias', 'tx6bias', - 'tx7bias', 'tx8bias', - 'tx1power', 'tx2power', - 'tx3power', 'tx4power', - 'tx5power', 'tx6power', - 'tx7power', 'tx8power' - ] - transceiver_dom_info_dict = dict.fromkeys(dom_info_dict_keys, 'N/A') - - if self.sfp_type == OSFP_TYPE: - pass - - elif self.sfp_type == QSFP_TYPE: - if not self.dom_supported: - return transceiver_dom_info_dict - - offset = 0 - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return transceiver_dom_info_dict - - dom_data_raw = self._read_eeprom_specific_bytes((offset + QSFP_DOM_BULK_DATA_START), QSFP_DOM_BULK_DATA_SIZE) - if dom_data_raw is None: - return transceiver_dom_info_dict - - if self.dom_temp_supported: - start = QSFP_TEMPE_OFFSET - QSFP_DOM_BULK_DATA_START - end = start + QSFP_TEMPE_WIDTH - dom_temperature_data = sfpd_obj.parse_temperature(dom_data_raw[start : end], 0) - temp = self._convert_string_to_num(dom_temperature_data['data']['Temperature']['value']) - if temp is not None: - transceiver_dom_info_dict['temperature'] = temp - - if self.dom_volt_supported: - start = QSFP_VOLT_OFFSET - QSFP_DOM_BULK_DATA_START - end = start + QSFP_VOLT_WIDTH - dom_voltage_data = sfpd_obj.parse_voltage(dom_data_raw[start : end], 0) - volt = self._convert_string_to_num(dom_voltage_data['data']['Vcc']['value']) - if volt is not None: - transceiver_dom_info_dict['voltage'] = volt - - start = QSFP_CHANNL_MON_OFFSET - QSFP_DOM_BULK_DATA_START - end = start + QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params_with_tx_power(dom_data_raw[start : end], 0) - - if self.dom_tx_power_supported: - transceiver_dom_info_dict['tx1power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['TX1Power']['value']) - transceiver_dom_info_dict['tx2power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['TX2Power']['value']) - transceiver_dom_info_dict['tx3power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['TX3Power']['value']) - transceiver_dom_info_dict['tx4power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['TX4Power']['value']) - - if self.dom_rx_power_supported: - transceiver_dom_info_dict['rx1power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['RX1Power']['value']) - transceiver_dom_info_dict['rx2power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['RX2Power']['value']) - transceiver_dom_info_dict['rx3power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['RX3Power']['value']) - transceiver_dom_info_dict['rx4power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['RX4Power']['value']) - - transceiver_dom_info_dict['tx1bias'] = dom_channel_monitor_data['data']['TX1Bias']['value'] - transceiver_dom_info_dict['tx2bias'] = dom_channel_monitor_data['data']['TX2Bias']['value'] - transceiver_dom_info_dict['tx3bias'] = dom_channel_monitor_data['data']['TX3Bias']['value'] - transceiver_dom_info_dict['tx4bias'] = dom_channel_monitor_data['data']['TX4Bias']['value'] - - elif self.sfp_type == QSFP_DD_TYPE: - - offset = 0 - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return transceiver_dom_info_dict - - dom_data_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_DOM_BULK_DATA_START), QSFP_DD_DOM_BULK_DATA_SIZE) - if dom_data_raw is None: - return transceiver_dom_info_dict - - if self.dom_temp_supported: - start = QSFP_DD_TEMPE_OFFSET - QSFP_DD_DOM_BULK_DATA_START - end = start + QSFP_DD_TEMPE_WIDTH - dom_temperature_data = sfpd_obj.parse_temperature(dom_data_raw[start : end], 0) - temp = self._convert_string_to_num(dom_temperature_data['data']['Temperature']['value']) - if temp is not None: - transceiver_dom_info_dict['temperature'] = temp - - if self.dom_volt_supported: - start = QSFP_DD_VOLT_OFFSET - QSFP_DD_DOM_BULK_DATA_START - end = start + QSFP_DD_VOLT_WIDTH - dom_voltage_data = sfpd_obj.parse_voltage(dom_data_raw[start : end], 0) - volt = self._convert_string_to_num(dom_voltage_data['data']['Vcc']['value']) - if volt is not None: - transceiver_dom_info_dict['voltage'] = volt - - if self.dom_rx_tx_power_bias_supported: - # page 11h - offset = 512 - dom_data_raw = self._read_eeprom_specific_bytes(offset + QSFP_DD_CHANNL_MON_OFFSET, QSFP_DD_CHANNL_MON_WIDTH) - if dom_data_raw is None: - return transceiver_dom_info_dict - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params(dom_data_raw, 0) - - if self.dom_tx_power_supported: - transceiver_dom_info_dict['tx1power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX1Power']['value'])) - transceiver_dom_info_dict['tx2power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX2Power']['value'])) - transceiver_dom_info_dict['tx3power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX3Power']['value'])) - transceiver_dom_info_dict['tx4power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX4Power']['value'])) - transceiver_dom_info_dict['tx5power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX5Power']['value'])) - transceiver_dom_info_dict['tx6power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX6Power']['value'])) - transceiver_dom_info_dict['tx7power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX7Power']['value'])) - transceiver_dom_info_dict['tx8power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['TX8Power']['value'])) - - if self.dom_rx_power_supported: - transceiver_dom_info_dict['rx1power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX1Power']['value'])) - transceiver_dom_info_dict['rx2power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX2Power']['value'])) - transceiver_dom_info_dict['rx3power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX3Power']['value'])) - transceiver_dom_info_dict['rx4power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX4Power']['value'])) - transceiver_dom_info_dict['rx5power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX5Power']['value'])) - transceiver_dom_info_dict['rx6power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX6Power']['value'])) - transceiver_dom_info_dict['rx7power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX7Power']['value'])) - transceiver_dom_info_dict['rx8power'] = str(self._convert_string_to_num(dom_channel_monitor_data['data']['RX8Power']['value'])) - - if self.dom_tx_bias_power_supported: - transceiver_dom_info_dict['tx1bias'] = str(dom_channel_monitor_data['data']['TX1Bias']['value']) - transceiver_dom_info_dict['tx2bias'] = str(dom_channel_monitor_data['data']['TX2Bias']['value']) - transceiver_dom_info_dict['tx3bias'] = str(dom_channel_monitor_data['data']['TX3Bias']['value']) - transceiver_dom_info_dict['tx4bias'] = str(dom_channel_monitor_data['data']['TX4Bias']['value']) - transceiver_dom_info_dict['tx5bias'] = str(dom_channel_monitor_data['data']['TX5Bias']['value']) - transceiver_dom_info_dict['tx6bias'] = str(dom_channel_monitor_data['data']['TX6Bias']['value']) - transceiver_dom_info_dict['tx7bias'] = str(dom_channel_monitor_data['data']['TX7Bias']['value']) - transceiver_dom_info_dict['tx8bias'] = str(dom_channel_monitor_data['data']['TX8Bias']['value']) - - return transceiver_dom_info_dict - - else: - if not self.dom_supported: - return transceiver_dom_info_dict - - offset = 256 - sfpd_obj = sff8472Dom() - if sfpd_obj is None: - return transceiver_dom_info_dict - sfpd_obj._calibration_type = self.calibration - - dom_data_raw = self._read_eeprom_specific_bytes((offset + SFP_DOM_BULK_DATA_START), SFP_DOM_BULK_DATA_SIZE) - - start = SFP_TEMPE_OFFSET - SFP_DOM_BULK_DATA_START - end = start + SFP_TEMPE_WIDTH - dom_temperature_data = sfpd_obj.parse_temperature(dom_data_raw[start: end], 0) - - start = SFP_VOLT_OFFSET - SFP_DOM_BULK_DATA_START - end = start + SFP_VOLT_WIDTH - dom_voltage_data = sfpd_obj.parse_voltage(dom_data_raw[start: end], 0) - - start = SFP_CHANNL_MON_OFFSET - SFP_DOM_BULK_DATA_START - end = start + SFP_CHANNL_MON_WIDTH - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params(dom_data_raw[start: end], 0) - - transceiver_dom_info_dict['temperature'] = self._convert_string_to_num(dom_temperature_data['data']['Temperature']['value']) - transceiver_dom_info_dict['voltage'] = self._convert_string_to_num(dom_voltage_data['data']['Vcc']['value']) - transceiver_dom_info_dict['rx1power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['RXPower']['value']) - transceiver_dom_info_dict['tx1bias'] = self._convert_string_to_num(dom_channel_monitor_data['data']['TXBias']['value']) - transceiver_dom_info_dict['tx1power'] = self._convert_string_to_num(dom_channel_monitor_data['data']['TXPower']['value']) - - return transceiver_dom_info_dict - - - def get_transceiver_threshold_info(self): - """ - Retrieves transceiver threshold info of this SFP - - Returns: - A dict which contains following keys/values : - ======================================================================== - keys |Value Format |Information - ---------------------------|---------------|---------------------------- - temphighalarm |FLOAT |High Alarm Threshold value of temperature in Celsius. - templowalarm |FLOAT |Low Alarm Threshold value of temperature in Celsius. - temphighwarning |FLOAT |High Warning Threshold value of temperature in Celsius. - templowwarning |FLOAT |Low Warning Threshold value of temperature in Celsius. - vcchighalarm |FLOAT |High Alarm Threshold value of supply voltage in mV. - vcclowalarm |FLOAT |Low Alarm Threshold value of supply voltage in mV. - vcchighwarning |FLOAT |High Warning Threshold value of supply voltage in mV. - vcclowwarning |FLOAT |Low Warning Threshold value of supply voltage in mV. - rxpowerhighalarm |FLOAT |High Alarm Threshold value of received power in dBm. - rxpowerlowalarm |FLOAT |Low Alarm Threshold value of received power in dBm. - rxpowerhighwarning |FLOAT |High Warning Threshold value of received power in dBm. - rxpowerlowwarning |FLOAT |Low Warning Threshold value of received power in dBm. - txpowerhighalarm |FLOAT |High Alarm Threshold value of transmit power in dBm. - txpowerlowalarm |FLOAT |Low Alarm Threshold value of transmit power in dBm. - txpowerhighwarning |FLOAT |High Warning Threshold value of transmit power in dBm. - txpowerlowwarning |FLOAT |Low Warning Threshold value of transmit power in dBm. - txbiashighalarm |FLOAT |High Alarm Threshold value of tx Bias Current in mA. - txbiaslowalarm |FLOAT |Low Alarm Threshold value of tx Bias Current in mA. - txbiashighwarning |FLOAT |High Warning Threshold value of tx Bias Current in mA. - txbiaslowwarning |FLOAT |Low Warning Threshold value of tx Bias Current in mA. - ======================================================================== - """ - transceiver_dom_threshold_info_dict = {} - - dom_info_dict_keys = ['temphighalarm', 'temphighwarning', - 'templowalarm', 'templowwarning', - 'vcchighalarm', 'vcchighwarning', - 'vcclowalarm', 'vcclowwarning', - 'rxpowerhighalarm', 'rxpowerhighwarning', - 'rxpowerlowalarm', 'rxpowerlowwarning', - 'txpowerhighalarm', 'txpowerhighwarning', - 'txpowerlowalarm', 'txpowerlowwarning', - 'txbiashighalarm', 'txbiashighwarning', - 'txbiaslowalarm', 'txbiaslowwarning' - ] - transceiver_dom_threshold_info_dict = dict.fromkeys(dom_info_dict_keys, 'N/A') - - if self.sfp_type == OSFP_TYPE: - pass - - elif self.sfp_type == QSFP_TYPE: - if not self.dom_supported or not self.qsfp_page3_available: - return transceiver_dom_threshold_info_dict - - # Dom Threshold data starts from offset 384 - # Revert offset back to 0 once data is retrieved - offset = QSFP_MODULE_UPPER_PAGE3_START - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return transceiver_dom_threshold_info_dict - - dom_module_threshold_raw = self._read_eeprom_specific_bytes((offset + QSFP_MODULE_THRESHOLD_OFFSET), QSFP_MODULE_THRESHOLD_WIDTH) - if dom_module_threshold_raw is None: - return transceiver_dom_threshold_info_dict - - dom_module_threshold_data = sfpd_obj.parse_module_threshold_values(dom_module_threshold_raw, 0) - - dom_channel_threshold_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_THRESHOLD_OFFSET), - QSFP_CHANNL_THRESHOLD_WIDTH) - if dom_channel_threshold_raw is None: - return transceiver_dom_threshold_info_dict - dom_channel_threshold_data = sfpd_obj.parse_channel_threshold_values(dom_channel_threshold_raw, 0) - - # Threshold Data - transceiver_dom_threshold_info_dict['temphighalarm'] = dom_module_threshold_data['data']['TempHighAlarm']['value'] - transceiver_dom_threshold_info_dict['temphighwarning'] = dom_module_threshold_data['data']['TempHighWarning']['value'] - transceiver_dom_threshold_info_dict['templowalarm'] = dom_module_threshold_data['data']['TempLowAlarm']['value'] - transceiver_dom_threshold_info_dict['templowwarning'] = dom_module_threshold_data['data']['TempLowWarning']['value'] - transceiver_dom_threshold_info_dict['vcchighalarm'] = dom_module_threshold_data['data']['VccHighAlarm']['value'] - transceiver_dom_threshold_info_dict['vcchighwarning'] = dom_module_threshold_data['data']['VccHighWarning']['value'] - transceiver_dom_threshold_info_dict['vcclowalarm'] = dom_module_threshold_data['data']['VccLowAlarm']['value'] - transceiver_dom_threshold_info_dict['vcclowwarning'] = dom_module_threshold_data['data']['VccLowWarning']['value'] - transceiver_dom_threshold_info_dict['rxpowerhighalarm'] = dom_channel_threshold_data['data']['RxPowerHighAlarm']['value'] - transceiver_dom_threshold_info_dict['rxpowerhighwarning'] = dom_channel_threshold_data['data']['RxPowerHighWarning']['value'] - transceiver_dom_threshold_info_dict['rxpowerlowalarm'] = dom_channel_threshold_data['data']['RxPowerLowAlarm']['value'] - transceiver_dom_threshold_info_dict['rxpowerlowwarning'] = dom_channel_threshold_data['data']['RxPowerLowWarning']['value'] - transceiver_dom_threshold_info_dict['txbiashighalarm'] = dom_channel_threshold_data['data']['TxBiasHighAlarm']['value'] - transceiver_dom_threshold_info_dict['txbiashighwarning'] = dom_channel_threshold_data['data']['TxBiasHighWarning']['value'] - transceiver_dom_threshold_info_dict['txbiaslowalarm'] = dom_channel_threshold_data['data']['TxBiasLowAlarm']['value'] - transceiver_dom_threshold_info_dict['txbiaslowwarning'] = dom_channel_threshold_data['data']['TxBiasLowWarning']['value'] - transceiver_dom_threshold_info_dict['txpowerhighalarm'] = dom_channel_threshold_data['data']['TxPowerHighAlarm']['value'] - transceiver_dom_threshold_info_dict['txpowerhighwarning'] = dom_channel_threshold_data['data']['TxPowerHighWarning']['value'] - transceiver_dom_threshold_info_dict['txpowerlowalarm'] = dom_channel_threshold_data['data']['TxPowerLowAlarm']['value'] - transceiver_dom_threshold_info_dict['txpowerlowwarning'] = dom_channel_threshold_data['data']['TxPowerLowWarning']['value'] - - elif self.sfp_type == QSFP_DD_TYPE: - if not self.dom_supported: - return transceiver_dom_threshold_info_dict - - if not self.dom_thresholds_supported: - return transceiver_dom_threshold_info_dict - - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return transceiver_dom_threshold_info_dict - - # page 02 - offset = 384 - dom_module_threshold_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_MODULE_THRESHOLD_OFFSET), QSFP_DD_MODULE_THRESHOLD_WIDTH) - if dom_module_threshold_raw is None: - return transceiver_dom_threshold_info_dict - - dom_module_threshold_data = sfpd_obj.parse_module_threshold_values(dom_module_threshold_raw, 0) - - # Threshold Data - transceiver_dom_threshold_info_dict['temphighalarm'] = dom_module_threshold_data['data']['TempHighAlarm']['value'] - transceiver_dom_threshold_info_dict['temphighwarning'] = dom_module_threshold_data['data']['TempHighWarning']['value'] - transceiver_dom_threshold_info_dict['templowalarm'] = dom_module_threshold_data['data']['TempLowAlarm']['value'] - transceiver_dom_threshold_info_dict['templowwarning'] = dom_module_threshold_data['data']['TempLowWarning']['value'] - transceiver_dom_threshold_info_dict['vcchighalarm'] = dom_module_threshold_data['data']['VccHighAlarm']['value'] - transceiver_dom_threshold_info_dict['vcchighwarning'] = dom_module_threshold_data['data']['VccHighWarning']['value'] - transceiver_dom_threshold_info_dict['vcclowalarm'] = dom_module_threshold_data['data']['VccLowAlarm']['value'] - transceiver_dom_threshold_info_dict['vcclowwarning'] = dom_module_threshold_data['data']['VccLowWarning']['value'] - transceiver_dom_threshold_info_dict['rxpowerhighalarm'] = dom_module_threshold_data['data']['RxPowerHighAlarm']['value'] - transceiver_dom_threshold_info_dict['rxpowerhighwarning'] = dom_module_threshold_data['data']['RxPowerHighWarning']['value'] - transceiver_dom_threshold_info_dict['rxpowerlowalarm'] = dom_module_threshold_data['data']['RxPowerLowAlarm']['value'] - transceiver_dom_threshold_info_dict['rxpowerlowwarning'] = dom_module_threshold_data['data']['RxPowerLowWarning']['value'] - transceiver_dom_threshold_info_dict['txbiashighalarm'] = dom_module_threshold_data['data']['TxBiasHighAlarm']['value'] - transceiver_dom_threshold_info_dict['txbiashighwarning'] = dom_module_threshold_data['data']['TxBiasHighWarning']['value'] - transceiver_dom_threshold_info_dict['txbiaslowalarm'] = dom_module_threshold_data['data']['TxBiasLowAlarm']['value'] - transceiver_dom_threshold_info_dict['txbiaslowwarning'] = dom_module_threshold_data['data']['TxBiasLowWarning']['value'] - transceiver_dom_threshold_info_dict['txpowerhighalarm'] = dom_module_threshold_data['data']['TxPowerHighAlarm']['value'] - transceiver_dom_threshold_info_dict['txpowerhighwarning'] = dom_module_threshold_data['data']['TxPowerHighWarning']['value'] - transceiver_dom_threshold_info_dict['txpowerlowalarm'] = dom_module_threshold_data['data']['TxPowerLowAlarm']['value'] - transceiver_dom_threshold_info_dict['txpowerlowwarning'] = dom_module_threshold_data['data']['TxPowerLowWarning']['value'] - - else: - offset = SFP_MODULE_ADDRA2_OFFSET - - if not self.dom_supported: - return transceiver_dom_threshold_info_dict - - sfpd_obj = sff8472Dom(None, self.calibration) - if sfpd_obj is None: - return transceiver_dom_threshold_info_dict - - dom_module_threshold_raw = self._read_eeprom_specific_bytes((offset + SFP_MODULE_THRESHOLD_OFFSET), - SFP_MODULE_THRESHOLD_WIDTH) - if dom_module_threshold_raw is not None: - dom_module_threshold_data = sfpd_obj.parse_alarm_warning_threshold(dom_module_threshold_raw, 0) - else: - return transceiver_dom_threshold_info_dict - - # Threshold Data - transceiver_dom_threshold_info_dict['temphighalarm'] = dom_module_threshold_data['data']['TempHighAlarm']['value'] - transceiver_dom_threshold_info_dict['templowalarm'] = dom_module_threshold_data['data']['TempLowAlarm']['value'] - transceiver_dom_threshold_info_dict['temphighwarning'] = dom_module_threshold_data['data']['TempHighWarning']['value'] - transceiver_dom_threshold_info_dict['templowwarning'] = dom_module_threshold_data['data']['TempLowWarning']['value'] - transceiver_dom_threshold_info_dict['vcchighalarm'] = dom_module_threshold_data['data']['VoltageHighAlarm']['value'] - transceiver_dom_threshold_info_dict['vcclowalarm'] = dom_module_threshold_data['data']['VoltageLowAlarm']['value'] - transceiver_dom_threshold_info_dict['vcchighwarning'] = dom_module_threshold_data['data']['VoltageHighWarning']['value'] - transceiver_dom_threshold_info_dict['vcclowwarning'] = dom_module_threshold_data['data']['VoltageLowWarning']['value'] - transceiver_dom_threshold_info_dict['txbiashighalarm'] = dom_module_threshold_data['data']['BiasHighAlarm']['value'] - transceiver_dom_threshold_info_dict['txbiaslowalarm'] = dom_module_threshold_data['data']['BiasLowAlarm']['value'] - transceiver_dom_threshold_info_dict['txbiashighwarning'] = dom_module_threshold_data['data']['BiasHighWarning']['value'] - transceiver_dom_threshold_info_dict['txbiaslowwarning'] = dom_module_threshold_data['data']['BiasLowWarning']['value'] - transceiver_dom_threshold_info_dict['txpowerhighalarm'] = dom_module_threshold_data['data']['TXPowerHighAlarm']['value'] - transceiver_dom_threshold_info_dict['txpowerlowalarm'] = dom_module_threshold_data['data']['TXPowerLowAlarm']['value'] - transceiver_dom_threshold_info_dict['txpowerhighwarning'] = dom_module_threshold_data['data']['TXPowerHighWarning']['value'] - transceiver_dom_threshold_info_dict['txpowerlowwarning'] = dom_module_threshold_data['data']['TXPowerLowWarning']['value'] - transceiver_dom_threshold_info_dict['rxpowerhighalarm'] = dom_module_threshold_data['data']['RXPowerHighAlarm']['value'] - transceiver_dom_threshold_info_dict['rxpowerlowalarm'] = dom_module_threshold_data['data']['RXPowerLowAlarm']['value'] - transceiver_dom_threshold_info_dict['rxpowerhighwarning'] = dom_module_threshold_data['data']['RXPowerHighWarning']['value'] - transceiver_dom_threshold_info_dict['rxpowerlowwarning'] = dom_module_threshold_data['data']['RXPowerLowWarning']['value'] - - return transceiver_dom_threshold_info_dict - - - def get_reset_status(self): - """ - Retrieves the reset status of SFP - - Returns: - A Boolean, True if reset enabled, False if disabled - - for QSFP, originally I would like to make use of Initialization complete flag bit - which is at Page a0 offset 6 bit 0 to test whether reset is complete. - However as unit testing was carried out I find this approach may fail because: - 1. we make use of ethtool to read data on I2C bus rather than to read directly - 2. ethtool is unable to access I2C during QSFP module being reset - In other words, whenever the flag is able to be retrived, the value is always be 1 - As a result, it doesn't make sense to retrieve that flag. Just treat successfully - retrieving data as "data ready". - for SFP it seems that there is not flag indicating whether reset succeed. However, - we can also do it in the way for QSFP. - """ - if not self.dom_supported: - return False - - if self.sfp_type == OSFP_TYPE: - return False - elif self.sfp_type == QSFP_TYPE: - offset = 0 - sfpd_obj = sff8436Dom() - dom_module_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_MODULE_MONITOR_OFFSET), QSFP_MODULE_MONITOR_WIDTH) - - if dom_module_monitor_raw is not None: - return True - else: - return False - elif self.sfp_type == SFP_TYPE: - offset = 0 - sfpd_obj = sff8472Dom() - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_STATUS_OFFSET), SFP_CHANNL_STATUS_WIDTH) - - if dom_channel_monitor_raw is not None: - return True - else: - return False - elif self.sfp_type == QSFP_DD_TYPE: - offset = 0 - sfpd_obj = qsfp_dd_Dom() - dom_channel_status_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_CHANNL_STATUS_OFFSET), QSFP_DD_CHANNL_STATUS_WIDTH) - - if dom_channel_status_raw is None: - return False - - dom_channel_status_data = sfpd_obj.parse_dom_channel_status(dom_channel_status_raw, 0) - return dom_channel_status_data['data']['Status']['value'] == 'On' - - def get_rx_los(self): - """ - Retrieves the RX LOS (lost-of-signal) status of SFP - - Returns: - A Boolean, True if SFP has RX LOS, False if not. - Note : RX LOS status is latched until a call to get_rx_los or a reset. - """ - if not self.dom_supported: - return None - - rx_los_list = [] - if self.sfp_type == OSFP_TYPE: - return None - elif self.sfp_type == QSFP_TYPE: - offset = 0 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_RX_LOS_STATUS_OFFSET), QSFP_CHANNL_RX_LOS_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - rx_los_data = int(dom_channel_monitor_raw[0], 16) - rx_los_list.append(rx_los_data & 0x01 != 0) - rx_los_list.append(rx_los_data & 0x02 != 0) - rx_los_list.append(rx_los_data & 0x04 != 0) - rx_los_list.append(rx_los_data & 0x08 != 0) - - elif self.sfp_type == QSFP_DD_TYPE: - # page 11h - if self.dom_rx_tx_power_bias_supported: - offset = 512 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_CHANNL_RX_LOS_STATUS_OFFSET), QSFP_DD_CHANNL_RX_LOS_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - rx_los_data = int(dom_channel_monitor_raw[0], 8) - rx_los_list.append(rx_los_data & 0x01 != 0) - rx_los_list.append(rx_los_data & 0x02 != 0) - rx_los_list.append(rx_los_data & 0x04 != 0) - rx_los_list.append(rx_los_data & 0x08 != 0) - rx_los_list.append(rx_los_data & 0x10 != 0) - rx_los_list.append(rx_los_data & 0x20 != 0) - rx_los_list.append(rx_los_data & 0x40 != 0) - rx_los_list.append(rx_los_data & 0x80 != 0) - - else: - offset = 256 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_STATUS_OFFSET), SFP_CHANNL_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - rx_los_data = int(dom_channel_monitor_raw[0], 16) - rx_los_list.append(rx_los_data & 0x02 != 0) - else: - return None - return rx_los_list - - - def get_tx_fault(self): - """ - Retrieves the TX fault status of SFP - - Returns: - A Boolean, True if SFP has TX fault, False if not - Note : TX fault status is lached until a call to get_tx_fault or a reset. - """ - if not self.dom_supported: - return None - - tx_fault_list = [] - if self.sfp_type == OSFP_TYPE: - return None - elif self.sfp_type == QSFP_TYPE: - offset = 0 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_TX_FAULT_STATUS_OFFSET), QSFP_CHANNL_TX_FAULT_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - tx_fault_data = int(dom_channel_monitor_raw[0], 16) - tx_fault_list.append(tx_fault_data & 0x01 != 0) - tx_fault_list.append(tx_fault_data & 0x02 != 0) - tx_fault_list.append(tx_fault_data & 0x04 != 0) - tx_fault_list.append(tx_fault_data & 0x08 != 0) - - elif self.sfp_type == QSFP_DD_TYPE: - return None - # page 11h - if self.dom_rx_tx_power_bias_supported: - offset = 512 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_CHANNL_TX_FAULT_STATUS_OFFSET), QSFP_DD_CHANNL_TX_FAULT_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - tx_fault_data = int(dom_channel_monitor_raw[0], 8) - tx_fault_list.append(tx_fault_data & 0x01 != 0) - tx_fault_list.append(tx_fault_data & 0x02 != 0) - tx_fault_list.append(tx_fault_data & 0x04 != 0) - tx_fault_list.append(tx_fault_data & 0x08 != 0) - tx_fault_list.append(tx_fault_data & 0x10 != 0) - tx_fault_list.append(tx_fault_data & 0x20 != 0) - tx_fault_list.append(tx_fault_data & 0x40 != 0) - tx_fault_list.append(tx_fault_data & 0x80 != 0) - - else: - offset = 256 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_STATUS_OFFSET), SFP_CHANNL_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - tx_fault_data = int(dom_channel_monitor_raw[0], 16) - tx_fault_list.append(tx_fault_data & 0x04 != 0) - else: - return None - return tx_fault_list - - - def get_tx_disable(self): - """ - Retrieves the tx_disable status of this SFP - - Returns: - A Boolean, True if tx_disable is enabled, False if disabled - - for QSFP, the disable states of each channel which are the lower 4 bits in byte 85 page a0 - for SFP, the TX Disable State and Soft TX Disable Select is ORed as the tx_disable status returned - These two bits are bit 7 & 6 in byte 110 page a2 respectively - """ - if not self.dom_supported: - return None - - tx_disable_list = [] - if self.sfp_type == OSFP_TYPE: - return None - elif self.sfp_type == QSFP_TYPE: - offset = 0 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_DISABLE_STATUS_OFFSET), QSFP_CHANNL_DISABLE_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - tx_disable_data = int(dom_channel_monitor_raw[0], 16) - tx_disable_list.append(tx_disable_data & 0x01 != 0) - tx_disable_list.append(tx_disable_data & 0x02 != 0) - tx_disable_list.append(tx_disable_data & 0x04 != 0) - tx_disable_list.append(tx_disable_data & 0x08 != 0) - - elif self.sfp_type == QSFP_DD_TYPE: - if self.dom_rx_tx_power_bias_supported: - offset = 128 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_CHANNL_DISABLE_STATUS_OFFSET), QSFP_DD_CHANNL_DISABLE_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - tx_disable_data = int(dom_channel_monitor_raw[0], 16) - tx_disable_list.append(tx_disable_data & 0x01 != 0) - tx_disable_list.append(tx_disable_data & 0x02 != 0) - tx_disable_list.append(tx_disable_data & 0x04 != 0) - tx_disable_list.append(tx_disable_data & 0x08 != 0) - tx_disable_list.append(tx_disable_data & 0x10 != 0) - tx_disable_list.append(tx_disable_data & 0x20 != 0) - tx_disable_list.append(tx_disable_data & 0x40 != 0) - tx_disable_list.append(tx_disable_data & 0x80 != 0) - - else: - offset = 256 - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_STATUS_OFFSET), SFP_CHANNL_STATUS_WIDTH) - if dom_channel_monitor_raw is not None: - tx_disable_data = int(dom_channel_monitor_raw[0], 16) - tx_disable_list.append(tx_disable_data & 0xC0 != 0) - else: - return None - return tx_disable_list - - - def get_tx_disable_channel(self): - """ - Retrieves the TX disabled channels in this SFP - - Returns: - A hex of 4 bits (bit 0 to bit 3 as channel 0 to channel 3) to represent - TX channels which have been disabled in this SFP. - As an example, a returned value of 0x5 indicates that channel 0 - and channel 2 have been disabled. - """ - tx_disable_list = self.get_tx_disable() - if tx_disable_list is None: - return 0 - tx_disabled = 0 - for i in range(len(tx_disable_list)): - if tx_disable_list[i]: - tx_disabled |= 1 << i - return tx_disabled - + eeprom_raw = list(map(lambda h: int(h, base=16), eeprom_raw)) + return bytearray(eeprom_raw) @classmethod def mgmt_phy_mod_pwr_attr_get(cls, power_attr_type, sdk_handle, sdk_index, slot_id): @@ -1619,8 +267,6 @@ def mgmt_phy_mod_pwr_attr_get(cls, power_attr_type, sdk_handle, sdk_index, slot_ finally: delete_sx_mgmt_phy_mod_pwr_attr_t_p(sx_mgmt_phy_mod_pwr_attr_p) - - def get_lpmode(self): """ Retrieves the lpmode (low power mode) status of this SFP @@ -1629,7 +275,7 @@ def get_lpmode(self): A Boolean, True if lpmode is enabled, False if disabled """ if utils.is_host(): - # To avoid performance issue, + # To avoid performance issue, # call class level method to avoid initialize the whole sonic platform API get_lpmode_code = 'from sonic_platform import sfp;\n' \ 'with sfp.SdkHandleContext() as sdk_handle:' \ @@ -1644,10 +290,9 @@ def get_lpmode(self): else: return self._get_lpmode(self.sdk_handle, self.sdk_index, self.slot_id) - @classmethod def _get_lpmode(cls, sdk_handle, sdk_index, slot_id): - """Class level method to get low power mode. + """Class level method to get low power mode. Args: sdk_handle: SDK handle @@ -1660,369 +305,6 @@ def _get_lpmode(cls, sdk_handle, sdk_index, slot_id): _, oper_pwr_mode = cls.mgmt_phy_mod_pwr_attr_get(SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E, sdk_handle, sdk_index, slot_id) return oper_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E - - def get_power_override(self): - """ - Retrieves the power-override status of this SFP - - Returns: - A Boolean, True if power-override is enabled, False if disabled - """ - if self.sfp_type == QSFP_TYPE: - offset = 0 - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return False - - dom_control_raw = self._read_eeprom_specific_bytes((offset + QSFP_CONTROL_OFFSET), QSFP_CONTROL_WIDTH) - if dom_control_raw is not None: - dom_control_data = sfpd_obj.parse_control_bytes(dom_control_raw, 0) - return ('On' == dom_control_data['data']['PowerOverride']) - else: - return NotImplementedError - - - def get_temperature(self): - """ - Retrieves the temperature of this SFP - - Returns: - An integer number of current temperature in Celsius - """ - if not self.dom_supported: - return None - if self.sfp_type == QSFP_TYPE: - offset = 0 - offset_xcvr = 128 - - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return None - - if self.dom_temp_supported: - dom_temperature_raw = self._read_eeprom_specific_bytes((offset + QSFP_TEMPE_OFFSET), QSFP_TEMPE_WIDTH) - if dom_temperature_raw is not None: - dom_temperature_data = sfpd_obj.parse_temperature(dom_temperature_raw, 0) - temp = self._convert_string_to_num(dom_temperature_data['data']['Temperature']['value']) - return temp - else: - return None - else: - return None - - elif self.sfp_type == QSFP_DD_TYPE: - offset = 0 - - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return None - - if self.dom_temp_supported: - dom_temperature_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_TEMPE_OFFSET), QSFP_DD_TEMPE_WIDTH) - if dom_temperature_raw is not None: - dom_temperature_data = sfpd_obj.parse_temperature(dom_temperature_raw, 0) - temp = self._convert_string_to_num(dom_temperature_data['data']['Temperature']['value']) - return temp - return None - - else: - offset = 256 - sfpd_obj = sff8472Dom() - if sfpd_obj is None: - return None - sfpd_obj._calibration_type = 1 - - dom_temperature_raw = self._read_eeprom_specific_bytes((offset + SFP_TEMPE_OFFSET), SFP_TEMPE_WIDTH) - if dom_temperature_raw is not None: - dom_temperature_data = sfpd_obj.parse_temperature(dom_temperature_raw, 0) - temp = self._convert_string_to_num(dom_temperature_data['data']['Temperature']['value']) - return temp - else: - return None - - - def get_voltage(self): - """ - Retrieves the supply voltage of this SFP - - Returns: - An integer number of supply voltage in mV - """ - if not self.dom_supported: - return None - if self.sfp_type == QSFP_TYPE: - offset = 0 - offset_xcvr = 128 - - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return None - - if self.dom_volt_supported: - dom_voltage_raw = self._read_eeprom_specific_bytes((offset + QSFP_VOLT_OFFSET), QSFP_VOLT_WIDTH) - if dom_voltage_raw is not None: - dom_voltage_data = sfpd_obj.parse_voltage(dom_voltage_raw, 0) - voltage = self._convert_string_to_num(dom_voltage_data['data']['Vcc']['value']) - return voltage - else: - return None - return None - - if self.sfp_type == QSFP_DD_TYPE: - offset = 128 - - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return None - - if self.dom_volt_supported: - dom_voltage_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_VOLT_OFFSET), QSFP_DD_VOLT_WIDTH) - if dom_voltage_raw is not None: - dom_voltage_data = sfpd_obj.parse_voltage(dom_voltage_raw, 0) - voltage = self._convert_string_to_num(dom_voltage_data['data']['Vcc']['value']) - return voltage - return None - - else: - offset = 256 - - sfpd_obj = sff8472Dom() - if sfpd_obj is None: - return None - - sfpd_obj._calibration_type = self.calibration - - dom_voltage_raw = self._read_eeprom_specific_bytes((offset + SFP_VOLT_OFFSET), SFP_VOLT_WIDTH) - if dom_voltage_raw is not None: - dom_voltage_data = sfpd_obj.parse_voltage(dom_voltage_raw, 0) - voltage = self._convert_string_to_num(dom_voltage_data['data']['Vcc']['value']) - return voltage - else: - return None - - - def get_tx_bias(self): - """ - Retrieves the TX bias current of this SFP - - Returns: - A list of four integer numbers, representing TX bias in mA - for channel 0 to channel 4. - Ex. ['110.09', '111.12', '108.21', '112.09'] - """ - tx_bias_list = [] - if self.sfp_type == QSFP_TYPE: - offset = 0 - offset_xcvr = 128 - - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return None - - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_MON_OFFSET), QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH) - if dom_channel_monitor_raw is not None: - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params_with_tx_power(dom_channel_monitor_raw, 0) - tx_bias_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX1Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX2Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX3Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX4Bias']['value'])) - - elif self.sfp_type == QSFP_DD_TYPE: - # page 11h - if self.dom_rx_tx_power_bias_supported: - offset = 512 - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return None - - if self.dom_tx_bias_power_supported: - dom_tx_bias_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_TX_BIAS_OFFSET), QSFP_DD_TX_BIAS_WIDTH) - if dom_tx_bias_raw is not None: - dom_tx_bias_data = sfpd_obj.parse_dom_tx_bias(dom_tx_bias_raw, 0) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX1Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX2Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX3Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX4Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX5Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX6Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX7Bias']['value'])) - tx_bias_list.append(self._convert_string_to_num(dom_tx_bias_data['data']['TX8Bias']['value'])) - - else: - offset = 256 - - sfpd_obj = sff8472Dom() - if sfpd_obj is None: - return None - sfpd_obj._calibration_type = self.calibration - - if self.dom_supported: - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_MON_OFFSET), SFP_CHANNL_MON_WIDTH) - if dom_channel_monitor_raw is not None: - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params(dom_channel_monitor_raw, 0) - tx_bias_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TXBias']['value'])) - else: - return None - else: - return None - - return tx_bias_list - - - def get_rx_power(self): - """ - Retrieves the received optical power for this SFP - - Returns: - A list of four integer numbers, representing received optical - power in mW for channel 0 to channel 4. - Ex. ['1.77', '1.71', '1.68', '1.70'] - """ - rx_power_list = [] - if self.sfp_type == OSFP_TYPE: - # OSFP not supported on our platform yet. - return None - - elif self.sfp_type == QSFP_TYPE: - offset = 0 - offset_xcvr = 128 - - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return None - - if self.dom_rx_power_supported: - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_MON_OFFSET), QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH) - if dom_channel_monitor_raw is not None: - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params_with_tx_power(dom_channel_monitor_raw, 0) - rx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['RX1Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['RX2Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['RX3Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['RX4Power']['value'])) - else: - return None - else: - return None - - elif self.sfp_type == QSFP_DD_TYPE: - # page 11 - if self.dom_rx_tx_power_bias_supported: - offset = 512 - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return None - - if self.dom_rx_power_supported: - dom_rx_power_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_RX_POWER_OFFSET), QSFP_DD_RX_POWER_WIDTH) - if dom_rx_power_raw is not None: - dom_rx_power_data = sfpd_obj.parse_dom_rx_power(dom_rx_power_raw, 0) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX1Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX2Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX3Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX4Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX5Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX6Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX7Power']['value'])) - rx_power_list.append(self._convert_string_to_num(dom_rx_power_data['data']['RX8Power']['value'])) - - else: - offset = 256 - - sfpd_obj = sff8472Dom() - if sfpd_obj is None: - return None - - if self.dom_supported: - sfpd_obj._calibration_type = self.calibration - - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_MON_OFFSET), SFP_CHANNL_MON_WIDTH) - if dom_channel_monitor_raw is not None: - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params(dom_channel_monitor_raw, 0) - rx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['RXPower']['value'])) - else: - return None - else: - return None - return rx_power_list - - - def get_tx_power(self): - """ - Retrieves the TX power of this SFP - - Returns: - A list of four integer numbers, representing TX power in mW - for channel 0 to channel 4. - Ex. ['1.86', '1.86', '1.86', '1.86'] - """ - tx_power_list = [] - if self.sfp_type == OSFP_TYPE: - # OSFP not supported on our platform yet. - return None - - elif self.sfp_type == QSFP_TYPE: - offset = 0 - offset_xcvr = 128 - - sfpd_obj = sff8436Dom() - if sfpd_obj is None: - return None - - if self.dom_tx_power_supported: - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + QSFP_CHANNL_MON_OFFSET), QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH) - if dom_channel_monitor_raw is not None: - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params_with_tx_power(dom_channel_monitor_raw, 0) - tx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX1Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX2Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX3Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TX4Power']['value'])) - else: - return None - else: - return None - - elif self.sfp_type == QSFP_DD_TYPE: - return None - # page 11 - if self.dom_rx_tx_power_bias_supported: - offset = 512 - sfpd_obj = qsfp_dd_Dom() - if sfpd_obj is None: - return None - - if self.dom_tx_power_supported: - dom_tx_power_raw = self._read_eeprom_specific_bytes((offset + QSFP_DD_TX_POWER_OFFSET), QSFP_DD_TX_POWER_WIDTH) - if dom_tx_power_raw is not None: - dom_tx_power_data = sfpd_obj.parse_dom_tx_power(dom_tx_power_raw, 0) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX1Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX2Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX3Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX4Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX5Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX6Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX7Power']['value'])) - tx_power_list.append(self._convert_string_to_num(dom_tx_power_data['data']['TX8Power']['value'])) - - else: - offset = 256 - sfpd_obj = sff8472Dom() - if sfpd_obj is None: - return None - - if self.dom_supported: - sfpd_obj._calibration_type = self.calibration - - dom_channel_monitor_raw = self._read_eeprom_specific_bytes((offset + SFP_CHANNL_MON_OFFSET), SFP_CHANNL_MON_WIDTH) - if dom_channel_monitor_raw is not None: - dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params(dom_channel_monitor_raw, 0) - tx_power_list.append(self._convert_string_to_num(dom_channel_monitor_data['data']['TXPower']['value'])) - else: - return None - else: - return None - return tx_power_list - - def reset(self): """ Reset SFP and return all user module settings to their default state. @@ -2033,7 +315,7 @@ def reset(self): refer plugins/sfpreset.py """ if utils.is_host(): - # To avoid performance issue, + # To avoid performance issue, # call class level method to avoid initialize the whole sonic platform API reset_code = 'from sonic_platform import sfp;\n' \ 'with sfp.SdkHandleContext() as sdk_handle:' \ @@ -2050,7 +332,6 @@ def reset(self): else: return self._reset(self.sdk_handle, self.sdk_index, self.slot_id) - @classmethod def _reset(cls, sdk_handle, sdk_index, slot_id): module_id_info = sx_mgmt_module_id_info_t() @@ -2063,41 +344,6 @@ def _reset(cls, sdk_handle, sdk_index, slot_id): return rc == SX_STATUS_SUCCESS - def tx_disable(self, tx_disable): - """ - Disable SFP TX for all channels - - Args: - tx_disable : A Boolean, True to enable tx_disable mode, False to disable - tx_disable mode. - - Returns: - A boolean, True if tx_disable is set successfully, False if not - - for SFP, make use of bit 6 of byte at (offset 110, a2h (i2c addr 0x51)) to disable/enable tx - for QSFP, set all channels to disable/enable tx - """ - return NotImplementedError - - - def tx_disable_channel(self, channel, disable): - """ - Sets the tx_disable for specified SFP channels - - Args: - channel : A hex of 4 bits (bit 0 to bit 3) which represent channel 0 to 3, - e.g. 0x5 for channel 0 and channel 2. - disable : A boolean, True to disable TX channels specified in channel, - False to enable - - Returns: - A boolean, True if successful, False if not - - QSFP: page a0, address 86, lower 4 bits - """ - return NotImplementedError - - @classmethod def is_nve(cls, port): return (port & NVE_MASK) != 0 @@ -2117,7 +363,7 @@ def is_port_admin_status_up(cls, sdk_handle, log_port): assert rc == SXD_STATUS_SUCCESS, "sx_api_port_state_get failed, rc = %d" % rc admin_state = sx_port_admin_state_t_p_value(admin_state_p) - + delete_sx_port_oper_state_t_p(oper_state_p) delete_sx_port_admin_state_t_p(admin_state_p) delete_sx_port_module_state_t_p(module_state_p) @@ -2220,7 +466,7 @@ def set_lpmode(self, lpmode): A boolean, True if lpmode is set successfully, False if not """ if utils.is_host(): - # To avoid performance issue, + # To avoid performance issue, # call class level method to avoid initialize the whole sonic platform API set_lpmode_code = 'from sonic_platform import sfp;\n' \ 'with sfp.SdkHandleContext() as sdk_handle:' \ @@ -2238,42 +484,20 @@ def set_lpmode(self, lpmode): else: return self._set_lpmode(lpmode, self.sdk_handle, self.sdk_index, self.slot_id) - + @classmethod def _set_lpmode(cls, lpmode, sdk_handle, sdk_index, slot_id): log_port_list = cls.get_logical_ports(sdk_handle, sdk_index, slot_id) sdk_lpmode = SX_MGMT_PHY_MOD_PWR_MODE_LOW_E if lpmode else SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E - cls._set_lpmode_raw(sdk_handle, - sdk_index, + cls._set_lpmode_raw(sdk_handle, + sdk_index, slot_id, - log_port_list, - SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E, + log_port_list, + SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E, sdk_lpmode) logger.log_info("{} low power mode for module {}, slot {}".format("Enabled" if lpmode else "Disabled", sdk_index, slot_id)) return True - - def set_power_override(self, power_override, power_set): - """ - Sets SFP power level using power_override and power_set - - Args: - power_override : - A Boolean, True to override set_lpmode and use power_set - to control SFP power, False to disable SFP power control - through power_override/power_set and use set_lpmode - to control SFP power. - power_set : - Only valid when power_override is True. - A Boolean, True to set SFP to low power mode, False to set - SFP to high power mode. - - Returns: - A boolean, True if power-override and power_set are set successfully, - False if not - """ - return NotImplementedError - def is_replaceable(self): """ Indicate whether this device is replaceable. diff --git a/platform/mellanox/mlnx-platform-api/tests/test_chassis.py b/platform/mellanox/mlnx-platform-api/tests/test_chassis.py index cfa2d8224718..a7eff5b08625 100644 --- a/platform/mellanox/mlnx-platform-api/tests/test_chassis.py +++ b/platform/mellanox/mlnx-platform-api/tests/test_chassis.py @@ -38,7 +38,7 @@ class TestChassis: 2. Fan drawer related API 3. SFP related API (Except modular chassis SFP related API) 4. Reboot cause related API - + Thermal, Eeprom, Watchdog, Component, System LED related API will be tested in seperate class """ @classmethod @@ -166,7 +166,6 @@ def test_sfp(self): @mock.patch('sonic_platform.sfp_event.sfp_event.check_sfp_status', MagicMock()) @mock.patch('sonic_platform.sfp_event.sfp_event.__init__', MagicMock(return_value=None)) @mock.patch('sonic_platform.sfp_event.sfp_event.initialize', MagicMock()) - @mock.patch('sonic_platform.sfp.SFP.reinit', MagicMock()) @mock.patch('sonic_platform.device_data.DeviceDataManager.get_sfp_count', MagicMock(return_value=3)) def test_change_event(self): from sonic_platform.sfp_event import sfp_event @@ -185,7 +184,6 @@ def mock_check_sfp_status(self, port_dict, error_dict, timeout): assert status is True assert 'sfp' in event_dict and event_dict['sfp'][1] == '1' assert len(chassis._sfp_list) == 3 - assert SFP.reinit.call_count == 1 # Call get_change_event with timeout=1.0 return_port_dict = {} diff --git a/platform/mellanox/mlnx-platform-api/tests/test_sfp.py b/platform/mellanox/mlnx-platform-api/tests/test_sfp.py index 0ad9537430b9..261a03b527aa 100644 --- a/platform/mellanox/mlnx-platform-api/tests/test_sfp.py +++ b/platform/mellanox/mlnx-platform-api/tests/test_sfp.py @@ -49,7 +49,7 @@ def test_sfp_index(self, mock_max_port): assert sfp.sdk_index == 1 assert sfp.index == 5 - @mock.patch('sonic_platform.sfp.SFP._read_eeprom_specific_bytes', mock.MagicMock(return_value=None)) + @mock.patch('sonic_platform.sfp.SFP.read_eeprom', mock.MagicMock(return_value=None)) @mock.patch('sonic_platform.sfp.SFP._get_error_code') @mock.patch('sonic_platform.chassis.Chassis.get_num_sfps', mock.MagicMock(return_value=2)) def test_sfp_get_error_status(self, mock_get_error_code): From 3377a4b706d9efec434275859a780f70ef9284dd Mon Sep 17 00:00:00 2001 From: Oleksandr Ivantsiv Date: Tue, 22 Mar 2022 14:57:17 +0000 Subject: [PATCH 2/2] Fix LGTM allerts. --- platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py index 92e6537bf492..29d1c7c081f6 100644 --- a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py +++ b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py @@ -25,15 +25,7 @@ try: import subprocess import os - from sonic_platform_base.sfp_base import SfpBase from sonic_platform_base.sonic_eeprom import eeprom_dts - from sonic_platform_base.sonic_sfp.sff8472 import sff8472InterfaceId - from sonic_platform_base.sonic_sfp.sff8472 import sff8472Dom - from sonic_platform_base.sonic_sfp.sff8436 import sff8436InterfaceId - from sonic_platform_base.sonic_sfp.sff8436 import sff8436Dom - from sonic_platform_base.sonic_sfp.inf8628 import inf8628InterfaceId - from sonic_platform_base.sonic_sfp.qsfp_dd import qsfp_dd_InterfaceId - from sonic_platform_base.sonic_sfp.qsfp_dd import qsfp_dd_Dom from sonic_py_common.logger import Logger from . import utils from .device_data import DeviceDataManager