From edd6b847e9b4100e4daf5417d3230de15669f942 Mon Sep 17 00:00:00 2001 From: Vivek Reddy Date: Mon, 22 Nov 2021 21:57:07 -0800 Subject: [PATCH] [hostcfgd] [202012] Fixed the brief blackout in hostcfgd using SubscriberStateTable (#9228) #### Why I did it Backporting https://github.com/Azure/sonic-buildimage/pull/8861 to 202012 --- src/sonic-host-services/scripts/hostcfgd | 459 ++++++++++-------- src/sonic-host-services/setup.py | 3 +- .../tests/common/mock_configdb.py | 85 ++++ .../tests/hostcfgd/hostcfgd_test.py | 187 ++++++- .../tests/hostcfgd/test_vectors.py | 59 +++ 5 files changed, 562 insertions(+), 231 deletions(-) diff --git a/src/sonic-host-services/scripts/hostcfgd b/src/sonic-host-services/scripts/hostcfgd index 06506f483eb9..a14d4cf026c6 100755 --- a/src/sonic-host-services/scripts/hostcfgd +++ b/src/sonic-host-services/scripts/hostcfgd @@ -4,12 +4,15 @@ import ast import copy import ipaddress import os +import sys import subprocess import syslog +import signal import jinja2 from sonic_py_common import device_info -from swsscommon.swsscommon import ConfigDBConnector +from swsscommon.swsscommon import SubscriberStateTable, DBConnector, Select +from swsscommon.swsscommon import ConfigDBConnector, TableConsumable # FILE PAM_AUTH_CONF = "/etc/pam.d/common-auth-sonic" @@ -23,6 +26,34 @@ TACPLUS_SERVER_PASSKEY_DEFAULT = "" TACPLUS_SERVER_TIMEOUT_DEFAULT = "5" TACPLUS_SERVER_AUTH_TYPE_DEFAULT = "pap" +# MISC Constants +CFG_DB = "CONFIG_DB" +HOSTCFGD_MAX_PRI = 10 # Used to enforce ordering b/w daemons under Hostcfgd +DEFAULT_SELECT_TIMEOUT = 1000 + + +def safe_eval(val, default_value=False): + """ Safely evaluate the expression, without raising an exception """ + try: + ret = ast.literal_eval(val) + except ValueError: + ret = default_value + return ret + + +def signal_handler(sig, frame): + if sig == signal.SIGHUP: + syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGHUP' is caught and ignoring..") + elif sig == signal.SIGINT: + syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGINT' is caught and exiting...") + sys.exit(128 + sig) + elif sig == signal.SIGTERM: + syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGTERM' is caught and exiting...") + sys.exit(128 + sig) + else: + syslog.syslog(syslog.LOG_INFO, "HostCfgd: invalid signal - ignoring..") + + def run_cmd(cmd, log_err = True): try: subprocess.check_call(cmd, shell = True) @@ -31,6 +62,7 @@ def run_cmd(cmd, log_err = True): syslog.syslog(syslog.LOG_ERR, "{} - failed: return code - {}, output:\n{}" .format(err.cmd, err.returncode, err.output)) + def is_true(val): if val == 'True' or val == 'true': return True @@ -74,10 +106,6 @@ class Iptables(object): ''' return (isinstance(key, tuple)) - def load(self, lpbk_table): - for row in lpbk_table: - self.iptables_handler(row, lpbk_table[row]) - def command(self, chain, ip, ver, op): cmd = 'iptables' if ver == '4' else 'ip6tables' cmd += ' -t mangle --{} {} -p tcp --tcp-flags SYN SYN'.format(op, chain) @@ -128,11 +156,8 @@ class Iptables(object): for cmd in iptables_cmds: syslog.syslog(syslog.LOG_INFO, "Running cmd - {}".format(cmd)) - try: - subprocess.check_call(cmd, shell=True) - except subprocess.CalledProcessError as err: - syslog.syslog(syslog.LOG_ERR, "'{}' failed. RC: {}, output: {}" - .format(err.cmd, err.returncode, err.output)) + run_cmd(cmd) + class AaaCfg(object): def __init__(self): @@ -248,19 +273,17 @@ class KdumpCfg(object): "num_dumps": "3" } def load(self, kdump_table): - syslog.syslog(syslog.LOG_INFO, "KdumpCfg load ...") - data = {} + """ + Set the KDUMP table in CFG DB to kdump_defaults if not set by the user + """ + syslog.syslog(syslog.LOG_INFO, "KdumpCfg init...") kdump_conf = kdump_table.get("config", {}) for row in self.kdump_defaults: value = self.kdump_defaults.get(row) - if kdump_conf.get(row) is not None: - value = kdump_conf.get(row) - else: + if not kdump_conf.get(row): self.config_db.mod_entry("KDUMP", "config", { row : value}) - data[row] = value - self.kdump_update("config", data, True) - def kdump_update(self, key, data, isLoad): + def kdump_update(self, key, data): syslog.syslog(syslog.LOG_INFO, "Kdump global configuration update") if key == "config": # Admin mode @@ -280,127 +303,109 @@ class KdumpCfg(object): memory = self.kdump_defaults["memory"] if data.get("memory") is not None: memory = data.get("memory") - if isLoad or data.get("memory") is not None: + if data.get("memory") is not None: run_cmd("sonic-kdump-config --memory " + memory) # Num dumps num_dumps = self.kdump_defaults["num_dumps"] if data.get("num_dumps") is not None: num_dumps = data.get("num_dumps") - if isLoad or data.get("num_dumps") is not None: + if data.get("num_dumps") is not None: run_cmd("sonic-kdump-config --num_dumps " + num_dumps) + class NtpCfg(object): - def __init__(self, CfgDb): - self.config_db = CfgDb + """ + NtpCfg Config Daemon + 1) ntp-config.service handles the configuration updates and then starts ntp.service + 2) Both of them start after all the feature services start + 3) Purpose of this daemon is to propagate runtime config changes in + NTP, NTP_SERVER and LOOPBACK_INTERFACE + """ + def __init__(self): self.ntp_global = {} - self.has_ntp_servers = False + self.ntp_servers = set() - def load(self, ntp_global_conf, ntp_server_conf): - syslog.syslog(syslog.LOG_INFO, "NtpCfg load ...") - - for row in ntp_global_conf: - self.ntp_global_update(row, ntp_global_conf[row], True) - - self.ntp_server_update(0, ntp_server_conf, True) - - def handle_ntp_source_intf_chg (self, key): + def handle_ntp_source_intf_chg(self, intf_name): # if no ntp server configured, do nothing - if self.has_ntp_servers == False: + if not self.ntp_servers: return # check only the intf configured as source interface - if (len(self.ntp_global) == 0): - return - - if 'src_intf' not in self.ntp_global: - return - - if key[0] != self.ntp_global['src_intf']: + if intf_name not in self.ntp_global.get('src_intf', '').split(';'): return else: # just restart ntp config cmd = 'systemctl restart ntp-config' run_cmd(cmd) - def ntp_global_update(self, key, data, isLoad): - syslog.syslog(syslog.LOG_INFO, "ntp global configuration update") - - new_src = new_vrf = orig_src = orig_vrf = "" - - if 'src_intf' in data: - new_src = data['src_intf'] - - if 'vrf' in data: - new_vrf = data['vrf'] - - if (len(self.ntp_global) != 0): - - if 'src_intf' in self.ntp_global: - orig_src = self.ntp_global['src_intf'] - - if 'vrf' in self.ntp_global: - orig_vrf = self.ntp_global['vrf'] - + def ntp_global_update(self, key, data): + syslog.syslog(syslog.LOG_INFO, 'NTP GLOBAL Update') + orig_src = self.ntp_global.get('src_intf', '') + orig_src_set = set(orig_src.split(";")) + orig_vrf = self.ntp_global.get('vrf', '') + + new_src = data.get('src_intf', '') + new_src_set = set(new_src.split(";")) + new_vrf = data.get('vrf', '') + + # Update the Local Cache self.ntp_global = data - # during initial load of ntp configuration, ntp server configuration decides if to restart ntp-config - if (isLoad): - syslog.syslog(syslog.LOG_INFO, "ntp global update in load") - return - # check if ntp server configured, if not, do nothing - if self.has_ntp_servers == False: - syslog.syslog(syslog.LOG_INFO, "no ntp server when global config change, do nothing") - return + if not self.ntp_servers: + syslog.syslog(syslog.LOG_INFO, "No ntp server when global config change, do nothing") + return - if (new_src != orig_src): + if orig_src_set != new_src_set: syslog.syslog(syslog.LOG_INFO, "ntp global update for source intf old {} new {}, restarting ntp-config" - .format(orig_src, new_src)) + .format(orig_src_set, new_src_set)) cmd = 'systemctl restart ntp-config' run_cmd(cmd) - else: - if (new_vrf != orig_vrf): - syslog.syslog(syslog.LOG_INFO, "ntp global update for vrf old {} new {}, restarting ntp service" - .format(orig_vrf, new_vrf)) - cmd = 'service ntp restart' - run_cmd(cmd) - - def ntp_server_update(self, key, data, isLoad): - syslog.syslog(syslog.LOG_INFO, 'ntp server update key {} data {}'.format(key, data)) - - # during load, restart ntp-config regardless if ntp server is configured or not - if isLoad == True: - if data != {}: - self.has_ntp_servers = True - else: - # for runtime ntp server change, to determine if there is ntp server configured, need to - # get from configDB, as delete triggers 2 event handling - ntp_servers_tbl = self.config_db.get_table('NTP_SERVER') - if ntp_servers_tbl != {}: - self.has_ntp_servers = True - else: - self.has_ntp_servers = False + elif new_vrf != orig_vrf: + syslog.syslog(syslog.LOG_INFO, "ntp global update for vrf old {} new {}, restarting ntp service" + .format(orig_vrf, new_vrf)) + cmd = 'service ntp restart' + run_cmd(cmd) - cmd = 'systemctl restart ntp-config' - syslog.syslog(syslog.LOG_INFO, 'ntp server update, restarting ntp-config, ntp server exists {}'.format(self.has_ntp_servers)) + def ntp_server_update(self, key, op): + syslog.syslog(syslog.LOG_INFO, 'ntp server update key {}'.format(key)) - run_cmd(cmd) + restart_config = False + if op == "SET" and key not in self.ntp_servers: + restart_config = True + self.ntp_servers.add(key) + elif op == "DEL" and key in self.ntp_servers: + restart_config = True + self.ntp_servers.remove(key) + + if restart_config: + cmd = 'systemctl restart ntp-config' + syslog.syslog(syslog.LOG_INFO, 'ntp server update, restarting ntp-config, ntp servers configured {}'.format(self.ntp_servers)) + run_cmd(cmd) class HostConfigDaemon: def __init__(self): + # Just a sanity check to verify if the CONFIG_DB has been initialized + # before moving forward self.config_db = ConfigDBConnector() self.config_db.connect(wait_for_init=True, retry_on=True) + self.dbconn = DBConnector(CFG_DB, 0) + self.selector = Select() syslog.syslog(syslog.LOG_INFO, 'ConfigDB connect success') + self.select = Select() + self.callbacks = dict() + self.subscriber_map = dict() + # Load DEVICE metadata configurations self.device_config = {} self.device_config['DEVICE_METADATA'] = self.config_db.get_table('DEVICE_METADATA') self.aaacfg = AaaCfg() self.iptables = Iptables() - self.ntpcfg = NtpCfg(self.config_db) + self.ntpcfg = NtpCfg() # Cache the values of 'state' field in 'FEATURE' table of each container self.cached_feature_states = {} @@ -410,34 +415,16 @@ class HostConfigDaemon: self.kdumpCfg = KdumpCfg(self.config_db) self.kdumpCfg.load(self.config_db.get_table('KDUMP')) - def load(self): aaa = self.config_db.get_table('AAA') tacacs_global = self.config_db.get_table('TACPLUS') tacacs_server = self.config_db.get_table('TACPLUS_SERVER') self.aaacfg.load(aaa, tacacs_global, tacacs_server) - lpbk_table = self.config_db.get_table('LOOPBACK_INTERFACE') - self.iptables.load(lpbk_table) - - # Load NTP configurations - ntp_server = self.config_db.get_table('NTP_SERVER') - ntp_global = self.config_db.get_table('NTP') - self.ntpcfg.load(ntp_global, ntp_server) - - def get_target_state(self, feature_name, state): - template = jinja2.Template(state) - target_state = template.render(self.device_config) - entry = self.config_db.get_entry('FEATURE', feature_name) - entry["state"] = target_state - self.config_db.set_entry("FEATURE", feature_name, entry) - - return target_state - - def get_feature_attribute(self, feature_name, feature_table): - has_timer = ast.literal_eval(feature_table[feature_name].get('has_timer', 'False')) - has_global_scope = ast.literal_eval(feature_table[feature_name].get('has_global_scope', 'True')) - has_per_asic_scope = ast.literal_eval(feature_table[feature_name].get('has_per_asic_scope', 'False')) + def get_feature_attribute(self, feature_name, feature_cfg): + has_timer = safe_eval(feature_cfg.get('has_timer', 'False')) + has_global_scope = safe_eval(feature_cfg.get('has_global_scope', 'True')) + has_per_asic_scope = safe_eval(feature_cfg.get('has_per_asic_scope', 'False')) # Create feature name suffix depending feature is running in host or namespace or in both feature_names = ( @@ -514,119 +501,135 @@ class HostConfigDaemon: syslog.syslog(syslog.LOG_ERR, "Feature '{}' failed to be stopped and disabled".format(feature_name)) return - def is_invariant_feature(self, feature_name, state, feature_table): - invariant_feature = self.cached_feature_states[feature_name] == "always_enabled" or \ - self.cached_feature_states[feature_name] == "always_disabled" - if invariant_feature: - invariant_state = self.cached_feature_states[feature_name] - if state != invariant_state: - syslog.syslog(syslog.LOG_INFO, "Feature '{}' service is '{}'" - .format(feature_name, invariant_state)) - entry = self.config_db.get_entry('FEATURE', feature_name) - entry['state'] = invariant_state - self.config_db.set_entry('FEATURE', feature_name, entry) - - if state == "always_disabled": - feature_names, feature_suffixes = self.get_feature_attribute(feature_name, feature_table) - self.disable_feature(feature_names, feature_suffixes) - syslog.syslog(syslog.LOG_INFO, "Feature '{}' is stopped and disabled".format(feature_name)) - - return invariant_feature - - def update_feature_state(self, feature_name, state, feature_table): - if not self.is_invariant_feature(feature_name, state, feature_table): - self.cached_feature_states[feature_name] = state - - feature_names, feature_suffixes = self.get_feature_attribute(feature_name, feature_table) - if state == "enabled": - self.enable_feature(feature_names, feature_suffixes) - syslog.syslog(syslog.LOG_INFO, "Feature '{}.{}' is enabled and started" - .format(feature_name, feature_suffixes[-1])) - elif state == "disabled": - self.disable_feature(feature_names, feature_suffixes) - syslog.syslog(syslog.LOG_INFO, "Feature '{}' is stopped and disabled".format(feature_name)) - else: - syslog.syslog(syslog.LOG_ERR, "Unexpected state value '{}' for feature '{}'" - .format(state, feature_name)) + def update_feature_state(self, feature_name, state, feature_cfg): + cached_feature_state = self.cached_feature_states.get(feature_name, None) + enable = False + disable = False + + # Allowed transitions: + # None -> always_enabled + # -> always_disabled + # -> enabled + # -> disabled + # always_enabled -> always_disabled + # enabled -> disabled + # disabled -> enabled + if cached_feature_state is None: + enable = state in ("always_enabled", "enabled") + disable = state in ("always_disabled", "disabled") + elif cached_feature_state in ("always_enabled", "always_disabled"): + disable = state == "always_disabled" + enable = state == "always_enabled" + elif cached_feature_state in ("enabled", "disabled"): + enable = state == "enabled" + disable = state == "disabled" + else: + syslog.syslog(syslog.LOG_INFO, "Feature {} service is {}".format(feature_name, cached_feature_state)) + return False - def update_all_feature_states(self): + if not enable and not disable: + syslog.syslog(syslog.LOG_ERR, "Unexpected state value '{}' for feature {}" + .format(state, feature_name)) + return False + + feature_names, feature_suffixes = self.get_feature_attribute(feature_name, feature_cfg) + + if enable: + self.enable_feature(feature_names, feature_suffixes) + syslog.syslog(syslog.LOG_INFO, "Feature {} is enabled and started".format(feature_name)) + + if disable: + self.disable_feature(feature_names, feature_suffixes) + syslog.syslog(syslog.LOG_INFO, "Feature {} is stopped and disabled".format(feature_name)) + + return True + + def render_all_feature_states(self): + """ + Render the Template (if any) for the state field of the FEATURE Table. + Update the rendered state in the config db + """ feature_table = self.config_db.get_table('FEATURE') for feature_name in feature_table: if not feature_name: syslog.syslog(syslog.LOG_WARNING, "Feature is None") continue - state = feature_table[feature_name]['state'] + state = feature_table.get(feature_name, {}).get('state', '') if not state: syslog.syslog(syslog.LOG_WARNING, "Enable state of feature '{}' is None".format(feature_name)) continue - target_state = self.get_target_state(feature_name, state) - # Store the initial value of 'state' field in 'FEATURE' table of a specific container - self.cached_feature_states[feature_name] = target_state + self.set_target_state(feature_name, state) + + def set_target_state(self, feature_name, state): + template = jinja2.Template(state) + target_state = template.render(self.device_config) + entry = self.config_db.get_entry('FEATURE', feature_name) + entry["state"] = target_state + self.config_db.set_entry("FEATURE", feature_name, entry) - self.update_feature_state(feature_name, target_state, feature_table) + def __get_intf_name(self, key): + if isinstance(key, tuple) and key: + intf = key[0] + else: + intf = key + return intf - def aaa_handler(self, key, data): + def aaa_handler(self, key, op, data): self.aaacfg.aaa_update(key, data) + syslog.syslog(syslog.LOG_INFO, 'AAA Update: key: {}, op: {}, data: {}'.format(key, op, data)) - def tacacs_server_handler(self, key, data): + def tacacs_server_handler(self, key, op, data): self.aaacfg.tacacs_server_update(key, data) log_data = copy.deepcopy(data) if 'passkey' in log_data: log_data['passkey'] = obfuscate(log_data['passkey']) - syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data)) + syslog.syslog(syslog.LOG_INFO, 'TACPLUS_SERVER update: key: {}, op: {}, data: {}'.format(key, op, log_data)) - def tacacs_global_handler(self, key, data): + def tacacs_global_handler(self, key, op, data): self.aaacfg.tacacs_global_update(key, data) log_data = copy.deepcopy(data) if 'passkey' in log_data: log_data['passkey'] = obfuscate(log_data['passkey']) - syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data)) + syslog.syslog(syslog.LOG_INFO, 'TACPLUS Global update: key: {}, op: {}, data: {}'.format(key, op, log_data)) - def lpbk_handler(self, key, data): + def lpbk_handler(self, key, op, data): key = ConfigDBConnector.deserialize_key(key) - # Check if delete operation by fetch existing keys - keys = self.config_db.get_keys('LOOPBACK_INTERFACE') - if key in keys: - add = True - else: + if op == "DEL": add = False - + else: + add = True + self.iptables.iptables_handler(key, data, add) - self.ntpcfg.handle_ntp_source_intf_chg(key) - - def feature_state_handler(self, key, data): - feature_name = key - feature_table = self.config_db.get_table('FEATURE') - if feature_name not in feature_table: - syslog.syslog(syslog.LOG_WARNING, "Feature '{}' not in FEATURE table".format(feature_name)) - return - - state = feature_table[feature_name]['state'] + lpbk_name = self.__get_intf_name(key) + self.ntpcfg.handle_ntp_source_intf_chg(lpbk_name) + + def feature_state_handler(self, feature_name, op, feature_cfg): + if not feature_cfg: + syslog.syslog(syslog.LOG_WARNING, "Deregistering feature {}".format(feature_name)) + self.cached_feature_states.pop(feature_name) + + state = feature_cfg.get("state", "") if not state: syslog.syslog(syslog.LOG_WARNING, "Enable state of feature '{}' is None".format(feature_name)) return - self.cached_feature_states.setdefault(feature_name, 'disabled') - # Enable/disable the container service if the feature state was changed from its previous state. - if self.cached_feature_states[feature_name] != state: - self.update_feature_state(feature_name, state, feature_table) + if self.cached_feature_states.get(feature_name, "") != state: + self.update_feature_state(feature_name, state, feature_cfg) + + self.cached_feature_states[feature_name] = state - def ntp_server_handler (self, key, data): - syslog.syslog(syslog.LOG_INFO, 'NTP server handler...') - ntp_server_db = self.config_db.get_table('NTP_SERVER') - data = ntp_server_db - self.ntpcfg.ntp_server_update(key, data, False) + def ntp_server_handler (self, key, op, data): + self.ntpcfg.ntp_server_update(key, op) - def ntp_global_handler (self, key, data): - syslog.syslog(syslog.LOG_INFO, 'NTP global handler...') - self.ntpcfg.ntp_global_update(key, data, False) + def ntp_global_handler (self, key, op, data): + self.ntpcfg.ntp_global_update(key, data) - def kdump_handler (self, key, data): + def kdump_handler (self, key, op, data): syslog.syslog(syslog.LOG_INFO, 'Kdump handler...') - self.kdumpCfg.kdump_update(key, data, False) + self.kdumpCfg.kdump_update(key, data) def wait_till_system_init_done(self): @@ -635,36 +638,70 @@ class HostConfigDaemon: systemctl_cmd = "sudo systemctl is-system-running --wait --quiet" subprocess.call(systemctl_cmd, shell=True) - def start(self): - - self.config_db.subscribe('AAA', lambda table, key, data: self.aaa_handler(key, data)) - self.config_db.subscribe('TACPLUS_SERVER', lambda table, key, data: self.tacacs_server_handler(key, data)) - self.config_db.subscribe('TACPLUS', lambda table, key, data: self.tacacs_global_handler(key, data)) - self.config_db.subscribe('LOOPBACK_INTERFACE', lambda table, key, data: self.lpbk_handler(key, data)) - self.config_db.subscribe('FEATURE', lambda table, key, data: self.feature_state_handler(key, data)) - self.config_db.subscribe('NTP_SERVER', lambda table, key, data: self.ntp_server_handler(key, data)) - self.config_db.subscribe('NTP', lambda table, key, data: self.ntp_global_handler(key, data)) - self.config_db.subscribe('KDUMP', lambda table, key, data: self.kdump_handler(key, data)) - + def subscribe(self, table, callback, pri): + try: + if table not in self.callbacks: + self.callbacks[table] = [] + subscriber = SubscriberStateTable(self.dbconn, table, TableConsumable.DEFAULT_POP_BATCH_SIZE, pri) + self.selector.addSelectable(subscriber) # Add to the Selector + self.subscriber_map[subscriber.getFd()] = (subscriber, table) # Maintain a mapping b/w subscriber & fd + + self.callbacks[table].append(callback) + except Exception as err: + syslog.syslog(syslog.LOG_ERR, "Subscribe to table {} failed with error {}".format(table, err)) + + def register_callbacks(self): + self.subscribe('KDUMP', lambda table, key, op, data: self.kdump_handler(key, op, data), HOSTCFGD_MAX_PRI) + # Handle FEATURE updates before other tables + self.subscribe('FEATURE', lambda table, key, op, data: self.feature_state_handler(key, op, data), HOSTCFGD_MAX_PRI-1) + # Handle AAA and TACACS related tables + self.subscribe('AAA', lambda table, key, op, data: self.aaa_handler(key, op, data), HOSTCFGD_MAX_PRI-2) + self.subscribe('TACPLUS_SERVER', lambda table, key, op, data: self.tacacs_server_handler(key, op, data), HOSTCFGD_MAX_PRI-2) + self.subscribe('TACPLUS', lambda table, key, op, data: self.tacacs_global_handler(key, op, data), HOSTCFGD_MAX_PRI-2) + self.subscribe('LOOPBACK_INTERFACE', lambda table, key, op, data: self.lpbk_handler(key, op, data), HOSTCFGD_MAX_PRI-2) + # Handle NTP & NTP_SERVER updates + self.subscribe('NTP_SERVER', lambda table, key, op, data: self.ntp_server_handler(key, op, data), HOSTCFGD_MAX_PRI-3) + self.subscribe('NTP', lambda table, key, op, data: self.ntp_global_handler(key, op, data), HOSTCFGD_MAX_PRI-3) + syslog.syslog(syslog.LOG_INFO, "Waiting for systemctl to finish initialization") self.wait_till_system_init_done() syslog.syslog(syslog.LOG_INFO, "systemctl has finished initialization -- proceeding ...") - # Update all feature states once upon starting - self.update_all_feature_states() - - # Defer load until subscribe - self.load() + def start(self): + while True: + state, selectable_ = self.selector.select(DEFAULT_SELECT_TIMEOUT) + if state == self.selector.TIMEOUT: + continue + elif state == self.selector.ERROR: + syslog.syslog(syslog.LOG_ERR, + "error returned by select") + continue - self.config_db.listen() + fd = selectable_.getFd() + # Get the Corresponding subscriber & table + subscriber, table = self.subscriber_map.get(fd, (None, "")) + if not subscriber: + syslog.syslog(syslog.LOG_ERR, + "No Subscriber object found for fd: {}, subscriber map: {}".format(fd, subscriber_map)) + continue + key, op, fvs = subscriber.pop() + # Get the registered callback + cbs = self.callbacks.get(table, None) + for callback in cbs: + callback(table, key, op, dict(fvs)) def main(): + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) daemon = HostConfigDaemon() + daemon.render_all_feature_states() + daemon.register_callbacks() + daemon.load() daemon.start() - if __name__ == "__main__": main() diff --git a/src/sonic-host-services/setup.py b/src/sonic-host-services/setup.py index 6d3add214fb8..0db779e8ddde 100644 --- a/src/sonic-host-services/setup.py +++ b/src/sonic-host-services/setup.py @@ -29,7 +29,8 @@ 'parameterized', 'pytest', 'pyfakefs', - 'sonic-py-common' + 'sonic-py-common', + 'deepdiff' ], classifiers = [ 'Development Status :: 3 - Alpha', diff --git a/src/sonic-host-services/tests/common/mock_configdb.py b/src/sonic-host-services/tests/common/mock_configdb.py index 0866c2f5abfe..ce955ea86d05 100644 --- a/src/sonic-host-services/tests/common/mock_configdb.py +++ b/src/sonic-host-services/tests/common/mock_configdb.py @@ -12,6 +12,14 @@ def __init__(self, **kwargs): def set_config_db(test_config_db): MockConfigDb.CONFIG_DB = test_config_db + @staticmethod + def deserialize_key(key, separator="|"): + tokens = key.split(separator) + if len(tokens) > 1: + return tuple(tokens) + else: + return key + @staticmethod def get_config_db(): return MockConfigDb.CONFIG_DB @@ -25,8 +33,85 @@ def get(self, db_id, key, field): def get_entry(self, key, field): return MockConfigDb.CONFIG_DB[key][field] + def mod_entry(self, key, field, data): + existing_data = self.get_entry(key, field) + existing_data.update(data) + self.set_entry(key, field, existing_data) + def set_entry(self, key, field, data): MockConfigDb.CONFIG_DB[key][field] = data def get_table(self, table_name): return MockConfigDb.CONFIG_DB[table_name] + +class MockSelect(): + + event_queue = [] + + @staticmethod + def set_event_queue(Q): + MockSelect.event_queue = Q + + @staticmethod + def get_event_queue(): + return MockSelect.event_queue + + @staticmethod + def reset_event_queue(): + MockSelect.event_queue = [] + + def __init__(self): + self.sub_map = {} + self.TIMEOUT = "TIMEOUT" + self.ERROR = "ERROR" + + def addSelectable(self, subscriber): + self.sub_map[subscriber.table] = subscriber + + def select(self, TIMEOUT): + if not MockSelect.get_event_queue(): + raise TimeoutError + table, key = MockSelect.get_event_queue().pop(0) + self.sub_map[table].nextKey(key) + return "OBJECT", self.sub_map[table] + + +class MockSubscriberStateTable(): + + FD_INIT = 0 + + @staticmethod + def generate_fd(): + curr = MockSubscriberStateTable.FD_INIT + MockSubscriberStateTable.FD_INIT = curr + 1 + return curr + + @staticmethod + def reset_fd(): + MockSubscriberStateTable.FD_INIT = 0 + + def __init__(self, conn, table, pop, pri): + self.fd = MockSubscriberStateTable.generate_fd() + self.next_key = '' + self.table = table + + def getFd(self): + return self.fd + + def nextKey(self, key): + self.next_key = key + + def pop(self): + table = MockConfigDb.CONFIG_DB.get(self.table, {}) + if self.next_key not in table: + op = "DEL" + fvs = {} + else: + op = "SET" + fvs = table.get(self.next_key, {}) + return self.next_key, op, fvs + + +class MockDBConnector(): + def __init__(self, db, val): + pass \ No newline at end of file diff --git a/src/sonic-host-services/tests/hostcfgd/hostcfgd_test.py b/src/sonic-host-services/tests/hostcfgd/hostcfgd_test.py index 52737df91c18..1608f79e5960 100644 --- a/src/sonic-host-services/tests/hostcfgd/hostcfgd_test.py +++ b/src/sonic-host-services/tests/hostcfgd/hostcfgd_test.py @@ -1,16 +1,20 @@ import os import sys -import swsscommon +import swsscommon as swsscommon_package +from swsscommon import swsscommon from parameterized import parameterized from sonic_py_common.general import load_module_from_source from unittest import TestCase, mock -from .test_vectors import HOSTCFGD_TEST_VECTOR -from tests.common.mock_configdb import MockConfigDb +from .test_vectors import HOSTCFGD_TEST_VECTOR, HOSTCFG_DAEMON_CFG_DB +from tests.common.mock_configdb import MockConfigDb, MockSubscriberStateTable +from tests.common.mock_configdb import MockSelect, MockDBConnector +from pyfakefs.fake_filesystem_unittest import patchfs +from deepdiff import DeepDiff +from unittest.mock import call -swsscommon.swsscommon.ConfigDBConnector = MockConfigDb test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) modules_path = os.path.dirname(test_path) scripts_path = os.path.join(modules_path, "scripts") @@ -19,7 +23,10 @@ # Load the file under test hostcfgd_path = os.path.join(scripts_path, 'hostcfgd') hostcfgd = load_module_from_source('hostcfgd', hostcfgd_path) - +hostcfgd.ConfigDBConnector = MockConfigDb +hostcfgd.SubscriberStateTable = MockSubscriberStateTable +hostcfgd.Select = MockSelect +hostcfgd.DBConnector = MockDBConnector class TestHostcfgd(TestCase): """ @@ -38,21 +45,13 @@ def __verify_table(self, table, expected_table): Returns: None """ - is_equal = len(table) == len(expected_table) - if is_equal: - for key, fields in expected_table.items(): - is_equal = is_equal and key in table and len(fields) == len(table[key]) - if is_equal: - for field, value in fields.items(): - is_equal = is_equal and value == table[key][field] - if not is_equal: - break; - else: - break - return is_equal + ddiff = DeepDiff(table, expected_table, ignore_order=True) + print('DIFF:', ddiff) + return True if not ddiff else False @parameterized.expand(HOSTCFGD_TEST_VECTOR) - def test_hostcfgd(self, test_name, test_data): + @patchfs + def test_hostcfgd(self, test_name, test_data, fs): """ Test hostcfd daemon initialization @@ -63,6 +62,8 @@ def test_hostcfgd(self, test_name, test_data): Returns: None """ + print(swsscommon_package.__path__) + fs.add_real_paths(swsscommon_package.__path__) # add real path of swsscommon for database_config.json MockConfigDb.set_config_db(test_data["config_db"]) with mock.patch("hostcfgd.subprocess") as mocked_subprocess: popen_mock = mock.Mock() @@ -71,9 +72,157 @@ def test_hostcfgd(self, test_name, test_data): mocked_subprocess.Popen.return_value = popen_mock host_config_daemon = hostcfgd.HostConfigDaemon() - host_config_daemon.update_all_feature_states() + host_config_daemon.render_all_feature_states() + + features = MockConfigDb.CONFIG_DB['FEATURE'] + for key, fvs in features.items(): + host_config_daemon.feature_state_handler(key, 'SET', fvs) + assert self.__verify_table( MockConfigDb.get_config_db()["FEATURE"], test_data["expected_config_db"]["FEATURE"] ), "Test failed for test data: {0}".format(test_data) mocked_subprocess.check_call.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True) + +class TesNtpCfgd(TestCase): + """ + Test hostcfd daemon - NtpCfgd + """ + def setUp(self): + MockConfigDb.CONFIG_DB['NTP'] = {'global': {'vrf': 'mgmt', 'src_intf': 'eth0'}} + MockConfigDb.CONFIG_DB['NTP_SERVER'] = {'0.debian.pool.ntp.org': {}} + + def tearDown(self): + MockConfigDb.CONFIG_DB = {} + + def test_ntp_global_update_with_no_servers(self): + with mock.patch('hostcfgd.subprocess') as mocked_subprocess: + popen_mock = mock.Mock() + attrs = {'communicate.return_value': ('output', 'error')} + popen_mock.configure_mock(**attrs) + mocked_subprocess.Popen.return_value = popen_mock + + ntpcfgd = hostcfgd.NtpCfg() + ntpcfgd.ntp_global_update('global', MockConfigDb.CONFIG_DB['NTP']['global']) + + mocked_subprocess.check_call.assert_not_called() + + def test_ntp_global_update_ntp_servers(self): + with mock.patch('hostcfgd.subprocess') as mocked_subprocess: + popen_mock = mock.Mock() + attrs = {'communicate.return_value': ('output', 'error')} + popen_mock.configure_mock(**attrs) + mocked_subprocess.Popen.return_value = popen_mock + + ntpcfgd = hostcfgd.NtpCfg() + ntpcfgd.ntp_global_update('global', MockConfigDb.CONFIG_DB['NTP']['global']) + ntpcfgd.ntp_server_update('0.debian.pool.ntp.org', 'SET') + mocked_subprocess.check_call.assert_has_calls([call('systemctl restart ntp-config', shell=True)]) + + def test_loopback_update(self): + with mock.patch('hostcfgd.subprocess') as mocked_subprocess: + popen_mock = mock.Mock() + attrs = {'communicate.return_value': ('output', 'error')} + popen_mock.configure_mock(**attrs) + mocked_subprocess.Popen.return_value = popen_mock + + ntpcfgd = hostcfgd.NtpCfg() + ntpcfgd.ntp_global = MockConfigDb.CONFIG_DB['NTP']['global'] + ntpcfgd.ntp_servers.add('0.debian.pool.ntp.org') + + ntpcfgd.handle_ntp_source_intf_chg('eth0') + mocked_subprocess.check_call.assert_has_calls([call('systemctl restart ntp-config', shell=True)]) + + +class TestHostcfgdDaemon(TestCase): + + def setUp(self): + MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB) + + def tearDown(self): + MockConfigDb.CONFIG_DB = {} + + @patchfs + def test_feature_events(self, fs): + MockSelect.event_queue = [('FEATURE', 'dhcp_relay'), + ('FEATURE', 'mux'), + ('FEATURE', 'telemetry')] + daemon = hostcfgd.HostConfigDaemon() + daemon.render_all_feature_states() + daemon.register_callbacks() + with mock.patch('hostcfgd.subprocess') as mocked_subprocess: + popen_mock = mock.Mock() + attrs = {'communicate.return_value': ('output', 'error')} + popen_mock.configure_mock(**attrs) + mocked_subprocess.Popen.return_value = popen_mock + try: + daemon.start() + except TimeoutError: + pass + expected = [call('sudo systemctl unmask dhcp_relay.service', shell=True), + call('sudo systemctl enable dhcp_relay.service', shell=True), + call('sudo systemctl start dhcp_relay.service', shell=True), + call('sudo systemctl unmask mux.service', shell=True), + call('sudo systemctl enable mux.service', shell=True), + call('sudo systemctl start mux.service', shell=True), + call('sudo systemctl unmask telemetry.service', shell=True), + call('sudo systemctl unmask telemetry.timer', shell=True), + call('sudo systemctl enable telemetry.timer', shell=True), + call('sudo systemctl start telemetry.timer', shell=True)] + mocked_subprocess.check_call.assert_has_calls(expected) + + # Change the state to disabled + MockConfigDb.CONFIG_DB['FEATURE']['telemetry']['state'] = 'disabled' + MockSelect.event_queue = [('FEATURE', 'telemetry')] + try: + daemon.start() + except TimeoutError: + pass + expected = [call('sudo systemctl stop telemetry.timer', shell=True), + call('sudo systemctl disable telemetry.timer', shell=True), + call('sudo systemctl mask telemetry.timer', shell=True), + call('sudo systemctl stop telemetry.service', shell=True), + call('sudo systemctl disable telemetry.service', shell=True), + call('sudo systemctl mask telemetry.service', shell=True)] + mocked_subprocess.check_call.assert_has_calls(expected) + + def test_loopback_events(self): + MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB) + MockSelect.event_queue = [('NTP', 'global'), + ('NTP_SERVER', '0.debian.pool.ntp.org'), + ('LOOPBACK_INTERFACE', 'Loopback0|10.184.8.233/32')] + daemon = hostcfgd.HostConfigDaemon() + daemon.register_callbacks() + with mock.patch('hostcfgd.subprocess') as mocked_subprocess: + popen_mock = mock.Mock() + attrs = {'communicate.return_value': ('output', 'error')} + popen_mock.configure_mock(**attrs) + mocked_subprocess.Popen.return_value = popen_mock + try: + daemon.start() + except TimeoutError: + pass + expected = [call('systemctl restart ntp-config', shell=True), + call('iptables -t mangle --append PREROUTING -p tcp --tcp-flags SYN SYN -d 10.184.8.233 -j TCPMSS --set-mss 1460', shell=True), + call('iptables -t mangle --append POSTROUTING -p tcp --tcp-flags SYN SYN -s 10.184.8.233 -j TCPMSS --set-mss 1460', shell=True)] + mocked_subprocess.check_call.assert_has_calls(expected, any_order=True) + + def test_kdump_event(self): + MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB) + daemon = hostcfgd.HostConfigDaemon() + daemon.register_callbacks() + assert MockConfigDb.CONFIG_DB['KDUMP']['config'] + MockSelect.event_queue = [('KDUMP', 'config')] + with mock.patch('hostcfgd.subprocess') as mocked_subprocess: + popen_mock = mock.Mock() + attrs = {'communicate.return_value': ('output', 'error')} + popen_mock.configure_mock(**attrs) + mocked_subprocess.Popen.return_value = popen_mock + try: + daemon.start() + except TimeoutError: + pass + expected = [call('sonic-kdump-config --disable', shell=True), + call('sonic-kdump-config --num_dumps 3', shell=True), + call('sonic-kdump-config --memory 0M-2G:256M,2G-4G:320M,4G-8G:384M,8G-:448M', shell=True)] + mocked_subprocess.check_call.assert_has_calls(expected, any_order=True) \ No newline at end of file diff --git a/src/sonic-host-services/tests/hostcfgd/test_vectors.py b/src/sonic-host-services/tests/hostcfgd/test_vectors.py index 123a5512ad9d..439c3844cc3b 100644 --- a/src/sonic-host-services/tests/hostcfgd/test_vectors.py +++ b/src/sonic-host-services/tests/hostcfgd/test_vectors.py @@ -381,3 +381,62 @@ } ] ] + +HOSTCFG_DAEMON_CFG_DB = { + "FEATURE": { + "dhcp_relay": { + "auto_restart": "enabled", + "has_global_scope": "True", + "has_per_asic_scope": "False", + "has_timer": "False", + "high_mem_alert": "disabled", + "set_owner": "kube", + "state": "{% if not (DEVICE_METADATA is defined and DEVICE_METADATA['localhost'] is defined and DEVICE_METADATA['localhost']['type'] is defined and DEVICE_METADATA['localhost']['type'] != 'ToRRouter') %}enabled{% else %}disabled{% endif %}" + }, + "mux": { + "auto_restart": "enabled", + "has_global_scope": "True", + "has_per_asic_scope": "False", + "has_timer": "False", + "high_mem_alert": "disabled", + "set_owner": "local", + "state": "{% if 'subtype' in DEVICE_METADATA['localhost'] and DEVICE_METADATA['localhost']['subtype'] == 'DualToR' %}enabled{% else %}always_disabled{% endif %}" + }, + "telemetry": { + "auto_restart": "enabled", + "has_global_scope": "True", + "has_per_asic_scope": "False", + "has_timer": "True", + "high_mem_alert": "disabled", + "set_owner": "kube", + "state": "enabled", + "status": "enabled" + }, + }, + "KDUMP": { + "config": { + + } + }, + "NTP": { + "global": { + "vrf": "default", + "src_intf": "eth0;Loopback0" + } + }, + "NTP_SERVER": { + "0.debian.pool.ntp.org": {} + }, + "LOOPBACK_INTERFACE": { + "Loopback0|10.184.8.233/32": { + "scope": "global", + "family": "IPv4" + } + }, + "DEVICE_METADATA": { + "localhost": { + "subtype": "DualToR", + "type": "ToRRouter", + } + } +} \ No newline at end of file