Skip to content

Commit

Permalink
Backport fixes and tests from AE branch
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <jiaweig3@illinois.edu>
  • Loading branch information
tylergu committed Aug 13, 2023
1 parent 8a92c01 commit f5dee97
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unittest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
make
- name: Run unittest
run: |
pytest
pytest -m "not local"
3 changes: 2 additions & 1 deletion acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@
is_reproduce=is_reproduce,
input_model=input_model,
apply_testcase_f=apply_testcase_f,
delta_from=args.delta_from)
delta_from=args.delta_from,
focus_fields=config.focus_fields,)
generation_time = datetime.now()
logger.info('Acto initialization finished in %s', generation_time - start_time)
if args.additional_semantic:
Expand Down
2 changes: 0 additions & 2 deletions acto/checker/impl/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ def check(self, generation: int, snapshot: Snapshot, prev_snapshot: Snapshot) ->
continue
logger.error('Found no matching fields for input delta')
logger.error('Input delta [%s]' % delta.path)
print_event(f'Found bug: {delta.path} changed from [{delta.prev}] to [{delta.curr}]')
return StateResult(Oracle.SYSTEM_STATE,
'Found no matching fields for input', delta)
else:
Expand All @@ -334,7 +333,6 @@ def check(self, generation: int, snapshot: Snapshot, prev_snapshot: Snapshot) ->
logger.error(f'Matched delta {match_delta.path}')
logger.error(f'Matched delta prev {match_delta.prev}, curr {match_delta.curr}')
logger.error('Failed to match input delta with matched system state delta')
print_event(f'Found bug: {delta.path} changed from [{delta.prev}] to [{delta.curr}]')
return StateResult(Oracle.SYSTEM_STATE,
'Found no matching fields for input', delta)
elif not should_compare:
Expand Down
2 changes: 1 addition & 1 deletion acto/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def invalid_input_message(log_msg: str, input_delta: dict) -> Tuple[bool, list]:

# if delta.curr is an int, we do exact match to avoid matching a short
# int (e.g. 1) to a log line and consider the int as invalid input
elif isinstance(delta.curr, int):
elif isinstance(delta.curr, int) and len(str(delta.curr)) > 1:
for item in log_msg.split(' '):
if item == str(delta.curr):
logger.info(
Expand Down
51 changes: 34 additions & 17 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
set_thread_logger_prefix)
from ssa.analysis import analyze

def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed):
def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed, time_breakdown):
logger = get_thread_logger(with_prefix=False)

