From 4ab5d1d494734274e4763c6371c42e7bc5650cd0 Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Wed, 23 Feb 2022 20:27:20 +0000 Subject: [PATCH 1/8] Add infra to allow consumers of ConfigDBConnector to load table data for initialization without blackout --- common/configdb.h | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/common/configdb.h b/common/configdb.h index f8b77ee69..f274c586e 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -71,10 +71,14 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native return self.getDbName() ## Note: callback is difficult to implement by SWIG C++, so keep in python - def listen(self): + def listen(self, start=True): ## Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes. self.pubsub = self.get_redis_client(self.db_name).pubsub() self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name))) + + if start: self.process() + + def process(self, cache={}): while True: item = self.pubsub.listen_message() if item['type'] == 'pmessage': @@ -84,6 +88,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native if table in self.handlers: client = self.get_redis_client(self.db_name) data = self.raw_to_typed(client.hgetall(key)) + if table in cache and row in cache[table]: + if cache[table][row] == data: + continue + else: + del cache[table][row] self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries From a75c2336d9560739f10a01c1c5a6947ed26fb728 Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Fri, 25 Feb 2022 23:22:13 +0000 Subject: [PATCH 2/8] Add explanatory comment --- common/configdb.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/configdb.h b/common/configdb.h index f274c586e..965dd6756 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -72,7 +72,9 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native ## Note: callback is difficult to implement by SWIG C++, so keep in python def listen(self, start=True): - ## Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes. + ## Start listen Redis keyspace event. Use start=False if you need to load in initial data to prevent blackout. + ## Then call process(cache) where cache is a dict of {table_name: data} of the initialized data to prevent + ## duplicate calls to the callback if the data has not changed. self.pubsub = self.get_redis_client(self.db_name).pubsub() self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name))) From ce9b47136804df91ea9ba1710023b468edb10a76 Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Wed, 16 Mar 2022 02:30:34 +0000 Subject: [PATCH 3/8] Respond to review feedback and refactor --- common/configdb.h | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/common/configdb.h b/common/configdb.h index 965dd6756..5bbca32c9 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -71,16 +71,16 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native return self.getDbName() ## Note: callback is difficult to implement by SWIG C++, so keep in python - def listen(self, start=True): - ## Start listen Redis keyspace event. Use start=False if you need to load in initial data to prevent blackout. - ## Then call process(cache) where cache is a dict of {table_name: data} of the initialized data to prevent - ## duplicate calls to the callback if the data has not changed. + def listen(self, init=None): + ## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data. self.pubsub = self.get_redis_client(self.db_name).pubsub() self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name))) - if start: self.process() + init_data = {} + if init: + init_data = {tbl: self.get_table(tbl) for tbl, cb in self.handlers.items()} + init(init_data) - def process(self, cache={}): while True: item = self.pubsub.listen_message() if item['type'] == 'pmessage': @@ -90,11 +90,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native if table in self.handlers: client = self.get_redis_client(self.db_name) data = self.raw_to_typed(client.hgetall(key)) - if table in cache and row in cache[table]: - if cache[table][row] == data: + if table in init_data and row in init_data[table]: + if init_data[table][row] == data: continue else: - del cache[table][row] + del init_data[table][row] self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries From 898554ea2cfda6bf1eac532e38f3b18b834cc207 Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Thu, 24 Mar 2022 04:59:49 +0000 Subject: [PATCH 4/8] respond to review feedback --- common/configdb.h | 26 ++++++++++++++++++------ tests/test_redis_ut.py | 45 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/common/configdb.h b/common/configdb.h index 5bbca32c9..c17dc336d 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -57,6 +57,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native ## Note: callback is difficult to implement by SWIG C++, so keep in python self.handlers = {} + self.fire_init_data = {} @property def KEY_SEPARATOR(self): @@ -71,15 +72,27 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native return self.getDbName() ## Note: callback is difficult to implement by SWIG C++, so keep in python - def listen(self, init=None): + def listen(self, init_data_handler=None): ## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data. self.pubsub = self.get_redis_client(self.db_name).pubsub() self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name))) + + # Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications + init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]} - init_data = {} - if init: - init_data = {tbl: self.get_table(tbl) for tbl, cb in self.handlers.items()} - init(init_data) + # Function to send initial data as series of updates through individual table callback handlers + def load_data(tbl, data): + if self.fire_init_data[tbl]: + for row, x in data.items(): + self.__fire(tbl, row, x) + return False + return True + + init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)} + + # Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller + if init_data_handler: + init_data_handler(init_callback_data) while True: item = self.pubsub.listen_message() @@ -164,8 +177,9 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native handler = self.handlers[table] handler(table, key, data) - def subscribe(self, table, handler): + def subscribe(self, table, handler, fire_init_data=False): self.handlers[table] = handler + self.fire_init_data[table] = fire_init_data def unsubscribe(self, table): if table in self.handlers: diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index 1bebd2e04..1acffa701 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -1,6 +1,7 @@ import os import time import pytest +import multiprocessing from threading import Thread from pympler.tracker import SummaryTracker from swsscommon import swsscommon @@ -552,6 +553,50 @@ def thread_coming_entry(): config_db.unsubscribe(table_name) assert table_name not in config_db.handlers +def test_ConfigDBInit(): + table_name_1 = 'TEST_TABLE_1' + table_name_2 = 'TEST_TABLE_2' + test_key = 'key1' + test_data = {'field1': 'value1'} + test_data_update = {'field1': 'value2'} + + manager = multiprocessing.Manager() + ret_data = manager.dict() + + def test_handler(table, key, data, ret): + ret[table] = {key: data} + + def test_init_handler(data, ret): + ret.update(data) + + def thread_listen(ret): + config_db = ConfigDBConnector() + config_db.connect(wait_for_init=False) + + config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret), + fire_init_data=False) + config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret), + fire_init_data=True) + + config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret)) + + config_db = ConfigDBConnector() + config_db.connect(wait_for_init=False) + client = config_db.get_redis_client(config_db.CONFIG_DB) + client.flushdb() + + # Init table data + config_db.set_entry(table_name_1, test_key, test_data_1) + config_db.set_entry(table_name_2, test_key, test_data_2) + + thread = multiprocessing.Process(target=thread_listen, args=(ret_data,)) + thread.start() + time.sleep(5) + thread.terminate() + + assert ret_data[table_name_1] == {test_key: test_data_1} + assert ret_data[table_name_2] == {test_key: test_data_2} + def test_DBConnectFailure(): """ Verify that a DB connection failure will not cause a process abort From 6cda60d7b845986f36b8d04e4138c9a6537109ab Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Thu, 24 Mar 2022 20:14:52 +0000 Subject: [PATCH 5/8] Invalidate cache on any hit --- common/configdb.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/configdb.h b/common/configdb.h index c17dc336d..4847ed4ff 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -106,8 +106,9 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native if table in init_data and row in init_data[table]: if init_data[table][row] == data: continue - else: - del init_data[table][row] + del init_data[table][row] + if not init_data[table]: + del init_data[table] self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries From 980ccd8f71fc58d84553bad11b00f9d706b71a44 Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Thu, 24 Mar 2022 20:50:35 +0000 Subject: [PATCH 6/8] Fix cache logic --- common/configdb.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/configdb.h b/common/configdb.h index 4847ed4ff..a46011fdb 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -104,11 +104,13 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native client = self.get_redis_client(self.db_name) data = self.raw_to_typed(client.hgetall(key)) if table in init_data and row in init_data[table]: + cache_hit = False if init_data[table][row] == data: - continue + cache_hit = True del init_data[table][row] if not init_data[table]: del init_data[table] + if cache_hit: continue self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries From 53d3f03760da6816268d8b1b6199fca68fdb41db Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Thu, 24 Mar 2022 20:54:25 +0000 Subject: [PATCH 7/8] Optimize a bit --- common/configdb.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/configdb.h b/common/configdb.h index a46011fdb..012370748 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -104,9 +104,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native client = self.get_redis_client(self.db_name) data = self.raw_to_typed(client.hgetall(key)) if table in init_data and row in init_data[table]: - cache_hit = False - if init_data[table][row] == data: - cache_hit = True + cache_hit = init_data[table][row] == data del init_data[table][row] if not init_data[table]: del init_data[table] From 3216388a69de41373506a8b8824d609d97f2b3cf Mon Sep 17 00:00:00 2001 From: Alexander Allen Date: Wed, 30 Mar 2022 12:24:19 +0000 Subject: [PATCH 8/8] Fix unit test --- tests/test_redis_ut.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index 1acffa701..f7634c867 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -586,16 +586,16 @@ def thread_listen(ret): client.flushdb() # Init table data - config_db.set_entry(table_name_1, test_key, test_data_1) - config_db.set_entry(table_name_2, test_key, test_data_2) + config_db.set_entry(table_name_1, test_key, test_data) + config_db.set_entry(table_name_2, test_key, test_data) thread = multiprocessing.Process(target=thread_listen, args=(ret_data,)) thread.start() time.sleep(5) thread.terminate() - assert ret_data[table_name_1] == {test_key: test_data_1} - assert ret_data[table_name_2] == {test_key: test_data_2} + assert ret_data[table_name_1] == {test_key: test_data} + assert ret_data[table_name_2] == {test_key: test_data} def test_DBConnectFailure():