From 788e1ef9fcf32b404b45722555c0e7d865c04ebc Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 16 Jul 2024 09:13:03 -0700 Subject: [PATCH 1/7] Create relationship rule and validate ImageId on Instance --- src/cfnlint/conditions/conditions.py | 3 +- src/cfnlint/rules/__init__.py | 3 + .../rules/jsonschema/CfnLintRelationship.py | 162 +++++++++++++ src/cfnlint/rules/jsonschema/__init__.py | 6 +- .../rules/resources/ectwo/InstanceImageId.py | 60 +++++ .../jsonschema/test_cfn_lint_relationship.py | 159 +++++++++++++ .../resources/ec2/test_instance_image_id.py | 219 ++++++++++++++++++ 7 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 src/cfnlint/rules/jsonschema/CfnLintRelationship.py create mode 100644 src/cfnlint/rules/resources/ectwo/InstanceImageId.py create mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship.py create mode 100644 test/unit/rules/resources/ec2/test_instance_image_id.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index da42ee8918..37bf7a1118 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -354,7 +354,8 @@ def satisfiable( determine if the conditions are satisfied Args: - condition_names (list[str]): A list of condition names + condition_names (dict[str, bool]): A list of condition names with if + they are True or False Returns: bool: True if the conditions are satisfied diff --git a/src/cfnlint/rules/__init__.py b/src/cfnlint/rules/__init__.py index 54c32fcbc5..d8cc5f4336 100644 --- a/src/cfnlint/rules/__init__.py +++ b/src/cfnlint/rules/__init__.py @@ -5,9 +5,12 @@ from cfnlint.rules._rule import CloudFormationLintRule, Match, RuleMatch from cfnlint.rules._rules import Rules, RulesCollection + +# type: ignore from cfnlint.rules.jsonschema import ( CfnLintJsonSchema, CfnLintJsonSchemaRegional, CfnLintKeyword, + CfnLintRelationship, SchemaDetails, ) diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py new file mode 100644 index 0000000000..e2d999f3fb --- /dev/null +++ b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py @@ -0,0 +1,162 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator, Sequence + +from cfnlint.helpers import is_function +from cfnlint.jsonschema import ValidationError, ValidationResult, Validator +from cfnlint.rules import CloudFormationLintRule + + +class CfnLintRelationship(CloudFormationLintRule): + def __init__( + self, keywords: Sequence[str] | None = None, relationship: str | None = None + ) -> None: + super().__init__() + self.keywords = keywords or [] + self.relationship = relationship or "" + self.parent_rules = ["E1101"] + + def message(self, instance: Any, err: ValidationError) -> str: + return self.shortdesc + + def _get_relationship( + self, validator: Validator, instance: Any, path: deque[str | int] + ) -> Iterator[tuple[Any, Validator]]: + fn_k, fn_v = is_function(instance) + if fn_k == "Fn::If": + if not isinstance(fn_v, list) and len(fn_v) != 3: + return + condition = fn_v[0] + # True path + for i in [1, 2]: + try: + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status={ + condition: i == 1, + } + ) + ) + ) + status = { + k: set([v]) + for k, v in if_validator.context.conditions.status.items() + } + if next( + validator.cfn.conditions.build_scenarios(status), + if_validator.context.regions[0], + ): + for r, v in self._get_relationship( + validator.evolve( + context=if_validator.context.evolve( + path=if_validator.context.path.descend( + path=fn_k + ).descend(path=i) + ) + ), + fn_v[i], + path.copy(), + ): + yield r, v + except ValueError: + return + return + + if fn_k == "Ref" and fn_v == "AWS::NoValue": + yield None, validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=fn_k) + ) + ) + return + + if not path: + yield instance, validator + return + + key = path.popleft() + if isinstance(instance, list): + if key == "*": + for i, v in enumerate(instance): + for r, v in self._get_relationship( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=i) + ), + ), + v, + path.copy(), + ): + yield r, v + return + + if not isinstance(instance, dict): + return + + for r, v in self._get_relationship( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=key) + ) + ), + instance.get(key), + path.copy(), + ): + yield r, v + + def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validator]]: + if len(validator.context.path.path) < 2: + return + + if "Resources" != validator.context.path.path[0]: + return + + resource_name = validator.context.path.path[1] + path = self.relationship.split("/") + if len(path) < 2: + return + # get related resources by ref/getatt to the source + # and relationship types + if not validator.cfn.graph: + return + for edge in validator.cfn.graph.graph.out_edges(resource_name): + other_resource = validator.cfn.template.get(path[0], {}).get(edge[1], {}) + + if other_resource.get("Type") != path[1]: + continue + + condition = other_resource.get("Condition", None) + if condition: + if not validator.cfn.conditions.check_implies( + validator.context.conditions.status, condition + ): + continue + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + { + condition: True, + } + ) + ) + ) + # validate reosurce type + + for r, v in self._get_relationship( + validator, other_resource, deque(path[2:]) + ): + yield r, v.evolve() + + def validate( + self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + raise NotImplementedError("validate not implemented") + return + yield diff --git a/src/cfnlint/rules/jsonschema/__init__.py b/src/cfnlint/rules/jsonschema/__init__.py index e1d5a72bee..c63aae1159 100644 --- a/src/cfnlint/rules/jsonschema/__init__.py +++ b/src/cfnlint/rules/jsonschema/__init__.py @@ -7,15 +7,17 @@ from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema, SchemaDetails from cfnlint.rules.jsonschema.CfnLintJsonSchemaRegional import CfnLintJsonSchemaRegional from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship from cfnlint.rules.jsonschema.MaxProperties import MaxProperties from cfnlint.rules.jsonschema.PropertyNames import PropertyNames __all__ = [ "BaseJsonSchema", - "CfnLintKeyword", "CfnLintJsonSchema", - "SchemaDetails", "CfnLintJsonSchemaRegional", + "CfnLintKeyword", + "CfnLintRelationship", "MaxProperties", "PropertyNames", + "SchemaDetails", ] diff --git a/src/cfnlint/rules/resources/ectwo/InstanceImageId.py b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py new file mode 100644 index 0000000000..24a976ab63 --- /dev/null +++ b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py @@ -0,0 +1,60 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any + +from cfnlint.jsonschema import ValidationError, ValidationResult, Validator +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship + + +class InstanceImageId(CfnLintRelationship): + id = "E3673" + shortdesc = "Validate if an ImageId is required" + description = ( + "Validate if an ImageID is required. It can be " + "required if the associated LaunchTemplate doesn't specify " + "an ImageID" + ) + tags = ["resources", "ec2"] + + def __init__(self) -> None: + super().__init__( + keywords=[ + "Resources/AWS::EC2::Instance/Properties", + ], + relationship="Resources/AWS::EC2::LaunchTemplate/Properties/LaunchTemplateData/ImageId", + ) + + def validate( + self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for source_props, source_validator in self._get_relationship( + validator, instance, path=deque(["ImageId"]) + ): + relationships = list(self.get_relationship(source_validator)) + if not relationships and not source_props: + yield ValidationError( + "'ImageId' is a required property", + path_override=( + source_validator.context.path.path + if len(source_validator.context.path.path) > 4 + else deque([]) + ), + ) + for relationship_props, _ in relationships: + if relationship_props is None: + if not source_props: + yield ValidationError( + "'ImageId' is a required property", + path_override=( + source_validator.context.path.path + if len(source_validator.context.path.path) > 4 + else deque([]) + ), + ) diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py new file mode 100644 index 0000000000..139c1c98d1 --- /dev/null +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py @@ -0,0 +1,159 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.context import Path +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship + + +@pytest.fixture +def rule(): + return CfnLintRelationship( + keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/ImageId" + ) + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "AWS::SSM::Parameter::Value", + "Default": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2", # noqa: E501 + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsImageIdSpecified": { + "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] + }, + }, + "Resources": { + "One": { + "Type": "AWS::EC2::Instance", + "Properties": { + "ImageId": { + "Fn::If": [ + "IsUsEast1", + {"Ref": "ImageId"}, + {"Ref": "AWS::NoValue"}, + ] + }, + }, + }, + "ParentOne": { + "Type": "AWS::EC2::Instance", + "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, + }, + "Two": { + "Type": "AWS::EC2::Instance", + "Properties": { + "ImageId": {"Ref": "ImageId"}, + }, + }, + "ParentTwo": { + "Type": "AWS::EC2::Instance", + "Properties": {"ImageId": {"Fn::GetAtt": ["Two", "ImageId"]}}, + }, + "Three": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + {"Ref": "AWS::NoValue"}, + ], + }, + }, + }, + "ParentThree": { + "Type": "AWS::EC2::Instance", + "Condition": "IsImageIdSpecified", + "Properties": {"ImageId": {"Fn::GetAtt": ["Three", "ImageId"]}}, + }, + "Four": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + {"Ref": "AWS::NoValue"}, + ] + }, + }, + }, + "ParentFour": { + "Type": "AWS::EC2::Instance", + "Condition": "IsImageIdSpecified", + "Properties": {"ImageId": {"Fn::GetAtt": ["Four", "ImageId"]}}, + }, + }, + } + + +@pytest.mark.parametrize( + "name,path,status,expected", + [ + ( + "One", + deque(["Resources", "ParentOne", "Properties", "ImageId"]), + {}, + [ + ({"Ref": "ImageId"}, {"IsUsEast1": True}), + (None, {"IsUsEast1": False}), + ], + ), + ( + "Two", + deque(["Resources", "ParentTwo", "Properties", "ImageId"]), + {}, + [({"Ref": "ImageId"}, {})], + ), + ( + "Three", + deque(["Resources", "ParentThree", "Properties", "ImageId"]), + { + "IsImageIdSpecified": True, + }, + [({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True})], + ), + ( + "Four", + deque(["Resources", "ParentFour", "Properties", "ImageId"]), + { + "IsImageIdSpecified": True, + }, + [ + ({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True}), + ], + ), + ], +) +def test_get_relationships(name, path, status, expected, rule, validator): + validator = validator.evolve( + context=validator.context.evolve( + path=Path(path), + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + results = list(rule.get_relationship(validator)) + assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" + for result, exp in zip(results, expected): + assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == exp[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" diff --git a/test/unit/rules/resources/ec2/test_instance_image_id.py b/test/unit/rules/resources/ec2/test_instance_image_id.py new file mode 100644 index 0000000000..842f74a2ee --- /dev/null +++ b/test/unit/rules/resources/ec2/test_instance_image_id.py @@ -0,0 +1,219 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from collections import deque + +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.resources.ectwo.InstanceImageId import InstanceImageId + + +@pytest.fixture(scope="module") +def rule(): + rule = InstanceImageId() + yield rule + + +@pytest.mark.parametrize( + "name,template,instance,path,expected", + [ + ( + "Valid with no ImageId in LaunchTemplate", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + }, + }, + }, + "Instance": { + "Type": "AWS::EC2::Instance", + "Properties": { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + "ImageId": "ami-12345678", + }, + }, + } + }, + { + "ImageId": "ami-12345678", + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with ImageId in LaunchTemplate", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + "ImageId": "ami-12345678", + }, + }, + }, + "Instance": { + "Type": "AWS::EC2::Instance", + "Properties": { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, + }, + } + }, + {}, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Invalid with no relationship", + { + "Resources": { + "Instance": { + "Type": "AWS::EC2::Instance", + "Properties": {}, + }, + } + }, + {}, + { + "path": ["Resources", "Instance", "Properties"], + }, + [ValidationError("'ImageId' is a required property")], + ), + ( + "Invalid with no ImageId in LaunchTemplate", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + }, + }, + }, + "Instance": { + "Type": "AWS::EC2::Instance", + "Properties": { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, + }, + } + }, + {}, + { + "path": ["Resources", "Instance", "Properties"], + }, + [ + ValidationError( + "'ImageId' is a required property", + ) + ], + ), + ( + "Invalid with no ImageId in LaunchTemplate with condition", + { + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]} + }, + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + }, + }, + }, + "Instance": { + "Type": "AWS::EC2::Instance", + "Properties": { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + "ImageId": { + "Fn::If": [ + "IsUsEast1", + "ami-12345678", + {"Ref": "AWS::NoValue"}, + ] + }, + }, + }, + }, + }, + { + "ImageId": { + "Fn::If": ["IsUsEast1", "ami-12345678", {"Ref": "AWS::NoValue"}] + } + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [ + ValidationError( + "'ImageId' is a required property", + path_override=deque( + [ + "Resources", + "Instance", + "Properties", + "ImageId", + "Fn::If", + 2, + "Ref", + ] + ), + ) + ], + ), + ], + indirect=["template", "path"], +) +def test_validate(name, instance, expected, rule, validator): + errs = list(rule.validate(validator, "", instance, {})) + assert ( + errs == expected + ), f"Expected test {name!r} to have {expected!r} but got {errs!r}" From 1a1e7e714a3a2f2c4a1fe0b1972b33a01cb808a7 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Wed, 17 Jul 2024 12:17:06 -0700 Subject: [PATCH 2/7] Update tests --- test/unit/module/cfn_yaml/test_yaml.py | 2 +- test/unit/module/test_rules_collections.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/module/cfn_yaml/test_yaml.py b/test/unit/module/cfn_yaml/test_yaml.py index b4668a19b8..4e5b5afc80 100644 --- a/test/unit/module/cfn_yaml/test_yaml.py +++ b/test/unit/module/cfn_yaml/test_yaml.py @@ -27,7 +27,7 @@ def setUp(self): }, "generic_bad": { "filename": "test/fixtures/templates/bad/generic.yaml", - "failures": 34, + "failures": 35, }, } diff --git a/test/unit/module/test_rules_collections.py b/test/unit/module/test_rules_collections.py index 3c2cd97917..edde4715a7 100644 --- a/test/unit/module/test_rules_collections.py +++ b/test/unit/module/test_rules_collections.py @@ -69,7 +69,7 @@ def test_fail_run(self): filename = "test/fixtures/templates/bad/generic.yaml" template = cfnlint.decode.cfn_yaml.load(filename) cfn = Template(filename, template, ["us-east-1"]) - expected_err_count = 37 + expected_err_count = 38 matches = [] matches.extend(self.rules.run(filename, cfn)) assert ( From fc593adcf6bc2d7f7c4556c2b4e4c822680e69a0 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Wed, 17 Jul 2024 15:55:59 -0700 Subject: [PATCH 3/7] Add more tests for bad condition scenarios --- .../rules/jsonschema/CfnLintRelationship.py | 7 +-- .../jsonschema/test_cfn_lint_relationship.py | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py index e2d999f3fb..0713c81658 100644 --- a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py +++ b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py @@ -9,7 +9,7 @@ from typing import Any, Iterator, Sequence from cfnlint.helpers import is_function -from cfnlint.jsonschema import ValidationError, ValidationResult, Validator +from cfnlint.jsonschema import ValidationResult, Validator from cfnlint.rules import CloudFormationLintRule @@ -22,9 +22,6 @@ def __init__( self.relationship = relationship or "" self.parent_rules = ["E1101"] - def message(self, instance: Any, err: ValidationError) -> str: - return self.shortdesc - def _get_relationship( self, validator: Validator, instance: Any, path: deque[str | int] ) -> Iterator[tuple[Any, Validator]]: @@ -158,5 +155,3 @@ def validate( self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] ) -> ValidationResult: raise NotImplementedError("validate not implemented") - return - yield diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py index 139c1c98d1..30a94a376f 100644 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py @@ -32,6 +32,7 @@ def template(): }, "Conditions": { "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, "IsImageIdSpecified": { "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] }, @@ -99,6 +100,34 @@ def template(): "Condition": "IsImageIdSpecified", "Properties": {"ImageId": {"Fn::GetAtt": ["Four", "ImageId"]}}, }, + "ParentFive": { + "Type": "AWS::EC2::Instance", + "Condition": "IsImageIdSpecified", + "Properties": {"ImageId": {"Fn::If": ["IsUsEast1", "ImageId"]}}, + }, + "ParentSix": { + "Type": "AWS::EC2::Instance", + "Condition": "IsImageIdSpecified", + "Properties": [], + }, + "Seven": { + "Type": "AWS::EC2::Instance", + "Condition": "IsNotUsEast1", + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + {"Ref": "AWS::NoValue"}, + ] + }, + }, + }, + "ParentSeven": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": {"ImageId": {"Fn::GetAtt": ["Sevent", "ImageId"]}}, + }, }, } @@ -139,6 +168,30 @@ def template(): ({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True}), ], ), + ( + "Five", + deque(["Resources", "ParentFive", "Properties", "ImageId"]), + {}, + [], + ), + ( + "Six", + deque(["Resources", "ParentSix", "Properties", "ImageId"]), + {}, + [], + ), + ( + "Seven", + deque(["Resources", "ParentSeven", "Properties", "ImageId"]), + {}, + [], + ), + ( + "Short Path", + deque(["Resources", "ParentSix"]), + {}, + [], + ), ], ) def test_get_relationships(name, path, status, expected, rule, validator): From 4f46395fde042a64dd4f412d2e3ac758a1e69482 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 18 Jul 2024 09:01:05 -0700 Subject: [PATCH 4/7] Update conditions logic --- src/cfnlint/conditions/conditions.py | 13 ++-- .../rules/jsonschema/CfnLintRelationship.py | 60 +++++++------- .../rules/resources/PrimaryIdentifiers.py | 2 +- src/cfnlint/template/template.py | 2 +- test/fixtures/templates/good/generic.yaml.dot | 0 .../unit/module/conditions/test_conditions.py | 21 ++--- .../jsonschema/test_cfn_lint_relationship.py | 56 +++++++++++-- .../test_cfn_lint_relationship_bad_path.py | 37 +++++++++ .../test_cfn_lint_relationship_list.py | 78 +++++++++++++++++++ 9 files changed, 214 insertions(+), 55 deletions(-) create mode 100644 test/fixtures/templates/good/generic.yaml.dot create mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py create mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index 37bf7a1118..6afef36441 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -240,7 +240,7 @@ def build_scenarios( # formatting or just the wrong condition name return - def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: + def implies(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) @@ -275,17 +275,18 @@ def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: ) ) - implies_condition = self._conditions[implies].build_true_cnf( - self._solver_params - ) - and_condition = And(*conditions) - cnf.add_prop(and_condition) # if the implies condition has to be true already then we don't # need to imply it if not scenarios.get(implies): + implies_condition = self._conditions[implies].build_true_cnf( + self._solver_params + ) cnf.add_prop(Not(Implies(and_condition, implies_condition))) + else: + cnf.add_prop(and_condition) + if satisfiable(cnf): return True diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py index 0713c81658..7b393687f7 100644 --- a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py +++ b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py @@ -27,43 +27,37 @@ def _get_relationship( ) -> Iterator[tuple[Any, Validator]]: fn_k, fn_v = is_function(instance) if fn_k == "Fn::If": - if not isinstance(fn_v, list) and len(fn_v) != 3: + if not isinstance(fn_v, list) or len(fn_v) != 3: return condition = fn_v[0] # True path - for i in [1, 2]: - try: - if_validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - status={ - condition: i == 1, - } - ) + status = { + k: set([v]) for k, v in validator.context.conditions.status.items() + } + if condition not in status: + status[condition] = set([True, False]) + for scenario in validator.cfn.conditions.build_scenarios(status): + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=scenario, ) ) - status = { - k: set([v]) - for k, v in if_validator.context.conditions.status.items() - } - if next( - validator.cfn.conditions.build_scenarios(status), - if_validator.context.regions[0], - ): - for r, v in self._get_relationship( - validator.evolve( - context=if_validator.context.evolve( - path=if_validator.context.path.descend( - path=fn_k - ).descend(path=i) - ) - ), - fn_v[i], - path.copy(), - ): - yield r, v - except ValueError: - return + ) + + i = 1 if scenario[condition] else 2 + for r, v in self._get_relationship( + validator.evolve( + context=if_validator.context.evolve( + path=if_validator.context.path.descend(path=fn_k).descend( + path=i + ) + ) + ), + fn_v[i], + path.copy(), + ): + yield r, v return if fn_k == "Ref" and fn_v == "AWS::NoValue": @@ -131,7 +125,7 @@ def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validato condition = other_resource.get("Condition", None) if condition: - if not validator.cfn.conditions.check_implies( + if not validator.cfn.conditions.implies( validator.context.conditions.status, condition ): continue diff --git a/src/cfnlint/rules/resources/PrimaryIdentifiers.py b/src/cfnlint/rules/resources/PrimaryIdentifiers.py index bf3305c447..fa0daef3fa 100644 --- a/src/cfnlint/rules/resources/PrimaryIdentifiers.py +++ b/src/cfnlint/rules/resources/PrimaryIdentifiers.py @@ -63,7 +63,7 @@ def _validate_resource_type_uniqueness(self, cfn, resource_type, ids): # and add it to the scenario as needed if condition: if scenario: - if not cfn.conditions.check_implies(scenario, condition): + if not cfn.conditions.implies(scenario, condition): continue conditions[condition] = set([True]) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 65767f350e..138a87cddb 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if self.conditions.check_implies(scenario, resource_condition): + if self.conditions.implies(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/fixtures/templates/good/generic.yaml.dot b/test/fixtures/templates/good/generic.yaml.dot new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/module/conditions/test_conditions.py b/test/unit/module/conditions/test_conditions.py index 4012a69fa2..70918a7fee 100644 --- a/test/unit/module/conditions/test_conditions.py +++ b/test/unit/module/conditions/test_conditions.py @@ -30,7 +30,14 @@ def test_bad_condition_definition(self): len(cfn.conditions._conditions), 1 ) # would be 2 but IsProd fails # test coverage for KeyErrors in the following functions - self.assertTrue(cfn.conditions.check_implies({"Test": True}, "IsUsEast1")) + self.assertTrue( + cfn.conditions.implies( + { + "Test": True, + }, + "IsUsEast1", + ) + ) self.assertEqual( list(cfn.conditions.build_scenarios({"IsProd": None, "IsUsEast1": None})), [], @@ -76,14 +83,10 @@ def test_check_implies(self): condition_names.append(f"{p}Condition") cfn = Template("", template) - self.assertFalse( - cfn.conditions.check_implies({"aCondition": False}, "aCondition") - ) - self.assertTrue( - cfn.conditions.check_implies({"aCondition": True}, "aCondition") - ) + self.assertFalse(cfn.conditions.implies({"aCondition": False}, "aCondition")) + self.assertTrue(cfn.conditions.implies({"aCondition": True}, "aCondition")) self.assertTrue( - cfn.conditions.check_implies( + cfn.conditions.implies( {"aCondition": True, "bCondition": False}, "aCondition" ) ) @@ -105,7 +108,7 @@ def test_check_always_true_or_false(self): cfn = Template("", template) self.assertEqual(len(cfn.conditions._conditions), 2) # test coverage for KeyErrors in the following functions - self.assertFalse(cfn.conditions.check_implies({"IsTrue": True}, "IsFalse")) + self.assertTrue(cfn.conditions.implies({"IsTrue": True}, "IsFalse")) def test_check_never_false(self): """With allowed values two conditions can not both be false""" diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py index 30a94a376f..fcbda4cf6d 100644 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py @@ -100,15 +100,32 @@ def template(): "Condition": "IsImageIdSpecified", "Properties": {"ImageId": {"Fn::GetAtt": ["Four", "ImageId"]}}, }, + "Five": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + ] + }, + }, + }, "ParentFive": { "Type": "AWS::EC2::Instance", "Condition": "IsImageIdSpecified", - "Properties": {"ImageId": {"Fn::If": ["IsUsEast1", "ImageId"]}}, + "Properties": {"ImageId": {"Fn::GetAtt": ["Five", "ImageId"]}}, + }, + "Six": { + "Type": "AWS::EC2::Instance", + "Condition": "IsUsEast1", + "Properties": "Foo", }, "ParentSix": { "Type": "AWS::EC2::Instance", "Condition": "IsImageIdSpecified", - "Properties": [], + "Properties": {"ImageId": {"Fn::GetAtt": ["Six", "ImageId"]}}, }, "Seven": { "Type": "AWS::EC2::Instance", @@ -126,7 +143,22 @@ def template(): "ParentSeven": { "Type": "AWS::EC2::Instance", "Condition": "IsUsEast1", - "Properties": {"ImageId": {"Fn::GetAtt": ["Sevent", "ImageId"]}}, + "Properties": {"ImageId": {"Fn::GetAtt": ["Seven", "ImageId"]}}, + }, + "Eight": { + "Properties": { + "ImageId": { + "Fn::If": [ + "IsImageIdSpecified", + {"Ref": "ImageId"}, + {"Ref": "AWS::NoValue"}, + ] + }, + }, + }, + "ParentEight": { + "Type": "AWS::EC2::Instance", + "Properties": {"ImageId": {"Fn::GetAtt": ["Eight", "ImageId"]}}, }, }, } @@ -183,12 +215,26 @@ def template(): ( "Seven", deque(["Resources", "ParentSeven", "Properties", "ImageId"]), + { + "IsUsEast1": True, + }, + [()], + ), + ( + "Short Path", + deque(["Resources"]), {}, [], ), ( - "Short Path", - deque(["Resources", "ParentSix"]), + "Not in resources", + deque(["Outputs", "MyOutput"]), + {}, + [], + ), + ( + "No type on relationshiop", + deque(["Resources", "ParentEight", "Properties", "ImageId"]), {}, [], ), diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py new file mode 100644 index 0000000000..71a2a1a544 --- /dev/null +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py @@ -0,0 +1,37 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.context import Path +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship + + +@pytest.fixture +def rule(): + return CfnLintRelationship(keywords=[], relationship="Resources") + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Resources": {}, + } + + +def test_get_relationships(rule, validator): + validator = validator.evolve( + context=validator.context.evolve( + path=Path( + deque(["Resources", "ParentOne", "Properties", "ImageId"]), + ), + ), + ) + assert [] == list(rule.get_relationship(validator)) diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py new file mode 100644 index 0000000000..e5aa828814 --- /dev/null +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py @@ -0,0 +1,78 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.context import Path +from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship + + +@pytest.fixture +def rule(): + return CfnLintRelationship( + keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/Foo/*/Bar" + ) + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Resources": { + "One": { + "Type": "AWS::EC2::Instance", + "Properties": { + "Foo": [ + { + "Bar": "One", + }, + { + "Bar": "Two", + }, + ] + }, + }, + "ParentOne": { + "Type": "AWS::EC2::Instance", + "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, + }, + }, + } + + +@pytest.mark.parametrize( + "name,path,status,expected", + [ + ( + "One", + deque(["Resources", "ParentOne", "Properties", "ImageId"]), + {}, + [ + ("One", {}), + ("Two", {}), + ], + ), + ], +) +def test_get_relationships(name, path, status, expected, rule, validator): + validator = validator.evolve( + context=validator.context.evolve( + path=Path(path), + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + results = list(rule.get_relationship(validator)) + assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" + for result, exp in zip(results, expected): + assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == exp[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" From 22594cd70a8fe334dd2f78dcbb0946d25aa336f5 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 18 Jul 2024 09:48:30 -0700 Subject: [PATCH 5/7] Update condition implies fn --- src/cfnlint/conditions/conditions.py | 20 ++-- src/cfnlint/rules/__init__.py | 14 +-- .../rules/jsonschema/CfnLintRelationship.py | 105 ++++++++++-------- src/cfnlint/template/template.py | 2 +- test/fixtures/templates/good/generic.yaml.dot | 0 .../jsonschema/test_cfn_lint_relationship.py | 33 +++--- 6 files changed, 89 insertions(+), 85 deletions(-) delete mode 100644 test/fixtures/templates/good/generic.yaml.dot diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index 6afef36441..7dfe25f138 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -277,20 +277,16 @@ def implies(self, scenarios: dict[str, bool], implies: str) -> bool: and_condition = And(*conditions) - # if the implies condition has to be true already then we don't - # need to imply it - if not scenarios.get(implies): - implies_condition = self._conditions[implies].build_true_cnf( - self._solver_params - ) - cnf.add_prop(Not(Implies(and_condition, implies_condition))) - else: - cnf.add_prop(and_condition) + implies_condition = self._conditions[implies].build_true_cnf( + self._solver_params + ) + cnf.add_prop(Not(Implies(and_condition, implies_condition))) - if satisfiable(cnf): - return True + results = satisfiable(cnf) + if results: + return False - return False + return True except KeyError: # KeyError is because the listed condition doesn't exist because of bad # formatting or just the wrong condition name diff --git a/src/cfnlint/rules/__init__.py b/src/cfnlint/rules/__init__.py index d8cc5f4336..5eb87f653c 100644 --- a/src/cfnlint/rules/__init__.py +++ b/src/cfnlint/rules/__init__.py @@ -5,12 +5,8 @@ from cfnlint.rules._rule import CloudFormationLintRule, Match, RuleMatch from cfnlint.rules._rules import Rules, RulesCollection - -# type: ignore -from cfnlint.rules.jsonschema import ( - CfnLintJsonSchema, - CfnLintJsonSchemaRegional, - CfnLintKeyword, - CfnLintRelationship, - SchemaDetails, -) +from cfnlint.rules.jsonschema import CfnLintJsonSchema # type: ignore +from cfnlint.rules.jsonschema import CfnLintJsonSchemaRegional # type: ignore +from cfnlint.rules.jsonschema import CfnLintKeyword # type: ignore +from cfnlint.rules.jsonschema import CfnLintRelationship # type: ignore +from cfnlint.rules.jsonschema import SchemaDetails diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py index 7b393687f7..ee28ad0887 100644 --- a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py +++ b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py @@ -22,42 +22,59 @@ def __init__( self.relationship = relationship or "" self.parent_rules = ["E1101"] - def _get_relationship( - self, validator: Validator, instance: Any, path: deque[str | int] + def _get_relationship_fn_if( + self, validator: Validator, key: Any, value: Any, path: deque[str | int] ) -> Iterator[tuple[Any, Validator]]: - fn_k, fn_v = is_function(instance) - if fn_k == "Fn::If": - if not isinstance(fn_v, list) or len(fn_v) != 3: - return - condition = fn_v[0] - # True path - status = { - k: set([v]) for k, v in validator.context.conditions.status.items() - } - if condition not in status: - status[condition] = set([True, False]) - for scenario in validator.cfn.conditions.build_scenarios(status): - if_validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - status=scenario, - ) + if not isinstance(value, list) or len(value) != 3: + return + condition = value[0] + # True path + status = {k: set([v]) for k, v in validator.context.conditions.status.items()} + if condition not in status: + status[condition] = set([True, False]) + for scenario in validator.cfn.conditions.build_scenarios(status): + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=scenario, ) ) + ) + + i = 1 if scenario[condition] else 2 + for r, v in self._get_relationship( + validator.evolve( + context=if_validator.context.evolve( + path=if_validator.context.path.descend(path=key).descend(path=i) + ) + ), + value[i], + path.copy(), + ): + yield r, v - i = 1 if scenario[condition] else 2 + def _get_relationship_list( + self, validator: Validator, key: Any, instance: Any, path: deque[str | int] + ) -> Iterator[tuple[Any, Validator]]: + if key == "*": + for i, v in enumerate(instance): for r, v in self._get_relationship( validator.evolve( - context=if_validator.context.evolve( - path=if_validator.context.path.descend(path=fn_k).descend( - path=i - ) - ) + context=validator.context.evolve( + path=validator.context.path.descend(path=i) + ), ), - fn_v[i], + v, path.copy(), ): yield r, v + + def _get_relationship( + self, validator: Validator, instance: Any, path: deque[str | int] + ) -> Iterator[tuple[Any, Validator]]: + fn_k, fn_v = is_function(instance) + if fn_k == "Fn::If": + yield from self._get_relationship_fn_if(validator, fn_k, fn_v, path) return if fn_k == "Ref" and fn_v == "AWS::NoValue": @@ -74,18 +91,7 @@ def _get_relationship( key = path.popleft() if isinstance(instance, list): - if key == "*": - for i, v in enumerate(instance): - for r, v in self._get_relationship( - validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=i) - ), - ), - v, - path.copy(), - ): - yield r, v + yield from self._get_relationship_list(validator, key, instance, path) return if not isinstance(instance, dict): @@ -102,22 +108,31 @@ def _get_relationship( ): yield r, v - def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validator]]: + def _validate_get_relationship_parameters(self, validator: Validator) -> bool: if len(validator.context.path.path) < 2: - return + return False if "Resources" != validator.context.path.path[0]: + return False + + if not validator.cfn.graph: + return False + + if len(self.relationship.split("/")) < 2: + return False + + return True + + def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validator]]: + if not self._validate_get_relationship_parameters(validator): return resource_name = validator.context.path.path[1] path = self.relationship.split("/") - if len(path) < 2: - return + # get related resources by ref/getatt to the source # and relationship types - if not validator.cfn.graph: - return - for edge in validator.cfn.graph.graph.out_edges(resource_name): + for edge in validator.cfn.graph.graph.out_edges(resource_name): # type: ignore other_resource = validator.cfn.template.get(path[0], {}).get(edge[1], {}) if other_resource.get("Type") != path[1]: diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 138a87cddb..3fa1a73433 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if self.conditions.implies(scenario, resource_condition): + if not self.conditions.implies(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/fixtures/templates/good/generic.yaml.dot b/test/fixtures/templates/good/generic.yaml.dot deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py index fcbda4cf6d..6606bbf10d 100644 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py +++ b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py @@ -66,11 +66,11 @@ def template(): }, "Three": { "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", + "Condition": "IsImageIdSpecified", "Properties": { "ImageId": { "Fn::If": [ - "IsImageIdSpecified", + "IsUsEast1", {"Ref": "ImageId"}, {"Ref": "AWS::NoValue"}, ], @@ -102,7 +102,6 @@ def template(): }, "Five": { "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", "Properties": { "ImageId": { "Fn::If": [ @@ -114,17 +113,14 @@ def template(): }, "ParentFive": { "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", "Properties": {"ImageId": {"Fn::GetAtt": ["Five", "ImageId"]}}, }, "Six": { "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", "Properties": "Foo", }, "ParentSix": { "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", "Properties": {"ImageId": {"Fn::GetAtt": ["Six", "ImageId"]}}, }, "Seven": { @@ -168,7 +164,7 @@ def template(): "name,path,status,expected", [ ( - "One", + "Condition for value", deque(["Resources", "ParentOne", "Properties", "ImageId"]), {}, [ @@ -177,48 +173,49 @@ def template(): ], ), ( - "Two", + "Standard", deque(["Resources", "ParentTwo", "Properties", "ImageId"]), {}, [({"Ref": "ImageId"}, {})], ), ( - "Three", + "Lots of conditions along the way", deque(["Resources", "ParentThree", "Properties", "ImageId"]), { "IsImageIdSpecified": True, }, - [({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True})], + [ + ({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True}), + (None, {"IsUsEast1": False, "IsImageIdSpecified": True}), + ], ), ( - "Four", + "Unrelated conditions", deque(["Resources", "ParentFour", "Properties", "ImageId"]), { "IsImageIdSpecified": True, }, - [ - ({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True}), - ], + [], ), ( - "Five", + "Destiniation Fn::If isn't properly formatted", deque(["Resources", "ParentFive", "Properties", "ImageId"]), {}, [], ), ( - "Six", + "Properties isn't an object", deque(["Resources", "ParentSix", "Properties", "ImageId"]), {}, [], ), ( - "Seven", + "Condition's don't align", deque(["Resources", "ParentSeven", "Properties", "ImageId"]), { "IsUsEast1": True, }, - [()], + [], ), ( "Short Path", From 58ae2f779038cb99f979fe20b5c2aa3ddd79dfde Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Mon, 22 Jul 2024 07:27:37 -0700 Subject: [PATCH 6/7] Switch to helper functions --- src/cfnlint/conditions/conditions.py | 70 +++-- src/cfnlint/context/context.py | 6 + src/cfnlint/rules/__init__.py | 1 - src/cfnlint/rules/helpers/__init__.py | 12 + .../rules/helpers/get_resource_by_name.py | 50 ++++ .../rules/helpers/get_value_from_path.py | 119 ++++++++ .../rules/jsonschema/CfnLintRelationship.py | 166 ------------ src/cfnlint/rules/jsonschema/__init__.py | 2 - .../rules/resources/PrimaryIdentifiers.py | 2 +- .../resources/ecs/ServiceDynamicPorts.py | 149 ++++++++++ .../rules/resources/ectwo/InstanceImageId.py | 109 ++++++-- src/cfnlint/template/template.py | 2 +- .../unit/module/conditions/test_conditions.py | 12 +- test/unit/rules/helpers/__init__.py | 0 .../helpers/test_get_resource_by_name.py | 169 ++++++++++++ .../rules/helpers/test_get_value_from_path.py | 194 +++++++++++++ .../jsonschema/test_cfn_lint_relationship.py | 255 ------------------ .../test_cfn_lint_relationship_bad_path.py | 37 --- .../test_cfn_lint_relationship_list.py | 78 ------ .../resources/ec2/test_instance_image_id.py | 212 ++++++++++----- 20 files changed, 993 insertions(+), 652 deletions(-) create mode 100644 src/cfnlint/rules/helpers/__init__.py create mode 100644 src/cfnlint/rules/helpers/get_resource_by_name.py create mode 100644 src/cfnlint/rules/helpers/get_value_from_path.py delete mode 100644 src/cfnlint/rules/jsonschema/CfnLintRelationship.py create mode 100644 src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py create mode 100644 test/unit/rules/helpers/__init__.py create mode 100644 test/unit/rules/helpers/test_get_resource_by_name.py create mode 100644 test/unit/rules/helpers/test_get_value_from_path.py delete mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship.py delete mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py delete mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index 7dfe25f138..cbba5bfed2 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -240,7 +240,23 @@ def build_scenarios( # formatting or just the wrong condition name return - def implies(self, scenarios: dict[str, bool], implies: str) -> bool: + def _build_cfn_implies(self, scenarios) -> And: + conditions = [] + for condition_name, opt in scenarios.items(): + if opt: + conditions.append( + self._conditions[condition_name].build_true_cnf(self._solver_params) + ) + else: + conditions.append( + self._conditions[condition_name].build_false_cnf( + self._solver_params + ) + ) + + return And(*conditions) + + def can_imply(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) @@ -260,23 +276,45 @@ def implies(self, scenarios: dict[str, bool], implies: str) -> bool: if not scenarios.get(implies, True): return False - conditions = [] - for condition_name, opt in scenarios.items(): - if opt: - conditions.append( - self._conditions[condition_name].build_true_cnf( - self._solver_params - ) - ) - else: - conditions.append( - self._conditions[condition_name].build_false_cnf( - self._solver_params - ) - ) + and_condition = self._build_cfn_implies(scenarios) + cnf.add_prop(and_condition) + implies_condition = self._conditions[implies].build_true_cnf( + self._solver_params + ) + cnf.add_prop(Implies(and_condition, implies_condition)) - and_condition = And(*conditions) + results = satisfiable(cnf) + if results: + return True + + return False + except KeyError: + # KeyError is because the listed condition doesn't exist because of bad + # formatting or just the wrong condition name + return True + + def has_to_imply(self, scenarios: dict[str, bool], implies: str) -> bool: + """Based on a bunch of scenario conditions and their Truth/False value + determine if implies condition is True any time the scenarios are satisfied + solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) + + Args: + scenarios (dict[str, bool]): A list of condition names + and if they are True or False implies: the condition name that + we are implying will also be True + + Returns: + bool: if the implied condition will be True if the scenario is True + """ + try: + cnf = self._cnf.copy() + # if the implies condition has to be false in the scenarios we + # know it can never be true + if not scenarios.get(implies, True): + return False + and_condition = self._build_cfn_implies(scenarios) + cnf.add_prop(and_condition) implies_condition = self._conditions[implies].build_true_cnf( self._solver_params ) diff --git a/src/cfnlint/context/context.py b/src/cfnlint/context/context.py index 7c35c57fb4..e70cb364ce 100644 --- a/src/cfnlint/context/context.py +++ b/src/cfnlint/context/context.py @@ -357,6 +357,7 @@ class Resource(_Ref): """ type: str = field(init=False) + condition: str | None = field(init=False, default=None) resource: InitVar[Any] def __post_init__(self, resource) -> None: @@ -369,6 +370,11 @@ def __post_init__(self, resource) -> None: if self.type.startswith("Custom::"): self.type = "AWS::CloudFormation::CustomResource" + c = resource.get("Condition") + if not isinstance(t, str): + raise ValueError("Condition must be a string") + self.condition = c + @property def get_atts(self, region: str = "us-east-1") -> AttributeDict: return PROVIDER_SCHEMA_MANAGER.get_type_getatts(self.type, region) diff --git a/src/cfnlint/rules/__init__.py b/src/cfnlint/rules/__init__.py index 5eb87f653c..976c6058c8 100644 --- a/src/cfnlint/rules/__init__.py +++ b/src/cfnlint/rules/__init__.py @@ -8,5 +8,4 @@ from cfnlint.rules.jsonschema import CfnLintJsonSchema # type: ignore from cfnlint.rules.jsonschema import CfnLintJsonSchemaRegional # type: ignore from cfnlint.rules.jsonschema import CfnLintKeyword # type: ignore -from cfnlint.rules.jsonschema import CfnLintRelationship # type: ignore from cfnlint.rules.jsonschema import SchemaDetails diff --git a/src/cfnlint/rules/helpers/__init__.py b/src/cfnlint/rules/helpers/__init__.py new file mode 100644 index 0000000000..44d8de325f --- /dev/null +++ b/src/cfnlint/rules/helpers/__init__.py @@ -0,0 +1,12 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfnlint.rules.helpers.get_resource_by_name import get_resource_by_name +from cfnlint.rules.helpers.get_value_from_path import get_value_from_path + +__all__ = [ + "get_value_from_path", + "get_resource_by_name", +] diff --git a/src/cfnlint/rules/helpers/get_resource_by_name.py b/src/cfnlint/rules/helpers/get_resource_by_name.py new file mode 100644 index 0000000000..c8f9327900 --- /dev/null +++ b/src/cfnlint/rules/helpers/get_resource_by_name.py @@ -0,0 +1,50 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Sequence + +from cfnlint.context import Path +from cfnlint.context.conditions import Unsatisfiable +from cfnlint.jsonschema import Validator + + +def get_resource_by_name( + validator: Validator, name: str, types: Sequence[str] | None = None +) -> tuple[Any, Validator]: + + resource = validator.context.resources.get(name) + if not resource: + return None, validator + + if types and resource.type not in types: + return None, validator + + if resource.condition: + try: + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + { + resource.condition: True, + } + ), + ) + ) + except Unsatisfiable: + return None, validator + + validator = validator.evolve( + context=validator.context.evolve( + path=Path( + path=deque(["Resources", name]), + cfn_path=deque(["Resources", resource.type]), + ) + ) + ) + + return validator.cfn.template.get("Resources", {}).get(name), validator diff --git a/src/cfnlint/rules/helpers/get_value_from_path.py b/src/cfnlint/rules/helpers/get_value_from_path.py new file mode 100644 index 0000000000..b5bcb61fad --- /dev/null +++ b/src/cfnlint/rules/helpers/get_value_from_path.py @@ -0,0 +1,119 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.context.conditions import Unsatisfiable +from cfnlint.helpers import is_function +from cfnlint.jsonschema import Validator + + +def _get_relationship_fn_if( + validator: Validator, key: Any, value: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + if not isinstance(value, list) or len(value) != 3: + return + condition = value[0] + + for i in [1, 2]: + try: + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status={ + condition: True if i == 1 else False, + }, + ), + path=validator.context.path.descend(path=key).descend(path=i), + ) + ) + for r, v in get_value_from_path( + if_validator, + value[i], + path.copy(), + ): + yield r, v + except Unsatisfiable: + pass + + +def _get_value_from_path_list( + validator: Validator, instance: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + for i, v in enumerate(instance): + for r, v in get_value_from_path( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=i) + ), + ), + v, + path.copy(), + ): + yield r, v + + +def get_value_from_path( + validator: Validator, instance: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + """ + Retrieve a value from a nested dictionary or list using a path. + + Args: + validator (Validator): The validator instance + data (Any): The dictionary or list to search. + path (deque[str | int]): The path to the value. + + Returns: + The value at the specified path, or None if the key doesn't exist. + + Examples: + >>> data = {'a': {'b': {'c': 3}}} + >>> get_value_from_path(data, ['a', 'b', 'c']) + 3 + """ + + fn_k, fn_v = is_function(instance) + if fn_k is not None: + if fn_k == "Fn::If": + yield from _get_relationship_fn_if(validator, fn_k, fn_v, path) + elif fn_k == "Ref" and fn_v == "AWS::NoValue": + yield None, validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=fn_k) + ) + ) + elif not path: + yield instance, validator + return + + if not path: + yield instance, validator + return + + key = path.popleft() + if isinstance(instance, list) and key == "*": + yield from _get_value_from_path_list(validator, instance, path) + return + + if not isinstance(instance, dict): + yield None, validator + return + + for r, v in get_value_from_path( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=key) + ) + ), + instance.get(key), + path.copy(), + ): + yield r, v + + return diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py deleted file mode 100644 index ee28ad0887..0000000000 --- a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque -from typing import Any, Iterator, Sequence - -from cfnlint.helpers import is_function -from cfnlint.jsonschema import ValidationResult, Validator -from cfnlint.rules import CloudFormationLintRule - - -class CfnLintRelationship(CloudFormationLintRule): - def __init__( - self, keywords: Sequence[str] | None = None, relationship: str | None = None - ) -> None: - super().__init__() - self.keywords = keywords or [] - self.relationship = relationship or "" - self.parent_rules = ["E1101"] - - def _get_relationship_fn_if( - self, validator: Validator, key: Any, value: Any, path: deque[str | int] - ) -> Iterator[tuple[Any, Validator]]: - if not isinstance(value, list) or len(value) != 3: - return - condition = value[0] - # True path - status = {k: set([v]) for k, v in validator.context.conditions.status.items()} - if condition not in status: - status[condition] = set([True, False]) - for scenario in validator.cfn.conditions.build_scenarios(status): - if_validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - status=scenario, - ) - ) - ) - - i = 1 if scenario[condition] else 2 - for r, v in self._get_relationship( - validator.evolve( - context=if_validator.context.evolve( - path=if_validator.context.path.descend(path=key).descend(path=i) - ) - ), - value[i], - path.copy(), - ): - yield r, v - - def _get_relationship_list( - self, validator: Validator, key: Any, instance: Any, path: deque[str | int] - ) -> Iterator[tuple[Any, Validator]]: - if key == "*": - for i, v in enumerate(instance): - for r, v in self._get_relationship( - validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=i) - ), - ), - v, - path.copy(), - ): - yield r, v - - def _get_relationship( - self, validator: Validator, instance: Any, path: deque[str | int] - ) -> Iterator[tuple[Any, Validator]]: - fn_k, fn_v = is_function(instance) - if fn_k == "Fn::If": - yield from self._get_relationship_fn_if(validator, fn_k, fn_v, path) - return - - if fn_k == "Ref" and fn_v == "AWS::NoValue": - yield None, validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=fn_k) - ) - ) - return - - if not path: - yield instance, validator - return - - key = path.popleft() - if isinstance(instance, list): - yield from self._get_relationship_list(validator, key, instance, path) - return - - if not isinstance(instance, dict): - return - - for r, v in self._get_relationship( - validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=key) - ) - ), - instance.get(key), - path.copy(), - ): - yield r, v - - def _validate_get_relationship_parameters(self, validator: Validator) -> bool: - if len(validator.context.path.path) < 2: - return False - - if "Resources" != validator.context.path.path[0]: - return False - - if not validator.cfn.graph: - return False - - if len(self.relationship.split("/")) < 2: - return False - - return True - - def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validator]]: - if not self._validate_get_relationship_parameters(validator): - return - - resource_name = validator.context.path.path[1] - path = self.relationship.split("/") - - # get related resources by ref/getatt to the source - # and relationship types - for edge in validator.cfn.graph.graph.out_edges(resource_name): # type: ignore - other_resource = validator.cfn.template.get(path[0], {}).get(edge[1], {}) - - if other_resource.get("Type") != path[1]: - continue - - condition = other_resource.get("Condition", None) - if condition: - if not validator.cfn.conditions.implies( - validator.context.conditions.status, condition - ): - continue - validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - { - condition: True, - } - ) - ) - ) - # validate reosurce type - - for r, v in self._get_relationship( - validator, other_resource, deque(path[2:]) - ): - yield r, v.evolve() - - def validate( - self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] - ) -> ValidationResult: - raise NotImplementedError("validate not implemented") diff --git a/src/cfnlint/rules/jsonschema/__init__.py b/src/cfnlint/rules/jsonschema/__init__.py index c63aae1159..60f460db6f 100644 --- a/src/cfnlint/rules/jsonschema/__init__.py +++ b/src/cfnlint/rules/jsonschema/__init__.py @@ -7,7 +7,6 @@ from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema, SchemaDetails from cfnlint.rules.jsonschema.CfnLintJsonSchemaRegional import CfnLintJsonSchemaRegional from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship from cfnlint.rules.jsonschema.MaxProperties import MaxProperties from cfnlint.rules.jsonschema.PropertyNames import PropertyNames @@ -16,7 +15,6 @@ "CfnLintJsonSchema", "CfnLintJsonSchemaRegional", "CfnLintKeyword", - "CfnLintRelationship", "MaxProperties", "PropertyNames", "SchemaDetails", diff --git a/src/cfnlint/rules/resources/PrimaryIdentifiers.py b/src/cfnlint/rules/resources/PrimaryIdentifiers.py index fa0daef3fa..493dc09822 100644 --- a/src/cfnlint/rules/resources/PrimaryIdentifiers.py +++ b/src/cfnlint/rules/resources/PrimaryIdentifiers.py @@ -63,7 +63,7 @@ def _validate_resource_type_uniqueness(self, cfn, resource_type, ids): # and add it to the scenario as needed if condition: if scenario: - if not cfn.conditions.implies(scenario, condition): + if not cfn.conditions.has_to_imply(scenario, condition): continue conditions[condition] = set([True]) diff --git a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py new file mode 100644 index 0000000000..39961e0a5a --- /dev/null +++ b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py @@ -0,0 +1,149 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class ServiceDynamicPorts(CfnLintKeyword): + id = "E3049" + shortdesc = ( + "Validate ECS tasks with dynamic host port have traffic-port ELB target groups" + ) + description = ( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'" + ) + tags = ["resources"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/AWS::ECS::Service/Properties"], + ) + + def _get_service_lb_containers( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, str, Validator]]: + + for load_balancer, load_balancer_validator in get_value_from_path( + validator, + instance, + path=deque(["LoadBalancers", "*"]), + ): + for name, name_validator in get_value_from_path( + load_balancer_validator, + load_balancer, + path=deque(["ContainerName"]), + ): + if name is None: + continue + for port, port_validator in get_value_from_path( + name_validator, + load_balancer, + path=deque(["ContainerPort"]), + ): + if port is None: + continue + yield name, port, port_validator + + def _get_task_definition_resource_name( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, Validator]]: + for task_definition_id, task_definition_validator in get_value_from_path( + validator, instance, deque(["TaskDefinition"]) + ): + fn_k, fn_v = is_function(task_definition_id) + if fn_k == "Ref": + if validator.is_type(fn_v, "string"): + if fn_v in validator.context.resources: + yield fn_v, task_definition_validator + elif fn_k != "Fn::GetAtt": + continue + yield ensure_list(fn_v)[0].split(".")[0], task_definition_validator + + def _get_task_containers( + self, validator: Validator, resource_name: str, container_name: str + ) -> Iterator[tuple[Any, Validator]]: + task_def, task_def_validator = get_resource_by_name( + validator, resource_name, ["AWS::ECS::TaskDefinition"] + ) + + if not task_def: + return + + for container, container_validator in get_value_from_path( + task_def_validator, + task_def, + path=deque(["Properties", "ContainerDefinitions", "*"]), + ): + for name, name_validator in get_value_from_path( + container_validator, + container, + path=deque(["Name"]), + ): + if not isinstance(name, str): + continue + if name == container_name: + yield container, name_validator + + def _filter_port_mappings( + self, validator: Validator, instance: Any, port: Any + ) -> Iterator[tuple[Any, Validator]]: + for port_mapping, port_mapping_validator in get_value_from_path( + validator, + instance, + path=deque(["PortMappings", "*"]), + ): + for container_port, container_port_validator in get_value_from_path( + port_mapping_validator, + port_mapping, + path=deque(["ContainerPort"]), + ): + if not isinstance(container_port, (str, int)): + continue + if str(port) == str(container_port): + for host_port, host_part_validator in get_value_from_path( + container_port_validator, + port_mapping, + path=deque(["HostPort"]), + ): + yield host_port, host_part_validator + + def validate( + self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for ( + task_definition_resource_name, + task_definition_resource_name_validator, + ) in self._get_task_definition_resource_name(validator, instance): + + for name, port, lb_validator in self._get_service_lb_containers( + task_definition_resource_name_validator, instance + ): + for container, container_validator in self._get_task_containers( + lb_validator, task_definition_resource_name, name + ): + for host_port, host_port_validator in self._filter_port_mappings( + container_validator, container, port + ): + if host_port == 0: + yield ValidationError( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'", + ) + + return + yield diff --git a/src/cfnlint/rules/resources/ectwo/InstanceImageId.py b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py index 24a976ab63..7219d3c10b 100644 --- a/src/cfnlint/rules/resources/ectwo/InstanceImageId.py +++ b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py @@ -6,13 +6,15 @@ from __future__ import annotations from collections import deque -from typing import Any +from typing import Any, Iterator +from cfnlint.helpers import ensure_list, is_function from cfnlint.jsonschema import ValidationError, ValidationResult, Validator -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword -class InstanceImageId(CfnLintRelationship): +class InstanceImageId(CfnLintKeyword): id = "E3673" shortdesc = "Validate if an ImageId is required" description = ( @@ -27,34 +29,99 @@ def __init__(self) -> None: keywords=[ "Resources/AWS::EC2::Instance/Properties", ], - relationship="Resources/AWS::EC2::LaunchTemplate/Properties/LaunchTemplateData/ImageId", ) + def _get_related_launch_template( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[Any, Validator]]: + + for launch_template, launch_template_validator in get_value_from_path( + validator, instance, deque(["LaunchTemplate"]) + ): + if not launch_template: + continue + for id, id_validator in get_value_from_path( + launch_template_validator, launch_template, deque(["LaunchTemplateId"]) + ): + if id is None: + for name, name_validator in get_value_from_path( + id_validator, + launch_template, + deque(["LaunchTemplateName"]), + ): + yield name, name_validator + yield id, id_validator + def validate( self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] ) -> ValidationResult: - for source_props, source_validator in self._get_relationship( - validator, instance, path=deque(["ImageId"]) + for ( + instance_image_id, + instance_image_id_validator, + ) in get_value_from_path( + validator, + instance, + path=deque(["ImageId"]), ): - relationships = list(self.get_relationship(source_validator)) - if not relationships and not source_props: + if instance_image_id: + continue + + launch_templates = list( + self._get_related_launch_template(instance_image_id_validator, instance) + ) + + if not launch_templates: yield ValidationError( "'ImageId' is a required property", - path_override=( - source_validator.context.path.path - if len(source_validator.context.path.path) > 4 - else deque([]) - ), + path_override=instance_image_id_validator.context.path.path, ) - for relationship_props, _ in relationships: - if relationship_props is None: - if not source_props: + continue + + for ( + instance_launch_template, + instance_launch_template_validator, + ) in launch_templates: + fn_k, fn_v = is_function(instance_launch_template) + + # if its not a function we can't tell from a string + # if the image ID is there or not + if fn_k is None: + continue + + launch_template, launch_template_validator = None, None + if fn_k == "Ref": + if fn_v in instance_launch_template_validator.context.parameters: + continue + elif fn_v in instance_launch_template_validator.context.resources: + launch_template, launch_template_validator = ( + get_resource_by_name( + instance_launch_template_validator, + fn_v, + ["AWS::EC2::LaunchTemplate"], + ) + ) + else: + continue + elif fn_k == "Fn::GetAtt": + launch_template, launch_template_validator = get_resource_by_name( + instance_launch_template_validator, + ensure_list(fn_v)[0].split(".")[0], + ["AWS::EC2::LaunchTemplate"], + ) + else: + continue + + for ( + launch_template_image_id, + _, + ) in get_value_from_path( + launch_template_validator, + launch_template or {}, + path=deque(["Properties", "LaunchTemplateData", "ImageId"]), + ): + if launch_template_image_id is None and instance_image_id is None: yield ValidationError( "'ImageId' is a required property", - path_override=( - source_validator.context.path.path - if len(source_validator.context.path.path) > 4 - else deque([]) - ), + path_override=instance_image_id_validator.context.path.path, ) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 3fa1a73433..7c39673951 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if not self.conditions.implies(scenario, resource_condition): + if not self.conditions.has_to_imply(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/unit/module/conditions/test_conditions.py b/test/unit/module/conditions/test_conditions.py index 70918a7fee..05cfb9db6e 100644 --- a/test/unit/module/conditions/test_conditions.py +++ b/test/unit/module/conditions/test_conditions.py @@ -31,7 +31,7 @@ def test_bad_condition_definition(self): ) # would be 2 but IsProd fails # test coverage for KeyErrors in the following functions self.assertTrue( - cfn.conditions.implies( + cfn.conditions.has_to_imply( { "Test": True, }, @@ -83,10 +83,12 @@ def test_check_implies(self): condition_names.append(f"{p}Condition") cfn = Template("", template) - self.assertFalse(cfn.conditions.implies({"aCondition": False}, "aCondition")) - self.assertTrue(cfn.conditions.implies({"aCondition": True}, "aCondition")) + self.assertFalse( + cfn.conditions.has_to_imply({"aCondition": False}, "aCondition") + ) + self.assertTrue(cfn.conditions.has_to_imply({"aCondition": True}, "aCondition")) self.assertTrue( - cfn.conditions.implies( + cfn.conditions.has_to_imply( {"aCondition": True, "bCondition": False}, "aCondition" ) ) @@ -108,7 +110,7 @@ def test_check_always_true_or_false(self): cfn = Template("", template) self.assertEqual(len(cfn.conditions._conditions), 2) # test coverage for KeyErrors in the following functions - self.assertTrue(cfn.conditions.implies({"IsTrue": True}, "IsFalse")) + self.assertTrue(cfn.conditions.has_to_imply({"IsTrue": True}, "IsFalse")) def test_check_never_false(self): """With allowed values two conditions can not both be false""" diff --git a/test/unit/rules/helpers/__init__.py b/test/unit/rules/helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/rules/helpers/test_get_resource_by_name.py b/test/unit/rules/helpers/test_get_resource_by_name.py new file mode 100644 index 0000000000..408b902efd --- /dev/null +++ b/test/unit/rules/helpers/test_get_resource_by_name.py @@ -0,0 +1,169 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.rules.helpers import get_resource_by_name + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "String", + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, + "IsImageId": {"Fn::Not": {"Fn::Equals": [{"Ref": "ImageId"}, ""]}}, + }, + "Resources": { + "Foo": {"Type": "AWS::S3::Bucket"}, + "Bar": {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + "FooBar": {"Type": "AWS::S3::Bucket", "Condition": "IsNotUsEast1"}, + "BadShape": [], + "NoType": {}, + }, + } + + +@pytest.mark.parametrize( + "name,resource_name,types,status,expected", + [ + ( + "Standard get", + "Foo", + None, + {}, + ({"Type": "AWS::S3::Bucket"}, {}, deque(["Resources", "Foo"])), + ), + ( + "Doesn't exist", + "Foo2", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "The destination isn't a dict", + "BadShape", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "The destination doesn't have type", + "NoType", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "Get a valid resource with a filter", + "Foo", + ["AWS::S3::Bucket"], + {}, + ( + { + "Type": "AWS::S3::Bucket", + }, + {}, + deque(["Resources", "Foo"]), + ), + ), + ( + "No result when type filters it out", + "Foo", + ["AWS::EC2::Instance"], + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "Get a resource with a condition", + "Bar", + None, + {}, + ( + {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + {"IsUsEast1": True}, + deque(["Resources", "Bar"]), + ), + ), + ( + "No resource when conditions don't align 1", + "Bar", + None, + {"IsUsEast1": False}, + ( + None, + {"IsUsEast1": False}, + deque([]), + ), + ), + ( + "No resource when conditions don't align 2", + "FooBar", + None, + {"IsUsEast1": True}, + ( + None, + {"IsUsEast1": True}, + deque([]), + ), + ), + ( + "Unrelated conditions return full results", + "Bar", + None, + {"IsImageId": False}, + ( + {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + {"IsUsEast1": True, "IsImageId": False}, + deque(["Resources", "Bar"]), + ), + ), + ], +) +def test_get_resource_by_name(name, resource_name, types, status, expected, validator): + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + + result = get_resource_by_name(validator, resource_name, types) + + assert result[0] == expected[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == expected[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" + assert ( + result[1].context.path.path == expected[2] + ), f"Test {name!r} got {result[1].context.path.path!r}" diff --git a/test/unit/rules/helpers/test_get_value_from_path.py b/test/unit/rules/helpers/test_get_value_from_path.py new file mode 100644 index 0000000000..bdcf72672c --- /dev/null +++ b/test/unit/rules/helpers/test_get_value_from_path.py @@ -0,0 +1,194 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.rules.helpers import get_value_from_path + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "AWS::SSM::Parameter::Value", + "Default": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2", # noqa: E501 + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, + "IsImageIdSpecified": { + "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] + }, + }, + "Resources": {}, + } + + +@pytest.mark.parametrize( + "name,instance,v_path,status,expected", + [ + ( + "No path", + {"Foo": "Bar"}, + deque([]), + {}, + [ + ({"Foo": "Bar"}, {}, deque([])), + ], + ), + ( + "Small path with out it being found", + {"Foo": "Bar"}, + deque(["Bar"]), + {}, + [ + (None, {}, deque(["Bar"])), + ], + ), + ( + "Bad path returns an empty value with list", + {"Foo": [{"Bar": "One"}]}, + deque(["Foo", "Bar"]), + {}, + [ + ( + None, + {}, + deque(["Foo"]), + ) + ], + ), + ( + "Bad path returns an empty value with string", + {"Foo": "Misvalued"}, + deque(["Foo", "Bar"]), + {}, + [ + ( + None, + {}, + deque(["Foo"]), + ) + ], + ), + ( + "With a path and no conditions", + {"Foo": {"Bar": True}}, + deque(["Foo", "Bar"]), + {}, + [ + (True, {}, deque(["Foo", "Bar"])), + ], + ), + ( + "With a list item", + {"Foo": [{"Bar": "One"}, {"Bar": "Two"}, {"NoValue": "Three"}]}, + deque(["Foo", "*", "Bar"]), + {}, + [ + ("One", {}, deque(["Foo", 0, "Bar"])), + ("Two", {}, deque(["Foo", 1, "Bar"])), + (None, {}, deque(["Foo", 2, "Bar"])), + ], + ), + ( + "With a basic condition", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {}, + [ + ("One", {"IsUsEast1": True}, deque(["Foo", "Fn::If", 1, "Bar"])), + ("Two", {"IsUsEast1": False}, deque(["Foo", "Fn::If", 2, "Bar"])), + ], + ), + ( + "With a basic condition and a current status", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {"IsUsEast1": True}, + [ + ("One", {"IsUsEast1": True}, deque(["Foo", "Fn::If", 1, "Bar"])), + ], + ), + ( + "With a basic condition and a current status on a related condition", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {"IsNotUsEast1": True}, + [ + ( + "Two", + {"IsUsEast1": False, "IsNotUsEast1": True}, + deque(["Foo", "Fn::If", 2, "Bar"]), + ), + ], + ), + ( + "With a random function in the way", + {"Foo": {"Fn::FindInMap": ["A", "B", "C"]}}, + deque(["Foo", "*", "Bar"]), + {}, + [], + ), + ( + "With a ref at the desination", + {"Foo": {"Ref": "Bar"}}, + deque(["Foo"]), + {}, + [({"Ref": "Bar"}, {}, deque(["Foo"]))], + ), + ], +) +def test_get_value_from_path(name, instance, v_path, status, expected, validator): + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + + results = list(get_value_from_path(validator, instance, v_path)) + + assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" + for result, exp in zip(results, expected): + assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == exp[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" + assert ( + result[1].context.path.path == exp[2] + ), f"Test {name!r} got {result[1].context.path.path!r}" diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py deleted file mode 100644 index 6606bbf10d..0000000000 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py +++ /dev/null @@ -1,255 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque - -import pytest - -from cfnlint.context import Path -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship - - -@pytest.fixture -def rule(): - return CfnLintRelationship( - keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/ImageId" - ) - - -@pytest.fixture -def template(): - return { - "AWSTemplateFormatVersion": "2010-09-09", - "Parameters": { - "ImageId": { - "Type": "AWS::SSM::Parameter::Value", - "Default": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2", # noqa: E501 - } - }, - "Conditions": { - "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, - "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, - "IsImageIdSpecified": { - "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] - }, - }, - "Resources": { - "One": { - "Type": "AWS::EC2::Instance", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsUsEast1", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentOne": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, - }, - "Two": { - "Type": "AWS::EC2::Instance", - "Properties": { - "ImageId": {"Ref": "ImageId"}, - }, - }, - "ParentTwo": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Two", "ImageId"]}}, - }, - "Three": { - "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsUsEast1", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ], - }, - }, - }, - "ParentThree": { - "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", - "Properties": {"ImageId": {"Fn::GetAtt": ["Three", "ImageId"]}}, - }, - "Four": { - "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentFour": { - "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", - "Properties": {"ImageId": {"Fn::GetAtt": ["Four", "ImageId"]}}, - }, - "Five": { - "Type": "AWS::EC2::Instance", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - ] - }, - }, - }, - "ParentFive": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Five", "ImageId"]}}, - }, - "Six": { - "Type": "AWS::EC2::Instance", - "Properties": "Foo", - }, - "ParentSix": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Six", "ImageId"]}}, - }, - "Seven": { - "Type": "AWS::EC2::Instance", - "Condition": "IsNotUsEast1", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentSeven": { - "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", - "Properties": {"ImageId": {"Fn::GetAtt": ["Seven", "ImageId"]}}, - }, - "Eight": { - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentEight": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Eight", "ImageId"]}}, - }, - }, - } - - -@pytest.mark.parametrize( - "name,path,status,expected", - [ - ( - "Condition for value", - deque(["Resources", "ParentOne", "Properties", "ImageId"]), - {}, - [ - ({"Ref": "ImageId"}, {"IsUsEast1": True}), - (None, {"IsUsEast1": False}), - ], - ), - ( - "Standard", - deque(["Resources", "ParentTwo", "Properties", "ImageId"]), - {}, - [({"Ref": "ImageId"}, {})], - ), - ( - "Lots of conditions along the way", - deque(["Resources", "ParentThree", "Properties", "ImageId"]), - { - "IsImageIdSpecified": True, - }, - [ - ({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True}), - (None, {"IsUsEast1": False, "IsImageIdSpecified": True}), - ], - ), - ( - "Unrelated conditions", - deque(["Resources", "ParentFour", "Properties", "ImageId"]), - { - "IsImageIdSpecified": True, - }, - [], - ), - ( - "Destiniation Fn::If isn't properly formatted", - deque(["Resources", "ParentFive", "Properties", "ImageId"]), - {}, - [], - ), - ( - "Properties isn't an object", - deque(["Resources", "ParentSix", "Properties", "ImageId"]), - {}, - [], - ), - ( - "Condition's don't align", - deque(["Resources", "ParentSeven", "Properties", "ImageId"]), - { - "IsUsEast1": True, - }, - [], - ), - ( - "Short Path", - deque(["Resources"]), - {}, - [], - ), - ( - "Not in resources", - deque(["Outputs", "MyOutput"]), - {}, - [], - ), - ( - "No type on relationshiop", - deque(["Resources", "ParentEight", "Properties", "ImageId"]), - {}, - [], - ), - ], -) -def test_get_relationships(name, path, status, expected, rule, validator): - validator = validator.evolve( - context=validator.context.evolve( - path=Path(path), - conditions=validator.context.conditions.evolve( - status=status, - ), - ), - ) - results = list(rule.get_relationship(validator)) - assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" - for result, exp in zip(results, expected): - assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" - assert ( - result[1].context.conditions.status == exp[1] - ), f"Test {name!r} got {result[1].context.conditions.status!r}" diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py deleted file mode 100644 index 71a2a1a544..0000000000 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque - -import pytest - -from cfnlint.context import Path -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship - - -@pytest.fixture -def rule(): - return CfnLintRelationship(keywords=[], relationship="Resources") - - -@pytest.fixture -def template(): - return { - "AWSTemplateFormatVersion": "2010-09-09", - "Resources": {}, - } - - -def test_get_relationships(rule, validator): - validator = validator.evolve( - context=validator.context.evolve( - path=Path( - deque(["Resources", "ParentOne", "Properties", "ImageId"]), - ), - ), - ) - assert [] == list(rule.get_relationship(validator)) diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py deleted file mode 100644 index e5aa828814..0000000000 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque - -import pytest - -from cfnlint.context import Path -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship - - -@pytest.fixture -def rule(): - return CfnLintRelationship( - keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/Foo/*/Bar" - ) - - -@pytest.fixture -def template(): - return { - "AWSTemplateFormatVersion": "2010-09-09", - "Resources": { - "One": { - "Type": "AWS::EC2::Instance", - "Properties": { - "Foo": [ - { - "Bar": "One", - }, - { - "Bar": "Two", - }, - ] - }, - }, - "ParentOne": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, - }, - }, - } - - -@pytest.mark.parametrize( - "name,path,status,expected", - [ - ( - "One", - deque(["Resources", "ParentOne", "Properties", "ImageId"]), - {}, - [ - ("One", {}), - ("Two", {}), - ], - ), - ], -) -def test_get_relationships(name, path, status, expected, rule, validator): - validator = validator.evolve( - context=validator.context.evolve( - path=Path(path), - conditions=validator.context.conditions.evolve( - status=status, - ), - ), - ) - results = list(rule.get_relationship(validator)) - assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" - for result, exp in zip(results, expected): - assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" - assert ( - result[1].context.conditions.status == exp[1] - ), f"Test {name!r} got {result[1].context.conditions.status!r}" diff --git a/test/unit/rules/resources/ec2/test_instance_image_id.py b/test/unit/rules/resources/ec2/test_instance_image_id.py index 842f74a2ee..845d5876ae 100644 --- a/test/unit/rules/resources/ec2/test_instance_image_id.py +++ b/test/unit/rules/resources/ec2/test_instance_image_id.py @@ -33,23 +33,17 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - "ImageId": "ami-12345678", - }, - }, } }, { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, "ImageId": "ami-12345678", }, { @@ -57,6 +51,70 @@ def rule(): }, [], ), + ( + "Valid with no ImageId and a string launch template", + { + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateId": "foo"}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and a parameter", + { + "Parameters": { + "LaunchTemplateId": { + "Type": "String", + } + }, + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateId": {"Ref": "LaunchTemplateId"}}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId a random ref", + { + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateName": {"Ref": "AWS::StackName"}}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and another function", + { + "Parameters": { + "LaunchTemplateId": { + "Type": "String", + } + }, + "Resources": {}, + }, + { + "LaunchTemplate": { + "LaunchTemplateId": {"Fn::FindInMap": ["One", "Two", "Three"]} + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), ( "Valid with ImageId in LaunchTemplate", { @@ -71,42 +129,66 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - }, - }, } }, - {}, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, { "path": ["Resources", "Instance", "Properties"], }, [], ), ( - "Invalid with no relationship", + "Valid with ImageId in LaunchTemplate using Name", { "Resources": { - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": {}, + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + "ImageId": "ami-12345678", + }, + }, }, } }, + { + "LaunchTemplate": { + "LaunchTemplateName": { + "Ref": "LaunchTemplate", + } + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Invalid with no relationship", + {}, {}, { "path": ["Resources", "Instance", "Properties"], }, - [ValidationError("'ImageId' is a required property")], + [ + ValidationError( + "'ImageId' is a required property", + path_override=deque( + ["Resources", "Instance", "Properties", "ImageId"] + ), + ) + ], ), ( "Invalid with no ImageId in LaunchTemplate", @@ -121,28 +203,27 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - }, - }, } }, - {}, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, { "path": ["Resources", "Instance", "Properties"], }, [ ValidationError( "'ImageId' is a required property", + path_override=deque( + ["Resources", "Instance", "Properties", "ImageId"] + ), ) ], ), @@ -162,32 +243,24 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - "ImageId": { - "Fn::If": [ - "IsUsEast1", - "ami-12345678", - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, }, }, { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, "ImageId": { - "Fn::If": ["IsUsEast1", "ami-12345678", {"Ref": "AWS::NoValue"}] - } + "Fn::If": [ + "IsUsEast1", + "ami-12345678", + {"Ref": "AWS::NoValue"}, + ] + }, }, { "path": ["Resources", "Instance", "Properties"], @@ -214,6 +287,7 @@ def rule(): ) def test_validate(name, instance, expected, rule, validator): errs = list(rule.validate(validator, "", instance, {})) + assert ( errs == expected ), f"Expected test {name!r} to have {expected!r} but got {errs!r}" From 08420cadddc4550e02f7d73d42b056042cf2285b Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 25 Jul 2024 16:40:26 -0700 Subject: [PATCH 7/7] Clean up the ECS check rule E3049 --- src/cfnlint/conditions/conditions.py | 39 +- .../rules/resources/PrimaryIdentifiers.py | 2 +- .../resources/ecs/ServiceDynamicPorts.py | 146 +++-- src/cfnlint/template/template.py | 2 +- .../unit/module/conditions/test_conditions.py | 12 +- .../ecs/test_service_dynamic_ports.py | 563 ++++++++++++++++++ 6 files changed, 679 insertions(+), 85 deletions(-) create mode 100644 test/unit/rules/resources/ecs/test_service_dynamic_ports.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index cbba5bfed2..915932361c 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -256,44 +256,7 @@ def _build_cfn_implies(self, scenarios) -> And: return And(*conditions) - def can_imply(self, scenarios: dict[str, bool], implies: str) -> bool: - """Based on a bunch of scenario conditions and their Truth/False value - determine if implies condition is True any time the scenarios are satisfied - solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) - - Args: - scenarios (dict[str, bool]): A list of condition names - and if they are True or False implies: the condition name that - we are implying will also be True - - Returns: - bool: if the implied condition will be True if the scenario is True - """ - try: - cnf = self._cnf.copy() - # if the implies condition has to be false in the scenarios we - # know it can never be true - if not scenarios.get(implies, True): - return False - - and_condition = self._build_cfn_implies(scenarios) - cnf.add_prop(and_condition) - implies_condition = self._conditions[implies].build_true_cnf( - self._solver_params - ) - cnf.add_prop(Implies(and_condition, implies_condition)) - - results = satisfiable(cnf) - if results: - return True - - return False - except KeyError: - # KeyError is because the listed condition doesn't exist because of bad - # formatting or just the wrong condition name - return True - - def has_to_imply(self, scenarios: dict[str, bool], implies: str) -> bool: + def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) diff --git a/src/cfnlint/rules/resources/PrimaryIdentifiers.py b/src/cfnlint/rules/resources/PrimaryIdentifiers.py index 493dc09822..bf3305c447 100644 --- a/src/cfnlint/rules/resources/PrimaryIdentifiers.py +++ b/src/cfnlint/rules/resources/PrimaryIdentifiers.py @@ -63,7 +63,7 @@ def _validate_resource_type_uniqueness(self, cfn, resource_type, ids): # and add it to the scenario as needed if condition: if scenario: - if not cfn.conditions.has_to_imply(scenario, condition): + if not cfn.conditions.check_implies(scenario, condition): continue conditions[condition] = set([True]) diff --git a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py index 39961e0a5a..00079aad94 100644 --- a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py +++ b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py @@ -32,10 +32,22 @@ def __init__(self) -> None: keywords=["Resources/AWS::ECS::Service/Properties"], ) + def _filter_resource_name(self, instance: Any) -> str | None: + fn_k, fn_v = is_function(instance) + if fn_k is None: + return None + if fn_k == "Ref": + if isinstance(fn_v, str): + return fn_v + elif fn_k == "Fn::GetAtt": + name = ensure_list(fn_v)[0].split(".")[0] + if isinstance(name, str): + return name + return None + def _get_service_lb_containers( self, validator: Validator, instance: Any - ) -> Iterator[tuple[str, str, Validator]]: - + ) -> Iterator[tuple[str, str, str, Validator]]: for load_balancer, load_balancer_validator in get_value_from_path( validator, instance, @@ -46,35 +58,79 @@ def _get_service_lb_containers( load_balancer, path=deque(["ContainerName"]), ): - if name is None: + if name is None or not isinstance(name, str): continue for port, port_validator in get_value_from_path( name_validator, load_balancer, path=deque(["ContainerPort"]), ): - if port is None: + if port is None or not isinstance(port, (str, int)): continue - yield name, port, port_validator + for tg, tg_validator in get_value_from_path( + port_validator, + load_balancer, + path=deque(["TargetGroupArn"]), + ): + tg = self._filter_resource_name(tg) + if tg is None: + continue + yield name, port, tg, tg_validator - def _get_task_definition_resource_name( + def _get_service_properties( self, validator: Validator, instance: Any - ) -> Iterator[tuple[str, Validator]]: + ) -> Iterator[tuple[str, str, str, str, Validator]]: for task_definition_id, task_definition_validator in get_value_from_path( validator, instance, deque(["TaskDefinition"]) ): - fn_k, fn_v = is_function(task_definition_id) - if fn_k == "Ref": - if validator.is_type(fn_v, "string"): - if fn_v in validator.context.resources: - yield fn_v, task_definition_validator - elif fn_k != "Fn::GetAtt": + task_definition_resource_name = self._filter_resource_name( + task_definition_id + ) + if task_definition_resource_name is None: continue - yield ensure_list(fn_v)[0].split(".")[0], task_definition_validator + for ( + container_name, + container_port, + target_group, + lb_validator, + ) in self._get_service_lb_containers(task_definition_validator, instance): + yield ( + task_definition_resource_name, + container_name, + container_port, + target_group, + lb_validator, + ) + + def _get_target_group_properties( + self, validator: Validator, resource_name: Any + ) -> Iterator[tuple[str | int | None, Validator]]: + target_group, target_group_validator = get_resource_by_name( + validator, resource_name, ["AWS::ElasticLoadBalancingV2::TargetGroup"] + ) + if not target_group: + return + + for port, port_validator in get_value_from_path( + target_group_validator, + target_group, + path=deque(["Properties", "HealthCheckPort"]), + ): + if port is None: + yield port, port_validator + continue + + if not isinstance(port, (str, int)): + continue + yield port, port_validator def _get_task_containers( - self, validator: Validator, resource_name: str, container_name: str - ) -> Iterator[tuple[Any, Validator]]: + self, + validator: Validator, + resource_name: str, + container_name: str, + container_port: str | int, + ) -> Iterator[Validator]: task_def, task_def_validator = get_resource_by_name( validator, resource_name, ["AWS::ECS::TaskDefinition"] ) @@ -95,11 +151,14 @@ def _get_task_containers( if not isinstance(name, str): continue if name == container_name: - yield container, name_validator + for host_port_validator in self._filter_port_mappings( + name_validator, container, container_port + ): + yield host_port_validator def _filter_port_mappings( self, validator: Validator, instance: Any, port: Any - ) -> Iterator[tuple[Any, Validator]]: + ) -> Iterator[Validator]: for port_mapping, port_mapping_validator in get_value_from_path( validator, instance, @@ -113,37 +172,44 @@ def _filter_port_mappings( if not isinstance(container_port, (str, int)): continue if str(port) == str(container_port): - for host_port, host_part_validator in get_value_from_path( + for _, host_part_validator in get_value_from_path( container_port_validator, port_mapping, path=deque(["HostPort"]), ): - yield host_port, host_part_validator + yield host_part_validator def validate( - self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] ) -> ValidationResult: for ( task_definition_resource_name, - task_definition_resource_name_validator, - ) in self._get_task_definition_resource_name(validator, instance): - - for name, port, lb_validator in self._get_service_lb_containers( - task_definition_resource_name_validator, instance + lb_container_name, + lb_container_port, + lb_target_group, + lb_service_validator, + ) in self._get_service_properties(validator, instance): + for container_validator in self._get_task_containers( + lb_service_validator, + task_definition_resource_name, + lb_container_name, + lb_container_port, ): - for container, container_validator in self._get_task_containers( - lb_validator, task_definition_resource_name, name + for ( + health_check_port, + health_check_port_validator, + ) in self._get_target_group_properties( + container_validator, lb_target_group ): - for host_port, host_port_validator in self._filter_port_mappings( - container_validator, container, port - ): - if host_port == 0: - yield ValidationError( - "When using an ECS task definition of host port 0 and " - "associating that container to an ELB the target group " - "has to have a 'HealthCheckPort' of 'traffic-port'", - ) - - return - yield + if health_check_port != "traffic-port": + err_validator = "const" + if health_check_port is None: + err_validator = "required" + yield ValidationError( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'", + path_override=health_check_port_validator.context.path.path, + validator=err_validator, + ) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 7c39673951..d14b562910 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if not self.conditions.has_to_imply(scenario, resource_condition): + if not self.conditions.check_implies(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/unit/module/conditions/test_conditions.py b/test/unit/module/conditions/test_conditions.py index 05cfb9db6e..d3d248652c 100644 --- a/test/unit/module/conditions/test_conditions.py +++ b/test/unit/module/conditions/test_conditions.py @@ -31,7 +31,7 @@ def test_bad_condition_definition(self): ) # would be 2 but IsProd fails # test coverage for KeyErrors in the following functions self.assertTrue( - cfn.conditions.has_to_imply( + cfn.conditions.check_implies( { "Test": True, }, @@ -84,11 +84,13 @@ def test_check_implies(self): cfn = Template("", template) self.assertFalse( - cfn.conditions.has_to_imply({"aCondition": False}, "aCondition") + cfn.conditions.check_implies({"aCondition": False}, "aCondition") ) - self.assertTrue(cfn.conditions.has_to_imply({"aCondition": True}, "aCondition")) self.assertTrue( - cfn.conditions.has_to_imply( + cfn.conditions.check_implies({"aCondition": True}, "aCondition") + ) + self.assertTrue( + cfn.conditions.check_implies( {"aCondition": True, "bCondition": False}, "aCondition" ) ) @@ -110,7 +112,7 @@ def test_check_always_true_or_false(self): cfn = Template("", template) self.assertEqual(len(cfn.conditions._conditions), 2) # test coverage for KeyErrors in the following functions - self.assertTrue(cfn.conditions.has_to_imply({"IsTrue": True}, "IsFalse")) + self.assertTrue(cfn.conditions.check_implies({"IsTrue": True}, "IsFalse")) def test_check_never_false(self): """With allowed values two conditions can not both be false""" diff --git a/test/unit/rules/resources/ecs/test_service_dynamic_ports.py b/test/unit/rules/resources/ecs/test_service_dynamic_ports.py new file mode 100644 index 0000000000..86ea9f71e9 --- /dev/null +++ b/test/unit/rules/resources/ecs/test_service_dynamic_ports.py @@ -0,0 +1,563 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import jsonpatch +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.resources.ecs.ServiceDynamicPorts import ServiceDynamicPorts + + +@pytest.fixture +def rule(): + rule = ServiceDynamicPorts() + yield rule + + +_task_definition = { + "Type": "AWS::ECS::TaskDefinition", + "Properties": { + "ContainerDefinitions": [ + { + "Name": "my-container", + "Image": "my-image", + "PortMappings": [ + { + "ContainerPort": 8080, + "HostPort": 0, + } + ], + } + ], + }, +} + +_target_group = { + "Type": "AWS::ElasticLoadBalancingV2::TargetGroup", + "Properties": { + "HealthCheckPort": "traffic-port", + "Port": 8080, + "Protocol": "HTTP", + }, +} + +_service = { + "Type": "AWS::ECS::Service", + "Properties": { + "TaskDefinition": {"Ref": "TaskDefinition"}, + "LoadBalancers": [ + { + "TargetGroupArn": {"Ref": "TargetGroup"}, + "ContainerName": "my-container", + "ContainerPort": 8080, + } + ], + }, +} + + +@pytest.mark.parametrize( + "template,start_path,expected", + [ + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": dict(_target_group), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyHealthCheckPort": { + "Type": "String", + }, + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": {"Ref": "MyHealthCheckPort"}, + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerPort": { + "Type": "String", + }, + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": ( + "/Properties/ContainerDefinitions" + "/0/PortMappings/0/ContainerPort" + ), + "value": {"Ref": "MyContainerPort"}, + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyTaskDefinition": { + "Type": "String", + }, + }, + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Ref": "MyTaskDefinition"}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Fn::FindInMap": ["A", "B", "C"]}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Fn::GetAtt": ["TaskDefinition", "Arn"]}, + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="const", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Ref": []}, + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="const", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="required", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/ContainerDefinitions/0/Name", + "value": {"Ref": "MyContainerName"}, + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/LoadBalancers/0/ContainerName", + "value": {"Ref": "MyContainerName"}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/ContainerName", + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/ContainerPort", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/TargetGroupArn", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/ContainerDefinitions/0/Name", + "value": "Changed", + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": ( + "/Properties/ContainerDefinitions/" + "0/PortMappings/0/ContainerPort" + ), + "value": "30", + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ], + indirect=["template"], +) +def test_validate(template, start_path, expected, rule, validator): + for instance, instance_validator in get_value_from_path( + validator, template, start_path + ): + errs = list(rule.validate(instance_validator, "", instance, {})) + assert errs == expected, f"Expected {expected} got {errs}"