Skip to content

Commit

Permalink
fix post diff test
Browse files Browse the repository at this point in the history
  • Loading branch information
KashunCheng committed Jul 19, 2023
1 parent 41dc8da commit 5affdea
Show file tree
Hide file tree
Showing 15 changed files with 635 additions and 826 deletions.
10 changes: 5 additions & 5 deletions acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@
logger.info('Start post processing steps')

# Post processing
# post_diff_test_dir = os.path.join(args.workdir_path, 'post_diff_test')
# p = PostDiffTest(testrun_dir=args.workdir_path, config=config)
# if not args.checkonly:
# p.post_process(post_diff_test_dir, num_workers=args.num_workers)
# p.check(post_diff_test_dir, num_workers=args.num_workers)
post_diff_test_dir = os.path.join(args.workdir_path, 'post_diff_test')
p = PostDiffTest(testrun_dir=args.workdir_path, config=config)
if not args.checkonly:
p.post_process(post_diff_test_dir, num_workers=args.num_workers)
p.check(post_diff_test_dir, num_workers=args.num_workers)

end_time = datetime.now()
logger.info('Acto end to end finished in %s', end_time - start_time)
22 changes: 22 additions & 0 deletions acto/checker/impl/constant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dataclasses import dataclass
from typing import Union

from acto.checker.checker import Checker, OracleResult, OracleControlFlow


@dataclass
class ConstantResult(OracleResult):
expected: OracleControlFlow = OracleControlFlow.ok

def means(self, control_flow: Union[OracleControlFlow, str])->bool:
return self.expected == control_flow

class ConstantChecker(Checker):
name = 'constant'

def __init__(self, expected: OracleControlFlow = OracleControlFlow.ok, **kwargs):
super().__init__(**kwargs)
self.expected = expected

def _check(self, _, __) -> OracleResult:
return ConstantResult(self.expected)
216 changes: 115 additions & 101 deletions acto/checker/impl/recovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
from copy import deepcopy
from dataclasses import dataclass, field
from typing import List

from deepdiff import DeepDiff

