Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mellanox] run module initialization when any SFP related API is called #221

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 219 additions & 32 deletions platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import os
from functools import reduce
from .utils import extract_RJ45_ports_index
from . import module_host_mgmt_initializer
from . import utils
from .device_data import DeviceDataManager
import re
import queue
import select
import threading
import time
from sonic_platform import modules_mgmt
except ImportError as e:
raise ImportError (str(e) + "- required module not found")

Expand Down Expand Up @@ -132,9 +132,9 @@ def __init__(self):

Chassis.chassis_instance = self

self.modules_mgmt_thread = threading.Thread()
self.modules_changes_queue = queue.Queue()
self.modules_mgmt_task_stopping_event = threading.Event()
self.module_host_mgmt_initializer = module_host_mgmt_initializer.ModuleHostMgmtInitializer()
self.poll_obj = None
self.registered_fds = None

logger.log_info("Chassis loaded successfully")

Expand Down Expand Up @@ -344,8 +344,11 @@ def get_all_sfps(self):
Returns:
A list of objects derived from SfpBase representing all sfps
available on this chassis
"""
self.initialize_sfp()
"""
if DeviceDataManager.is_module_host_management_mode():
self.module_host_mgmt_initializer.initialize(self)
else:
self.initialize_sfp()
return self._sfp_list

def get_sfp(self, index):
Expand All @@ -362,7 +365,10 @@ def get_sfp(self, index):
An object dervied from SfpBase representing the specified sfp
"""
index = index - 1
self.initialize_single_sfp(index)
if DeviceDataManager.is_module_host_management_mode():
self.module_host_mgmt_initializer.initialize(self)
else:
self.initialize_single_sfp(index)
return super(Chassis, self).get_sfp(index)

def get_port_or_cage_type(self, index):
Expand Down Expand Up @@ -412,42 +418,223 @@ def get_change_event(self, timeout=0):
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.modules_mgmt_thread.is_alive():
# open new SFP change events thread
self.modules_mgmt_thread = modules_mgmt.ModulesMgmtTask(q=self.modules_changes_queue
, main_thread_stop_event = self.modules_mgmt_task_stopping_event)
# Set the thread as daemon so when pmon/xcvrd are shutting down, modules_mgmt will shut down immedietly.
self.modules_mgmt_thread.daemon = True
self.modules_mgmt_thread.start()
self.initialize_sfp()
wait_for_ever = (timeout == 0)
if DeviceDataManager.is_module_host_management_mode():
self.module_host_mgmt_initializer.initialize(self)
return self.get_change_event_for_module_host_management_mode(timeout)
else:
self.initialize_sfp()
return self.get_change_event_legacy(timeout)

def get_change_event_for_module_host_management_mode(self, timeout):
"""Get SFP change event when module host management mode is enabled.

Args:
timeout: Timeout in milliseconds (optional). If timeout == 0,
this method will block until a change is detected.

Returns:
(bool, dict):
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
where device_id is the device ID for this device and
device_event,
status='1' represents device inserted,
status='0' represents device removed.
Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.poll_obj:
self.poll_obj = select.poll()
self.registered_fds = {}
for s in self._sfp_list:
fds = s.get_fds_for_poling()
for fd_type, fd in fds.items():
self.poll_obj.register(fd, select.POLLERR | select.POLLPRI)
self.registered_fds[fd.fileno()] = (s.sdk_index, fd, fd_type)

logger.log_debug(f'Registered SFP file descriptors for polling: {self.registered_fds}')

from . import sfp

wait_forever = (timeout == 0)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
wait_ready_task = sfp.SFP.get_wait_ready_task()

while True:
fds_events = self.poll_obj.poll(timeout)
for fileno, _ in fds_events:
if fileno not in self.registered_fds:
logger.log_error(f'Unknown file no {fileno} from poll event, registered files are {self.registered_fds}')
continue

sfp_index, fd, fd_type = self.registered_fds[fileno]
s = self._sfp_list[sfp_index]
fd.seek(0)
fd_value = int(fd.read().strip())

