Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xcvrd should restart if any child thread crashes #326

Merged
merged 5 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 140 additions & 33 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,95 @@
global_media_settings = media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS'].pop('1-32')
media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS']['1-5,6,7-20,21-32'] = global_media_settings

class TestXcvrdThreadException(object):

@patch('xcvrd.xcvrd.platform_chassis', MagicMock())
def test_CmisManagerTask_task_run_with_exception(self):
port_mapping = PortMapping()
stop_event = threading.Event()
cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cmis_manager.wait_for_port_config_done = MagicMock(side_effect = NotImplementedError)
exception_received = None
trace = None
try:
cmis_manager.start()
cmis_manager.join()
except Exception as e1:
exception_received = e1
trace = traceback.format_exc()

assert not cmis_manager.is_alive()
assert(type(exception_received) == NotImplementedError)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("wait_for_port_config_done" in str(trace))

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
def test_DomInfoUpdateTask_task_run_with_exception(self):
port_mapping = PortMapping()
stop_event = threading.Event()
dom_info_update = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
exception_received = None
trace = None
try:
dom_info_update.start()
dom_info_update.join()
except Exception as e1:
exception_received = e1
trace = traceback.format_exc()

assert not dom_info_update.is_alive()
assert(type(exception_received) == NotImplementedError)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("subscribe_port_config_change" in str(trace))

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
def test_SfpStateUpdateTask_task_run_with_exception(self):
port_mapping = PortMapping()
retry_eeprom_set = set()
stop_event = threading.Event()
sfp_error_event = threading.Event()
sfp_state_update = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
exception_received = None
trace = None
try:
sfp_state_update.start()
sfp_state_update.join()
except Exception as e1:
exception_received = e1
trace = traceback.format_exc()

assert not sfp_state_update.is_alive()
assert(type(exception_received) == NotImplementedError)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("subscribe_port_config_change" in str(trace))

@patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False))
@patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False))
@patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False))
@patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect = NotImplementedError))
@patch('xcvrd.xcvrd.CmisManagerTask.start', MagicMock())
@patch('xcvrd.xcvrd.DomInfoUpdateTask.start', MagicMock())
@patch('xcvrd.xcvrd.SfpStateUpdateTask.start', MagicMock())
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit', MagicMock())
@patch('os.kill')
@patch('xcvrd.xcvrd.DaemonXcvrd.init')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.join')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.join')
def test_DaemonXcvrd_run_with_exception(self, mock_task_join1, mock_task_join2, mock_init, mock_os_kill):
mock_init.return_value = (PortMapping(), set())
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.stop_event.wait = MagicMock()
xcvrd.run()

assert len(xcvrd.threads) == 3
assert mock_init.call_count == 1
assert mock_task_join1.call_count == 1
assert mock_task_join2.call_count == 1
assert mock_os_kill.call_count == 1

class TestXcvrdScript(object):

@patch('xcvrd.xcvrd._wrapper_get_sfp_type')
Expand Down Expand Up @@ -482,10 +571,10 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table

@patch('xcvrd.xcvrd.DaemonXcvrd.init')
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.start')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.start')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.join')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.join')
def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init):
mock_init.return_value = (PortMapping(), set())
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
Expand All @@ -502,7 +591,8 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1,
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
def test_CmisManagerTask_handle_port_change_event(self):
port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)

assert not task.isPortConfigDone
port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
Expand All @@ -529,7 +619,8 @@ def test_CmisManagerTask_handle_port_change_event(self):
@patch('xcvrd.xcvrd.XcvrTableHelper')
def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cfg_port_tbl = MagicMock()
cfg_port_tbl.get = MagicMock(return_value=(True, (('laser_freq', 193100),)))
mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl)
Expand All @@ -539,7 +630,8 @@ def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
@patch('xcvrd.xcvrd.XcvrTableHelper')
def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper):
port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cfg_port_tbl = MagicMock()
cfg_port_tbl.get = MagicMock(return_value=(True, (('tx_power', -10),)))
mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl)
Expand All @@ -555,11 +647,12 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object])

port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
task.wait_for_port_config_done = MagicMock()
task.task_run()
task.task_stop()
assert task.task_process is None
stop_event = threading.Event()
cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cmis_manager.wait_for_port_config_done = MagicMock()
cmis_manager.start()
cmis_manager.join()
assert not cmis_manager.is_alive()

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
Expand Down Expand Up @@ -658,7 +751,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)

port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)

port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
task.on_port_update_event(port_change_event)
Expand Down Expand Up @@ -709,7 +803,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
@patch('xcvrd.xcvrd.delete_port_from_status_table_hw')
def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw):
port_mapping = PortMapping()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.on_port_config_change(port_change_event)
Expand All @@ -731,10 +826,11 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
def test_DomInfoUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
task.task_run()
task.task_stop()
assert not task.task_thread.is_alive()
stop_event = threading.Event()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.start()
task.join()
assert not task.is_alive()

@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
@patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status')
Expand All @@ -755,7 +851,8 @@ def test_DomInfoUpdateTask_task_worker(self, mock_post_pm_info, mock_update_stat
mock_sub_table.return_value = mock_selectable

port_mapping = PortMapping()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.task_stopping_event.wait = MagicMock(side_effect=[False, True])
mock_detect_error.return_value = True
Expand Down Expand Up @@ -786,10 +883,11 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
mock_table_helper.get_int_tbl = MagicMock(return_value=mock_table)
mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
mock_table_helper.get_dom_threshold_tbl = MagicMock(return_value=mock_table)
stopping_event = multiprocessing.Event()
stop_event = threading.Event()
sfp_error_event = threading.Event()
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
wait_time = 5
Expand Down Expand Up @@ -819,15 +917,18 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
assert not task.port_mapping.logical_to_asic
assert mock_update_status_hw.call_count == 1

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
def test_SfpStateUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
sfp_error_event = multiprocessing.Event()
task.task_run(sfp_error_event)
assert wait_until(5, 1, task.task_process.is_alive)
task.task_stop()
assert wait_until(5, 1, lambda: task.task_process.is_alive() is False)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.start()
assert wait_until(5, 1, task.is_alive)
task.raise_exception()
task.join()
assert wait_until(5, 1, lambda: task.is_alive() is False)

@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
@patch('xcvrd.xcvrd.post_port_sfp_info_to_db')
Expand All @@ -838,7 +939,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo

port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table)
task.xcvr_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
Expand Down Expand Up @@ -870,7 +973,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo
def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
port_dict = {}
assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL
assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL
Expand Down Expand Up @@ -907,10 +1012,10 @@ def test_SfpStateUpdateTask_task_worker(self, mock_post_pm_info, mock_del_status
mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill):
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
stop_event = multiprocessing.Event()
sfp_error_event = multiprocessing.Event()
mock_change_event.return_value = (True, {0: 0}, {})
mock_mapping_event.return_value = SYSTEM_NOT_READY

Expand Down Expand Up @@ -1033,7 +1138,9 @@ class MockTable:

port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl
task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl
Expand Down
Loading