Skip to content

Commit

Permalink
Add API endpoints to ConfigDBConnector to support pre-loading data wi…
Browse files Browse the repository at this point in the history
…thout blackout (#587)

Currently if someone wishes to use ConfigDBConnector to operate on updates coming from CONFIG_DB AND they wish to pull all the initial data from the table to initialize their only option is to pull the data and then run the `listen()` method on ConfigDBConnector.

This creates a "blackout" period where between the time the tables are downloaded and the `listen()` method is called, any new updates to CONFIG_DB will not be handled.

To resolve this I have changed two things in ConfigDBConnector. All of which are 100% backwards compatible...
1. I split `listen()` into `listen()` and `process()` where `listen()` may now be called with `start=False` as an argument which does not start immediately processing updates from CONFIG_DB and calling handlers. This allows you to call `listen()` *first* and then download table data and then call `process()` to start processing updates. This way no updates will be missed as they will be queued by redis while the table data is being downloaded by the consumer. 
2. I added a `cache` argument to `process` in which you pass any initial table data your system is operating on. When a table update is processed the system checks if that keys value is different than what is stored in the cache (if it is present) and only fires the callback handler in the case that the data has changes (added / deleted / updated). This prevents the same data from being processed multiple times.
  • Loading branch information
alexrallen authored Mar 30, 2022
1 parent 556f4ad commit 36e1f61
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
32 changes: 29 additions & 3 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native

## Note: callback is difficult to implement by SWIG C++, so keep in python
self.handlers = {}
self.fire_init_data = {}

@property
def KEY_SEPARATOR(self):
Expand All @@ -71,10 +72,28 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
return self.getDbName()

## Note: callback is difficult to implement by SWIG C++, so keep in python
def listen(self):
## Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes.
def listen(self, init_data_handler=None):
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
self.pubsub = self.get_redis_client(self.db_name).pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))

# Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications
init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]}

# Function to send initial data as series of updates through individual table callback handlers
def load_data(tbl, data):
if self.fire_init_data[tbl]:
for row, x in data.items():
self.__fire(tbl, row, x)
return False
return True

init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}

# Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller
if init_data_handler:
init_data_handler(init_callback_data)

while True:
item = self.pubsub.listen_message()
if item['type'] == 'pmessage':
Expand All @@ -84,6 +103,12 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
if table in self.handlers:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if table in init_data and row in init_data[table]:
cache_hit = init_data[table][row] == data
del init_data[table][row]
if not init_data[table]:
del init_data[table]
if cache_hit: continue
self.__fire(table, row, data)
except ValueError:
pass #Ignore non table-formated redis entries
Expand Down Expand Up @@ -153,8 +178,9 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
handler = self.handlers[table]
handler(table, key, data)

def subscribe(self, table, handler):
def subscribe(self, table, handler, fire_init_data=False):
self.handlers[table] = handler
self.fire_init_data[table] = fire_init_data

def unsubscribe(self, table):
if table in self.handlers:
Expand Down
45 changes: 45 additions & 0 deletions tests/test_redis_ut.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
import pytest
import multiprocessing
from threading import Thread
from pympler.tracker import SummaryTracker
from swsscommon import swsscommon
Expand Down Expand Up @@ -621,6 +622,50 @@ def thread_coming_entry():
config_db.unsubscribe(table_name)
assert table_name not in config_db.handlers

def test_ConfigDBInit():
table_name_1 = 'TEST_TABLE_1'
table_name_2 = 'TEST_TABLE_2'
test_key = 'key1'
test_data = {'field1': 'value1'}
test_data_update = {'field1': 'value2'}

manager = multiprocessing.Manager()
ret_data = manager.dict()

def test_handler(table, key, data, ret):
ret[table] = {key: data}

def test_init_handler(data, ret):
ret.update(data)

def thread_listen(ret):
config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)

config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret),
fire_init_data=False)
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret),
fire_init_data=True)

config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret))

config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)
client = config_db.get_redis_client(config_db.CONFIG_DB)
client.flushdb()

# Init table data
config_db.set_entry(table_name_1, test_key, test_data)
config_db.set_entry(table_name_2, test_key, test_data)

thread = multiprocessing.Process(target=thread_listen, args=(ret_data,))
thread.start()
time.sleep(5)
thread.terminate()

assert ret_data[table_name_1] == {test_key: test_data}
assert ret_data[table_name_2] == {test_key: test_data}


def test_DBConnectFailure():
""" Verify that a DB connection failure will not cause a process abort
Expand Down

0 comments on commit 36e1f61

Please sign in to comment.