# Detecting dummy event
if s.is_dummy_event(fd_type, fd_value):
# Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
logger.log_debug(f'Ignore dummy event {fd_type}:{fd_value} for SFP {sfp_index}')
continue

logger.log_notice(f'Got SFP event: index={sfp_index}, type={fd_type}, value={fd_value}')
if fd_type == 'hw_present':
# event could be EVENT_NOT_PRESENT or EVENT_PRESENT
event = sfp.EVENT_NOT_PRESENT if fd_value == 0 else sfp.EVENT_PRESENT
s.on_event(event)
elif fd_type == 'present':
if str(fd_value) == sfp.SFP_STATUS_ERROR:
# FW control cable got an error, no need trigger state machine
sfp_status, error_desc = s.get_error_info_from_sdk_error_type()
port_dict[sfp_index + 1] = sfp_status
if error_desc:
error_dict[sfp_index + 1] = error_desc
continue
elif str(fd_value) == sfp.SFP_STATUS_INSERTED:
# FW control cable got present, only case is that the cable is recovering
# from an error. FW control cable has no transition from "Not Present" to "Present"
# because "Not Present" cable is always "software control" and should always poll
# hw_present sysfs instead of present sysfs.
port_dict[sfp_index + 1] = sfp.SFP_STATUS_INSERTED
continue
else:
s.on_event(sfp.EVENT_NOT_PRESENT)
else:
# event could be EVENT_POWER_GOOD or EVENT_POWER_BAD
event = sfp.EVENT_POWER_BAD if fd_value == 0 else sfp.EVENT_POWER_GOOD
s.on_event(event)

if s.in_stable_state():
s.fill_change_event(port_dict)
s.refresh_poll_obj(self.poll_obj, self.registered_fds)
else:
logger.log_debug(f'SFP {sfp_index} does not reach stable state, state={s.state}')

ready_sfp_set = wait_ready_task.get_ready_set()
for sfp_index in ready_sfp_set:
s = self._sfp_list[sfp_index]
s.on_event(sfp.EVENT_RESET_DONE)
if s.in_stable_state():
s.fill_change_event(port_dict)
s.refresh_poll_obj(self.poll_obj, self.registered_fds)
else:
logger.log_error(f'SFP {sfp_index} failed to reach stable state, state={s.state}')

if port_dict:
logger.log_notice(f'Sending SFP change event: {port_dict}, error event: {error_dict}')
self.reinit_sfps(port_dict)
return True, {
'sfp': port_dict,
'sfp_error': error_dict
}
else:
if not wait_forever:
elapse = time.time() - begin
if elapse * 1000 >= timeout:
return True, {'sfp': {}}