result_dict = {}
Expand All @@ -47,6 +47,7 @@ def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_e
except:
result_dict['trial_num'] = trial_dir
result_dict['duration'] = trial_elapsed
result_dict['time_breakdown'] = time_breakdown
result_dict['num_tests'] = num_tests
if trial_result == None:
logger.info('Trial %s completed without error', trial_dir)
Expand Down Expand Up @@ -198,17 +199,17 @@ def __init__(self, context: dict, input_model: InputModel, deploy: Deploy, runne
checker_t: type, wait_time: int, custom_on_init: List[callable],
custom_oracle: List[callable], workdir: str, cluster: base.KubernetesEngine,
worker_id: int, sequence_base: int, dryrun: bool, is_reproduce: bool,
apply_testcase_f: FunctionType) -> None:
apply_testcase_f: FunctionType, acto_namespace: int) -> None:
self.context = context
self.workdir = workdir
self.base_workdir = workdir
self.cluster = cluster
self.images_archive = os.path.join(workdir, 'images.tar')
self.worker_id = worker_id
self.sequence_base = sequence_base # trial number to start with
self.context_name = cluster.get_context_name(f"acto-cluster-{worker_id}")
self.context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}")
self.kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self.context_name)
self.cluster_name = f"acto-cluster-{worker_id}"
self.cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}"
self.input_model = input_model
self.deploy = deploy
self.runner_t = runner_t
Expand All @@ -226,7 +227,7 @@ def __init__(self, context: dict, input_model: InputModel, deploy: Deploy, runne

self.curr_trial = 0

def run(self, mode: str = InputModel.NORMAL):
def run(self, errors: List[RunResult], mode: str = InputModel.NORMAL):
logger = get_thread_logger(with_prefix=True)

self.input_model.set_worker_id(self.worker_id)
Expand All @@ -246,12 +247,13 @@ def run(self, mode: str = InputModel.NORMAL):
self.cluster.restart_cluster(self.cluster_name, self.kubeconfig, CONST.K8S_VERSION)
apiclient = kubernetes_client(self.kubeconfig, self.context_name)
self.cluster.load_images(self.images_archive, self.cluster_name)
trial_k8s_bootstrap_time = time.time()
deployed = self.deploy.deploy_with_retry(self.context, self.kubeconfig,
self.context_name)
if not deployed:
logger.info('Not deployed. Try again!')
continue

operator_deploy_time = time.time()
trial_dir = os.path.join(
self.workdir,
'trial-%02d-%04d' % (self.worker_id + self.sequence_base, self.curr_trial))
Expand All @@ -260,13 +262,22 @@ def run(self, mode: str = InputModel.NORMAL):
trial_err, num_tests = self.run_trial(trial_dir=trial_dir, curr_trial=self.curr_trial)
self.snapshots = []

trial_elapsed = time.strftime("%H:%M:%S", time.gmtime(time.time() - trial_start_time))
trial_finished_time = time.time()
trial_elapsed = time.strftime("%H:%M:%S", time.gmtime(trial_finished_time - trial_start_time))
logger.info('Trial %d finished, completed in %s' % (self.curr_trial, trial_elapsed))
logger.info(f'Kubernetes bootstrap: {trial_k8s_bootstrap_time - trial_start_time}')
logger.info(f'Operator deploy: {operator_deploy_time - trial_k8s_bootstrap_time}')
logger.info(f'Trial run: {trial_finished_time - operator_deploy_time}')
logger.info('---------------------------------------\n')

delete_operator_pod(apiclient, self.context['namespace'])
save_result(trial_dir, trial_err, num_tests, trial_elapsed)
save_result(trial_dir, trial_err, num_tests, trial_elapsed, {
'k8s_bootstrap': trial_k8s_bootstrap_time - trial_start_time,
'operator_deploy': operator_deploy_time - trial_k8s_bootstrap_time,
'trial_run': trial_finished_time - operator_deploy_time
})
self.curr_trial = self.curr_trial + 1
errors.append(trial_err)

if self.input_model.is_empty():
logger.info('Test finished')
Expand Down Expand Up @@ -602,7 +613,8 @@ def __init__(self,
reproduce_dir: str = None,
delta_from: str = None,
mount: list = None,
focus_fields: list = None) -> None:
focus_fields: list = None,
acto_namespace: int = 0) -> None:
logger = get_thread_logger(with_prefix=False)