Expand Down Expand Up @@ -29,109 +31,121 @@ def _check(self, snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleResult:
- a dict of diff results, empty if no diff found
"""

logger = get_thread_logger(with_prefix=True)
if snapshot.trial_state != 'recovery':
return OracleResult()
prev_snapshot = prev_snapshot.snapshot_before_applied_input
assert prev_snapshot is not None

curr_system_state = deepcopy(snapshot.system_state)
prev_system_state = deepcopy(prev_snapshot.system_state)

if len(curr_system_state) == 0 or len(prev_system_state) == 0:
return OracleResult()

del curr_system_state['endpoints']
del prev_system_state['endpoints']
del curr_system_state['job']
del prev_system_state['job']

# remove pods that belong to jobs from both states to avoid observability problem
curr_pods = curr_system_state['pod']
prev_pods = prev_system_state['pod']
curr_system_state['pod'] = {
k: v
for k, v in curr_pods.items()
if v['metadata']['owner_references'][0]['kind'] != 'Job'
}
prev_system_state['pod'] = {
k: v
for k, v in prev_pods.items()
if v['metadata']['owner_references'][0]['kind'] != 'Job'
}

for name, obj in prev_system_state['secret'].items():
if 'data' in obj and obj['data'] is not None:
for key, data in obj['data'].items():
try:
obj['data'][key] = json.loads(data)
except Exception:
pass

for name, obj in curr_system_state['secret'].items():
if 'data' in obj and obj['data'] is not None:
for key, data in obj['data'].items():
try:
obj['data'][key] = json.loads(data)
except:
pass

# remove custom resource from both states
curr_system_state.pop('custom_resource_spec', None)
prev_system_state.pop('custom_resource_spec', None)
curr_system_state.pop('custom_resource_status', None)
prev_system_state.pop('custom_resource_status', None)
curr_system_state.pop('pvc', None)
prev_system_state.pop('pvc', None)

# remove fields that are not deterministic
exclude_paths = [
r".*\['metadata'\]\['managed_fields'\]",
r".*\['metadata'\]\['cluster_name'\]",
r".*\['metadata'\]\['creation_timestamp'\]",
r".*\['metadata'\]\['resource_version'\]",
r".*\['metadata'\].*\['uid'\]",
r".*\['metadata'\]\['generation'\]$",
r".*\['metadata'\]\['annotations'\]",
r".*\['metadata'\]\['annotations'\]\['.*last-applied.*'\]",
r".*\['metadata'\]\['annotations'\]\['.*\.kubernetes\.io.*'\]",
r".*\['metadata'\]\['labels'\]\['.*revision.*'\]",
r".*\['metadata'\]\['labels'\]\['owner-rv'\]",
r".*\['status'\]",
r"\['metadata'\]\['deletion_grace_period_seconds'\]",
r"\['metadata'\]\['deletion_timestamp'\]",
r".*\['spec'\]\['init_containers'\]\[.*\]\['volume_mounts'\]\[.*\]\['name'\]$",
r".*\['spec'\]\['containers'\]\[.*\]\['volume_mounts'\]\[.*\]\['name'\]$",
r".*\['spec'\]\['volumes'\]\[.*\]\['name'\]$",
r".*\[.*\]\['node_name'\]$",
r".*\[\'spec\'\]\[\'host_users\'\]$",
r".*\[\'spec\'\]\[\'os\'\]$",
r".*\[\'grpc\'\]$",
r".*\[\'spec\'\]\[\'volume_name\'\]$",
r".*\['version'\]$",
r".*\['endpoints'\]\[.*\]\['addresses'\]\[.*\]\['target_ref'\]\['uid'\]$",
r".*\['endpoints'\]\[.*\]\['addresses'\]\[.*\]\['target_ref'\]\['resource_version'\]$",
r".*\['endpoints'\]\[.*\]\['addresses'\]\[.*\]\['ip'\]",
r".*\['cluster_ip'\]$",
r".*\['cluster_i_ps'\].*$",
r".*\['deployment_pods'\].*\['metadata'\]\['name'\]$",
r"\[\'config_map\'\]\[\'kube\-root\-ca\.crt\'\]\[\'data\'\]\[\'ca\.crt\'\]$",
r".*\['secret'\].*$",
r"\['secrets'\]\[.*\]\['name'\]",
r".*\['node_port'\]",
r".*\['metadata'\]\['generate_name'\]",
r".*\['metadata'\]\['labels'\]\['pod\-template\-hash'\]",
r"\['deployment_pods'\].*\['metadata'\]\['owner_references'\]\[.*\]\['name'\]",
]

diff = DeepDiff(prev_system_state,
curr_system_state,
exclude_regex_paths=exclude_paths,
view='tree')

if diff:
message = f"failed attempt recovering to seed state - system state diff: {diff}"
logger.debug(message)
return RecoveryResult(message=message, diff=diff)

return compare_system_equality(snapshot.system_state, prev_snapshot.system_state)


def compare_system_equality(curr_system_state: dict,
prev_system_state: dict,
diff_operators: list = None,
additional_exclude_paths: List[str] = None,
iterable_compare_func = None) -> OracleResult:
if diff_operators is None:
diff_operators = []
if additional_exclude_paths is None:
additional_exclude_paths = []
curr_system_state = deepcopy(curr_system_state)
prev_system_state = deepcopy(prev_system_state)

if len(curr_system_state) == 0 or len(prev_system_state) == 0:
return OracleResult()

del curr_system_state['endpoints']
del prev_system_state['endpoints']
del curr_system_state['job']
del prev_system_state['job']

# remove pods that belong to jobs from both states to avoid observability problem
curr_pods = curr_system_state['pod']
prev_pods = prev_system_state['pod']
curr_system_state['pod'] = {
k: v
for k, v in curr_pods.items()
if v['metadata']['owner_references'][0]['kind'] != 'Job'
}
prev_system_state['pod'] = {
k: v
for k, v in prev_pods.items()
if v['metadata']['owner_references'][0]['kind'] != 'Job'
}

for name, obj in prev_system_state['secret'].items():
if 'data' in obj and obj['data'] is not None:
for key, data in obj['data'].items():
try:
obj['data'][key] = json.loads(data)
except:
pass

for name, obj in curr_system_state['secret'].items():
if 'data' in obj and obj['data'] is not None:
for key, data in obj['data'].items():
try:
obj['data'][key] = json.loads(data)
except:
pass

# remove custom resource from both states
curr_system_state.pop('custom_resource_spec', None)
prev_system_state.pop('custom_resource_spec', None)
curr_system_state.pop('custom_resource_status', None)
prev_system_state.pop('custom_resource_status', None)
curr_system_state.pop('pvc', None)
prev_system_state.pop('pvc', None)

# remove fields that are not deterministic
exclude_paths = [
r".*\['metadata'\]\['managed_fields'\]",
r".*\['metadata'\]\['cluster_name'\]",
r".*\['metadata'\]\['creation_timestamp'\]",
r".*\['metadata'\]\['resource_version'\]",
r".*\['metadata'\].*\['uid'\]",
r".*\['metadata'\]\['generation'\]$",
r".*\['metadata'\]\['annotations'\]",
r".*\['metadata'\]\['annotations'\]\['.*last-applied.*'\]",
r".*\['metadata'\]\['annotations'\]\['.*\.kubernetes\.io.*'\]",
r".*\['metadata'\]\['labels'\]\['.*revision.*'\]",
r".*\['metadata'\]\['labels'\]\['owner-rv'\]",
r".*\['status'\]",
r"\['metadata'\]\['deletion_grace_period_seconds'\]",
r"\['metadata'\]\['deletion_timestamp'\]",
r".*\['spec'\]\['init_containers'\]\[.*\]\['volume_mounts'\]\[.*\]\['name'\]$",
r".*\['spec'\]\['containers'\]\[.*\]\['volume_mounts'\]\[.*\]\['name'\]$",
r".*\['spec'\]\['volumes'\]\[.*\]\['name'\]$",
r".*\[.*\]\['node_name'\]$",
r".*\[\'spec\'\]\[\'host_users\'\]$",
r".*\[\'spec\'\]\[\'os\'\]$",
r".*\[\'grpc\'\]$",
r".*\[\'spec\'\]\[\'volume_name\'\]$",
r".*\['version'\]$",
r".*\['endpoints'\]\[.*\]\['addresses'\]\[.*\]\['target_ref'\]\['uid'\]$",
r".*\['endpoints'\]\[.*\]\['addresses'\]\[.*\]\['target_ref'\]\['resource_version'\]$",
r".*\['endpoints'\]\[.*\]\['addresses'\]\[.*\]\['ip'\]",
r".*\['cluster_ip'\]$",
r".*\['cluster_i_ps'\].*$",
r".*\['deployment_pods'\].*\['metadata'\]\['name'\]$",
r"\[\'config_map\'\]\[\'kube\-root\-ca\.crt\'\]\[\'data\'\]\[\'ca\.crt\'\]$",
r".*\['secret'\].*$",
r"\['secrets'\]\[.*\]\['name'\]",
r".*\['node_port'\]",
r".*\['metadata'\]\['generate_name'\]",
r".*\['metadata'\]\['labels'\]\['pod\-template\-hash'\]",
r"\['deployment_pods'\].*\['metadata'\]\['owner_references'\]\[.*\]\['name'\]",
] + additional_exclude_paths

diff = DeepDiff(prev_system_state,
curr_system_state,
exclude_regex_paths=exclude_paths,
custom_operators=diff_operators,
iterable_compare_func=iterable_compare_func,
view='tree')

if diff:
message = f"failed attempt recovering to seed state - system state diff: {diff}"
logging.debug(message)
return RecoveryResult(message=message, diff=diff)

return OracleResult()
2 changes: 1 addition & 1 deletion acto/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def wrapper(runner: Runner, trial: Trial, context: dict) -> Snapshot:
# return True


class YamDeploy(Deploy):
class YamlDeploy(Deploy):

def deploy(self, runner: Runner) -> str:
# TODO: We cannot specify namespace ACTO_NAMESPACE here.
Expand Down
12 changes: 7 additions & 5 deletions acto/engine_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from acto.common import print_event
from acto.config import actoConfig
from acto.constant import CONST
from acto.deploy import DeployMethod, YamDeploy
from acto.deploy import DeployMethod, YamlDeploy
from acto.snapshot import Snapshot
from acto.input import InputModel, TestCase, DeterministicInputModel
from acto.input.input import OverSpecifiedField
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(self,
logger.error('Only YAML deploy method is supported, aborting')
quit()

self.deploy = YamDeploy(operator_config.deploy.file, operator_config.deploy.init)
self.deploy = YamlDeploy(operator_config.deploy.file, operator_config.deploy.init)

if cluster_runtime != "KIND":
logger.error('Only KIND cluster runtime is supported, aborting')
Expand Down Expand Up @@ -239,13 +239,15 @@ def task(runner: Runner, test_cases: List[Tuple[List[str], TestCase]]) -> Trial:
trial: Trial = self.runners.get_next_unordered()
active_runner_count -= 1

for test_case in trial.next_input.next_testcase:
for test_case in trial.recycle_testcases():
test_case_list.append(test_case)
trial_save_dir = os.path.join(self.workdir_path, f'trial-{trial_id:05}')
os.makedirs(trial_save_dir, exist_ok=True)
# TODO: improve saving snapshots
for snapshot in trial.snapshots:
snapshot.save(trial_save_dir)
for (_, exception_or_snapshot_plus_oracle_result) in trial.history_iterator():
if not isinstance(exception_or_snapshot_plus_oracle_result, Exception):
(snapshot, _) = exception_or_snapshot_plus_oracle_result
snapshot.save(trial_save_dir)
pickle.dump(trial, open(os.path.join(trial_save_dir, 'trial.pkl'), 'wb'))
trial_id += 1

Expand Down
8 changes: 8 additions & 0 deletions acto/lib/fp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,11 @@ def wrapper(*args, **kwargs):
return fn(*args[1:], **kwargs)

return wrapper

def create_constant_function(x):
def constant_function(*_, **__):
return x
return constant_function

def unreachable(*_, **__):
assert False, 'Unreachable'
Loading

0 comments on commit 5affdea

Please sign in to comment.