From 1aff8bfc88f0c6de65a189c7fe1b698562e2d37a Mon Sep 17 00:00:00 2001 From: Jianquan Ye Date: Tue, 17 Dec 2024 18:38:24 +1000 Subject: [PATCH] Fix redis memory leak issue in PhysicalEntityCacheUpdater --- .gitignore | 1 + src/ax_interface/mib.py | 2 +- src/sonic_ax_impl/mibs/__init__.py | 15 +++ src/sonic_ax_impl/mibs/ietf/rfc2737.py | 43 +++++++- tests/mock_tables/dbconnector.py | 3 + tests/test_rfc1213.py | 2 +- tests/test_rfc2737.py | 138 +++++++++++++++++++++++++ 7 files changed, 201 insertions(+), 3 deletions(-) create mode 100644 tests/test_rfc2737.py diff --git a/.gitignore b/.gitignore index b9caeccda..8db9a37d1 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,4 @@ fabric.properties gh-release.patch tests/test_cpuUtilizationHandler.py +tests/test-results.xml diff --git a/src/ax_interface/mib.py b/src/ax_interface/mib.py index 404e2916a..43bf9b478 100644 --- a/src/ax_interface/mib.py +++ b/src/ax_interface/mib.py @@ -49,7 +49,7 @@ async def start(self): redis_exception_happen = False except RuntimeError: # Any unexpected exception or error, log it and keep running - logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()") + logger.exception("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections") # When redis server restart, swsscommon will throw swsscommon.RedisError, redis connection need re-initialize in reinit_data() # TODO: change to swsscommon.RedisError redis_exception_happen = True diff --git a/src/sonic_ax_impl/mibs/__init__.py b/src/sonic_ax_impl/mibs/__init__.py index 38c70b113..1fe6264ec 100644 --- a/src/sonic_ax_impl/mibs/__init__.py +++ b/src/sonic_ax_impl/mibs/__init__.py @@ -496,6 +496,21 @@ def get_redis_pubsub(db_conn, db_name, pattern): return pubsub +def cancel_redis_pubsub(pubsub, db_conn, db_name, pattern): + db = db_conn.get_dbid(db_name) + logger.debug(f"Cancel subscription {db} {pattern}") + pubsub.punsubscribe("__keyspace@{}__:{}".format(db, pattern)) + return pubsub + + +def clear_pubsub_msg(pubsub): + while True: + msg = pubsub.get_message() + logger.debug("Clearing pubsub {}, get and drop message {}".format(pubsub, msg)) + if not msg: + break + + class RedisOidTreeUpdater(MIBUpdater): def __init__(self, prefix_str): super().__init__() diff --git a/src/sonic_ax_impl/mibs/ietf/rfc2737.py b/src/sonic_ax_impl/mibs/ietf/rfc2737.py index d9e5fdec2..f6fa6d1c0 100644 --- a/src/sonic_ax_impl/mibs/ietf/rfc2737.py +++ b/src/sonic_ax_impl/mibs/ietf/rfc2737.py @@ -318,8 +318,34 @@ def reinit_data(self): self.physical_name_map[chassis_mgmt_sub_id] = name self.physical_fru_map[chassis_mgmt_sub_id] = self.NOT_REPLACEABLE + exceptions = [] + has_runtime_err = False + # Catch exception in the iteration + # This makes sure if any exception is raised in the mid of loop + # every updater's reinit_data function will be always called + # So that the redis subscriptions always get chance to be cleaned, + # Otherwise if the exception never recover, + # the redis subscription keeps increasing but never got consumed, + # this causes redis memory leak. for updater in self.physical_entity_updaters: - updater.reinit_data() + try: + updater.reinit_data() + except BaseException as e: + mibs.logger.error(f"Jianquan {e}, type{type(e)}") + if isinstance(e, RuntimeError): + has_runtime_err = True + # Log traceback so that we know the original error details + mibs.logger.error(e, exc_info=True) + exceptions.append(e) + + # The RuntimeError will be considered as Redis connection error + # And will trigger re-init connection, if the exceptions contain any RuntimeError + # We raise runtime error + if exceptions: + if has_runtime_err: + raise RuntimeError(exceptions) + else: + raise Exception(exceptions) def update_data(self): # This code is not executed in unit test, since mockredis @@ -648,6 +674,21 @@ def __init__(self, mib_updater): self.entity_to_oid_map = {} def reinit_data(self): + + # Redis subscriptions are established and consumed in update_data, + # but if there's stable exception during update logic, + # the reinit_data will be called, but the update_data is never called. + # The message is sent into subscription queue, but never got consumed, + # this causes Redis memory leaking. + # Hence clear the message in the subscription and cancel the subscription during reinit_data + for db_index in list(self.pub_sub_dict): + pubsub = self.pub_sub_dict[db_index] + db_conn = self.mib_updater.statedb[db_index] + # clear message in the subscription and cancel the subscription + mibs.clear_pubsub_msg(pubsub) + mibs.cancel_redis_pubsub(pubsub, db_conn, db_conn.STATE_DB, self.get_key_pattern()) + del self.pub_sub_dict[db_index] + self.entity_to_oid_map.clear() # retrieve the initial list of entity in db key_info = Namespace.dbs_keys(self.mib_updater.statedb, mibs.STATE_DB, self.get_key_pattern()) diff --git a/tests/mock_tables/dbconnector.py b/tests/mock_tables/dbconnector.py index 6a5cbd997..e4cf95daa 100644 --- a/tests/mock_tables/dbconnector.py +++ b/tests/mock_tables/dbconnector.py @@ -83,6 +83,9 @@ def get_message(self): def psubscribe(self, *args, **kwargs): pass + def punsubscribe(self, *args, **kwargs): + pass + def __call__(self, *args, **kwargs): return self diff --git a/tests/test_rfc1213.py b/tests/test_rfc1213.py index af5c2c936..0f8e7e2a6 100644 --- a/tests/test_rfc1213.py +++ b/tests/test_rfc1213.py @@ -77,7 +77,7 @@ def test_NextHopUpdater_redis_exception(self): # check warning expected = [ - mock.call("MIBUpdater.start() caught an unexpected exception during update_data()") + mock.call("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections") ] mocked_exception.assert_has_calls(expected) diff --git a/tests/test_rfc2737.py b/tests/test_rfc2737.py new file mode 100644 index 000000000..8054df2c9 --- /dev/null +++ b/tests/test_rfc2737.py @@ -0,0 +1,138 @@ +import os +import sys +from unittest import TestCase + +import pytest +from sonic_ax_impl.mibs.ietf.rfc2737 import PhysicalTableMIBUpdater + + +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + +modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(modules_path, 'src')) + + +class TestPhysicalTableMIBUpdater(TestCase): + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The first updater(XcvrCacheUpdater) raises exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=Exception('mocked error')) + def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_xcvr_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(Exception) as excinfo, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(excinfo.value) == "[Exception('mocked error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The last updater(ThermalCacheUpdater) raises exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error')) + def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_thermal_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(Exception) as excinfo, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch( + 'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(excinfo.value) == "[Exception('mocked error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The first updater(XcvrCacheUpdater) raises Runtime exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error')) + def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_thermal_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(RuntimeError) as excinfo, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch( + 'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch( + 'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_thermal_reinit_data.assert_called() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(excinfo.value) == "[RuntimeError('mocked runtime error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The last updater(XcvrCacheUpdater) raises Runtime exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error')) + def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_xcvr_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(RuntimeError) as exc_info, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(exc_info.value) == "[RuntimeError('mocked runtime error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The first(XcvrCacheUpdater) and last updater(ThermalCacheUpdater) + # raises Runtime exception and Exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + # Both the RuntimeError and Exception should be caught and combined as RuntimeError then been raised + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error')) + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error')) + def test_PhysicalTableMIBUpdater_multi_exception(self, mocked_xcvr_reinit_data, mocked_thermal_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(RuntimeError) as exc_info, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(exc_info.value) == "[RuntimeError('mocked runtime error'), Exception('mocked error')]"