Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the issue of ignoring callback calls for removed keys. #789

Merged
merged 4 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
## Dynamic typed functions used in python
@staticmethod
def raw_to_typed(raw_data):
if raw_data is None:
if not raw_data or not raw_data.keys():
return None
typed_data = {}
for raw_key in raw_data:
Expand Down
45 changes: 34 additions & 11 deletions tests/test_redis_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,29 +634,40 @@ def thread_coming_entry():
def test_ConfigDBInit():
table_name_1 = 'TEST_TABLE_1'
table_name_2 = 'TEST_TABLE_2'
table_name_3 = 'TEST_TABLE_3'
test_key = 'key1'
test_data = {'field1': 'value1'}
test_data_update = {'field1': 'value2'}

queue = multiprocessing.Queue()

manager = multiprocessing.Manager()
ret_data = manager.dict()

def test_handler(table, key, data, ret):
ret[table] = {key: data}
def test_handler(table, key, data, ret, q=None):
if data is None:
ret[table] = {k: v for k, v in ret[table].items() if k != key}
else:
ret[table] = {key: data}

if q:
q.put(ret[table])

def test_init_handler(data, ret):
def test_init_handler(data, ret, queue):
ret.update(data)
queue.put(ret)

def thread_listen(ret):
def thread_listen(ret, queue):
config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)

config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret),
fire_init_data=False)
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret),
fire_init_data=True)
config_db.subscribe(table_name_3, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)

config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret))
config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret, queue))

config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)
Expand All @@ -666,14 +677,26 @@ def thread_listen(ret):
# Init table data
config_db.set_entry(table_name_1, test_key, test_data)
config_db.set_entry(table_name_2, test_key, test_data)
config_db.set_entry(table_name_3, test_key, {})

thread = multiprocessing.Process(target=thread_listen, args=(ret_data,))
thread = multiprocessing.Process(target=thread_listen, args=(ret_data, queue))
thread.start()
time.sleep(5)
thread.terminate()

assert ret_data[table_name_1] == {test_key: test_data}
assert ret_data[table_name_2] == {test_key: test_data}
init_data = queue.get(5)

# Verify that all tables initialized correctly
assert init_data[table_name_1] == {test_key: test_data}
assert init_data[table_name_2] == {test_key: test_data}
assert init_data[table_name_3] == {test_key: {}}

# Remove the entry (with no attributes) from the table.
# Verify that the update is received and a callback is called
config_db.set_entry(table_name_3, test_key, None)

table_3_data = queue.get(5)
assert test_key not in table_3_data

thread.terminate()


def test_DBConnectFailure():
Expand Down