From 4f46395fde042a64dd4f412d2e3ac758a1e69482 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 18 Jul 2024 09:01:05 -0700 Subject: [PATCH] Update conditions logic --- src/cfnlint/conditions/conditions.py | 13 ++-- .../rules/jsonschema/CfnLintRelationship.py | 60 +++++++------- .../rules/resources/PrimaryIdentifiers.py | 2 +- src/cfnlint/template/template.py | 2 +- test/fixtures/templates/good/generic.yaml.dot | 0 .../unit/module/conditions/test_conditions.py | 21 ++--- .../jsonschema/test_cfn_lint_relationship.py | 56 +++++++++++-- .../test_cfn_lint_relationship_bad_path.py | 37 +++++++++ .../test_cfn_lint_relationship_list.py | 78 +++++++++++++++++++ 9 files changed, 214 insertions(+), 55 deletions(-) create mode 100644 test/fixtures/templates/good/generic.yaml.dot create mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py create mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index 37bf7a1118..6afef36441 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -240,7 +240,7 @@ def build_scenarios( # formatting or just the wrong condition name return - def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: + def implies(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) @@ -275,17 +275,18 @@ def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: ) ) - implies_condition = self._conditions[implies].build_true_cnf( - self._solver_params - ) - and_condition = And(*conditions) - cnf.add_prop(and_condition) # if the implies condition has to be true already then we don't # need to imply it if not scenarios.get(implies): + implies_condition = self._conditions[implies].build_true_cnf( + self._solver_params + ) cnf.add_prop(Not(Implies(and_condition, implies_condition))) + else: + cnf.add_prop(and_condition) + if satisfiable(cnf): return True diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py index 0713c81658..7b393687f7 100644 --- a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py +++ b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py @@ -27,43 +27,37 @@ def _get_relationship( ) -> Iterator[tuple[Any, Validator]]: fn_k, fn_v = is_function(instance) if fn_k == "Fn::If": - if not isinstance(fn_v, list) and len(fn_v) != 3: + if not isinstance(fn_v, list) or len(fn_v) != 3: return condition = fn_v[0] # True path - for i in [1, 2]: - try: - if_validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - status={ - condition: i == 1, - } - ) + status = { + k: set([v]) for k, v in validator.context.conditions.status.items() + } + if condition not in status: + status[condition] = set([True, False]) + for scenario in validator.cfn.conditions.build_scenarios(status): + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=scenario, ) ) - status = { - k: set([v]) - for k, v in if_validator.context.conditions.status.items() - } - if next( - validator.cfn.conditions.build_scenarios(status), - if_validator.context.regions[0], - ): - for r, v in self._get_relationship( - validator.evolve( - context=if_validator.context.evolve( - path=if_validator.context.path.descend( - path=fn_k - ).descend(path=i) - ) - ), - fn_v[i], - path.copy(), - ): - yield r, v - except ValueError: - return + ) + + i = 1 if scenario[condition] else 2 + for r, v in self._get_relationship( + validator.evolve( + context=if_validator.context.evolve( + path=if_validator.context.path.descend(path=fn_k).descend( + path=i + ) + ) + ), + fn_v[i], + path.copy(), + ): + yield r, v return if fn_k == "Ref" and fn_v == "AWS::NoValue": @@ -131,7 +125,7 @@ def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validato condition = other_resource.get("Condition", None) if condition: - if not validator.cfn.conditions.check_implies( + if not validator.cfn.conditions.implies( validator.context.conditions.status, condition ): continue diff --git a/src/cfnlint/rules/resources/PrimaryIdentifiers.py b/src/cfnlint/rules/resources/PrimaryIdentifiers.py index bf3305c447..fa0daef3fa 100644 --- a/src/cfnlint/rules/resources/PrimaryIdentifiers.py +++ b/src/cfnlint/rules/resources/PrimaryIdentifiers.py @@ -63,7 +63,7 @@ def _validate_resource_type_uniqueness(self, cfn, resource_type, ids): # and add it to the scenario as needed if condition: if scenario: - if not cfn.conditions.check_implies(scenario, condition): + if not cfn.conditions.implies(scenario, condition): continue conditions[condition] = set([True]) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 65767f350e..138a87cddb 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if self.conditions.check_implies(scenario, resource_condition): + if self.conditions.implies(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/fixtures/templates/good/generic.yaml.dot b/test/fixtures/templates/good/generic.yaml.dot new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/module/conditions/test_conditions.py b/test/unit/module/conditions/test_conditions.py index 4012a69fa2..70918a7fee 100644 --- a/test/unit/module/conditions/test_conditions.py +++ b/test/unit/module/conditions/test_conditions.py @@ -30,7 +30,14 @@ def test_bad_condition_definition(self): len(cfn.conditions._conditions), 1 ) # would be 2 but IsProd fails # test coverage for KeyErrors in the following functions - self.assertTrue(cfn.conditions.check_implies({"Test": True}, "IsUsEast1")) + self.assertTrue( + cfn.conditions.implies( + { + "Test": True, + }, + "IsUsEast1", + ) + ) self.assertEqual( list(cfn.conditions.build_scenarios({"IsProd": None, "IsUsEast1": None})), [], @@ -76,14 +83,10 @@ def test_check_implies(self): condition_names.append(f"{p}Condition") cfn = Template("", template) - self.assertFalse( - cfn.conditions.check_implies({"aCondition": False}, "aCondition") - ) - self.assertTrue( - cfn.conditions.check_implies({"aCondition": True}, "aCondition") - ) + self.assertFalse(cfn.conditions.implies({"aCondition": False}, "aCondition")) + self.assertTrue(cfn.conditions.implies({"aCondition": True}, "aCondition")) self.assertTrue( - cfn.conditions.check_implies( + cfn.conditions.implies( {"aCondition": True, "bCondition": False}, "aCondition" ) ) @@ -105,7 +108,7 @@ def test_check_always_true_or_false(self): cfn = Template("", template) self.assertEqual(len(cfn.conditions._conditions), 2) # test coverage for KeyErrors in the following functions - self.assertFalse(cfn.conditions.check_implies({"IsTrue": True}, "IsFalse")) + self.assertTrue(cfn.conditions.implies({"IsTrue": True}, "IsFalse")) def test_check_never_false(self): """With allowed values two conditions can not both be false""" diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py index 30a94a376f..fcbda4cf6d 100644 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py @@ -100,15 +100,32 @@ def template(): "Condition": "IsImageIdSpecified", "Properties": {"ImageId": {"Fn::GetAtt": ["Four", "ImageId"]}}, }, + "Five": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + ] + }, + }, + }, "ParentFive": { "Type": "AWS::EC2::Instance", "Condition": "IsImageIdSpecified", - "Properties": {"ImageId": {"Fn::If": ["IsUsEast1", "ImageId"]}}, + "Properties": {"ImageId": {"Fn::GetAtt": ["Five", "ImageId"]}}, + }, + "Six": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": "Foo", }, "ParentSix": { "Type": "AWS::EC2::Instance", "Condition": "IsImageIdSpecified", - "Properties": [], + "Properties": {"ImageId": {"Fn::GetAtt": ["Six", "ImageId"]}}, }, "Seven": { "Type": "AWS::EC2::Instance", @@ -126,7 +143,22 @@ def template(): "ParentSeven": { "Type": "AWS::EC2::Instance", "Condition": "IsUsEast1", - "Properties": {"ImageId": {"Fn::GetAtt": ["Sevent", "ImageId"]}}, + "Properties": {"ImageId": {"Fn::GetAtt": ["Seven", "ImageId"]}}, + }, + "Eight": { + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + {"Ref": "AWS::NoValue"}, + ] + }, + }, + }, + "ParentEight": { + "Type": "AWS::EC2::Instance", + "Properties": {"ImageId": {"Fn::GetAtt": ["Eight", "ImageId"]}}, }, }, } @@ -183,12 +215,26 @@ def template(): ( "Seven", deque(["Resources", "ParentSeven", "Properties", "ImageId"]), + { + "IsUsEast1": True, + }, + [()], + ), + ( + "Short Path", + deque(["Resources"]), {}, [], ), ( - "Short Path", - deque(["Resources", "ParentSix"]), + "Not in resources", + deque(["Outputs", "MyOutput"]), + {}, + [], + ), + ( + "No type on relationshiop", + deque(["Resources", "ParentEight", "Properties", "ImageId"]), {}, [], ), diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py new file mode 100644 index 0000000000..71a2a1a544 --- /dev/null +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py @@ -0,0 +1,37 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.context import Path +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship + + +@pytest.fixture +def rule(): + return CfnLintRelationship(keywords=[], relationship="Resources") + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Resources": {}, + } + + +def test_get_relationships(rule, validator): + validator = validator.evolve( + context=validator.context.evolve( + path=Path( + deque(["Resources", "ParentOne", "Properties", "ImageId"]), + ), + ), + ) + assert [] == list(rule.get_relationship(validator)) diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py new file mode 100644 index 0000000000..e5aa828814 --- /dev/null +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py @@ -0,0 +1,78 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.context import Path +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship + + +@pytest.fixture +def rule(): + return CfnLintRelationship( + keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/Foo/*/Bar" + ) + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Resources": { + "One": { + "Type": "AWS::EC2::Instance", + "Properties": { + "Foo": [ + { + "Bar": "One", + }, + { + "Bar": "Two", + }, + ] + }, + }, + "ParentOne": { + "Type": "AWS::EC2::Instance", + "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, + }, + }, + } + + +@pytest.mark.parametrize( + "name,path,status,expected", + [ + ( + "One", + deque(["Resources", "ParentOne", "Properties", "ImageId"]), + {}, + [ + ("One", {}), + ("Two", {}), + ], + ), + ], +) +def test_get_relationships(name, path, status, expected, rule, validator): + validator = validator.evolve( + context=validator.context.evolve( + path=Path(path), + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + results = list(rule.get_relationship(validator)) + assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" + for result, exp in zip(results, expected): + assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == exp[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}"