def get_change_event_legacy(self, timeout):
"""Get SFP change event when module host management is disabled.

Args:
timeout (int): polling timeout in ms

Returns:
(bool, dict):
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
where device_id is the device ID for this device and
device_event,
status='1' represents device inserted,
status='0' represents device removed.
Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.poll_obj:
self.poll_obj = select.poll()
self.registered_fds = {}
# SDK always sent event for the first time polling. Such event should not be sent to xcvrd.
# Store SFP state before first time polling so that we can detect dummy event.
self.sfp_states_before_first_poll = {}
for s in self._sfp_list:
fd = s.get_fd_for_polling_legacy()
self.poll_obj.register(fd, select.POLLERR | select.POLLPRI)
self.registered_fds[fd.fileno()] = (s.sdk_index, fd)
self.sfp_states_before_first_poll[s.sdk_index] = s.get_module_status()

logger.log_debug(f'Registered SFP file descriptors for polling: {self.registered_fds}')

from . import sfp

wait_forever = (timeout == 0)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
i = 0

while True:
try:
logger.log_info(f'get_change_event() trying to get changes from queue on iteration {i}')
port_dict = self.modules_changes_queue.get(timeout=timeout / 1000)
logger.log_info(f'get_change_event() iteration {i} port_dict: {port_dict}')
except queue.Empty:
logger.log_info(f"failed to get item from modules changes queue on itertaion {i}")
fds_events = self.poll_obj.poll(timeout)
for fileno, _ in fds_events:
if fileno not in self.registered_fds:
logger.log_error(f'Unknown file no {fileno} from poll event, registered files are {self.registered_fds}')
continue

sfp_index, fd = self.registered_fds[fileno]
fd.seek(0)
fd.read()
s = self._sfp_list[sfp_index]
sfp_status = s.get_module_status()

if sfp_index in self.sfp_states_before_first_poll:
# Detecting dummy event
sfp_state_before_poll = self.sfp_states_before_first_poll[sfp_index]
self.sfp_states_before_first_poll.pop(sfp_index)
if sfp_state_before_poll == sfp_status:
# Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
logger.log_debug(f'Ignore dummy event {sfp_status} for SFP {sfp_index}')
continue

logger.log_notice(f'Got SFP event: index={sfp_index}, value={sfp_status}')
if sfp_status == sfp.SFP_STATUS_UNKNOWN:
# in the following sequence, STATUS_UNKNOWN can be returned.
# so we shouldn't raise exception here.
# 1. some sfp module is inserted
# 2. sfp_event gets stuck and fails to fetch the change event instantaneously
# 3. and then the sfp module is removed
# 4. sfp_event starts to try fetching the change event
logger.log_notice("unknown module state, maybe the port suffers two adjacent insertion/removal")
continue

if sfp_status == sfp.SFP_STATUS_ERROR:
sfp_status, error_desc = s.get_error_info_from_sdk_error_type()
if error_desc:
error_dict[sfp_index + 1] = error_desc
port_dict[sfp_index + 1] = sfp_status

if port_dict:
logger.log_notice(f'Sending SFP change event: {port_dict}, error event: {error_dict}')
self.reinit_sfps(port_dict)
result_dict = {'sfp': port_dict}
result_dict['sfp_error'] = error_dict
return True, result_dict
return True, {
'sfp': port_dict,
'sfp_error': error_dict
}
else:
if not wait_for_ever:
if not wait_forever:
elapse = time.time() - begin
logger.log_info(f"get_change_event: wait_for_ever {wait_for_ever} elapse {elapse} iteartion {i}")
if elapse * 1000 >= timeout:
logger.log_info(f"elapse {elapse} > timeout {timeout} iteartion {i} returning empty dict")
return True, {'sfp': {}}
i += 1

def reinit_sfps(self, port_dict):
"""
Expand All @@ -457,7 +644,7 @@ def reinit_sfps(self, port_dict):
"""
from . import sfp
for index, status in port_dict.items():
if status == sfp.SFP_STATUS_INSERTED:
if status == sfp.SFP_STATUS_REMOVED:
try:
self._sfp_list[int(index) - 1].reinit()
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023 NVIDIA CORPORATION & AFFILIATES.
# Copyright (c) 2020-2024 NVIDIA CORPORATION & AFFILIATES.
# Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -242,7 +242,7 @@ def get_cpld_component_list(cls):

@classmethod
@utils.read_only_cache()
def is_independent_mode(cls):
def is_module_host_management_mode(cls):
from sonic_py_common import device_info
_, hwsku_dir = device_info.get_paths_to_platform_and_hwsku_dirs()
sai_profile_file = os.path.join(hwsku_dir, 'sai.profile')
Expand All @@ -258,7 +258,7 @@ def wait_platform_ready(cls):
"""
conditions = []
sysfs_nodes = ['power_mode', 'power_mode_policy', 'present', 'reset', 'status', 'statuserror']
if cls.is_independent_mode():
if cls.is_module_host_management_mode():
sysfs_nodes.extend(['control', 'frequency', 'frequency_support', 'hw_present', 'hw_reset',
'power_good', 'power_limit', 'power_on', 'temperature/input'])
else:
Expand Down
Loading