diff --git a/sonic-ledd/scripts/ledd b/sonic-ledd/scripts/ledd index 91e6162994a5..87cea62873a1 100644 --- a/sonic-ledd/scripts/ledd +++ b/sonic-ledd/scripts/ledd @@ -37,11 +37,12 @@ LEDUTIL_LOAD_ERROR = 1 class DaemonLedd(daemon_base.DaemonBase): - # Run daemon - def run(self): + def __init__(self): + daemon_base.DaemonBase.__init__(self, SYSLOG_IDENTIFIER) + # Load platform-specific LedControl module try: - led_control = self.load_platform_util(LED_MODULE_NAME, LED_CLASS_NAME) + self.led_control = self.load_platform_util(LED_MODULE_NAME, LED_CLASS_NAME) except Exception as e: self.log_error("Failed to load ledutil: %s" % (str(e)), True) sys.exit(LEDUTIL_LOAD_ERROR) @@ -55,45 +56,52 @@ class DaemonLedd(daemon_base.DaemonBase): namespaces = multi_asic.get_front_end_namespaces() # Subscribe to PORT table notifications in the Application DB - appl_db, sst = {}, {} - sel = swsscommon.Select() + appl_db = {} + self.sst = {} + self.sel = swsscommon.Select() for namespace in namespaces: # Open a handle to the Application database, in all namespaces appl_db[namespace] = daemon_base.db_connect("APPL_DB", namespace=namespace) - sst[namespace] = swsscommon.SubscriberStateTable(appl_db[namespace], swsscommon.APP_PORT_TABLE_NAME) - sel.addSelectable(sst[namespace]) + self.sst[namespace] = swsscommon.SubscriberStateTable(appl_db[namespace], swsscommon.APP_PORT_TABLE_NAME) + self.sel.addSelectable(self.sst[namespace]) - # Listen indefinitely for changes to the PORT table in the Application DB's - while True: - # Use timeout to prevent ignoring the signals we want to handle - # in signal_handler() (e.g. SIGTERM for graceful shutdown) - (state, selectableObj) = sel.select(SELECT_TIMEOUT) + # Run daemon + def run(self): + # Use timeout to prevent ignoring the signals we want to handle + # in signal_handler() (e.g. SIGTERM for graceful shutdown) + (state, selectableObj) = self.sel.select(SELECT_TIMEOUT) - if state == swsscommon.Select.TIMEOUT: - # Do not flood log when select times out - continue - if state != swsscommon.Select.OBJECT: - self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") - continue + if state == swsscommon.Select.TIMEOUT: + # Do not flood log when select times out + return 1 - # Get the redisselect object from selectable object - redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - # Get the corresponding namespace from redisselect db connector object - namespace = redisSelectObj.getDbConnector().getNamespace() + if state != swsscommon.Select.OBJECT: + self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") + return 2 - (key, op, fvp) = sst[namespace].pop() - if fvp: - # TODO: Once these flag entries have been removed from the DB, - # we can remove this check - if key in ["PortConfigDone", "PortInitDone"]: - continue + # Get the redisselect object from selectable object + redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - fvp_dict = dict(fvp) + # Get the corresponding namespace from redisselect db connector object + namespace = redisSelectObj.getDbConnector().getNamespace() - if op == "SET" and "oper_status" in fvp_dict: - if not key.startswith(backplane_prefix()): - led_control.port_link_state_change(key, fvp_dict["oper_status"]) + (key, op, fvp) = self.sst[namespace].pop() + if fvp: + # TODO: Once these flag entries have been removed from the DB, + # we can remove this check + if key in ["PortConfigDone", "PortInitDone"]: + return 3 + + fvp_dict = dict(fvp) + + if op == "SET" and "oper_status" in fvp_dict: + if not key.startswith(backplane_prefix()): + self.led_control.port_link_state_change(key, fvp_dict["oper_status"]) + else: + return 4 + + return 0 def main(): @@ -114,8 +122,11 @@ def main(): print('ledd version {}'.format(VERSION)) sys.exit(0) - ledd = DaemonLedd(SYSLOG_IDENTIFIER) - ledd.run() + ledd = DaemonLedd() + + # Listen indefinitely for changes to the PORT table in the Application DB's + while True: + ledd.run() if __name__ == '__main__': diff --git a/sonic-ledd/tests/test_ledd.py b/sonic-ledd/tests/test_ledd.py index f4be68357360..06861d93e384 100644 --- a/sonic-ledd/tests/test_ledd.py +++ b/sonic-ledd/tests/test_ledd.py @@ -5,29 +5,25 @@ import pytest # TODO: Clean this up once we no longer need to support Python 2 if sys.version_info.major == 3: - from unittest.mock import Mock, MagicMock, patch + from unittest import mock else: - from mock import Mock, MagicMock, patch + import mock from sonic_py_common import daemon_base -SYSLOG_IDENTIFIER = 'ledd_test' -NOT_AVAILABLE = 'N/A' - -daemon_base.db_connect = MagicMock() +daemon_base.db_connect = mock.MagicMock() test_path = os.path.dirname(os.path.abspath(__file__)) modules_path = os.path.dirname(test_path) scripts_path = os.path.join(modules_path, "scripts") sys.path.insert(0, modules_path) -os.environ["LEDD_UNIT_TESTING"] = "1" load_source('ledd', scripts_path + '/ledd') import ledd def test_help_args(capsys): for flag in ['-h', '--help']: - with patch.object(sys, 'argv', ['ledd', flag]): + with mock.patch.object(sys, 'argv', ['ledd', flag]): with pytest.raises(SystemExit) as pytest_wrapped_e: ledd.main() assert pytest_wrapped_e.type == SystemExit @@ -38,7 +34,7 @@ def test_help_args(capsys): def test_version_args(capsys): for flag in ['-v', '--version']: - with patch.object(sys, 'argv', ['ledd', flag]): + with mock.patch.object(sys, 'argv', ['ledd', flag]): with pytest.raises(SystemExit) as pytest_wrapped_e: ledd.main() assert pytest_wrapped_e.type == SystemExit @@ -49,7 +45,7 @@ def test_version_args(capsys): def test_bad_args(capsys): for flag in ['-n', '--nonexistent']: - with patch.object(sys, 'argv', ['ledd', flag]): + with mock.patch.object(sys, 'argv', ['ledd', flag]): with pytest.raises(SystemExit) as pytest_wrapped_e: ledd.main() assert pytest_wrapped_e.type == SystemExit @@ -63,6 +59,93 @@ class TestDaemonLedd(object): Test cases to cover functionality in DaemonLedd class """ - def test_run(self): - daemon_ledd = ledd.DaemonLedd(SYSLOG_IDENTIFIER) - # TODO: Add more coverage + def test_run_fail_load_platform_util(self): + with pytest.raises(SystemExit) as pytest_wrapped_e: + ledd.DaemonLedd() + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == ledd.LEDUTIL_LOAD_ERROR + + @mock.patch("ledd.DaemonLedd.load_platform_util") + @mock.patch("ledd.swsscommon.SubscriberStateTable") + @mock.patch("ledd.swsscommon.Select") + def test_run_select_timeout(self, mock_select, mock_sst, mock_load_plat_util): + select_instance = mock_select.return_value + select_instance.select.return_value = (ledd.swsscommon.Select.TIMEOUT, None) + + daemon_ledd = ledd.DaemonLedd() + ret = daemon_ledd.run() + assert ret == 1 + + @mock.patch("ledd.DaemonLedd.load_platform_util") + @mock.patch("ledd.swsscommon.SubscriberStateTable") + @mock.patch("ledd.swsscommon.Select") + def test_run_bad_select_return(self, mock_select, mock_sst, mock_load_plat_util): + select_instance = mock_select.return_value + select_instance.select.return_value = (ledd.swsscommon.Select.ERROR, mock.MagicMock()) + + daemon_ledd = ledd.DaemonLedd() + ret = daemon_ledd.run() + assert ret == 2 + + @mock.patch("ledd.DaemonLedd.load_platform_util") + @mock.patch("ledd.swsscommon.CastSelectableToRedisSelectObj") + @mock.patch("ledd.swsscommon.SubscriberStateTable") + @mock.patch("ledd.swsscommon.Select") + def test_run_ignore_keys(self, mock_select, mock_sst, mock_cstrso, mock_load_plat_util): + select_instance = mock_select.return_value + select_instance.select.return_value = (ledd.swsscommon.Select.OBJECT, mock.MagicMock()) + + mock_cstrso.return_value.getDbConnector.return_value.getNamespace.return_value = ledd.multi_asic.DEFAULT_NAMESPACE + + sst_instance = mock_sst.return_value + + for key in ['PortConfigDone', 'PortInitDone']: + sst_instance.pop.return_value = ('PortConfigDone', 'SET', {'not': 'applicable'}) + + daemon_ledd = ledd.DaemonLedd() + ret = daemon_ledd.run() + assert ret == 3 + + @mock.patch("ledd.DaemonLedd.load_platform_util") + @mock.patch("ledd.swsscommon.CastSelectableToRedisSelectObj") + @mock.patch("ledd.swsscommon.SubscriberStateTable") + @mock.patch("ledd.swsscommon.Select") + def test_run_bad_fvp(self, mock_select, mock_sst, mock_cstrso, mock_load_plat_util): + select_instance = mock_select.return_value + select_instance.select.return_value = (ledd.swsscommon.Select.OBJECT, mock.MagicMock()) + + mock_cstrso.return_value.getDbConnector.return_value.getNamespace.return_value = ledd.multi_asic.DEFAULT_NAMESPACE + + sst_instance = mock_sst.return_value + + for fvp in [None, {}]: + sst_instance.pop.return_value = ('Ethernet0', 'SET', fvp) + + daemon_ledd = ledd.DaemonLedd() + ret = daemon_ledd.run() + assert ret == 4 + + @mock.patch("ledd.DaemonLedd.load_platform_util") + @mock.patch("ledd.swsscommon.CastSelectableToRedisSelectObj") + @mock.patch("ledd.swsscommon.SubscriberStateTable") + @mock.patch("ledd.swsscommon.Select") + def test_run_good(self, mock_select, mock_sst, mock_cstrso, mock_led_control): + select_instance = mock_select.return_value + select_instance.select.return_value = (ledd.swsscommon.Select.OBJECT, mock.MagicMock()) + + mock_cstrso.return_value.getDbConnector.return_value.getNamespace.return_value = ledd.multi_asic.DEFAULT_NAMESPACE + + sst_instance = mock_sst.return_value + + led_control_instance = mock_led_control.return_value + + for port in ['Ethernet0', 'Ethernet4']: + for link_state in ['up', 'down']: + sst_instance.pop.return_value = (port, 'SET', {'oper_status': link_state}) + + daemon_ledd = ledd.DaemonLedd() + ret = daemon_ledd.run() + assert ret == 0 + assert led_control_instance.port_link_state_change.call_count == 1 + led_control_instance.port_link_state_change.assert_called_with(port, link_state) + led_control_instance.port_link_state_change.reset_mock()