diff --git a/.github/workflows/unittest.yaml b/.github/workflows/unittest.yaml index 708af3145b..4b44c2f6ac 100644 --- a/.github/workflows/unittest.yaml +++ b/.github/workflows/unittest.yaml @@ -19,4 +19,4 @@ jobs: make - name: Run unittest run: | - pytest \ No newline at end of file + pytest -m "not local" \ No newline at end of file diff --git a/acto/__main__.py b/acto/__main__.py index a6c249e71a..52228cd27e 100644 --- a/acto/__main__.py +++ b/acto/__main__.py @@ -151,7 +151,8 @@ is_reproduce=is_reproduce, input_model=input_model, apply_testcase_f=apply_testcase_f, - delta_from=args.delta_from) + delta_from=args.delta_from, + focus_fields=config.focus_fields,) generation_time = datetime.now() logger.info('Acto initialization finished in %s', generation_time - start_time) if args.additional_semantic: diff --git a/acto/checker/impl/state.py b/acto/checker/impl/state.py index 0a57e8886f..2e9653c0f1 100644 --- a/acto/checker/impl/state.py +++ b/acto/checker/impl/state.py @@ -323,7 +323,6 @@ def check(self, generation: int, snapshot: Snapshot, prev_snapshot: Snapshot) -> continue logger.error('Found no matching fields for input delta') logger.error('Input delta [%s]' % delta.path) - print_event(f'Found bug: {delta.path} changed from [{delta.prev}] to [{delta.curr}]') return StateResult(Oracle.SYSTEM_STATE, 'Found no matching fields for input', delta) else: @@ -334,7 +333,6 @@ def check(self, generation: int, snapshot: Snapshot, prev_snapshot: Snapshot) -> logger.error(f'Matched delta {match_delta.path}') logger.error(f'Matched delta prev {match_delta.prev}, curr {match_delta.curr}') logger.error('Failed to match input delta with matched system state delta') - print_event(f'Found bug: {delta.path} changed from [{delta.prev}] to [{delta.curr}]') return StateResult(Oracle.SYSTEM_STATE, 'Found no matching fields for input', delta) elif not should_compare: diff --git a/acto/common.py b/acto/common.py index e89a3ff564..ba9ec011f1 100644 --- a/acto/common.py +++ b/acto/common.py @@ -494,7 +494,7 @@ def invalid_input_message(log_msg: str, input_delta: dict) -> Tuple[bool, list]: # if delta.curr is an int, we do exact match to avoid matching a short # int (e.g. 1) to a log line and consider the int as invalid input - elif isinstance(delta.curr, int): + elif isinstance(delta.curr, int) and len(str(delta.curr)) > 1: for item in log_msg.split(' '): if item == str(delta.curr): logger.info( diff --git a/acto/engine.py b/acto/engine.py index 10263818ba..ba83556426 100644 --- a/acto/engine.py +++ b/acto/engine.py @@ -37,7 +37,7 @@ set_thread_logger_prefix) from ssa.analysis import analyze -def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed): +def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed, time_breakdown): logger = get_thread_logger(with_prefix=False) result_dict = {} @@ -47,6 +47,7 @@ def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_e except: result_dict['trial_num'] = trial_dir result_dict['duration'] = trial_elapsed + result_dict['time_breakdown'] = time_breakdown result_dict['num_tests'] = num_tests if trial_result == None: logger.info('Trial %s completed without error', trial_dir) @@ -198,7 +199,7 @@ def __init__(self, context: dict, input_model: InputModel, deploy: Deploy, runne checker_t: type, wait_time: int, custom_on_init: List[callable], custom_oracle: List[callable], workdir: str, cluster: base.KubernetesEngine, worker_id: int, sequence_base: int, dryrun: bool, is_reproduce: bool, - apply_testcase_f: FunctionType) -> None: + apply_testcase_f: FunctionType, acto_namespace: int) -> None: self.context = context self.workdir = workdir self.base_workdir = workdir @@ -206,9 +207,9 @@ def __init__(self, context: dict, input_model: InputModel, deploy: Deploy, runne self.images_archive = os.path.join(workdir, 'images.tar') self.worker_id = worker_id self.sequence_base = sequence_base # trial number to start with - self.context_name = cluster.get_context_name(f"acto-cluster-{worker_id}") + self.context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}") self.kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self.context_name) - self.cluster_name = f"acto-cluster-{worker_id}" + self.cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}" self.input_model = input_model self.deploy = deploy self.runner_t = runner_t @@ -226,7 +227,7 @@ def __init__(self, context: dict, input_model: InputModel, deploy: Deploy, runne self.curr_trial = 0 - def run(self, mode: str = InputModel.NORMAL): + def run(self, errors: List[RunResult], mode: str = InputModel.NORMAL): logger = get_thread_logger(with_prefix=True) self.input_model.set_worker_id(self.worker_id) @@ -246,12 +247,13 @@ def run(self, mode: str = InputModel.NORMAL): self.cluster.restart_cluster(self.cluster_name, self.kubeconfig, CONST.K8S_VERSION) apiclient = kubernetes_client(self.kubeconfig, self.context_name) self.cluster.load_images(self.images_archive, self.cluster_name) + trial_k8s_bootstrap_time = time.time() deployed = self.deploy.deploy_with_retry(self.context, self.kubeconfig, self.context_name) if not deployed: logger.info('Not deployed. Try again!') continue - + operator_deploy_time = time.time() trial_dir = os.path.join( self.workdir, 'trial-%02d-%04d' % (self.worker_id + self.sequence_base, self.curr_trial)) @@ -260,13 +262,22 @@ def run(self, mode: str = InputModel.NORMAL): trial_err, num_tests = self.run_trial(trial_dir=trial_dir, curr_trial=self.curr_trial) self.snapshots = [] - trial_elapsed = time.strftime("%H:%M:%S", time.gmtime(time.time() - trial_start_time)) + trial_finished_time = time.time() + trial_elapsed = time.strftime("%H:%M:%S", time.gmtime(trial_finished_time - trial_start_time)) logger.info('Trial %d finished, completed in %s' % (self.curr_trial, trial_elapsed)) + logger.info(f'Kubernetes bootstrap: {trial_k8s_bootstrap_time - trial_start_time}') + logger.info(f'Operator deploy: {operator_deploy_time - trial_k8s_bootstrap_time}') + logger.info(f'Trial run: {trial_finished_time - operator_deploy_time}') logger.info('---------------------------------------\n') delete_operator_pod(apiclient, self.context['namespace']) - save_result(trial_dir, trial_err, num_tests, trial_elapsed) + save_result(trial_dir, trial_err, num_tests, trial_elapsed, { + 'k8s_bootstrap': trial_k8s_bootstrap_time - trial_start_time, + 'operator_deploy': operator_deploy_time - trial_k8s_bootstrap_time, + 'trial_run': trial_finished_time - operator_deploy_time + }) self.curr_trial = self.curr_trial + 1 + errors.append(trial_err) if self.input_model.is_empty(): logger.info('Test finished') @@ -602,7 +613,8 @@ def __init__(self, reproduce_dir: str = None, delta_from: str = None, mount: list = None, - focus_fields: list = None) -> None: + focus_fields: list = None, + acto_namespace: int = 0) -> None: logger = get_thread_logger(with_prefix=False) try: @@ -625,13 +637,13 @@ def __init__(self, raise UnknownDeployMethodError() if cluster_runtime == "KIND": - cluster = kind.Kind() + cluster = kind.Kind(acto_namespace=acto_namespace) elif cluster_runtime == "K3D": cluster = k3d.K3D() else: logger.warning( f"Cluster Runtime {cluster_runtime} is not supported, defaulted to use kind") - cluster = kind.Kind() + cluster = kind.Kind(acto_namespace=acto_namespace) self.cluster = cluster self.deploy = deploy @@ -644,6 +656,7 @@ def __init__(self, self.is_reproduce = is_reproduce self.apply_testcase_f = apply_testcase_f self.reproduce_dir = reproduce_dir + self.acto_namespace = acto_namespace self.runner_type = Runner self.checker_type = CheckerSet @@ -674,6 +687,7 @@ def __init__(self, if operator_config.k8s_fields is not None: module = importlib.import_module(operator_config.k8s_fields) if hasattr(module,'BLACKBOX') and actoConfig.mode == 'blackbox': + applied_custom_k8s_fields = True for k8s_field in module.BLACKBOX: self.input_model.apply_k8s_schema(k8s_field) elif hasattr(module,'WHITEBOX') and actoConfig.mode == 'whitebox': @@ -814,7 +828,8 @@ def __learn(self, context_file, helper_crd, analysis_only=False): with open(context_file, 'w') as context_fout: json.dump(self.context, context_fout, cls=ContextEncoder, indent=4, sort_keys=True) - def run(self, modes: list = ['normal', 'overspecified', 'copiedover']): + def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[RunResult]: + # TODO: return the alarms here logger = get_thread_logger(with_prefix=True) # Build an archive to be preloaded @@ -829,19 +844,20 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']): start_time = time.time() + errors: List[RunResult] = [] runners: List[TrialRunner] = [] for i in range(self.num_workers): runner = TrialRunner(self.context, self.input_model, self.deploy, self.runner_type, self.checker_type, self.operator_config.wait_time, self.custom_on_init, self.custom_oracle, self.workdir_path, self.cluster, i, self.sequence_base, self.dryrun, - self.is_reproduce, self.apply_testcase_f) + self.is_reproduce, self.apply_testcase_f, self.acto_namespace) runners.append(runner) if 'normal' in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([])) + t = threading.Thread(target=runner.run, args=([errors])) t.start() threads.append(t) @@ -853,7 +869,7 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']): if 'overspecified' in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([InputModel.OVERSPECIFIED])) + t = threading.Thread(target=runner.run, args=([errors, InputModel.OVERSPECIFIED])) t.start() threads.append(t) @@ -865,7 +881,7 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']): if 'copiedover' in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([InputModel.COPIED_OVER])) + t = threading.Thread(target=runner.run, args=([errors, InputModel.COPIED_OVER])) t.start() threads.append(t) @@ -877,7 +893,7 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']): if InputModel.ADDITIONAL_SEMANTIC in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([InputModel.ADDITIONAL_SEMANTIC])) + t = threading.Thread(target=runner.run, args=([errors, InputModel.ADDITIONAL_SEMANTIC])) t.start() threads.append(t) @@ -904,3 +920,4 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']): json.dump(testrun_info, info_file, cls=ActoEncoder, indent=4) logger.info('All tests finished') + return errors diff --git a/acto/input/input.py b/acto/input/input.py index 1092a06827..66143455f4 100644 --- a/acto/input/input.py +++ b/acto/input/input.py @@ -11,7 +11,7 @@ import yaml from deepdiff import DeepDiff -from acto.common import random_string +from acto.common import is_subfield, random_string from acto.input import known_schemas from acto.input.get_matched_schemas import find_matched_schema from acto.input.valuegenerator import extract_schema_with_value_generator @@ -693,7 +693,7 @@ def generate_test_plan(self, delta_from: str = None, focus_fields: list = None) focused = False for focus_field in focus_fields: logger.info(f'Comparing {schema.path} with {focus_field}') - if focus_field == schema.path: + if is_subfield(schema.path, focus_field): focused = True break if not focused: @@ -732,6 +732,18 @@ def generate_test_plan(self, delta_from: str = None, focus_fields: list = None) # num_additional_semantic_testcases += len(k8s_str_tests) for schema in pruned_by_overspecified: + # Skip if the schema is not in the focus fields + if focus_fields is not None: + logger.info(f'focusing on {focus_fields}') + focused = False + for focus_field in focus_fields: + logger.info(f'Comparing {schema.path} with {focus_field}') + if is_subfield(schema.path, focus_field): + focused = True + break + if not focused: + continue + testcases, semantic_testcases_ = schema.test_cases() path = json.dumps(schema.path).replace('\"ITEM\"', '0').replace('additional_properties', @@ -754,6 +766,18 @@ def generate_test_plan(self, delta_from: str = None, focus_fields: list = None) num_total_semantic_tests += 1 for schema in pruned_by_copied: + # Skip if the schema is not in the focus fields + if focus_fields is not None: + logger.info(f'focusing on {focus_fields}') + focused = False + for focus_field in focus_fields: + logger.info(f'Comparing {schema.path} with {focus_field}') + if is_subfield(schema.path, focus_field): + focused = True + break + if not focused: + continue + testcases, semantic_testcases_ = schema.test_cases() path = json.dumps(schema.path).replace('\"ITEM\"', '0').replace('additional_properties', diff --git a/acto/kubernetes_engine/kind.py b/acto/kubernetes_engine/kind.py index 4ad96501bf..956c390a44 100644 --- a/acto/kubernetes_engine/kind.py +++ b/acto/kubernetes_engine/kind.py @@ -14,8 +14,8 @@ class Kind(base.KubernetesEngine): - def __init__(self): - self.config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, 'KIND.yaml') + def __init__(self, acto_namespace: int): + self.config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, f'KIND-{acto_namespace}.yaml') def configure_cluster(self, num_nodes: int, version: str): '''Create config file for kind''' @@ -63,7 +63,7 @@ def create_cluster(self, name: str, kubeconfig: str, version: str): config: path of the config file for cluster version: k8s version ''' - print_event('Creating kind cluster...') + print_event('Creating a Kind cluster...') cmd = ['kind', 'create', 'cluster'] if name: diff --git a/acto/post_process/post_diff_test.py b/acto/post_process/post_diff_test.py index f7f08e9faa..04a3bb708e 100644 --- a/acto/post_process/post_diff_test.py +++ b/acto/post_process/post_diff_test.py @@ -88,6 +88,12 @@ def compare_system_equality(curr_system_state: Dict, obj['data'][key] = json.loads(data) except: pass + + if len(curr_system_state['secret']) != len(prev_system_state['secret']): + logger.debug(f"failed attempt recovering to seed state - secret count mismatch") + return RecoveryResult( + delta=DeepDiff(len(curr_system_state['secret']), len(prev_system_state['secret'])), + from_=prev_system_state, to_=curr_system_state) # remove custom resource from both states curr_system_state.pop('custom_resource_spec', None) @@ -256,15 +262,15 @@ def get_nondeterministic_fields(s1, s2, additional_exclude_paths): class AdditionalRunner: def __init__(self, context: Dict, deploy: Deploy, workdir: str, cluster: base.KubernetesEngine, - worker_id): + worker_id, acto_namespace: int): self._context = context self._deploy = deploy self._workdir = workdir self._cluster = cluster self._worker_id = worker_id - self._cluster_name = f"acto-cluster-{worker_id}" - self._context_name = cluster.get_context_name(f"acto-cluster-{worker_id}") + self._cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}" + self._context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}") self._kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self._context_name) self._generation = 0 @@ -297,15 +303,15 @@ def run_cr(self, cr, trial, gen): class DeployRunner: def __init__(self, workqueue: multiprocessing.Queue, context: Dict, deploy: Deploy, - workdir: str, cluster: base.KubernetesEngine, worker_id): + workdir: str, cluster: base.KubernetesEngine, worker_id, acto_namespace: int): self._workqueue = workqueue self._context = context self._deploy = deploy self._workdir = workdir self._cluster = cluster self._worker_id = worker_id - self._cluster_name = f"acto-cluster-{worker_id}" - self._context_name = cluster.get_context_name(f"acto-cluster-{worker_id}") + self._cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}" + self._context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}") self._kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self._context_name) self._images_archive = os.path.join(workdir, 'images.tar') @@ -315,18 +321,21 @@ def run(self): trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) os.makedirs(trial_dir, exist_ok=True) + before_k8s_bootstrap_time = time.time() # Start the cluster and deploy the operator self._cluster.restart_cluster(self._cluster_name, self._kubeconfig, CONST.K8S_VERSION) self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) + after_k8s_bootstrap_time = time.time() deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, self._context_name) - add_acto_label(apiclient, self._context) + after_operator_deploy_time = time.time() trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) os.makedirs(trial_dir, exist_ok=True) runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) while True: + after_k8s_bootstrap_time = time.time() try: group = self._workqueue.get(block=False) except queue.Empty: @@ -335,26 +344,34 @@ def run(self): cr = group.iloc[0]['input'] snapshot, err = runner.run(cr, generation=generation) + after_run_time = time.time() err = True difftest_result = { 'input_digest': group.iloc[0]['input_digest'], 'snapshot': snapshot.to_dict(), 'originals': group[['trial', 'gen']].to_dict('records'), + 'time': { + 'k8s_bootstrap': after_k8s_bootstrap_time - before_k8s_bootstrap_time, + 'operator_deploy': after_operator_deploy_time - after_k8s_bootstrap_time, + 'run': after_run_time - after_operator_deploy_time, + }, } difftest_result_path = os.path.join(trial_dir, 'difftest-%03d.json' % generation) with open(difftest_result_path, 'w') as f: json.dump(difftest_result, f, cls=ActoEncoder, indent=6) if err: + before_k8s_bootstrap_time = time.time() logger.error(f'Restart cluster due to error: {err}') # Start the cluster and deploy the operator self._cluster.restart_cluster(self._cluster_name, self._kubeconfig, CONST.K8S_VERSION) self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) + after_k8s_bootstrap_time = time.time() deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, self._context_name) - add_acto_label(apiclient, self._context) + after_operator_deploy_time = time.time() runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) generation += 1 @@ -362,7 +379,8 @@ def run(self): class PostDiffTest(PostProcessor): - def __init__(self, testrun_dir: str, config: OperatorConfig): + def __init__(self, testrun_dir: str, config: OperatorConfig, ignore_invalid: bool = False, acto_namespace: int = 0): + self.acto_namespace = acto_namespace super().__init__(testrun_dir, config) logger = get_thread_logger(with_prefix=True) @@ -370,7 +388,7 @@ def __init__(self, testrun_dir: str, config: OperatorConfig): for trial, steps in self.trial_to_steps.items(): for step in steps.values(): invalid, _ = step.runtime_result.is_invalid() - if invalid: + if invalid and not ignore_invalid: continue self.all_inputs.append({ 'trial': trial, @@ -402,7 +420,7 @@ def __init__(self, testrun_dir: str, config: OperatorConfig): def post_process(self, workdir: str, num_workers: int = 1): if not os.path.exists(workdir): os.mkdir(workdir) - cluster = kind.Kind() + cluster = kind.Kind(acto_namespace=self.acto_namespace) cluster.configure_cluster(self.config.num_nodes, CONST.K8S_VERSION) deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new() # Build an archive to be preloaded @@ -420,7 +438,7 @@ def post_process(self, workdir: str, num_workers: int = 1): runners: List[DeployRunner] = [] for i in range(num_workers): - runner = DeployRunner(workqueue, self.context, deploy, workdir, cluster, i) + runner = DeployRunner(workqueue, self.context, deploy, workdir, cluster, i, self.acto_namespace) runners.append(runner) processes = [] @@ -458,7 +476,7 @@ def check_diff_test_result(self, workqueue: multiprocessing.Queue, workdir: str, generation = 0 # for additional runner additional_runner_dir = os.path.join(workdir, f'additional-runner-{worker_id}') - cluster = kind.Kind() + cluster = kind.Kind(acto_namespace=self.acto_namespace) cluster.configure_cluster(self.config.num_nodes, CONST.K8S_VERSION) deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new() @@ -467,7 +485,8 @@ def check_diff_test_result(self, workqueue: multiprocessing.Queue, workdir: str, deploy=deploy, workdir=additional_runner_dir, cluster=cluster, - worker_id=worker_id) + worker_id=worker_id, + acto_namespace=self.acto_namespace) while True: try: @@ -513,8 +532,7 @@ def check_diff_test_step(diff_test_result: Dict, trial_dir = original_result.trial_dir gen = original_result.gen - invalid, _ = original_result.runtime_result.is_invalid() - if isinstance(original_result.runtime_result.health_result, ErrorResult) or invalid: + if isinstance(original_result.runtime_result.health_result, ErrorResult): return original_operator_log = original_result.operator_log diff --git a/acto/reproduce.py b/acto/reproduce.py index f23253cd2e..3f60c3f472 100644 --- a/acto/reproduce.py +++ b/acto/reproduce.py @@ -4,16 +4,17 @@ import json import sys import logging +from typing import List import jsonpatch import yaml from glob import glob import os +from acto.common import RunResult from acto.engine import Acto +from acto.input.testplan import TestGroup from acto.input.valuegenerator import extract_schema_with_value_generator - -from acto.schema import BaseSchema, OpaqueSchema -from acto.input.testplan import TestGroup, TreeNode +from acto.post_process.post_diff_test import PostDiffTest from acto.input.value_with_schema import ValueWithSchema from acto.input import TestCase @@ -123,9 +124,6 @@ def next_test(self) -> list: def apply_k8s_schema(self, k8s_field): pass - def apply_custom_field(self, custom_field: CustomField): - pass - def repro_precondition(v): return True @@ -139,8 +137,17 @@ def repro_setup(v): return None -def reproduce(workdir_path: str, reproduce_dir: str, operator_config: OperatorConfig, **kwargs): - +def reproduce(workdir_path: str, reproduce_dir: str, operator_config: OperatorConfig, acto_namespace: int, **kwargs) -> List[RunResult]: + os.makedirs(workdir_path, exist_ok=True) + # Setting up log infra + logging.basicConfig( + filename=os.path.join(workdir_path, 'test.log'), + level=logging.DEBUG, + filemode='w', + format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s') + logging.getLogger("kubernetes").setLevel(logging.ERROR) + logging.getLogger("sh").setLevel(logging.ERROR) + with open(operator_config, 'r') as config_file: config = OperatorConfig(**json.load(config_file)) context_cache = os.path.join(os.path.dirname(config.seed_custom_resource), 'context.json') @@ -158,15 +165,28 @@ def reproduce(workdir_path: str, reproduce_dir: str, operator_config: OperatorCo num_cases=1, dryrun=False, analysis_only=False, - is_reproduce=is_reproduce, + is_reproduce=True, input_model=input_model, apply_testcase_f=apply_testcase_f, - reproduce_dir=reproduce_dir) + reproduce_dir=reproduce_dir, + acto_namespace=acto_namespace) - acto.run(modes=['normal']) + errors = acto.run(modes=['normal']) + return [error for error in errors if error is not None] +def reproduce_postdiff(workdir_path: str, operator_config: OperatorConfig, acto_namespace: int, **kwargs) -> bool: + with open(operator_config, 'r') as config_file: + config = OperatorConfig(**json.load(config_file)) + post_diff_test_dir = os.path.join(workdir_path, 'post_diff_test') + logs = glob(workdir_path + '/*/operator-*.log') + for log in logs: + open(log, 'w').close() + p = PostDiffTest(testrun_dir=workdir_path, config=config, ignore_invalid=True, acto_namespace=acto_namespace) + p.post_process(post_diff_test_dir, num_workers=1) + p.check(post_diff_test_dir, num_workers=1) + + return len(glob(os.path.join(post_diff_test_dir, 'compare-results-*.json'))) > 0 -# TODO add main function if __name__ == '__main__': parser = argparse.ArgumentParser( description='Automatic, Continuous Testing for k8s/openshift Operators') @@ -188,17 +208,6 @@ def reproduce(workdir_path: str, reproduce_dir: str, operator_config: OperatorCo args = parser.parse_args() workdir_path = 'testrun-%s' % datetime.now().strftime('%Y-%m-%d-%H-%M') - os.makedirs(workdir_path, exist_ok=True) - # Setting up log infra - logging.basicConfig( - filename=os.path.join(workdir_path, 'test.log'), - level=logging.DEBUG, - filemode='w', - format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s') - logging.getLogger("kubernetes").setLevel(logging.ERROR) - logging.getLogger("sh").setLevel(logging.ERROR) - - logger = get_thread_logger(with_prefix=False) is_reproduce = True start_time = datetime.now() @@ -206,5 +215,4 @@ def reproduce(workdir_path: str, reproduce_dir: str, operator_config: OperatorCo reproduce_dir=args.reproduce_dir, operator_config=args.config, cluster_runtime=args.cluster_runtime) - end_time = datetime.now() - logger.info('Acto finished in %s', end_time - start_time) \ No newline at end of file + end_time = datetime.now() \ No newline at end of file diff --git a/data/cass-operator/config.json b/data/cass-operator/config.json index 5838e7465c..4d4274a2c1 100644 --- a/data/cass-operator/config.json +++ b/data/cass-operator/config.json @@ -30,6 +30,7 @@ "\\['service_account'\\]\\['cass\\-operator\\-controller\\-manager'\\]\\['secrets'\\]\\[.*\\]\\['name'\\]", "\\['service_account'\\]\\['default'\\]\\['secrets'\\]\\[.*\\]\\['name'\\]", "\\['stateful_set'\\]\\['cluster1\\-test\\-cluster\\-default\\-sts'\\]\\['spec'\\]\\['update_strategy'\\]\\['rolling_update'\\]\\['max_unavailable'\\]", - "\\['stateful_set'\\]\\['cluster1\\-test\\-cluster\\-default\\-sts'\\]\\['spec'\\]\\['persistent_volume_claim_retention_policy'\\]" + "\\['stateful_set'\\]\\['cluster1\\-test\\-cluster\\-default\\-sts'\\]\\['spec'\\]\\['persistent_volume_claim_retention_policy'\\]", + "\\['cassandra\\.datastax\\.com/resource\\-hash'\\]" ] } \ No newline at end of file diff --git a/data/mongodb-community-operator/config.json b/data/mongodb-community-operator/config.json index 91aa27ace9..c6b6874325 100644 --- a/data/mongodb-community-operator/config.json +++ b/data/mongodb-community-operator/config.json @@ -16,5 +16,8 @@ "entrypoint": "cmd/manager", "type": "MongoDBCommunity", "package": "github.com/mongodb/mongodb-kubernetes-operator/api/v1" - } + }, + "diff_ignore_fields": [ + "\\['metadata'\\]\\['annotations'\\]\\['agent\\.mongodb\\.com\\\/version'\\]" + ] } \ No newline at end of file diff --git a/data/percona-server-mongodb-operator/config.json b/data/percona-server-mongodb-operator/config.json index d3efb01a5c..eb2f20e0a5 100644 --- a/data/percona-server-mongodb-operator/config.json +++ b/data/percona-server-mongodb-operator/config.json @@ -24,6 +24,7 @@ "\\['secret'\\]\\['percona\\-server\\-mongodb\\-operator\\-token-.*'\\]", "\\['secret'\\]\\[.*\\]\\['data'\\]", "\\['metadata'\\]\\['annotations'\\]\\['percona\\.com\\\/ssl\\-hash'\\]", - "\\['metadata'\\]\\['annotations'\\]\\['percona\\.com\\\/ssl\\-internal\\-hash'\\]" + "\\['metadata'\\]\\['annotations'\\]\\['percona\\.com\\\/ssl\\-internal\\-hash'\\]", + "\\['metadata'\\]\\['annotations'\\]\\['percona\\.com\\\/last\\-config\\-hash'\\]" ] } \ No newline at end of file diff --git a/data/tidb-operator/config.json b/data/tidb-operator/config.json index ef7fb1cca3..753c13d6c2 100644 --- a/data/tidb-operator/config.json +++ b/data/tidb-operator/config.json @@ -21,5 +21,9 @@ "\\['tidb\\.pingcap\\.com\\\/cluster\\-id'\\]", "\\['tidb\\.pingcap\\.com\\\/member\\-id'\\]", "\\['tidb\\.pingcap\\.com\\\/store\\-id'\\]" + ], + "focus_fields": [ + ["spec", "tidb"], + ["spec", "tiflash", "storage", "storageClaims"] ] } diff --git a/data/zookeeper-operator/config.json b/data/zookeeper-operator/config.json index fc78968658..0c90a8d3ff 100644 --- a/data/zookeeper-operator/config.json +++ b/data/zookeeper-operator/config.json @@ -18,6 +18,7 @@ "type": "ZookeeperCluster", "package": "github.com/pravega/zookeeper-operator/pkg/apis/zookeeper/v1beta1" }, + "wait_time": 45, "diff_ignore_fields": [ "\\['config_map'\\]\\['zookeeper\\-operator\\-lock'\\]\\['metadata'\\]\\['owner_references'\\]\\[.*\\]\\['name'\\]", "\\['service'\\]\\['test\\-cluster\\-admin\\-server'\\]\\['spec'\\]\\['ports'\\]\\[.*\\]\\['node_port'\\]" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000..13bc2a8e86 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + local: mark a test to run on a local machine \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b67feefcc4..49577c7536 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ PyYAML~=6.0 requests~=2.31.0 pytest~=7.4.0 pydantic~=1.10.9 -pytest-cov~=4.1.0 \ No newline at end of file +pytest-cov~=4.1.0 +tabulate~=0.9.0 \ No newline at end of file diff --git a/test/test_bug_reproduction.py b/test/test_bug_reproduction.py new file mode 100644 index 0000000000..bc74fd1663 --- /dev/null +++ b/test/test_bug_reproduction.py @@ -0,0 +1,136 @@ +import multiprocessing +import os +import pathlib +import queue +from typing import Dict, List, Tuple +import unittest + +import pytest +from acto.common import PassResult +from acto.reproduce import reproduce, reproduce_postdiff + +from test.utils import BugConfig, all_bugs, check_postdiff_runtime_error + +test_dir = pathlib.Path(__file__).parent.resolve() +test_data_dir = os.path.join(test_dir, 'test_data') + +def run_bug_config(operator_name: str, bug_id: str, bug_config: BugConfig, acto_namespace: int) -> bool: + '''This function tries to reproduce a bug according to the bug config + + Returns: + if the reproduction is successful + ''' + repro_dir = os.path.join(test_data_dir, bug_config.dir) + work_dir = f'testrun-{bug_id}' + operator_config = f'data/{operator_name}/config.json' + + reproduced: bool = False + normal_run_result = reproduce(work_dir, + repro_dir, + operator_config, + cluster_runtime='KIND', + acto_namespace=acto_namespace) + if bug_config.difftest: + if bug_config.diffdir != None: + diff_repro_dir = bug_config.diffdir + work_dir = f'testrun-{bug_id}-diff' + reproduce(work_dir, + diff_repro_dir, + operator_config, + cluster_runtime='KIND', + acto_namespace=acto_namespace) + if reproduce_postdiff(work_dir, + operator_config, + cluster_runtime='KIND', + acto_namespace=acto_namespace): + reproduced = True + else: + print(f"Bug {bug_id} not reproduced!") + return False + + if bug_config.declaration: + if len(normal_run_result) != 0: + last_error = normal_run_result[-1] + if last_error.state_result != None and not isinstance( + last_error.state_result, PassResult): + reproduced = True + elif last_error.recovery_result != None and not isinstance( + last_error.recovery_result, PassResult): + reproduced = True + else: + print(f"Bug {bug_id} not reproduced!") + return False + + if bug_config.recovery: + if len(normal_run_result) != 0: + last_error = normal_run_result[-1] + if last_error.recovery_result != None and not isinstance( + last_error.recovery_result, PassResult): + reproduced = True + elif last_error.state_result != None and not isinstance( + last_error.state_result, PassResult): + reproduced = True + else: + print(f"Bug {bug_id} not reproduced!") + return False + + if bug_config.runtime_error: + if bug_config.difftest and check_postdiff_runtime_error(work_dir): + reproduced = True + elif len(normal_run_result) != 0: + last_error = normal_run_result[-1] + if last_error.health_result != None and not isinstance( + last_error.health_result, PassResult): + reproduced = True + else: + print(f"Bug {bug_id} not reproduced!") + return False + + if reproduced: + return True + else: + return False + +def run_worker(workqueue: multiprocessing.Queue, acto_namespace: int, reproduction_results: Dict[str, bool]): + while True: + try: + bug_tuple: Tuple[str, str, BugConfig] = workqueue.get(block=True, timeout=5) + except queue.Empty: + break + + operator_name, bug_id, bug_config = bug_tuple + reproduction_results[bug_id] = run_bug_config(operator_name=operator_name, + bug_id=bug_id, + bug_config=bug_config, + acto_namespace=acto_namespace) + +@pytest.mark.local +class TestBugReproduction(unittest.TestCase): + + def __init__(self, methodName: str = "runTest") -> None: + super().__init__(methodName) + + self._num_workers = 4 # Number of workers to run the test + + def test_all_bugs(self): + manager = multiprocessing.Manager() + workqueue = multiprocessing.Queue() + + for operator, bugs in all_bugs.items(): + for bug_id, bug_config in bugs.items(): + workqueue.put((operator, bug_id, bug_config)) + + reproduction_results: Dict[str, bool] = manager.dict() # workers write reproduction results + # to this dict. Bug ID -> if success + processes: List[multiprocessing.Process] = [] + for i in range(self._num_workers): + p = multiprocessing.Process(target=run_worker, args=(workqueue, i, reproduction_results)) + p.run() + processes.append(p) + + for p in processes: + p.join() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/utils.py b/test/utils.py index 26507e9694..09a6ffc066 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,10 +1,14 @@ from enum import Enum +import glob import json +import os from typing import Dict, List import yaml +from acto.checker.impl.health import HealthChecker +from acto.common import PassResult -from acto.snapshot import Snapshot +from acto.snapshot import EmptySnapshot, Snapshot def construct_snapshot(trial_dir: str, generation: int): @@ -576,3 +580,23 @@ class OperatorPrettyName(str, Enum): ), } } + + +def check_postdiff_runtime_error(workdir_path: str) -> bool: + '''Checks if there is runtime error manifested in the postdiff run''' + post_diff_test_dir = os.path.join(workdir_path, 'post_diff_test') + compare_results = glob.glob(os.path.join(post_diff_test_dir, 'compare-results-*.json')) + if len(compare_results) == 0: + return False + else: + for compare_result in compare_results: + with open(compare_result) as f: + result = json.load(f)[0] + to_state = result['to'] + snapshot = EmptySnapshot({}) + snapshot.system_state = to_state + health_result = HealthChecker().check(0, snapshot, {}) + if not isinstance(health_result, PassResult): + return True + + return False \ No newline at end of file