Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Zhaohui Sun <zhaohuisun@microsoft.com>
  • Loading branch information
ZhaohuiS committed Dec 23, 2024
1 parent 821ae80 commit 16776fb
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 114 deletions.
209 changes: 99 additions & 110 deletions scripts/caclmgrd
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,6 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
while True:
# Sleep for our delay interval
time.sleep(self.UPDATE_DELAY_SECS)
self.log_info("After delay {}s, checking for ACL table changes in namespace '{}'".format(self.UPDATE_DELAY_SECS, namespace))

with self.lock[namespace]:
if self.num_changes[namespace] > num_changes:
Expand All @@ -897,14 +896,8 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
except Exception as e:
self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
exc_info = traceback.format_exc()
exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue
self.log_error("Exiting thread {}, put it into exception_queue {}".format(
threading.current_thread().getName(), exception_queue))
full_traceback = traceback.format_exception(exc_type, exc_value, exc_traceback)
exception_queue.put((namespace, repr(e), full_traceback)) # Add the exception to the queue
finally:
new_config_db_connector.close("CONFIG_DB")

Expand Down Expand Up @@ -1047,121 +1040,117 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

# Get the ACL rule table seprator
acl_rule_table_seprator = subscribe_acl_rule_table.getTableNameSeparator()
try:
# Loop on select to see if any event happen on state db or config db of any namespace
while True:
# Periodically check for exceptions from child threads
try:
namespace, error, _ = exception_queue.get_nowait() # Non-blocking
self.log_error(f"Exception in namespace '{namespace}': {error}")
self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName()))
os.kill(os.getpid(), signal.SIGKILL)
except Empty:
# No exceptions in the queue
pass
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue

# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)
# Loop on select to see if any event happen on state db or config db of any namespace
while True:
# Periodically check for exceptions from child threads
try:
namespace, error, exc_info = exception_queue.get_nowait() # Non-blocking
self.log_error("Exception in namespace '{}': {}".format(namespace, error))
self.log_error("Full traceback from child thread:")
import pdb; pdb.set_trace()
for tb_line in exc_info:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName()))
os.kill(os.getpid(), signal.SIGKILL)
except Empty:
# No exceptions in the queue
pass
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue

# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

# Get the corresponding namespace and db_id from redisselect
namespace = redisSelectObj.getDbConnector().getNamespace()
db_id = redisSelectObj.getDbConnector().getDbId()
# Get the corresponding namespace and db_id from redisselect
namespace = redisSelectObj.getDbConnector().getNamespace()
db_id = redisSelectObj.getDbConnector().getDbId()

if db_id == state_db_id:
if db_id == state_db_id:
while True:
key, op, fvs = subscribe_bfd_session.pop()
if not key:
break

if op == 'SET' and not self.bfdAllowed:
self.allow_bfd_protocol(namespace)
self.bfdAllowed = True
sel.removeSelectable(subscribe_bfd_session)

if self.DualToR:
'''dhcp packet mark update'''
while True:
key, op, fvs = subscribe_bfd_session.pop()
key, op, fvs = subscribe_dhcp_packet_mark.pop()
if not key:
break
self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs)))

if op == 'SET' and not self.bfdAllowed:
self.allow_bfd_protocol(namespace)
self.bfdAllowed = True
sel.removeSelectable(subscribe_bfd_session)

if self.DualToR:
'''dhcp packet mark update'''
while True:
key, op, fvs = subscribe_dhcp_packet_mark.pop()
if not key:
break
self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs)))

'''initial value is None'''
pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
cur_mark = None if op == 'DEL' else dict(fvs)['mark']
dhcp_packet_mark_tbl[key] = cur_mark
self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark)

'''mux cable update'''
while True:
key, op, fvs = subscribe_mux_cable.pop()
if not key:
break
self.log_info("mux cable update : '%s'" % str((key, op, fvs)))

mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
self.update_dhcp_acl(key, op, dict(fvs), mark)
continue
'''initial value is None'''
pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
cur_mark = None if op == 'DEL' else dict(fvs)['mark']
dhcp_packet_mark_tbl[key] = cur_mark
self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark)

if db_id == config_db_id:
'''mux cable update'''
while True:
key, op, fvs = subscribe_vxlan_table.pop()
key, op, fvs = subscribe_mux_cable.pop()
if not key:
break
if op == 'SET' and not self.VxlanAllowed:
self.allow_vxlan_port(namespace, fvs)
elif op == 'DEL' and self.VxlanAllowed:
self.block_vxlan_port(namespace)
self.log_info("mux cable update : '%s'" % str((key, op, fvs)))

ctrl_plane_acl_notification = set()
mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
self.update_dhcp_acl(key, op, dict(fvs), mark)
continue

# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
(key, op, fvp) = table.pop()
# Pop of table that does not have data so break
if key == '':
break
# ACL Table notification. We will take Control Plane ACTION for any ACL Table Event
# This can be optimize further but we should not have many acl table set/del events in normal
# scenario
if acl_rule_table_seprator not in key:
if db_id == config_db_id:
while True:
key, op, fvs = subscribe_vxlan_table.pop()
if not key:
break
if op == 'SET' and not self.VxlanAllowed:
self.allow_vxlan_port(namespace, fvs)
elif op == 'DEL' and self.VxlanAllowed:
self.block_vxlan_port(namespace)

ctrl_plane_acl_notification = set()

# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
(key, op, fvp) = table.pop()
# Pop of table that does not have data so break
if key == '':
break
# ACL Table notification. We will take Control Plane ACTION for any ACL Table Event
# This can be optimize further but we should not have many acl table set/del events in normal
# scenario
if acl_rule_table_seprator not in key:
ctrl_plane_acl_notification.add(namespace)
# Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane
else:
acl_table = key.split(acl_rule_table_seprator)[0]
if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE:
ctrl_plane_acl_notification.add(namespace)
# Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane
else:
acl_table = key.split(acl_rule_table_seprator)[0]
if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE:
ctrl_plane_acl_notification.add(namespace)

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace], exception_queue))
self.update_thread[namespace].start()
except Exception as e:
self.log_error("Exception occured at main thread due to {}".format(repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
self.log_error("Catch exception in main thread, generating SIGKILL for main thread")
os.kill(os.getpid(), signal.SIGKILL)

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace], exception_queue))
self.update_thread[namespace].start()

# ============================= Functions =============================

Expand Down
13 changes: 9 additions & 4 deletions tests/caclmgrd/caclmgrd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import threading
import sys
from queue import Queue

import queue

DBCONFIG_PATH = "/var/run/redis/sonic-db/database_config.json"

Expand Down Expand Up @@ -113,7 +113,9 @@ def test_run(
("key5", "SET", [("mark", "0x11"), ("field4", "value4")]),
('', None, None),
("key6", "SET", [("mark", "0x11"), ("field5", "value5")]),
(None, None, None),
('', None, None),
("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))),
('', None, None),
("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))),
('', None, None),
]
Expand Down Expand Up @@ -153,8 +155,11 @@ def test_run(
manager.update_dhcp_acl_for_mark_change = MagicMock()
manager.update_dhcp_acl = MagicMock()
manager.setup_dhcp_chain = MagicMock()

manager.run()
try:
manager.run()
except StopIteration as e:
# This is expected to happen
pass

# Asserting the method calls
manager.update_control_plane_acls.assert_called()
Expand Down

0 comments on commit 16776fb

Please sign in to comment.