try:
Expand All @@ -625,13 +637,13 @@ def __init__(self,
raise UnknownDeployMethodError()

if cluster_runtime == "KIND":
cluster = kind.Kind()
cluster = kind.Kind(acto_namespace=acto_namespace)
elif cluster_runtime == "K3D":
cluster = k3d.K3D()
else:
logger.warning(
f"Cluster Runtime {cluster_runtime} is not supported, defaulted to use kind")
cluster = kind.Kind()
cluster = kind.Kind(acto_namespace=acto_namespace)

self.cluster = cluster
self.deploy = deploy
Expand All @@ -644,6 +656,7 @@ def __init__(self,
self.is_reproduce = is_reproduce
self.apply_testcase_f = apply_testcase_f
self.reproduce_dir = reproduce_dir
self.acto_namespace = acto_namespace

self.runner_type = Runner
self.checker_type = CheckerSet
Expand Down Expand Up @@ -674,6 +687,7 @@ def __init__(self,
if operator_config.k8s_fields is not None:
module = importlib.import_module(operator_config.k8s_fields)
if hasattr(module,'BLACKBOX') and actoConfig.mode == 'blackbox':
applied_custom_k8s_fields = True
for k8s_field in module.BLACKBOX:
self.input_model.apply_k8s_schema(k8s_field)
elif hasattr(module,'WHITEBOX') and actoConfig.mode == 'whitebox':
Expand Down Expand Up @@ -814,7 +828,8 @@ def __learn(self, context_file, helper_crd, analysis_only=False):
with open(context_file, 'w') as context_fout:
json.dump(self.context, context_fout, cls=ContextEncoder, indent=4, sort_keys=True)

def run(self, modes: list = ['normal', 'overspecified', 'copiedover']):
def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[RunResult]:
# TODO: return the alarms here
logger = get_thread_logger(with_prefix=True)

# Build an archive to be preloaded
Expand All @@ -829,19 +844,20 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']):

start_time = time.time()

errors: List[RunResult] = []
runners: List[TrialRunner] = []
for i in range(self.num_workers):
runner = TrialRunner(self.context, self.input_model, self.deploy, self.runner_type,
self.checker_type, self.operator_config.wait_time,
self.custom_on_init, self.custom_oracle, self.workdir_path,
self.cluster, i, self.sequence_base, self.dryrun,
self.is_reproduce, self.apply_testcase_f)
self.is_reproduce, self.apply_testcase_f, self.acto_namespace)
runners.append(runner)

if 'normal' in modes:
threads = []
for runner in runners:
t = threading.Thread(target=runner.run, args=([]))
t = threading.Thread(target=runner.run, args=([errors]))
t.start()
threads.append(t)

Expand All @@ -853,7 +869,7 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']):
if 'overspecified' in modes:
threads = []
for runner in runners:
t = threading.Thread(target=runner.run, args=([InputModel.OVERSPECIFIED]))
t = threading.Thread(target=runner.run, args=([errors, InputModel.OVERSPECIFIED]))
t.start()
threads.append(t)

Expand All @@ -865,7 +881,7 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']):
if 'copiedover' in modes:
threads = []
for runner in runners:
t = threading.Thread(target=runner.run, args=([InputModel.COPIED_OVER]))
t = threading.Thread(target=runner.run, args=([errors, InputModel.COPIED_OVER]))
t.start()
threads.append(t)

Expand All @@ -877,7 +893,7 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']):
if InputModel.ADDITIONAL_SEMANTIC in modes:
threads = []
for runner in runners:
t = threading.Thread(target=runner.run, args=([InputModel.ADDITIONAL_SEMANTIC]))
t = threading.Thread(target=runner.run, args=([errors, InputModel.ADDITIONAL_SEMANTIC]))
t.start()
threads.append(t)

Expand All @@ -904,3 +920,4 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']):
json.dump(testrun_info, info_file, cls=ActoEncoder, indent=4)

logger.info('All tests finished')
return errors
28 changes: 26 additions & 2 deletions acto/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import yaml
from deepdiff import DeepDiff

from acto.common import random_string
from acto.common import is_subfield, random_string
from acto.input import known_schemas
from acto.input.get_matched_schemas import find_matched_schema
from acto.input.valuegenerator import extract_schema_with_value_generator
Expand Down Expand Up @@ -693,7 +693,7 @@ def generate_test_plan(self, delta_from: str = None, focus_fields: list = None)
focused = False
for focus_field in focus_fields:
logger.info(f'Comparing {schema.path} with {focus_field}')
if focus_field == schema.path:
if is_subfield(schema.path, focus_field):
focused = True
break
if not focused:
Expand Down Expand Up @@ -732,6 +732,18 @@ def generate_test_plan(self, delta_from: str = None, focus_fields: list = None)
# num_additional_semantic_testcases += len(k8s_str_tests)

for schema in pruned_by_overspecified:
# Skip if the schema is not in the focus fields
if focus_fields is not None:
logger.info(f'focusing on {focus_fields}')
focused = False
for focus_field in focus_fields:
logger.info(f'Comparing {schema.path} with {focus_field}')
if is_subfield(schema.path, focus_field):
focused = True
break
if not focused:
continue

testcases, semantic_testcases_ = schema.test_cases()
path = json.dumps(schema.path).replace('\"ITEM\"',
'0').replace('additional_properties',
Expand All @@ -754,6 +766,18 @@ def generate_test_plan(self, delta_from: str = None, focus_fields: list = None)
num_total_semantic_tests += 1

for schema in pruned_by_copied:
# Skip if the schema is not in the focus fields
if focus_fields is not None:
logger.info(f'focusing on {focus_fields}')
focused = False
for focus_field in focus_fields:
logger.info(f'Comparing {schema.path} with {focus_field}')
if is_subfield(schema.path, focus_field):
focused = True
break
if not focused:
continue

testcases, semantic_testcases_ = schema.test_cases()
path = json.dumps(schema.path).replace('\"ITEM\"',
'0').replace('additional_properties',
Expand Down
6 changes: 3 additions & 3 deletions acto/kubernetes_engine/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

class Kind(base.KubernetesEngine):

def __init__(self):
self.config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, 'KIND.yaml')
def __init__(self, acto_namespace: int):
self.config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, f'KIND-{acto_namespace}.yaml')

def configure_cluster(self, num_nodes: int, version: str):
'''Create config file for kind'''
Expand Down Expand Up @@ -63,7 +63,7 @@ def create_cluster(self, name: str, kubeconfig: str, version: str):
config: path of the config file for cluster
version: k8s version
'''
print_event('Creating kind cluster...')
print_event('Creating a Kind cluster...')
cmd = ['kind', 'create', 'cluster']

if name:
Expand Down
Loading

0 comments on commit f5dee97

Please sign in to comment.