From 780444ae13ac01b054f6fc7b1ddcc8ab9c711515 Mon Sep 17 00:00:00 2001 From: junchao Date: Wed, 11 Aug 2021 18:32:03 +0800 Subject: [PATCH] [xcvrd] Subscribe port configuration change in xcvrd --- .gitignore | 6 +- sonic-xcvrd/tests/test_xcvrd.py | 467 +++++++++++- sonic-xcvrd/xcvrd/xcvrd.py | 666 ++++++++++++------ .../xcvrd/xcvrd_utilities/port_mapping.py | 77 ++ .../xcvrd/xcvrd_utilities/y_cable_helper.py | 178 +++-- 5 files changed, 1085 insertions(+), 309 deletions(-) create mode 100644 sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py diff --git a/.gitignore b/.gitignore index aebbe29ba..fa7dcba3c 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,8 @@ sonic-psud/scripts/psudc # Unit test / coverage reports coverage.xml .coverage -htmlcov/ \ No newline at end of file +htmlcov/ +test-results.xml +leddc +pciedc +syseepromdc diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index a8ad41c6f..376c5843e 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -2,7 +2,6 @@ import sys import time -import unittest if sys.version_info >= (3, 3): from unittest.mock import MagicMock, patch else: @@ -33,9 +32,9 @@ class TestXcvrdScript(object): def test_xcvrd_helper_class_run(self): - Y_cable_task = YCableTableUpdateTask() + Y_cable_task = YCableTableUpdateTask(None) - @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_info', MagicMock(return_value={'temperature': '22.75', 'voltage': '0.5', @@ -65,20 +64,21 @@ def test_xcvrd_helper_class_run(self): 'tx8power': '0.7', })) def test_post_port_dom_info_to_db(self): logical_port_name = "Ethernet0" + port_mapping = PortMapping() stop_event = threading.Event() dom_tbl = Table("STATE_DB", TRANSCEIVER_DOM_SENSOR_TABLE) - post_port_dom_info_to_db(logical_port_name, dom_tbl, stop_event) + post_port_dom_info_to_db(logical_port_name, port_mapping, dom_tbl, stop_event) - @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) def test_del_port_sfp_dom_info_from_db(self): logical_port_name = "Ethernet0" - stop_event = threading.Event() + port_mapping = PortMapping() dom_tbl = Table("STATE_DB", TRANSCEIVER_DOM_SENSOR_TABLE) init_tbl = Table("STATE_DB", TRANSCEIVER_INFO_TABLE) - del_port_sfp_dom_info_from_db(logical_port_name, init_tbl, dom_tbl) + del_port_sfp_dom_info_from_db(logical_port_name, port_mapping, init_tbl, dom_tbl) - @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_threshold_info', MagicMock(return_value={'temphighalarm': '22.75', 'temphighwarning': '0.5', @@ -102,11 +102,12 @@ def test_del_port_sfp_dom_info_from_db(self): 'txbiaslowwarning': '0.7', })) def test_post_port_dom_threshold_info_to_db(self): logical_port_name = "Ethernet0" + port_mapping = PortMapping() stop_event = threading.Event() dom_tbl = Table("STATE_DB", TRANSCEIVER_DOM_SENSOR_TABLE) - post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl, stop_event) + post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, dom_tbl, stop_event) - @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75', @@ -129,12 +130,13 @@ def test_post_port_dom_threshold_info_to_db(self): 'dom_capability': '0.7', })) def test_post_port_sfp_info_to_db(self): logical_port_name = "Ethernet0" + port_mapping = PortMapping() stop_event = threading.Event() dom_tbl = Table("STATE_DB", TRANSCEIVER_DOM_SENSOR_TABLE) transceiver_dict = {} - post_port_sfp_info_to_db(logical_port_name, dom_tbl, transceiver_dict, stop_event) + post_port_sfp_info_to_db(logical_port_name, port_mapping, dom_tbl, transceiver_dict, stop_event) - @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @@ -203,20 +205,21 @@ def test_post_port_sfp_info_to_db(self): 'tx7power': '0.7', 'tx8power': '0.7', })) def test_post_port_sfp_dom_info_to_db(self): - logical_port_name = "Ethernet0" + port_mapping = PortMapping() stop_event = threading.Event() - post_port_sfp_dom_info_to_db(True, stop_event) + post_port_sfp_dom_info_to_db(True, port_mapping, stop_event) - @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) def test_init_port_sfp_status_tbl(self): + port_mapping = PortMapping() stop_event = threading.Event() - init_port_sfp_status_tbl(stop_event) + init_port_sfp_status_tbl(port_mapping, stop_event) @patch('xcvrd.xcvrd_utilities.y_cable_helper.y_cable_platform_sfputil', MagicMock(return_value=[0])) - @patch('xcvrd.xcvrd_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd_utilities.y_cable_helper._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd_utilities.y_cable_helper.get_muxcable_info', MagicMock(return_value={'tor_active': 'self', 'mux_direction': 'self', @@ -250,12 +253,13 @@ def test_init_port_sfp_status_tbl(self): 'version_peer_next': '1.7MS'})) def test_post_port_mux_info_to_db(self): logical_port_name = "Ethernet0" + port_mapping = PortMapping() mux_tbl = Table("STATE_DB", y_cable_helper.MUX_CABLE_INFO_TABLE) - rc = post_port_mux_info_to_db(logical_port_name, mux_tbl) + rc = post_port_mux_info_to_db(logical_port_name, port_mapping, mux_tbl) assert(rc != -1) @patch('xcvrd.xcvrd_utilities.y_cable_helper.y_cable_platform_sfputil', MagicMock(return_value=[0])) - @patch('xcvrd.xcvrd_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd_utilities.y_cable_helper._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd_utilities.y_cable_helper.get_muxcable_static_info', MagicMock(return_value={'read_side': 'self', 'nic_lane1_precursor1': '1', @@ -290,8 +294,9 @@ def test_post_port_mux_info_to_db(self): 'tor_peer_lane2_postcursor2': '17'})) def test_post_port_mux_static_info_to_db(self): logical_port_name = "Ethernet0" + port_mapping = PortMapping() mux_tbl = Table("STATE_DB", y_cable_helper.MUX_CABLE_STATIC_INFO_TABLE) - rc = post_port_mux_static_info_to_db(logical_port_name, mux_tbl) + rc = post_port_mux_static_info_to_db(logical_port_name, port_mapping, mux_tbl) assert(rc != -1) def test_get_media_settings_key(self): @@ -336,6 +341,413 @@ def test_is_error_sfp_status(self): assert not is_error_block_eeprom_reading(int(SFP_STATUS_INSERTED)) assert not is_error_block_eeprom_reading(int(SFP_STATUS_REMOVED)) + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.wait_for_port_config_done('') + assert swsscommon.Select.select.call_count == 2 + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_DaemonXcvrd_handle_port_config_change(self, mock_select, mock_sub_table): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True]) + mock_observer = MagicMock() + mock_observer.notify_port_change_event = MagicMock() + xcvrd.subscribe_port_change_event(mock_observer) + + xcvrd.handle_port_config_change() + assert mock_observer.notify_port_change_event.call_count == 1 + assert xcvrd.port_mapping.logical_port_list.count('Ethernet0') + assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1] + + xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True]) + mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), (None, None, None)]) + xcvrd.handle_port_config_change() + assert mock_observer.notify_port_change_event.call_count == 2 + assert not xcvrd.port_mapping.logical_port_list + assert not xcvrd.port_mapping.logical_to_physical + assert not xcvrd.port_mapping.physical_to_logical + assert not xcvrd.port_mapping.logical_to_asic + + @patch('swsscommon.swsscommon.Table') + def test_DaemonXcvrd_init_port_mapping(self, mock_swsscommon_table): + mock_table = MagicMock() + mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + mock_table.get = MagicMock(side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.init_port_mapping() + assert xcvrd.port_mapping.logical_port_list.count('Ethernet0') + assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1] + + assert xcvrd.port_mapping.logical_port_list.count('Ethernet4') + assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0 + assert xcvrd.port_mapping.get_physical_to_logical(2) == ['Ethernet4'] + assert xcvrd.port_mapping.get_logical_to_physical('Ethernet4') == [2] + + @patch('xcvrd.xcvrd.DaemonXcvrd.load_platform_util', MagicMock()) + @patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', MagicMock(return_value=('/tmp', None))) + @patch('swsscommon.swsscommon.WarmStart', MagicMock()) + @patch('xcvrd.xcvrd.DaemonXcvrd.wait_for_port_config_done', MagicMock()) + def test_DaemonXcvrd_init_deinit(self): + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.init() + xcvrd.deinit() + # TODO: fow now we only simply call xcvrd.init/deinit without any further check, it only makes sure that + # xcvrd.init/deinit will not raise unexpected exception. In future, probably more check will be added + + @patch('xcvrd.xcvrd.DaemonXcvrd.init') + @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') + @patch('xcvrd.xcvrd.DaemonXcvrd.handle_port_config_change') + @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run') + @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop') + def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_handle_port_config_change, mock_deinit, mock_init): + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.run() + # TODO: more check + assert mock_task_stop1.call_count == 1 + assert mock_task_stop2.call_count == 1 + assert mock_task_run1.call_count == 1 + assert mock_task_run2.call_count == 1 + assert mock_handle_port_config_change.call_count == 1 + assert mock_deinit.call_count == 1 + assert mock_init.call_count == 1 + + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + def test_DomInfoUpdateTask_handle_port_change_event(self): + port_mapping = PortMapping() + task = DomInfoUpdateTask(port_mapping) + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.notify_port_change_event(port_change_event) + task.handle_port_change_event() + assert task.port_mapping.logical_port_list.count('Ethernet0') + assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) + task.notify_port_change_event(port_change_event) + task.handle_port_change_event() + assert not task.port_mapping.logical_port_list + assert not task.port_mapping.logical_to_physical + assert not task.port_mapping.physical_to_logical + assert not task.port_mapping.logical_to_asic + + def test_DomInfoUpdateTask_task_run_stop(self): + port_mapping = PortMapping() + task = DomInfoUpdateTask(port_mapping) + task.task_run([False]) + task.task_stop() + assert not task.task_thread.is_alive() + + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status') + @patch('xcvrd.xcvrd.post_port_dom_info_to_db') + @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') + def test_DomInfoUpdateTask_task_worker(self, mock_post_dom_th, mock_post_dom_info, mock_detect_error): + port_mapping = PortMapping() + task = DomInfoUpdateTask(port_mapping) + task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.notify_port_change_event(port_change_event) + mock_detect_error.return_value = True + task.task_worker([False]) + assert task.port_mapping.logical_port_list.count('Ethernet0') + assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] + assert mock_post_dom_th.call_count == 0 + assert mock_post_dom_info.call_count == 0 + mock_detect_error.return_value = False + task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) + task.task_worker([False]) + assert mock_post_dom_th.call_count == 1 + assert mock_post_dom_info.call_count == 1 + + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=False)) + @patch('xcvrd.xcvrd.xcvr_table_helper') + def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): + mock_table = MagicMock() + mock_table.get = MagicMock(return_value=(False, None)) + mock_table_helper.get_status_tbl = MagicMock(return_value=mock_table) + mock_table_helper.get_int_tbl = MagicMock(return_value=mock_table) + mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table) + stopping_event = multiprocessing.Event() + port_mapping = PortMapping() + task = SfpStateUpdateTask(port_mapping) + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.notify_port_change_event(port_change_event) + wait_time = 5 + while wait_time > 0: + task.handle_port_change_event(task.event_queue, stopping_event, [False]) + if task.port_mapping.logical_port_list: + break + wait_time -= 1 + time.sleep(1) + assert task.port_mapping.logical_port_list.count('Ethernet0') + assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 + assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0'] + assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1] + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) + task.notify_port_change_event(port_change_event) + wait_time = 5 + while wait_time > 0: + task.handle_port_change_event(task.event_queue, stopping_event, [False]) + if not task.port_mapping.logical_port_list: + break + wait_time -= 1 + time.sleep(1) + assert not task.port_mapping.logical_port_list + assert not task.port_mapping.logical_to_physical + assert not task.port_mapping.physical_to_logical + assert not task.port_mapping.logical_to_asic + + def test_SfpStateUpdateTask_task_run_stop(self): + port_mapping = PortMapping() + task = SfpStateUpdateTask(port_mapping) + sfp_error_event = multiprocessing.Event() + task.task_run(sfp_error_event, [False]) + assert wait_until(5, 1, task.task_process.is_alive) + task.task_stop() + assert wait_until(5, 1, lambda: task.task_process.is_alive() is False) + + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') + def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): + port_mapping = PortMapping() + task = SfpStateUpdateTask(port_mapping) + task.retry_eeprom_reading() + assert mock_post_sfp_info.call_count == 0 + + task.retry_eeprom_set.add('Ethernet0') + task.last_retry_eeprom_time = time.time() + task.retry_eeprom_reading() + assert mock_post_sfp_info.call_count == 0 + + task.last_retry_eeprom_time = 0 + mock_post_sfp_info.return_value = SFP_EEPROM_NOT_READY + task.retry_eeprom_reading() + assert 'Ethernet0' in task.retry_eeprom_set + + task.last_retry_eeprom_time = 0 + mock_post_sfp_info.return_value = None + task.retry_eeprom_reading() + assert 'Ethernet0' not in task.retry_eeprom_set + + def test_SfpStateUpdateTask_mapping_event_from_change_event(self): + port_mapping = PortMapping() + task = SfpStateUpdateTask(port_mapping) + port_dict = {} + assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL + assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL + + port_dict = {EVENT_ON_ALL_SFP: SYSTEM_FAIL} + assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL + + port_dict = {} + assert task._mapping_event_from_change_event(True, port_dict) == SYSTEM_BECOME_READY + assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_BECOME_READY + + port_dict = {1, SFP_STATUS_INSERTED} + assert task._mapping_event_from_change_event(True, port_dict) == NORMAL_EVENT + + @patch('time.sleep', MagicMock()) + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock()) + @patch('os.kill') + @patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event') + @patch('xcvrd.xcvrd._wrapper_get_transceiver_change_event') + @patch('xcvrd.xcvrd.del_port_sfp_dom_info_from_db') + @patch('xcvrd.xcvrd.notify_media_setting') + @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') + @patch('xcvrd.xcvrd.post_port_dom_info_to_db') + @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') + @patch('xcvrd.xcvrd.update_port_transceiver_status_table') + def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_info, mock_post_dom_info, mock_post_dom_th, mock_update_media_setting, mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill): + port_mapping = PortMapping() + task = SfpStateUpdateTask(port_mapping) + stop_event = multiprocessing.Event() + sfp_error_event = multiprocessing.Event() + mock_change_event.return_value = (True, {0:0}, {}) + mock_mapping_event.return_value = SYSTEM_NOT_READY + + # Test state machine: STATE_INIT + SYSTEM_NOT_READY event => STATE_INIT + SYSTEM_NOT_READY event ... => STATE_EXIT + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_os_kill.call_count == 1 + assert sfp_error_event.is_set() + + mock_mapping_event.return_value = SYSTEM_FAIL + mock_os_kill.reset_mock() + sfp_error_event.clear() + # Test state machine: STATE_INIT + SYSTEM_FAIL event => STATE_INIT + SYSTEM_FAIL event ... => STATE_EXIT + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_os_kill.call_count == 1 + assert sfp_error_event.is_set() + + mock_mapping_event.side_effect = [SYSTEM_BECOME_READY, SYSTEM_NOT_READY] + mock_os_kill.reset_mock() + sfp_error_event.clear() + # Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_NOT_READY event ... => STATE_EXIT + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_os_kill.call_count == 1 + assert not sfp_error_event.is_set() + + mock_mapping_event.side_effect = [SYSTEM_BECOME_READY, SYSTEM_FAIL] + [SYSTEM_FAIL] * (RETRY_TIMES_FOR_SYSTEM_READY + 1) + mock_os_kill.reset_mock() + sfp_error_event.clear() + # Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_FAIL event ... => STATE_INIT + # + SYSTEM_FAIL event ... => STATE_EXIT + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_os_kill.call_count == 1 + assert sfp_error_event.is_set() + + task.port_mapping.handle_port_change_event(PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)) + mock_change_event.return_value = (True, {1:SFP_STATUS_INSERTED}, {}) + mock_mapping_event.side_effect = None + mock_mapping_event.return_value = NORMAL_EVENT + mock_post_sfp_info.return_value = SFP_EEPROM_NOT_READY + stop_event.is_set = MagicMock(side_effect=[False, True]) + # Test state machine: handle SFP insert event, but EEPROM read failure + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_updata_status.call_count == 1 + assert mock_post_sfp_info.call_count == 2 # first call and retry call + assert mock_post_dom_info.call_count == 0 + assert mock_post_dom_th.call_count == 0 + assert mock_update_media_setting.call_count == 0 + assert 'Ethernet0' in task.retry_eeprom_set + task.retry_eeprom_set.clear() + + stop_event.is_set = MagicMock(side_effect=[False, True]) + mock_post_sfp_info.return_value = None + mock_updata_status.reset_mock() + mock_post_sfp_info.reset_mock() + # Test state machine: handle SFP insert event, and EEPROM read success + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_updata_status.call_count == 1 + assert mock_post_sfp_info.call_count == 1 + assert mock_post_dom_info.call_count == 1 + assert mock_post_dom_th.call_count == 1 + assert mock_update_media_setting.call_count == 1 + + stop_event.is_set = MagicMock(side_effect=[False, True]) + mock_change_event.return_value = (True, {1:SFP_STATUS_REMOVED}, {}) + mock_updata_status.reset_mock() + # Test state machine: handle SFP remove event + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_updata_status.call_count == 1 + assert mock_del_dom.call_count == 1 + + stop_event.is_set = MagicMock(side_effect=[False, True]) + error = int(SFP_STATUS_INSERTED) | SfpBase.SFP_ERROR_BIT_BLOCKING | SfpBase.SFP_ERROR_BIT_POWER_BUDGET_EXCEEDED + mock_change_event.return_value = (True, {1:error}, {}) + mock_updata_status.reset_mock() + mock_del_dom.reset_mock() + # Test state machine: handle SFP error event + task.task_worker(stop_event, sfp_error_event, [False], task.event_queue) + assert mock_updata_status.call_count == 1 + assert mock_del_dom.call_count == 1 + + @patch('xcvrd.xcvrd.xcvr_table_helper') + @patch('xcvrd.xcvrd._wrapper_get_presence') + @patch('xcvrd.xcvrd.notify_media_setting') + @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') + @patch('xcvrd.xcvrd.post_port_dom_info_to_db') + @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') + @patch('xcvrd.xcvrd.update_port_transceiver_status_table') + def test_SfpStateUpdateTask_on_add_logical_port(self, mock_updata_status, mock_post_sfp_info, mock_post_dom_info, mock_post_dom_th, mock_update_media_setting, mock_get_presence, mock_table_helper): + class MockTable: + pass + + status_tbl = MockTable() + status_tbl.get = MagicMock(return_value=(True, (('status', SFP_STATUS_INSERTED),))) + status_tbl.set = MagicMock() + int_tbl = MockTable() + int_tbl.get = MagicMock(return_value=(True, (('key2', 'value2'),))) + int_tbl.set = MagicMock() + dom_tbl = MockTable() + dom_tbl.get = MagicMock(return_value=(True, (('key3', 'value3'),))) + dom_tbl.set = MagicMock() + mock_table_helper.get_status_tbl = MagicMock(return_value=status_tbl) + mock_table_helper.get_int_tbl = MagicMock(return_value=int_tbl) + mock_table_helper.get_dom_tbl = MagicMock(return_value=dom_tbl) + + port_mapping = PortMapping() + task = SfpStateUpdateTask(port_mapping) + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.port_mapping.handle_port_change_event(port_change_event) + # SFP information is in the DB, copy the SFP information for the newly added logical port + task.on_add_logical_port(port_change_event) + status_tbl.get.assert_called_with('Ethernet0') + status_tbl.set.assert_called_with('Ethernet0', (('status', SFP_STATUS_INSERTED),)) + int_tbl.get.assert_called_with('Ethernet0') + int_tbl.set.assert_called_with('Ethernet0', (('key2', 'value2'),)) + dom_tbl.get.assert_called_with('Ethernet0') + dom_tbl.set.assert_called_with('Ethernet0', (('key3', 'value3'),)) + + status_tbl.get.return_value = (False, ()) + mock_get_presence.return_value = True + mock_post_sfp_info.return_value = SFP_EEPROM_NOT_READY + # SFP information is not in the DB, and SFP is present, and SFP has no error, but SFP EEPROM reading failed + task.on_add_logical_port(port_change_event) + assert mock_updata_status.call_count == 1 + mock_updata_status.assert_called_with('Ethernet0', status_tbl, SFP_STATUS_INSERTED, 'N/A') + assert mock_post_sfp_info.call_count == 1 + mock_post_sfp_info.assert_called_with('Ethernet0', task.port_mapping, int_tbl, {}) + assert mock_post_dom_info.call_count == 0 + assert mock_post_dom_th.call_count == 0 + assert mock_update_media_setting.call_count == 0 + assert 'Ethernet0' in task.retry_eeprom_set + task.retry_eeprom_set.clear() + + mock_post_sfp_info.return_value = None + mock_updata_status.reset_mock() + mock_post_sfp_info.reset_mock() + # SFP information is not in the DB, and SFP is present, and SFP has no error, and SFP EEPROM reading succeed + task.on_add_logical_port(port_change_event) + assert mock_updata_status.call_count == 1 + mock_updata_status.assert_called_with('Ethernet0', status_tbl, SFP_STATUS_INSERTED, 'N/A') + assert mock_post_sfp_info.call_count == 1 + mock_post_sfp_info.assert_called_with('Ethernet0', task.port_mapping, int_tbl, {}) + assert mock_post_dom_info.call_count == 1 + mock_post_dom_info.assert_called_with('Ethernet0', task.port_mapping, dom_tbl) + assert mock_post_dom_th.call_count == 1 + mock_post_dom_th.assert_called_with('Ethernet0', task.port_mapping, dom_tbl) + assert mock_update_media_setting.call_count == 1 + assert 'Ethernet0' not in task.retry_eeprom_set + + mock_get_presence.return_value = False + mock_updata_status.reset_mock() + # SFP information is not in DB and SFP is not present + task.on_add_logical_port(port_change_event) + assert mock_updata_status.call_count == 1 + mock_updata_status.assert_called_with('Ethernet0', status_tbl, SFP_STATUS_REMOVED, 'N/A') + + task.sfp_error_dict[1] = (str(SfpBase.SFP_ERROR_BIT_BLOCKING | SfpBase.SFP_ERROR_BIT_POWER_BUDGET_EXCEEDED), {}) + mock_updata_status.reset_mock() + # SFP information is not in DB, and SFP is not present, and SFP is in error status + task.on_add_logical_port(port_change_event) + assert mock_updata_status.call_count == 1 + mock_updata_status.assert_called_with('Ethernet0', status_tbl, task.sfp_error_dict[1][0], 'Blocking EEPROM from being read|Power budget exceeded') + def test_sfp_insert_events(self): from xcvrd.xcvrd import _wrapper_soak_sfp_insert_event sfp_insert_events = {} @@ -359,4 +771,17 @@ def test_sfp_remove_events(self): time.sleep(1) _wrapper_soak_sfp_insert_event(sfp_insert_events, removal) - assert port_dict == removal \ No newline at end of file + assert port_dict == removal + + +def wait_until(total_wait_time, interval, call_back, *args, **kwargs): + wait_time = 0 + while wait_time <= total_wait_time: + try: + if call_back(*args, **kwargs): + return True + except: + pass + time.sleep(interval) + wait_time += interval + return False diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 9b1eade74..be1d43ee0 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -7,9 +7,11 @@ try: import ast + import copy import json import multiprocessing import os + import queue import signal import sys import threading @@ -21,6 +23,7 @@ from .xcvrd_utilities import sfp_status_helper from .xcvrd_utilities import y_cable_helper + from .xcvrd_utilities.port_mapping import PortMapping, PortChangeEvent except ImportError as e: raise ImportError(str(e) + " - required module not found") @@ -46,8 +49,8 @@ SFP_INSERT_EVENT_POLL_PERIOD_MSECS = 1000 DOM_INFO_UPDATE_PERIOD_SECS = 60 +STATE_MACHINE_UPDATE_PERIOD_MSECS = 6000 TIME_FOR_SFP_READY_SECS = 1 -XCVRD_MAIN_THREAD_SLEEP_SECS = 60 EVENT_ON_ALL_SFP = '-1' # events definition @@ -79,12 +82,13 @@ POWER_UNIT = 'dBm' BIAS_UNIT = 'mA' -media_settings = '' g_dict = {} # Global platform specific sfputil class instance platform_sfputil = None # Global chassis object based on new platform api platform_chassis = None +# Global xcvr table helper +xcvr_table_helper = None # Global logger instance for helper functions and classes # TODO: Refactor so that we only need the logger inherited @@ -95,26 +99,11 @@ # Helper functions ============================================================= # -# Find out the underneath physical port list by logical name - - -def logical_port_name_to_physical_port_list(port_name): - try: - return [int(port_name)] - except ValueError: - if platform_sfputil.is_logical_port(port_name): - return platform_sfputil.get_logical_to_physical(port_name) - else: - helper_logger.log_error("Invalid port '{}'".format(port_name)) - return None - # Get physical port name def get_physical_port_name(logical_port, physical_port, ganged): - if logical_port == physical_port: - return logical_port - elif ganged: + if ganged: return logical_port + ":{} (ganged)".format(physical_port) else: return logical_port @@ -282,12 +271,13 @@ def beautify_dom_threshold_info_dict(dom_info_dict): # Update port sfp info in db -def post_port_sfp_info_to_db(logical_port_name, table, transceiver_dict, + +def post_port_sfp_info_to_db(logical_port_name, port_mapping, table, transceiver_dict, stop_event=threading.Event()): ganged_port = False ganged_member_num = 1 - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) return PHYSICAL_PORT_NOT_EXIST @@ -343,12 +333,12 @@ def post_port_sfp_info_to_db(logical_port_name, table, transceiver_dict, # Update port dom threshold info in db -def post_port_dom_threshold_info_to_db(logical_port_name, table, - stop=threading.Event()): +def post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, table, + stop=threading.Event(), dom_th_info_cache=None): ganged_port = False ganged_member_num = 1 - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) return PHYSICAL_PORT_NOT_EXIST @@ -368,7 +358,14 @@ def post_port_dom_threshold_info_to_db(logical_port_name, table, ganged_member_num += 1 try: - dom_info_dict = _wrapper_get_transceiver_dom_threshold_info(physical_port) + if dom_th_info_cache is not None and physical_port in dom_th_info_cache: + # If cache is enabled and there is a cache, no need read from EEPROM, just read from cache + dom_info_dict = dom_th_info_cache[physical_port] + else: + dom_info_dict = _wrapper_get_transceiver_dom_threshold_info(physical_port) + if dom_th_info_cache is not None: + # If cache is enabled, put dom threshold infomation to cache + dom_th_info_cache[physical_port] = dom_info_dict if dom_info_dict is not None: beautify_dom_threshold_info_dict(dom_info_dict) fvs = swsscommon.FieldValuePairs( @@ -404,11 +401,11 @@ def post_port_dom_threshold_info_to_db(logical_port_name, table, # Update port dom sensor info in db -def post_port_dom_info_to_db(logical_port_name, table, stop_event=threading.Event()): +def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event=threading.Event(), dom_info_cache=None): ganged_port = False ganged_member_num = 1 - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) return PHYSICAL_PORT_NOT_EXIST @@ -427,7 +424,14 @@ def post_port_dom_info_to_db(logical_port_name, table, stop_event=threading.Even ganged_member_num += 1 try: - dom_info_dict = _wrapper_get_transceiver_dom_info(physical_port) + if dom_info_cache is not None and physical_port in dom_info_cache: + # If cache is enabled and dom information is in cache, just read from cache, no need read from EEPROM + dom_info_dict = dom_info_cache[physical_port] + else: + dom_info_dict = _wrapper_get_transceiver_dom_info(physical_port) + if dom_info_cache is not None: + # If cache is enabled, put dom information to cache + dom_info_cache[physical_port] = dom_info_dict if dom_info_dict is not None: beautify_dom_info_dict(dom_info_dict, physical_port) if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD': @@ -489,48 +493,38 @@ def post_port_dom_info_to_db(logical_port_name, table, stop_event=threading.Even # Update port dom/sfp info in db -def post_port_sfp_dom_info_to_db(is_warm_start, stop_event=threading.Event()): +def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, stop_event=threading.Event()): # Connect to STATE_DB and create transceiver dom/sfp info tables - transceiver_dict, state_db, appl_db, int_tbl, dom_tbl, app_port_tbl = {}, {}, {}, {}, {}, {} - - # Get the namespaces in the platform - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) - int_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_INFO_TABLE) - dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) - app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db[asic_id], swsscommon.APP_PORT_TABLE_NAME) + transceiver_dict = {} # Post all the current interface dom/sfp info to STATE_DB - logical_port_list = platform_sfputil.logical + logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: if stop_event.is_set(): break # Get the asic to which this port belongs - asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + asic_index = port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: - logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - post_port_sfp_info_to_db(logical_port_name, int_tbl[asic_index], transceiver_dict, stop_event) - post_port_dom_info_to_db(logical_port_name, dom_tbl[asic_index], stop_event) - post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl[asic_index], stop_event) + post_port_sfp_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict, stop_event) + post_port_dom_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event) + post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), stop_event) # Do not notify media settings during warm reboot to avoid dataplane traffic impact if is_warm_start == False: - notify_media_setting(logical_port_name, transceiver_dict, app_port_tbl[asic_index]) + notify_media_setting(logical_port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), port_mapping) transceiver_dict.clear() # Delete port dom/sfp info from db -def del_port_sfp_dom_info_from_db(logical_port_name, int_tbl, dom_tbl): +def del_port_sfp_dom_info_from_db(logical_port_name, port_mapping, int_tbl, dom_tbl): ganged_port = False ganged_member_num = 1 - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) return PHYSICAL_PORT_NOT_EXIST @@ -552,27 +546,6 @@ def del_port_sfp_dom_info_from_db(logical_port_name, int_tbl, dom_tbl): helper_logger.log_error("This functionality is currently not implemented for this platform") sys.exit(NOT_IMPLEMENTED_ERROR) -# recover missing sfp table entries if any - - -def recover_missing_sfp_table_entries(sfp_util, int_tbl, status_tbl, stop_event): - transceiver_dict = {} - - logical_port_list = sfp_util.logical - for logical_port_name in logical_port_list: - if stop_event.is_set(): - break - - # Get the asic to which this port belongs - asic_index = sfp_util.get_asic_id_for_logical_port(logical_port_name) - if asic_index is None: - logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) - continue - - keys = int_tbl[asic_index].getKeys() - if logical_port_name not in keys and not sfp_status_helper.detect_port_in_error_status(logical_port_name, status_tbl[asic_index]): - post_port_sfp_info_to_db(logical_port_name, int_tbl[asic_index], transceiver_dict, stop_event) - def check_port_in_range(range_str, physical_port): RANGE_SEPARATOR = '-' @@ -747,14 +720,14 @@ def get_media_val_str(num_logical_ports, lane_dict, logical_idx): def notify_media_setting(logical_port_name, transceiver_dict, - app_port_tbl): - if len(media_settings) == 0: + app_port_tbl, port_mapping): + if not g_dict: return ganged_port = False ganged_member_num = 1 - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("Error: No physical ports found for logical port '{}'".format(logical_port_name)) return PHYSICAL_PORT_NOT_EXIST @@ -763,7 +736,7 @@ def notify_media_setting(logical_port_name, transceiver_dict, ganged_port = True for physical_port in physical_port_list: - logical_port_list = platform_sfputil.get_physical_to_logical(physical_port) + logical_port_list = port_mapping.get_physical_to_logical(physical_port) num_logical_ports = len(logical_port_list) logical_idx = logical_port_list.index(logical_port_name) if not _wrapper_get_presence(physical_port): @@ -822,42 +795,32 @@ def delete_port_from_status_table(logical_port_name, status_tbl): # Init TRANSCEIVER_STATUS table -def init_port_sfp_status_tbl(stop_event=threading.Event()): - # Connect to STATE_DB and create transceiver status table - state_db, status_tbl = {}, {} - - # Get the namespaces in the platform - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) - +def init_port_sfp_status_tbl(port_mapping, stop_event=threading.Event()): # Init TRANSCEIVER_STATUS table - logical_port_list = platform_sfputil.logical + logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: if stop_event.is_set(): break # Get the asic to which this port belongs - asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + asic_index = port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: - logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) - update_port_transceiver_status_table(logical_port_name, status_tbl[asic_index], sfp_status_helper.SFP_STATUS_REMOVED) + update_port_transceiver_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_REMOVED) for physical_port in physical_port_list: if stop_event.is_set(): break if not _wrapper_get_presence(physical_port): - update_port_transceiver_status_table(logical_port_name, status_tbl[asic_index], sfp_status_helper.SFP_STATUS_REMOVED) + update_port_transceiver_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_REMOVED) else: - update_port_transceiver_status_table(logical_port_name, status_tbl[asic_index], sfp_status_helper.SFP_STATUS_INSERTED) + update_port_transceiver_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED) # # Helper classes =============================================================== @@ -867,40 +830,39 @@ def init_port_sfp_status_tbl(stop_event=threading.Event()): class DomInfoUpdateTask(object): - def __init__(self): + def __init__(self, port_mapping): self.task_thread = None self.task_stopping_event = threading.Event() + self.event_queue = queue.Queue() + self.port_mapping = copy.deepcopy(port_mapping) def task_worker(self, y_cable_presence): helper_logger.log_info("Start DOM monitoring loop") - - # Connect to STATE_DB and create transceiver dom info table - state_db, dom_tbl, status_tbl = {}, {}, {} mux_tbl = {} - - # Get the namespaces in the platform - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) - status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) + dom_info_cache = {} + dom_th_info_cache = {} # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): - logical_port_list = platform_sfputil.logical + # Clear the cache at the begin of the loop to make sure it will be clear each time + dom_info_cache.clear() + dom_th_info_cache.clear() + + # Handle port change event from main thread + self.handle_port_change_event() + logical_port_list = self.port_mapping.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs - asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: - logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - if not sfp_status_helper.detect_port_in_error_status(logical_port_name, status_tbl[asic_index]): - post_port_dom_info_to_db(logical_port_name, dom_tbl[asic_index], self.task_stopping_event) - post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl[asic_index], self.task_stopping_event) + if not sfp_status_helper.detect_port_in_error_status(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index)): + post_port_dom_info_to_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_info_cache=dom_info_cache) + post_port_dom_threshold_info_to_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_th_info_cache=dom_th_info_cache) if y_cable_presence[0] is True: - y_cable_helper.check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_index, logical_port_name) + y_cable_helper.check_identifier_presence_and_update_mux_info_entry(xcvr_table_helper.get_state_db(asic_index), mux_tbl, asic_index, logical_port_name) helper_logger.log_info("Stop DOM monitoring loop") @@ -915,13 +877,59 @@ def task_stop(self): self.task_stopping_event.set() self.task_thread.join() + def notify_port_change_event(self, port_change_event): + """Called by main thread. When main thread detects a port configuration add/remove, it puts an event to + the event queue + + Args: + port_change_event (object): port change event + """ + self.event_queue.put_nowait(port_change_event) + + def handle_port_change_event(self): + """Handle port change event until the queue is empty, update local port mapping and DB data accordingly. + """ + while True: + try: + port_change_event = self.event_queue.get_nowait() + if port_change_event.event_type == PortChangeEvent.PORT_REMOVE: + self.on_remove_logical_port(port_change_event) + self.port_mapping.handle_port_change_event(port_change_event) + except queue.Empty: + break + + def on_remove_logical_port(self, port_change_event): + """Called when a logical port is removed from CONFIG_DB + + Args: + port_change_event (object): port change event + """ + # To avoid race condition, remove the entry TRANSCEIVER_DOM_INFO table. + # This thread only update TRANSCEIVER_DOM_INFO table, so we don't have to remove entries from + # TRANSCEIVER_INFO and TRANSCEIVER_STATUS_INFO + del_port_sfp_dom_info_from_db(port_change_event.port_name, + self.port_mapping, + None, + xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) + + # Process wrapper class to update sfp state info periodically class SfpStateUpdateTask(object): - def __init__(self): + RETRY_EEPROM_READING_INTERVAL = 60 + def __init__(self, port_mapping): self.task_process = None self.task_stopping_event = multiprocessing.Event() + self.event_queue = multiprocessing.Queue() + self.port_mapping = copy.deepcopy(port_mapping) + # A set to hold those SFPs who fail to read EEPROM + self.retry_eeprom_set = set() + # To avoid retry EEPROM read too fast, record the last EEPROM read timestamp in this member + self.last_retry_eeprom_time = 0 + # A dict to hold SFP error event, for SFP insert/remove event, it is not necessary to cache them + # because _wrapper_get_presence returns the SFP presence status + self.sfp_error_dict = {} self.sfp_insert_events = {} def _mapping_event_from_change_event(self, status, port_dict): @@ -948,26 +956,10 @@ def _mapping_event_from_change_event(self, status, port_dict): helper_logger.log_debug("mapping from {} {} to {}".format(status, port_dict, event)) return event - def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): + def task_worker(self, stopping_event, sfp_error_event, y_cable_presence, event_queue): helper_logger.log_info("Start SFP monitoring loop") transceiver_dict = {} - # Connect to STATE_DB and create transceiver dom/sfp info tables - state_db, appl_db, int_tbl, dom_tbl, status_tbl, app_port_tbl = {}, {}, {}, {}, {}, {} - - # Get the namespaces in the platform - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - int_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_INFO_TABLE) - dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) - status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) - - # Connect to APPL_DB to notify Media notifications - appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) - app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db[asic_id], swsscommon.APP_PORT_TABLE_NAME) - # Start main loop to listen to the SFP change event. # The state migrating sequence: # 1. When the system starts, it is in "INIT" state, calling get_transceiver_change_event @@ -1038,6 +1030,10 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): timeout = RETRY_PERIOD_FOR_SYSTEM_READY_MSECS state = STATE_INIT while not stopping_event.is_set(): + self.handle_port_change_event(event_queue, stopping_event, y_cable_presence) + + # Retry those logical ports whose EEPROM reading failed or timeout when the SFP is inserted + self.retry_eeprom_reading() next_state = state time_start = time.time() # Ensure not to block for any event if sfp insert event is pending @@ -1093,41 +1089,53 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): # 1. the state has been normal before got the event # 2. the state was init and transition to normal after got the event. # this is for the vendors who don't implement "system_not_ready/system_becom_ready" logic + logical_port_dict = {} for key, value in port_dict.items(): - logical_port_list = platform_sfputil.get_physical_to_logical(int(key)) + # SFP error event should be cached because: when a logical port is created, there is no way to + # detect the SFP error by platform API. + if value != sfp_status_helper.SFP_STATUS_INSERTED and value != sfp_status_helper.SFP_STATUS_REMOVED: + self.sfp_error_dict[key] = (value, error_dict) + else: + self.sfp_error_dict.pop(key, None) + logical_port_list = self.port_mapping.get_physical_to_logical(key) if logical_port_list is None: helper_logger.log_warning("Got unknown FP port index {}, ignored".format(key)) continue for logical_port in logical_port_list: - + logical_port_dict[logical_port] = value # Get the asic to which this port belongs - asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port) + asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port) if asic_index is None: - logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port)) + helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port)) continue if value == sfp_status_helper.SFP_STATUS_INSERTED: helper_logger.log_info("Got SFP inserted event") # A plugin event will clear the error state. update_port_transceiver_status_table( - logical_port, status_tbl[asic_index], sfp_status_helper.SFP_STATUS_INSERTED) + logical_port, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED) helper_logger.log_info("receive plug in and update port sfp status table.") - rc = post_port_sfp_info_to_db(logical_port, int_tbl[asic_index], transceiver_dict) + rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict) # If we didn't get the sfp info, assuming the eeprom is not ready, give a try again. if rc == SFP_EEPROM_NOT_READY: helper_logger.log_warning("SFP EEPROM is not ready. One more try...") time.sleep(TIME_FOR_SFP_READY_SECS) - post_port_sfp_info_to_db(logical_port, int_tbl[asic_index], transceiver_dict) - post_port_dom_info_to_db(logical_port, dom_tbl[asic_index]) - post_port_dom_threshold_info_to_db(logical_port, dom_tbl[asic_index]) - notify_media_setting(logical_port, transceiver_dict, app_port_tbl[asic_index]) - transceiver_dict.clear() + rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict) + if rc == SFP_EEPROM_NOT_READY: + # If still failed to read EEPROM, put it to retry set + self.retry_eeprom_set.add(logical_port) + + if rc != SFP_EEPROM_NOT_READY: + post_port_dom_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) + post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) + notify_media_setting(logical_port, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) + transceiver_dict.clear() elif value == sfp_status_helper.SFP_STATUS_REMOVED: helper_logger.log_info("Got SFP removed event") update_port_transceiver_status_table( - logical_port, status_tbl[asic_index], sfp_status_helper.SFP_STATUS_REMOVED) - helper_logger.log_info("receive plug out and update port sfp status table.") - del_port_sfp_dom_info_from_db(logical_port, int_tbl[asic_index], dom_tbl[asic_index]) + logical_port, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_REMOVED) + helper_logger.log_info("receive plug out and pdate port sfp status table.") + del_port_sfp_dom_info_from_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) else: try: error_bits = int(value) @@ -1144,18 +1152,18 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): # Add error info to database # Any existing error will be replaced by the new one. - update_port_transceiver_status_table(logical_port, status_tbl[asic_index], value, '|'.join(error_descriptions)) + update_port_transceiver_status_table(logical_port, xcvr_table_helper.get_status_tbl(asic_index), value, '|'.join(error_descriptions)) helper_logger.log_info("Receive error update port sfp status table.") # In this case EEPROM is not accessible. The DOM info will be removed since it can be out-of-date. # The interface info remains in the DB since it is static. if sfp_status_helper.is_error_block_eeprom_reading(error_bits): - del_port_sfp_dom_info_from_db(logical_port, None, dom_tbl[asic_index]) + del_port_sfp_dom_info_from_db(logical_port, None, xcvr_table_helper.get_dom_tbl(asic_index)) except (TypeError, ValueError) as e: - logger.log_error("Got unrecognized event {}, ignored".format(value)) + helper_logger.log_error("Got unrecognized event {}, ignored".format(value)) # Since ports could be connected to a mux cable, if there is a change event process the change for being on a Y cable Port y_cable_helper.change_ports_status_for_y_cable_change_event( - port_dict, y_cable_presence, stopping_event) + logical_port_dict, self.port_mapping, y_cable_presence, stopping_event) else: next_state = STATE_EXIT elif event == SYSTEM_FAIL: @@ -1190,7 +1198,7 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): os.kill(os.getppid(), signal.SIGTERM) break elif next_state == STATE_NORMAL: - timeout = 0 + timeout = STATE_MACHINE_UPDATE_PERIOD_MSECS helper_logger.log_info("Stop SFP monitoring loop") @@ -1199,13 +1207,189 @@ def task_run(self, sfp_error_event, y_cable_presence): return self.task_process = multiprocessing.Process(target=self.task_worker, args=( - self.task_stopping_event, sfp_error_event, y_cable_presence)) + self.task_stopping_event, sfp_error_event, y_cable_presence, self.event_queue)) self.task_process.start() def task_stop(self): self.task_stopping_event.set() os.kill(self.task_process.pid, signal.SIGKILL) + def notify_port_change_event(self, port_change_event): + """Called by main thread. When main thread detects a port configuration add/remove, it puts an event to + the event queue + + Args: + port_change_event (object): port change event + """ + self.event_queue.put_nowait(port_change_event) + + def handle_port_change_event(self, event_queue, stopping_event, y_cable_presence): + """Handle port change event until the queue is empty, update local port mapping and DB data accordingly. + """ + while True: + try: + port_change_event = event_queue.get_nowait() + if port_change_event.event_type == PortChangeEvent.PORT_REMOVE: + self.on_remove_logical_port(port_change_event) + # Update y_cable related database once a logical port is removed + y_cable_helper.change_ports_status_for_y_cable_change_event( + {port_change_event.port_name:sfp_status_helper.SFP_STATUS_REMOVED}, + self.port_mapping, + y_cable_presence, + stopping_event) + self.port_mapping.handle_port_change_event(port_change_event) + elif port_change_event.event_type == PortChangeEvent.PORT_ADD: + self.port_mapping.handle_port_change_event(port_change_event) + logical_port_event_dict = self.on_add_logical_port(port_change_event) + # Update y_cable related database once a logical port is added + y_cable_helper.change_ports_status_for_y_cable_change_event( + logical_port_event_dict, + self.port_mapping, + y_cable_presence, + stopping_event) + except queue.Empty: + break + + def on_remove_logical_port(self, port_change_event): + """Called when a logical port is removed from CONFIG_DB. + + Args: + port_change_event (object): port change event + """ + # To avoid race condition, remove the entry TRANSCEIVER_DOM_INFO, TRANSCEIVER_STATUS_INFO and TRANSCEIVER_INFO table. + # The operation to remove entry from TRANSCEIVER_DOM_INFO is duplicate with DomInfoUpdateTask.on_remove_logical_port, + # but it is necessary because TRANSCEIVER_DOM_INFO is also updated in this sub process when a new SFP is inserted. + del_port_sfp_dom_info_from_db(port_change_event.port_name, + self.port_mapping, + xcvr_table_helper.get_int_tbl(port_change_event.asic_id), + xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) + delete_port_from_status_table(port_change_event.port_name, xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) + + # The logical port has been removed, no need retry EEPROM reading + if port_change_event.port_name in self.retry_eeprom_set: + self.retry_eeprom_set.remove(port_change_event.port_name) + + def on_add_logical_port(self, port_change_event): + """Called when a logical port is added + + Args: + port_change_event (object): port change event + + Returns: + dict: key is logical port name, value is SFP status + """ + # A logical port is created. There could be 3 cases: + # 1. SFP information is already in DB, which means that a logical port with the same physical index is in DB before. + # Need copy the data from existing logical port and insert it into TRANSCEIVER_DOM_INFO, TRANSCEIVER_STATUS_INFO + # and TRANSCEIVER_INFO table. + # 2. SFP information is not in DB and SFP is present. Need query the SFP status by platform API and insert the + # data to DB. + # 3. SFP information is not in DB and SFP is not present. Only update TRANSCEIVER_STATUS_INFO table. + logical_port_event_dict = {} + sfp_status = None + sibling_port = None + status_tbl = xcvr_table_helper.get_status_tbl(port_change_event.asic_id) + int_tbl = xcvr_table_helper.get_int_tbl(port_change_event.asic_id) + dom_tbl = xcvr_table_helper.get_dom_tbl(port_change_event.asic_id) + physical_port_list = self.port_mapping.logical_port_name_to_physical_port_list(port_change_event.port_name) + + # Try to find a logical port with same physical index in DB + for physical_port in physical_port_list: + logical_port_list = self.port_mapping.get_physical_to_logical(physical_port) + if not logical_port_list: + continue + + for logical_port in logical_port_list: + found, sfp_status = status_tbl.get(logical_port) + if found: + sibling_port = logical_port + break + + if sfp_status: + break + + if sfp_status: + # SFP information is in DB + status_tbl.set(port_change_event.port_name, sfp_status) + logical_port_event_dict[port_change_event.port_name] = dict(sfp_status)['status'] # TODO + found, sfp_info = int_tbl.get(sibling_port) + if found: + int_tbl.set(port_change_event.port_name, sfp_info) + found, dom_info = dom_tbl.get(sibling_port) + if found: + dom_tbl.set(port_change_event.port_name, dom_info) + else: + error_description = 'N/A' + status = None + read_eeprom = True + if port_change_event.port_index in self.sfp_error_dict: + value, error_dict = self.sfp_error_dict[port_change_event.port_index] + status = value + error_bits = int(value) + helper_logger.log_info("Got SFP error event {}".format(value)) + + error_descriptions = sfp_status_helper.fetch_generic_error_description(error_bits) + + if sfp_status_helper.has_vendor_specific_error(error_bits): + if error_dict: + vendor_specific_error_description = error_dict.get(port_change_event.port_index) + else: + vendor_specific_error_description = _wrapper_get_sfp_error_description(port_change_event.port_index) + error_descriptions.append(vendor_specific_error_description) + + error_description = '|'.join(error_descriptions) + helper_logger.log_info("Receive error update port sfp status table.") + if sfp_status_helper.is_error_block_eeprom_reading(error_bits): + read_eeprom = False + + # SFP information not in DB + if _wrapper_get_presence(port_change_event.port_index) and read_eeprom: + logical_port_event_dict[port_change_event.port_name] = sfp_status_helper.SFP_STATUS_INSERTED + transceiver_dict = {} + status = sfp_status_helper.SFP_STATUS_INSERTED if not status else status + rc = post_port_sfp_info_to_db(port_change_event.port_name, self.port_mapping, int_tbl, transceiver_dict) + if rc == SFP_EEPROM_NOT_READY: + # Failed to read EEPROM, put it to retry set + self.retry_eeprom_set.add(port_change_event.port_name) + else: + post_port_dom_info_to_db(port_change_event.port_name, self.port_mapping, dom_tbl) + post_port_dom_threshold_info_to_db(port_change_event.port_name, self.port_mapping, dom_tbl) + notify_media_setting(port_change_event.port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(port_change_event.asic_id), self.port_mapping) + else: + status = sfp_status_helper.SFP_STATUS_REMOVED if not status else status + logical_port_event_dict[port_change_event.port_name] = status + update_port_transceiver_status_table(port_change_event.port_name, status_tbl, status, error_description) + return logical_port_event_dict + + def retry_eeprom_reading(self): + """Retry EEPROM reading, if retry succeed, remove the logical port from the retry set + """ + if not self.retry_eeprom_set: + return + + # Retry eeprom with an interval RETRY_EEPROM_READING_INTERVAL. No need to put sleep here + # because _wrapper_get_transceiver_change_event has a timeout argument. + now = time.time() + if now - self.last_retry_eeprom_time < self.RETRY_EEPROM_READING_INTERVAL: + return + + self.last_retry_eeprom_time = now + + transceiver_dict = {} + retry_success_set = set() + for logical_port in self.retry_eeprom_set: + asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port) + rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), transceiver_dict) + if rc != SFP_EEPROM_NOT_READY: + post_port_dom_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) + post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) + notify_media_setting(logical_port, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) + transceiver_dict.clear() + retry_success_set.add(logical_port) + # Update retry EEPROM set + self.retry_eeprom_set -= retry_success_set + + # # Daemon ======================================================================= # @@ -1214,12 +1398,11 @@ def task_stop(self): class DaemonXcvrd(daemon_base.DaemonBase): def __init__(self, log_identifier): super(DaemonXcvrd, self).__init__(log_identifier) - - self.timeout = XCVRD_MAIN_THREAD_SLEEP_SECS - self.num_asics = multi_asic.get_num_asics() self.stop_event = threading.Event() self.sfp_error_event = multiprocessing.Event() self.y_cable_presence = [False] + self.port_change_event_observers = [] + self.port_mapping = PortMapping() # Signal handler def signal_handler(self, sig, frame): @@ -1256,24 +1439,103 @@ def wait_for_port_config_done(self, namespace): if key in ["PortConfigDone", "PortInitDone"]: break + def handle_port_config_change(self): + """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers + """ + sel = swsscommon.Select() + asic_context = {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + sst = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) + asic_context[sst] = asic_id + sel.addSelectable(sst) + + while not self.stop_event.is_set(): + (state, _) = sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + continue + if state != swsscommon.Select.OBJECT: + self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") + continue + + for sst in asic_context.keys(): + while True: + (key, op, fvp) = sst.pop() + if not key: + break + if op == swsscommon.SET_COMMAND: + # Only process new logical port, here we assume that the physical index for a logical port should + # never change, if user configure a new value for the "index" field for exsitng locial port, it is + # an error + if not self.port_mapping.is_logical_port(key): + fvp = dict(fvp) + if 'index' in fvp: + port_change_event = PortChangeEvent(key, fvp['index'], asic_context[sst], PortChangeEvent.PORT_ADD) + self.notify_port_change_event(port_change_event) + elif op == swsscommon.DEL_COMMAND: + if self.port_mapping.is_logical_port(key): + fvp = dict(fvp) + port_change_event = PortChangeEvent(key, + self.port_mapping.get_logical_to_physical(key)[0], + asic_context[sst], + PortChangeEvent.PORT_REMOVE) + self.notify_port_change_event(port_change_event) + else: + self.log_warning("Invalid DB operation: {}".format(op)) + + def notify_port_change_event(self, port_change_event): + """Notify observers that there is a port change event + + Args: + port_change_event (object): port change event + """ + + self.log_notice('Sending port change event {} to observers'.format(port_change_event)) + for observer in self.port_change_event_observers: + observer.notify_port_change_event(port_change_event) + self.port_mapping.handle_port_change_event(port_change_event) + + def subscribe_port_change_event(self, observer): + """Subscribe port change event + + Args: + observer (object): observer who listen to port change event + """ + self.port_change_event_observers.append(observer) + def load_media_settings(self): - global media_settings global g_dict - (platform_path, hwsku_path) = device_info.get_paths_to_platform_and_hwsku_dirs() + (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() media_settings_file_path = os.path.join(platform_path, "media_settings.json") if not os.path.isfile(media_settings_file_path): self.log_info("xcvrd: No media file exists") return {} - media_file = open(media_settings_file_path, "r") - media_settings = media_file.read() - g_dict = json.loads(media_settings) + with open(media_settings_file_path, "r") as media_file: + g_dict = json.load(media_file) + def init_port_mapping(self): + """Initialize port mapping from CONFIG_DB for the first run + """ + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + port_table = swsscommon.Table(config_db, swsscommon.CFG_PORT_TABLE_NAME) + for key in port_table.getKeys(): + _, port_config = port_table.get(key) + port_config_dict = dict(port_config) + port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) + self.port_mapping.handle_port_change_event(port_change_event) + # Initialize daemon def init(self): global platform_sfputil global platform_chassis + global xcvr_table_helper self.log_info("Start daemon init...") @@ -1304,31 +1566,8 @@ def init(self): # Load the namespace details first from the database_global.json file. swsscommon.SonicDBConfig.initializeGlobalConfig() - # Load port info - try: - if multi_asic.is_multi_asic(): - # For multi ASIC platforms we pass DIR of port_config_file_path and the number of asics - (platform_path, hwsku_path) = device_info.get_paths_to_platform_and_hwsku_dirs() - platform_sfputil.read_all_porttab_mappings(hwsku_path, self.num_asics) - else: - # For single ASIC platforms we pass port_config_file_path and the asic_inst as 0 - port_config_file_path = device_info.get_path_to_port_config_file() - platform_sfputil.read_porttab_mappings(port_config_file_path, 0) - except Exception as e: - self.log_error("Failed to read port info: {}".format(str(e)), True) - sys.exit(PORT_CONFIG_LOAD_ERROR) - - # Connect to STATE_DB and create transceiver dom/sfp info tables - state_db, self.int_tbl, self.dom_tbl, self.status_tbl = {}, {}, {}, {} - - # Get the namespaces in the platform - namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - self.int_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_INFO_TABLE) - self.dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) - self.status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) + # Initialize xcvr table helper + xcvr_table_helper = XcvrTableHelper() self.load_media_settings() warmstart = swsscommon.WarmStart() @@ -1338,39 +1577,41 @@ def init(self): # Make sure this daemon started after all port configured self.log_info("Wait for port config is done") - for namespace in namespaces: + for namespace in xcvr_table_helper.namespaces: self.wait_for_port_config_done(namespace) + self.init_port_mapping() + # Post all the current interface dom/sfp info to STATE_DB self.log_info("Post all port DOM/SFP info to DB") - post_port_sfp_dom_info_to_db(is_warm_start, self.stop_event) + post_port_sfp_dom_info_to_db(is_warm_start, self.port_mapping, self.stop_event) # Init port sfp status table self.log_info("Init port sfp status table") - init_port_sfp_status_tbl(self.stop_event) + init_port_sfp_status_tbl(self.port_mapping, self.stop_event) # Init port y_cable status table y_cable_helper.init_ports_status_for_y_cable( - platform_sfputil, platform_chassis, self.y_cable_presence, self.stop_event) + platform_sfputil, platform_chassis, self.y_cable_presence, self.port_mapping, self.stop_event) # Deinitialize daemon def deinit(self): self.log_info("Start daemon deinit...") # Delete all the information from DB and then exit - logical_port_list = platform_sfputil.logical + logical_port_list = self.port_mapping.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs - asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: - logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - del_port_sfp_dom_info_from_db(logical_port_name, self.int_tbl[asic_index], self.dom_tbl[asic_index]) - delete_port_from_status_table(logical_port_name, self.status_tbl[asic_index]) + del_port_sfp_dom_info_from_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_int_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) + delete_port_from_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index)) if self.y_cable_presence[0] is True: - y_cable_helper.delete_ports_status_for_y_cable() + y_cable_helper.delete_ports_status_for_y_cable(self.port_mapping) del globals()['platform_chassis'] @@ -1383,25 +1624,25 @@ def run(self): self.init() # Start the dom sensor info update thread - dom_info_update = DomInfoUpdateTask() + dom_info_update = DomInfoUpdateTask(self.port_mapping) + self.subscribe_port_change_event(dom_info_update) dom_info_update.task_run(self.y_cable_presence) # Start the sfp state info update process - sfp_state_update = SfpStateUpdateTask() + sfp_state_update = SfpStateUpdateTask(self.port_mapping) + self.subscribe_port_change_event(sfp_state_update) sfp_state_update.task_run(self.sfp_error_event, self.y_cable_presence) # Start the Y-cable state info update process if Y cable presence established y_cable_state_update = None if self.y_cable_presence[0] is True: - y_cable_state_update = y_cable_helper.YCableTableUpdateTask() + y_cable_state_update = y_cable_helper.YCableTableUpdateTask(self.port_mapping) y_cable_state_update.task_run() # Start main loop self.log_info("Start daemon main loop") - while not self.stop_event.wait(self.timeout): - # Check the integrity of the sfp info table and recover the missing entries if any - recover_missing_sfp_table_entries(platform_sfputil, self.int_tbl, self.status_tbl, self.stop_event) + self.handle_port_config_change() self.log_info("Stop daemon main loop") @@ -1423,6 +1664,37 @@ def run(self): if self.sfp_error_event.is_set(): sys.exit(SFP_SYSTEM_ERROR) + +class XcvrTableHelper: + def __init__(self): + self.int_tbl, self.dom_tbl, self.status_tbl, self.app_port_tbl = {}, {}, {}, {} + self.state_db = {} + self.namespaces = multi_asic.get_front_end_namespaces() + for namespace in self.namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + self.int_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_INFO_TABLE) + self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) + appl_db = daemon_base.db_connect("APPL_DB", namespace) + self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + + def get_int_tbl(self, asic_id): + return self.int_tbl[asic_id] + + def get_dom_tbl(self, asic_id): + return self.dom_tbl[asic_id] + + def get_status_tbl(self, asic_id): + return self.status_tbl[asic_id] + + def get_app_port_tbl(self, asic_id): + return self.app_port_tbl[asic_id] + + def get_state_db(self, asic_id): + return self.state_db[asic_id] + + # # Main ========================================================================= # diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py new file mode 100644 index 000000000..d2ecd7314 --- /dev/null +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -0,0 +1,77 @@ +class PortChangeEvent: + PORT_ADD = 0 + PORT_REMOVE = 1 + def __init__(self, port_name, port_index, asic_id, event_type): + # Logical port name, e.g. Ethernet0 + self.port_name = port_name + # Physical port index, equals to "index" field of PORT table in CONFIG_DB + self.port_index = int(port_index) + # ASIC ID, for multi ASIC + self.asic_id = asic_id + # Port change event type + self.event_type = event_type + + def __str__(self): + return '{} - name={} index={} asic_id={}'.format('Add' if self.event_type == self.PORT_ADD else 'Remove', + self.port_name, + self.port_index, + self.asic_id) + + +class PortMapping: + def __init__(self): + # A list of logical port name, e.g. ["Ethernet0", "Ethernet4" ...] + self.logical_port_list = [] + # Logical port name to physical port index mapping + self.logical_to_physical = {} + # Physical port index to logical port name mapping + self.physical_to_logical = {} + # Logical port name to ASIC ID mapping + self.logical_to_asic = {} + + def handle_port_change_event(self, port_change_event): + if port_change_event.event_type == PortChangeEvent.PORT_ADD: + self._handle_port_add(port_change_event) + elif port_change_event.event_type == PortChangeEvent.PORT_REMOVE: + self._handle_port_remove(port_change_event) + + def _handle_port_add(self, port_change_event): + port_name = port_change_event.port_name + self.logical_port_list.append(port_name) + self.logical_to_physical[port_name] = port_change_event.port_index + if port_change_event.port_index not in self.physical_to_logical: + self.physical_to_logical[port_change_event.port_index] = [port_name] + else: + self.physical_to_logical[port_change_event.port_index].append(port_name) + self.logical_to_asic[port_name] = port_change_event.asic_id + + def _handle_port_remove(self, port_change_event): + port_name = port_change_event.port_name + self.logical_port_list.remove(port_name) + self.logical_to_physical.pop(port_name) + self.physical_to_logical[port_change_event.port_index].remove(port_name) + if not self.physical_to_logical[port_change_event.port_index]: + self.physical_to_logical.pop(port_change_event.port_index) + self.logical_to_asic.pop(port_name) + + def get_asic_id_for_logical_port(self, port_name): + return self.logical_to_asic.get(port_name) + + def is_logical_port(self, port_name): + return port_name in self.logical_to_physical + + def get_logical_to_physical(self, port_name): + port_index = self.logical_to_physical.get(port_name) + return None if port_index is None else [port_index] + + def get_physical_to_logical(self, physical_port): + return self.physical_to_logical.get(physical_port) + + def logical_port_name_to_physical_port_list(self, port_name): + try: + return [int(port_name)] + except ValueError: + if self.is_logical_port(port_name): + return self.get_logical_to_physical(port_name) + else: + return None diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py index 7e41b62b1..c3c614e32 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/y_cable_helper.py @@ -3,7 +3,9 @@ helper utlities configuring y_cable for xcvrd daemon """ +import copy import datetime +import queue import threading from sonic_py_common import daemon_base, logger @@ -11,6 +13,7 @@ from sonic_y_cable import y_cable from swsscommon import swsscommon from . import sfp_status_helper +from .port_mapping import PortMapping SELECT_TIMEOUT = 1000 @@ -38,17 +41,6 @@ # Find out the underneath physical port list by logical name -def logical_port_name_to_physical_port_list(port_name): - if port_name.startswith("Ethernet"): - if y_cable_platform_sfputil.is_logical_port(port_name): - return y_cable_platform_sfputil.get_logical_to_physical(port_name) - else: - helper_logger.log_error("Invalid port '%s'" % port_name) - return None - else: - return [int(port_name)] - - def _wrapper_get_presence(physical_port): if y_cable_platform_chassis is not None: try: @@ -102,8 +94,8 @@ def y_cable_toggle_mux_torB(physical_port): return -1 -def update_tor_active_side(read_side, state, logical_port_name): - physical_port_list = logical_port_name_to_physical_port_list( +def update_tor_active_side(read_side, state, logical_port_name, port_mapping): + physical_port_list = port_mapping.logical_port_name_to_physical_port_list( logical_port_name) if len(physical_port_list) == 1: @@ -137,14 +129,14 @@ def update_tor_active_side(read_side, state, logical_port_name): return -1 -def update_appdb_port_mux_cable_response_table(logical_port_name, asic_index, appl_db, read_side): +def update_appdb_port_mux_cable_response_table(logical_port_name, port_mapping, asic_index, appl_db, read_side): status = None y_cable_response_tbl = {} y_cable_response_tbl[asic_index] = swsscommon.Table( appl_db[asic_index], "MUX_CABLE_RESPONSE_TABLE") - physical_port_list = logical_port_name_to_physical_port_list( + physical_port_list = port_mapping.logical_port_name_to_physical_port_list( logical_port_name) if len(physical_port_list) == 1: @@ -200,8 +192,8 @@ def update_appdb_port_mux_cable_response_table(logical_port_name, asic_index, ap "Error: Retreived multiple ports for a Y cable port {} while responding to command probe".format(logical_port_name)) -def read_y_cable_and_update_statedb_port_tbl(logical_port_name, mux_config_tbl): - physical_port_list = logical_port_name_to_physical_port_list( +def read_y_cable_and_update_statedb_port_tbl(logical_port_name, port_mapping, mux_config_tbl): + physical_port_list = port_mapping.logical_port_name_to_physical_port_list( logical_port_name) read_side = None @@ -259,7 +251,7 @@ def read_y_cable_and_update_statedb_port_tbl(logical_port_name, mux_config_tbl): "Error: Retreived multiple ports for a Y cable port {}".format(logical_port_name)) -def check_identifier_presence_and_update_mux_table_entry(state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence): +def check_identifier_presence_and_update_mux_table_entry(state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, port_mapping, y_cable_presence): (status, fvs) = port_tbl[asic_index].get(logical_port_name) if status is False: @@ -280,9 +272,9 @@ def check_identifier_presence_and_update_mux_table_entry(state_db, port_tbl, y_c if y_cable_presence[0] is True and y_cable_asic_table is not None and mux_asic_table is not None and static_mux_asic_table is not None: # fill in the newly found entry read_y_cable_and_update_statedb_port_tbl( - logical_port_name, y_cable_tbl[asic_index]) - post_port_mux_info_to_db(logical_port_name, mux_tbl[asic_index]) - post_port_mux_static_info_to_db(logical_port_name, static_tbl[asic_index]) + logical_port_name, port_mapping, y_cable_tbl[asic_index]) + post_port_mux_info_to_db(logical_port_name, port_mapping, mux_tbl[asic_index]) + post_port_mux_static_info_to_db(logical_port_name, port_mapping, static_tbl[asic_index]) else: # first create the state db y cable table and then fill in the entry @@ -299,9 +291,9 @@ def check_identifier_presence_and_update_mux_table_entry(state_db, port_tbl, y_c mux_tbl[asic_id] = swsscommon.Table(state_db[asic_id], MUX_CABLE_INFO_TABLE) # fill the newly found entry read_y_cable_and_update_statedb_port_tbl( - logical_port_name, y_cable_tbl[asic_index]) - post_port_mux_info_to_db(logical_port_name, mux_tbl[asic_index]) - post_port_mux_static_info_to_db(logical_port_name, static_tbl[asic_index]) + logical_port_name, port_mapping, y_cable_tbl[asic_index]) + post_port_mux_info_to_db(logical_port_name, port_mapping, mux_tbl[asic_index]) + post_port_mux_static_info_to_db(logical_port_name, port_mapping, static_tbl[asic_index]) else: helper_logger.log_warning( "Could not retreive active or auto value for state kvp for {}, inside MUX_CABLE table".format(logical_port_name)) @@ -348,7 +340,7 @@ def check_identifier_presence_and_delete_mux_table_entry(state_db, port_tbl, asi delete_change_event[:] = [True] -def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presence, stop_event=threading.Event()): +def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presence, port_mapping, stop_event=threading.Event()): global y_cable_platform_sfputil global y_cable_platform_chassis # Connect to CONFIG_DB and create port status table inside state_db @@ -372,13 +364,13 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen xcvrd_log_tbl[asic_id].set("Y_CABLE", fvs_updated ) # Init PORT_STATUS table if ports are on Y cable - logical_port_list = y_cable_platform_sfputil.logical + logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: if stop_event.is_set(): break # Get the asic to which this port belongs - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port( + asic_index = port_mapping.get_asic_id_for_logical_port( logical_port_name) if asic_index is None: helper_logger.log_warning( @@ -387,7 +379,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen if logical_port_name in port_table_keys[asic_index]: check_identifier_presence_and_update_mux_table_entry( - state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, port_mapping, y_cable_presence) else: # This port does not exist in Port table of config but is present inside # logical_ports after loading the port_mappings from port_config_file @@ -396,7 +388,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen "Could not retreive port inside config_db PORT table ".format(logical_port_name)) -def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, stop_event=threading.Event()): +def change_ports_status_for_y_cable_change_event(logical_port_dict, port_mapping, y_cable_presence, stop_event=threading.Event()): # Connect to CONFIG_DB and create port status table inside state_db config_db, state_db, port_tbl, y_cable_tbl = {}, {}, {}, {} static_tbl, mux_tbl = {}, {} @@ -413,43 +405,35 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, st port_table_keys[asic_id] = port_tbl[asic_id].getKeys() # Init PORT_STATUS table if ports are on Y cable and an event is received - for key, value in port_dict.items(): - if stop_event.is_set(): - break - logical_port_list = y_cable_platform_sfputil.get_physical_to_logical(int(key)) - if logical_port_list is None: - helper_logger.log_warning("Got unknown FP port index {}, ignored".format(key)) + for logical_port_name, value in logical_port_dict.items(): + # Get the asic to which this port belongs + asic_index = port_mapping.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + helper_logger.log_warning( + "Got invalid asic index for {}, ignored".format(logical_port_name)) continue - for logical_port_name in logical_port_list: - # Get the asic to which this port belongs - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port(logical_port_name) - if asic_index is None: - helper_logger.log_warning( - "Got invalid asic index for {}, ignored".format(logical_port_name)) - continue - - if logical_port_name in port_table_keys[asic_index]: - if value == sfp_status_helper.SFP_STATUS_INSERTED: - helper_logger.log_info("Got SFP inserted event") - check_identifier_presence_and_update_mux_table_entry( - state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) - elif value == sfp_status_helper.SFP_STATUS_REMOVED: - check_identifier_presence_and_delete_mux_table_entry( - state_db, port_tbl, asic_index, logical_port_name, y_cable_presence, delete_change_event) + if logical_port_name in port_table_keys[asic_index]: + if value == sfp_status_helper.SFP_STATUS_INSERTED: + helper_logger.log_info("Got SFP inserted event") + check_identifier_presence_and_update_mux_table_entry( + state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, port_mapping, y_cable_presence) + elif value == sfp_status_helper.SFP_STATUS_REMOVED: + check_identifier_presence_and_delete_mux_table_entry( + state_db, port_tbl, asic_index, logical_port_name, y_cable_presence, delete_change_event) - else: - try: - # Now that the value is in bitmap format, let's convert it to number - event_bits = int(value) - if sfp_status_helper.is_error_block_eeprom_reading(event_bits): - check_identifier_presence_and_delete_mux_table_entry( - state_db, port_tbl, asic_index, logical_port_name, y_cable_presence, delete_change_event) - except: - pass - # SFP return unkown event, just ignore for now. - helper_logger.log_warning("Got unknown event {}, ignored".format(value)) - continue + else: + try: + # Now that the value is in bitmap format, let's convert it to number + event_bits = int(value) + if sfp_status_helper.is_error_block_eeprom_reading(event_bits): + check_identifier_presence_and_delete_mux_table_entry( + state_db, port_tbl, asic_index, logical_port_name, y_cable_presence, delete_change_event) + except: + pass + # SFP return unkown event, just ignore for now. + helper_logger.log_warning("Got unknown event {}, ignored".format(value)) + continue # If there was a delete event and y_cable_presence was true, reaccess the y_cable presence if y_cable_presence[0] is True and delete_change_event[0] is True: @@ -466,7 +450,7 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, st break -def delete_ports_status_for_y_cable(): +def delete_ports_status_for_y_cable(port_mapping): state_db, port_tbl, y_cable_tbl = {}, {}, {} y_cable_tbl_keys = {} @@ -482,11 +466,11 @@ def delete_ports_status_for_y_cable(): mux_tbl[asic_id] = swsscommon.Table(state_db[asic_id], MUX_CABLE_INFO_TABLE) # delete PORTS on Y cable table if ports on Y cable - logical_port_list = y_cable_platform_sfputil.logical + logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port( + asic_index = port_mapping.get_asic_id_for_logical_port( logical_port_name) if asic_index is None: logger.log_warning( @@ -501,7 +485,7 @@ def delete_ports_status_for_y_cable(): logical_port_name, mux_tbl[asic_index]) -def check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_index, logical_port_name): +def check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_index, logical_port_name, port_mapping): # Get the namespaces in the platform config_db, port_tbl = {}, {} @@ -527,7 +511,7 @@ def check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_ if mux_tbl.get(asic_index, None) is not None: # fill in the newly found entry - post_port_mux_info_to_db(logical_port_name, mux_tbl[asic_index]) + post_port_mux_info_to_db(logical_port_name, port_mapping, mux_tbl[asic_index]) else: # first create the state db y cable table and then fill in the entry @@ -537,7 +521,7 @@ def check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_ namespace) mux_tbl[asic_id] = swsscommon.Table(state_db[asic_id], MUX_CABLE_INFO_TABLE) # fill the newly found entry - post_port_mux_info_to_db(logical_port_name, mux_tbl[asic_index]) + post_port_mux_info_to_db(logical_port_name, port_mapping, mux_tbl[asic_index]) else: helper_logger.log_warning( "Could not retreive active or auto value for state kvp for {}, inside MUX_CABLE table".format(logical_port_name)) @@ -558,7 +542,7 @@ def get_firmware_dict(physical_port, target, side, mux_info_dict): mux_info_dict[("version_{}_next".format(side))] = "N/A" -def get_muxcable_info(physical_port, logical_port_name): +def get_muxcable_info(physical_port, logical_port_name, port_mapping): mux_info_dict = {} y_cable_tbl, state_db = {}, {} @@ -570,7 +554,7 @@ def get_muxcable_info(physical_port, logical_port_name): y_cable_tbl[asic_id] = swsscommon.Table( state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port( + asic_index = port_mapping.get_asic_id_for_logical_port( logical_port_name) if asic_index is None: helper_logger.log_warning( @@ -735,7 +719,7 @@ def get_muxcable_info(physical_port, logical_port_name): return mux_info_dict -def get_muxcable_static_info(physical_port, logical_port_name): +def get_muxcable_static_info(physical_port, logical_port_name, port_mapping): mux_static_info_dict = {} y_cable_tbl, state_db = {}, {} @@ -747,7 +731,7 @@ def get_muxcable_static_info(physical_port, logical_port_name): y_cable_tbl[asic_id] = swsscommon.Table( state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port( + asic_index = port_mapping.get_asic_id_for_logical_port( logical_port_name) if asic_index is None: helper_logger.log_warning( @@ -828,9 +812,9 @@ def get_muxcable_static_info(physical_port, logical_port_name): return mux_static_info_dict -def post_port_mux_info_to_db(logical_port_name, table): +def post_port_mux_info_to_db(logical_port_name, port_mapping, table): - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) return -1 @@ -847,7 +831,7 @@ def post_port_mux_info_to_db(logical_port_name, table): "Error: trying to post mux info without presence of port {}".format(logical_port_name)) continue - mux_info_dict = get_muxcable_info(physical_port, logical_port_name) + mux_info_dict = get_muxcable_info(physical_port, logical_port_name, port_mapping) if mux_info_dict is not None and mux_info_dict is not -1: #transceiver_dict[physical_port] = port_info_dict fvs = swsscommon.FieldValuePairs( @@ -887,9 +871,9 @@ def post_port_mux_info_to_db(logical_port_name, table): return -1 -def post_port_mux_static_info_to_db(logical_port_name, static_table): +def post_port_mux_static_info_to_db(logical_port_name, port_mapping, static_table): - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + physical_port_list = port_mapping.logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '{}'".format(logical_port_name)) return -1 @@ -904,7 +888,7 @@ def post_port_mux_static_info_to_db(logical_port_name, static_table): if not _wrapper_get_presence(physical_port): continue - mux_static_info_dict = get_muxcable_static_info(physical_port, logical_port_name) + mux_static_info_dict = get_muxcable_static_info(physical_port, logical_port_name, port_mapping) if mux_static_info_dict is not None and mux_static_info_dict is not -1: #transceiver_dict[physical_port] = port_info_dict @@ -946,7 +930,7 @@ def post_port_mux_static_info_to_db(logical_port_name, static_table): return -1 -def post_mux_static_info_to_db(is_warm_start, stop_event=threading.Event()): +def post_mux_static_info_to_db(is_warm_start, port_mapping, stop_event=threading.Event()): # Connect to STATE_DB and create transceiver mux/static info tables state_db, static_tbl = {}, {} @@ -958,20 +942,20 @@ def post_mux_static_info_to_db(is_warm_start, stop_event=threading.Event()): static_tbl[asic_id] = swsscommon.Table(state_db[asic_id], MUX_CABLE_STATIC_INFO_TABLE) # Post all the current interface dom/sfp info to STATE_DB - logical_port_list = y_cable_platform_sfputil.logical + logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: if stop_event.is_set(): break # Get the asic to which this port belongs - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + asic_index = port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - post_port_mux_static_info_to_db(logical_port_name, mux_tbl[asic_index]) + post_port_mux_static_info_to_db(logical_port_name, port_mapping, mux_tbl[asic_index]) -def post_mux_info_to_db(is_warm_start, stop_event=threading.Event()): +def post_mux_info_to_db(is_warm_start, port_mapping, stop_event=threading.Event()): # Connect to STATE_DB and create transceiver mux/static info tables state_db, mux_tbl, static_tbl = {}, {}, {} @@ -983,28 +967,41 @@ def post_mux_info_to_db(is_warm_start, stop_event=threading.Event()): mux_tbl[asic_id] = swsscommon.Table(state_db[asic_id], MUX_CABLE_INFO_TABLE) # Post all the current interface dom/sfp info to STATE_DB - logical_port_list = y_cable_platform_sfputil.logical + logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: if stop_event.is_set(): break # Get the asic to which this port belongs - asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + asic_index = port_mapping.get_asic_id_for_logical_port(logical_port_name) if asic_index is None: logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - post_port_mux_info_to_db(logical_port_name, mux_tbl[asic_index]) + post_port_mux_info_to_db(logical_port_name, port_mapping, mux_tbl[asic_index]) # Thread wrapper class to update y_cable status periodically class YCableTableUpdateTask(object): - def __init__(self): + def __init__(self, port_mapping): self.task_thread = None + self.event_queue = queue.Queue() + self.port_mapping = copy.deepcopy(port_mapping) if multi_asic.is_multi_asic(): # Load the namespace details first from the database_global.json file. swsscommon.SonicDBConfig.initializeGlobalConfig() + def notify_port_change_event(self, port_change_event): + self.event_queue.put_nowait(port_change_event) + + def handle_port_change_event(self): + while True: + try: + port_change_event = self.event_queue.get_nowait() + self.port_mapping.handle_port_change_event(port_change_event) + except queue.Empty: + break + def task_worker(self): # Connect to STATE_DB and APPL_DB and get both the HW_MUX_STATUS_TABLE info @@ -1062,6 +1059,7 @@ def task_worker(self): # Get the corresponding namespace from redisselect db connector object namespace = redisSelectObj.getDbConnector().getNamespace() asic_index = multi_asic.get_asic_index_from_namespace(namespace) + self.handle_port_change_event() while True: (port, op, fvp) = status_tbl[asic_index].pop() @@ -1096,7 +1094,7 @@ def task_worker(self): read_side = mux_port_dict.get("read_side") # Now whatever is the state requested, toggle the mux appropriately helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd trying to transition port {} from {} to {}".format(port, old_status, new_status)) - active_side = update_tor_active_side(read_side, new_status, port) + active_side = update_tor_active_side(read_side, new_status, port, self.port_mapping) if active_side == -1: helper_logger.log_warning("ERR: Got a change event for toggle but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format( port, old_status, new_status)) @@ -1144,7 +1142,7 @@ def task_worker(self): continue mux_port_dict = dict(fv) read_side = mux_port_dict.get("read_side") - update_appdb_port_mux_cable_response_table(port_m, asic_index, appl_db, int(read_side)) + update_appdb_port_mux_cable_response_table(port_m, self.port_mapping, asic_index, appl_db, int(read_side)) while True: (key, op_m, fvp_m) = xcvrd_log_tbl[asic_index].pop()