diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index da42ee8918..915932361c 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -240,6 +240,22 @@ def build_scenarios( # formatting or just the wrong condition name return + def _build_cfn_implies(self, scenarios) -> And: + conditions = [] + for condition_name, opt in scenarios.items(): + if opt: + conditions.append( + self._conditions[condition_name].build_true_cnf(self._solver_params) + ) + else: + conditions.append( + self._conditions[condition_name].build_false_cnf( + self._solver_params + ) + ) + + return And(*conditions) + def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied @@ -260,36 +276,18 @@ def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: if not scenarios.get(implies, True): return False - conditions = [] - for condition_name, opt in scenarios.items(): - if opt: - conditions.append( - self._conditions[condition_name].build_true_cnf( - self._solver_params - ) - ) - else: - conditions.append( - self._conditions[condition_name].build_false_cnf( - self._solver_params - ) - ) - + and_condition = self._build_cfn_implies(scenarios) + cnf.add_prop(and_condition) implies_condition = self._conditions[implies].build_true_cnf( self._solver_params ) + cnf.add_prop(Not(Implies(and_condition, implies_condition))) - and_condition = And(*conditions) - cnf.add_prop(and_condition) - - # if the implies condition has to be true already then we don't - # need to imply it - if not scenarios.get(implies): - cnf.add_prop(Not(Implies(and_condition, implies_condition))) - if satisfiable(cnf): - return True + results = satisfiable(cnf) + if results: + return False - return False + return True except KeyError: # KeyError is because the listed condition doesn't exist because of bad # formatting or just the wrong condition name @@ -354,7 +352,8 @@ def satisfiable( determine if the conditions are satisfied Args: - condition_names (list[str]): A list of condition names + condition_names (dict[str, bool]): A list of condition names with if + they are True or False Returns: bool: True if the conditions are satisfied diff --git a/src/cfnlint/context/context.py b/src/cfnlint/context/context.py index 7c35c57fb4..e70cb364ce 100644 --- a/src/cfnlint/context/context.py +++ b/src/cfnlint/context/context.py @@ -357,6 +357,7 @@ class Resource(_Ref): """ type: str = field(init=False) + condition: str | None = field(init=False, default=None) resource: InitVar[Any] def __post_init__(self, resource) -> None: @@ -369,6 +370,11 @@ def __post_init__(self, resource) -> None: if self.type.startswith("Custom::"): self.type = "AWS::CloudFormation::CustomResource" + c = resource.get("Condition") + if not isinstance(t, str): + raise ValueError("Condition must be a string") + self.condition = c + @property def get_atts(self, region: str = "us-east-1") -> AttributeDict: return PROVIDER_SCHEMA_MANAGER.get_type_getatts(self.type, region) diff --git a/src/cfnlint/rules/__init__.py b/src/cfnlint/rules/__init__.py index 54c32fcbc5..976c6058c8 100644 --- a/src/cfnlint/rules/__init__.py +++ b/src/cfnlint/rules/__init__.py @@ -5,9 +5,7 @@ from cfnlint.rules._rule import CloudFormationLintRule, Match, RuleMatch from cfnlint.rules._rules import Rules, RulesCollection -from cfnlint.rules.jsonschema import ( - CfnLintJsonSchema, - CfnLintJsonSchemaRegional, - CfnLintKeyword, - SchemaDetails, -) +from cfnlint.rules.jsonschema import CfnLintJsonSchema # type: ignore +from cfnlint.rules.jsonschema import CfnLintJsonSchemaRegional # type: ignore +from cfnlint.rules.jsonschema import CfnLintKeyword # type: ignore +from cfnlint.rules.jsonschema import SchemaDetails diff --git a/src/cfnlint/rules/helpers/__init__.py b/src/cfnlint/rules/helpers/__init__.py new file mode 100644 index 0000000000..44d8de325f --- /dev/null +++ b/src/cfnlint/rules/helpers/__init__.py @@ -0,0 +1,12 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfnlint.rules.helpers.get_resource_by_name import get_resource_by_name +from cfnlint.rules.helpers.get_value_from_path import get_value_from_path + +__all__ = [ + "get_value_from_path", + "get_resource_by_name", +] diff --git a/src/cfnlint/rules/helpers/get_resource_by_name.py b/src/cfnlint/rules/helpers/get_resource_by_name.py new file mode 100644 index 0000000000..c8f9327900 --- /dev/null +++ b/src/cfnlint/rules/helpers/get_resource_by_name.py @@ -0,0 +1,50 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Sequence + +from cfnlint.context import Path +from cfnlint.context.conditions import Unsatisfiable +from cfnlint.jsonschema import Validator + + +def get_resource_by_name( + validator: Validator, name: str, types: Sequence[str] | None = None +) -> tuple[Any, Validator]: + + resource = validator.context.resources.get(name) + if not resource: + return None, validator + + if types and resource.type not in types: + return None, validator + + if resource.condition: + try: + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + { + resource.condition: True, + } + ), + ) + ) + except Unsatisfiable: + return None, validator + + validator = validator.evolve( + context=validator.context.evolve( + path=Path( + path=deque(["Resources", name]), + cfn_path=deque(["Resources", resource.type]), + ) + ) + ) + + return validator.cfn.template.get("Resources", {}).get(name), validator diff --git a/src/cfnlint/rules/helpers/get_value_from_path.py b/src/cfnlint/rules/helpers/get_value_from_path.py new file mode 100644 index 0000000000..b5bcb61fad --- /dev/null +++ b/src/cfnlint/rules/helpers/get_value_from_path.py @@ -0,0 +1,119 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.context.conditions import Unsatisfiable +from cfnlint.helpers import is_function +from cfnlint.jsonschema import Validator + + +def _get_relationship_fn_if( + validator: Validator, key: Any, value: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + if not isinstance(value, list) or len(value) != 3: + return + condition = value[0] + + for i in [1, 2]: + try: + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status={ + condition: True if i == 1 else False, + }, + ), + path=validator.context.path.descend(path=key).descend(path=i), + ) + ) + for r, v in get_value_from_path( + if_validator, + value[i], + path.copy(), + ): + yield r, v + except Unsatisfiable: + pass + + +def _get_value_from_path_list( + validator: Validator, instance: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + for i, v in enumerate(instance): + for r, v in get_value_from_path( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=i) + ), + ), + v, + path.copy(), + ): + yield r, v + + +def get_value_from_path( + validator: Validator, instance: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + """ + Retrieve a value from a nested dictionary or list using a path. + + Args: + validator (Validator): The validator instance + data (Any): The dictionary or list to search. + path (deque[str | int]): The path to the value. + + Returns: + The value at the specified path, or None if the key doesn't exist. + + Examples: + >>> data = {'a': {'b': {'c': 3}}} + >>> get_value_from_path(data, ['a', 'b', 'c']) + 3 + """ + + fn_k, fn_v = is_function(instance) + if fn_k is not None: + if fn_k == "Fn::If": + yield from _get_relationship_fn_if(validator, fn_k, fn_v, path) + elif fn_k == "Ref" and fn_v == "AWS::NoValue": + yield None, validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=fn_k) + ) + ) + elif not path: + yield instance, validator + return + + if not path: + yield instance, validator + return + + key = path.popleft() + if isinstance(instance, list) and key == "*": + yield from _get_value_from_path_list(validator, instance, path) + return + + if not isinstance(instance, dict): + yield None, validator + return + + for r, v in get_value_from_path( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=key) + ) + ), + instance.get(key), + path.copy(), + ): + yield r, v + + return diff --git a/src/cfnlint/rules/jsonschema/__init__.py b/src/cfnlint/rules/jsonschema/__init__.py index e1d5a72bee..60f460db6f 100644 --- a/src/cfnlint/rules/jsonschema/__init__.py +++ b/src/cfnlint/rules/jsonschema/__init__.py @@ -12,10 +12,10 @@ __all__ = [ "BaseJsonSchema", - "CfnLintKeyword", "CfnLintJsonSchema", - "SchemaDetails", "CfnLintJsonSchemaRegional", + "CfnLintKeyword", "MaxProperties", "PropertyNames", + "SchemaDetails", ] diff --git a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py new file mode 100644 index 0000000000..00079aad94 --- /dev/null +++ b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py @@ -0,0 +1,215 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class ServiceDynamicPorts(CfnLintKeyword): + id = "E3049" + shortdesc = ( + "Validate ECS tasks with dynamic host port have traffic-port ELB target groups" + ) + description = ( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'" + ) + tags = ["resources"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/AWS::ECS::Service/Properties"], + ) + + def _filter_resource_name(self, instance: Any) -> str | None: + fn_k, fn_v = is_function(instance) + if fn_k is None: + return None + if fn_k == "Ref": + if isinstance(fn_v, str): + return fn_v + elif fn_k == "Fn::GetAtt": + name = ensure_list(fn_v)[0].split(".")[0] + if isinstance(name, str): + return name + return None + + def _get_service_lb_containers( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, str, str, Validator]]: + for load_balancer, load_balancer_validator in get_value_from_path( + validator, + instance, + path=deque(["LoadBalancers", "*"]), + ): + for name, name_validator in get_value_from_path( + load_balancer_validator, + load_balancer, + path=deque(["ContainerName"]), + ): + if name is None or not isinstance(name, str): + continue + for port, port_validator in get_value_from_path( + name_validator, + load_balancer, + path=deque(["ContainerPort"]), + ): + if port is None or not isinstance(port, (str, int)): + continue + for tg, tg_validator in get_value_from_path( + port_validator, + load_balancer, + path=deque(["TargetGroupArn"]), + ): + tg = self._filter_resource_name(tg) + if tg is None: + continue + yield name, port, tg, tg_validator + + def _get_service_properties( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, str, str, str, Validator]]: + for task_definition_id, task_definition_validator in get_value_from_path( + validator, instance, deque(["TaskDefinition"]) + ): + task_definition_resource_name = self._filter_resource_name( + task_definition_id + ) + if task_definition_resource_name is None: + continue + for ( + container_name, + container_port, + target_group, + lb_validator, + ) in self._get_service_lb_containers(task_definition_validator, instance): + yield ( + task_definition_resource_name, + container_name, + container_port, + target_group, + lb_validator, + ) + + def _get_target_group_properties( + self, validator: Validator, resource_name: Any + ) -> Iterator[tuple[str | int | None, Validator]]: + target_group, target_group_validator = get_resource_by_name( + validator, resource_name, ["AWS::ElasticLoadBalancingV2::TargetGroup"] + ) + if not target_group: + return + + for port, port_validator in get_value_from_path( + target_group_validator, + target_group, + path=deque(["Properties", "HealthCheckPort"]), + ): + if port is None: + yield port, port_validator + continue + + if not isinstance(port, (str, int)): + continue + yield port, port_validator + + def _get_task_containers( + self, + validator: Validator, + resource_name: str, + container_name: str, + container_port: str | int, + ) -> Iterator[Validator]: + task_def, task_def_validator = get_resource_by_name( + validator, resource_name, ["AWS::ECS::TaskDefinition"] + ) + + if not task_def: + return + + for container, container_validator in get_value_from_path( + task_def_validator, + task_def, + path=deque(["Properties", "ContainerDefinitions", "*"]), + ): + for name, name_validator in get_value_from_path( + container_validator, + container, + path=deque(["Name"]), + ): + if not isinstance(name, str): + continue + if name == container_name: + for host_port_validator in self._filter_port_mappings( + name_validator, container, container_port + ): + yield host_port_validator + + def _filter_port_mappings( + self, validator: Validator, instance: Any, port: Any + ) -> Iterator[Validator]: + for port_mapping, port_mapping_validator in get_value_from_path( + validator, + instance, + path=deque(["PortMappings", "*"]), + ): + for container_port, container_port_validator in get_value_from_path( + port_mapping_validator, + port_mapping, + path=deque(["ContainerPort"]), + ): + if not isinstance(container_port, (str, int)): + continue + if str(port) == str(container_port): + for _, host_part_validator in get_value_from_path( + container_port_validator, + port_mapping, + path=deque(["HostPort"]), + ): + yield host_part_validator + + def validate( + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for ( + task_definition_resource_name, + lb_container_name, + lb_container_port, + lb_target_group, + lb_service_validator, + ) in self._get_service_properties(validator, instance): + for container_validator in self._get_task_containers( + lb_service_validator, + task_definition_resource_name, + lb_container_name, + lb_container_port, + ): + for ( + health_check_port, + health_check_port_validator, + ) in self._get_target_group_properties( + container_validator, lb_target_group + ): + if health_check_port != "traffic-port": + err_validator = "const" + if health_check_port is None: + err_validator = "required" + yield ValidationError( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'", + path_override=health_check_port_validator.context.path.path, + validator=err_validator, + ) diff --git a/src/cfnlint/rules/resources/ectwo/InstanceImageId.py b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py new file mode 100644 index 0000000000..7219d3c10b --- /dev/null +++ b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py @@ -0,0 +1,127 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult, Validator +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class InstanceImageId(CfnLintKeyword): + id = "E3673" + shortdesc = "Validate if an ImageId is required" + description = ( + "Validate if an ImageID is required. It can be " + "required if the associated LaunchTemplate doesn't specify " + "an ImageID" + ) + tags = ["resources", "ec2"] + + def __init__(self) -> None: + super().__init__( + keywords=[ + "Resources/AWS::EC2::Instance/Properties", + ], + ) + + def _get_related_launch_template( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[Any, Validator]]: + + for launch_template, launch_template_validator in get_value_from_path( + validator, instance, deque(["LaunchTemplate"]) + ): + if not launch_template: + continue + for id, id_validator in get_value_from_path( + launch_template_validator, launch_template, deque(["LaunchTemplateId"]) + ): + if id is None: + for name, name_validator in get_value_from_path( + id_validator, + launch_template, + deque(["LaunchTemplateName"]), + ): + yield name, name_validator + yield id, id_validator + + def validate( + self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for ( + instance_image_id, + instance_image_id_validator, + ) in get_value_from_path( + validator, + instance, + path=deque(["ImageId"]), + ): + if instance_image_id: + continue + + launch_templates = list( + self._get_related_launch_template(instance_image_id_validator, instance) + ) + + if not launch_templates: + yield ValidationError( + "'ImageId' is a required property", + path_override=instance_image_id_validator.context.path.path, + ) + continue + + for ( + instance_launch_template, + instance_launch_template_validator, + ) in launch_templates: + fn_k, fn_v = is_function(instance_launch_template) + + # if its not a function we can't tell from a string + # if the image ID is there or not + if fn_k is None: + continue + + launch_template, launch_template_validator = None, None + if fn_k == "Ref": + if fn_v in instance_launch_template_validator.context.parameters: + continue + elif fn_v in instance_launch_template_validator.context.resources: + launch_template, launch_template_validator = ( + get_resource_by_name( + instance_launch_template_validator, + fn_v, + ["AWS::EC2::LaunchTemplate"], + ) + ) + else: + continue + elif fn_k == "Fn::GetAtt": + launch_template, launch_template_validator = get_resource_by_name( + instance_launch_template_validator, + ensure_list(fn_v)[0].split(".")[0], + ["AWS::EC2::LaunchTemplate"], + ) + else: + continue + + for ( + launch_template_image_id, + _, + ) in get_value_from_path( + launch_template_validator, + launch_template or {}, + path=deque(["Properties", "LaunchTemplateData", "ImageId"]), + ): + if launch_template_image_id is None and instance_image_id is None: + yield ValidationError( + "'ImageId' is a required property", + path_override=instance_image_id_validator.context.path.path, + ) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 65767f350e..d14b562910 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if self.conditions.check_implies(scenario, resource_condition): + if not self.conditions.check_implies(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/unit/module/cfn_yaml/test_yaml.py b/test/unit/module/cfn_yaml/test_yaml.py index b4668a19b8..4e5b5afc80 100644 --- a/test/unit/module/cfn_yaml/test_yaml.py +++ b/test/unit/module/cfn_yaml/test_yaml.py @@ -27,7 +27,7 @@ def setUp(self): }, "generic_bad": { "filename": "test/fixtures/templates/bad/generic.yaml", - "failures": 34, + "failures": 35, }, } diff --git a/test/unit/module/conditions/test_conditions.py b/test/unit/module/conditions/test_conditions.py index 4012a69fa2..d3d248652c 100644 --- a/test/unit/module/conditions/test_conditions.py +++ b/test/unit/module/conditions/test_conditions.py @@ -30,7 +30,14 @@ def test_bad_condition_definition(self): len(cfn.conditions._conditions), 1 ) # would be 2 but IsProd fails # test coverage for KeyErrors in the following functions - self.assertTrue(cfn.conditions.check_implies({"Test": True}, "IsUsEast1")) + self.assertTrue( + cfn.conditions.check_implies( + { + "Test": True, + }, + "IsUsEast1", + ) + ) self.assertEqual( list(cfn.conditions.build_scenarios({"IsProd": None, "IsUsEast1": None})), [], @@ -105,7 +112,7 @@ def test_check_always_true_or_false(self): cfn = Template("", template) self.assertEqual(len(cfn.conditions._conditions), 2) # test coverage for KeyErrors in the following functions - self.assertFalse(cfn.conditions.check_implies({"IsTrue": True}, "IsFalse")) + self.assertTrue(cfn.conditions.check_implies({"IsTrue": True}, "IsFalse")) def test_check_never_false(self): """With allowed values two conditions can not both be false""" diff --git a/test/unit/module/test_rules_collections.py b/test/unit/module/test_rules_collections.py index 3c2cd97917..edde4715a7 100644 --- a/test/unit/module/test_rules_collections.py +++ b/test/unit/module/test_rules_collections.py @@ -69,7 +69,7 @@ def test_fail_run(self): filename = "test/fixtures/templates/bad/generic.yaml" template = cfnlint.decode.cfn_yaml.load(filename) cfn = Template(filename, template, ["us-east-1"]) - expected_err_count = 37 + expected_err_count = 38 matches = [] matches.extend(self.rules.run(filename, cfn)) assert ( diff --git a/test/unit/rules/helpers/__init__.py b/test/unit/rules/helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/rules/helpers/test_get_resource_by_name.py b/test/unit/rules/helpers/test_get_resource_by_name.py new file mode 100644 index 0000000000..408b902efd --- /dev/null +++ b/test/unit/rules/helpers/test_get_resource_by_name.py @@ -0,0 +1,169 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.rules.helpers import get_resource_by_name + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "String", + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, + "IsImageId": {"Fn::Not": {"Fn::Equals": [{"Ref": "ImageId"}, ""]}}, + }, + "Resources": { + "Foo": {"Type": "AWS::S3::Bucket"}, + "Bar": {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + "FooBar": {"Type": "AWS::S3::Bucket", "Condition": "IsNotUsEast1"}, + "BadShape": [], + "NoType": {}, + }, + } + + +@pytest.mark.parametrize( + "name,resource_name,types,status,expected", + [ + ( + "Standard get", + "Foo", + None, + {}, + ({"Type": "AWS::S3::Bucket"}, {}, deque(["Resources", "Foo"])), + ), + ( + "Doesn't exist", + "Foo2", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "The destination isn't a dict", + "BadShape", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "The destination doesn't have type", + "NoType", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "Get a valid resource with a filter", + "Foo", + ["AWS::S3::Bucket"], + {}, + ( + { + "Type": "AWS::S3::Bucket", + }, + {}, + deque(["Resources", "Foo"]), + ), + ), + ( + "No result when type filters it out", + "Foo", + ["AWS::EC2::Instance"], + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "Get a resource with a condition", + "Bar", + None, + {}, + ( + {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + {"IsUsEast1": True}, + deque(["Resources", "Bar"]), + ), + ), + ( + "No resource when conditions don't align 1", + "Bar", + None, + {"IsUsEast1": False}, + ( + None, + {"IsUsEast1": False}, + deque([]), + ), + ), + ( + "No resource when conditions don't align 2", + "FooBar", + None, + {"IsUsEast1": True}, + ( + None, + {"IsUsEast1": True}, + deque([]), + ), + ), + ( + "Unrelated conditions return full results", + "Bar", + None, + {"IsImageId": False}, + ( + {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + {"IsUsEast1": True, "IsImageId": False}, + deque(["Resources", "Bar"]), + ), + ), + ], +) +def test_get_resource_by_name(name, resource_name, types, status, expected, validator): + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + + result = get_resource_by_name(validator, resource_name, types) + + assert result[0] == expected[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == expected[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" + assert ( + result[1].context.path.path == expected[2] + ), f"Test {name!r} got {result[1].context.path.path!r}" diff --git a/test/unit/rules/helpers/test_get_value_from_path.py b/test/unit/rules/helpers/test_get_value_from_path.py new file mode 100644 index 0000000000..bdcf72672c --- /dev/null +++ b/test/unit/rules/helpers/test_get_value_from_path.py @@ -0,0 +1,194 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.rules.helpers import get_value_from_path + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "AWS::SSM::Parameter::Value", + "Default": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2", # noqa: E501 + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, + "IsImageIdSpecified": { + "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] + }, + }, + "Resources": {}, + } + + +@pytest.mark.parametrize( + "name,instance,v_path,status,expected", + [ + ( + "No path", + {"Foo": "Bar"}, + deque([]), + {}, + [ + ({"Foo": "Bar"}, {}, deque([])), + ], + ), + ( + "Small path with out it being found", + {"Foo": "Bar"}, + deque(["Bar"]), + {}, + [ + (None, {}, deque(["Bar"])), + ], + ), + ( + "Bad path returns an empty value with list", + {"Foo": [{"Bar": "One"}]}, + deque(["Foo", "Bar"]), + {}, + [ + ( + None, + {}, + deque(["Foo"]), + ) + ], + ), + ( + "Bad path returns an empty value with string", + {"Foo": "Misvalued"}, + deque(["Foo", "Bar"]), + {}, + [ + ( + None, + {}, + deque(["Foo"]), + ) + ], + ), + ( + "With a path and no conditions", + {"Foo": {"Bar": True}}, + deque(["Foo", "Bar"]), + {}, + [ + (True, {}, deque(["Foo", "Bar"])), + ], + ), + ( + "With a list item", + {"Foo": [{"Bar": "One"}, {"Bar": "Two"}, {"NoValue": "Three"}]}, + deque(["Foo", "*", "Bar"]), + {}, + [ + ("One", {}, deque(["Foo", 0, "Bar"])), + ("Two", {}, deque(["Foo", 1, "Bar"])), + (None, {}, deque(["Foo", 2, "Bar"])), + ], + ), + ( + "With a basic condition", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {}, + [ + ("One", {"IsUsEast1": True}, deque(["Foo", "Fn::If", 1, "Bar"])), + ("Two", {"IsUsEast1": False}, deque(["Foo", "Fn::If", 2, "Bar"])), + ], + ), + ( + "With a basic condition and a current status", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {"IsUsEast1": True}, + [ + ("One", {"IsUsEast1": True}, deque(["Foo", "Fn::If", 1, "Bar"])), + ], + ), + ( + "With a basic condition and a current status on a related condition", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {"IsNotUsEast1": True}, + [ + ( + "Two", + {"IsUsEast1": False, "IsNotUsEast1": True}, + deque(["Foo", "Fn::If", 2, "Bar"]), + ), + ], + ), + ( + "With a random function in the way", + {"Foo": {"Fn::FindInMap": ["A", "B", "C"]}}, + deque(["Foo", "*", "Bar"]), + {}, + [], + ), + ( + "With a ref at the desination", + {"Foo": {"Ref": "Bar"}}, + deque(["Foo"]), + {}, + [({"Ref": "Bar"}, {}, deque(["Foo"]))], + ), + ], +) +def test_get_value_from_path(name, instance, v_path, status, expected, validator): + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + + results = list(get_value_from_path(validator, instance, v_path)) + + assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" + for result, exp in zip(results, expected): + assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == exp[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" + assert ( + result[1].context.path.path == exp[2] + ), f"Test {name!r} got {result[1].context.path.path!r}" diff --git a/test/unit/rules/resources/ec2/test_instance_image_id.py b/test/unit/rules/resources/ec2/test_instance_image_id.py new file mode 100644 index 0000000000..845d5876ae --- /dev/null +++ b/test/unit/rules/resources/ec2/test_instance_image_id.py @@ -0,0 +1,293 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from collections import deque + +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.resources.ectwo.InstanceImageId import InstanceImageId + + +@pytest.fixture(scope="module") +def rule(): + rule = InstanceImageId() + yield rule + + +@pytest.mark.parametrize( + "name,template,instance,path,expected", + [ + ( + "Valid with no ImageId in LaunchTemplate", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + }, + }, + }, + } + }, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + "ImageId": "ami-12345678", + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and a string launch template", + { + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateId": "foo"}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and a parameter", + { + "Parameters": { + "LaunchTemplateId": { + "Type": "String", + } + }, + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateId": {"Ref": "LaunchTemplateId"}}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId a random ref", + { + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateName": {"Ref": "AWS::StackName"}}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and another function", + { + "Parameters": { + "LaunchTemplateId": { + "Type": "String", + } + }, + "Resources": {}, + }, + { + "LaunchTemplate": { + "LaunchTemplateId": {"Fn::FindInMap": ["One", "Two", "Three"]} + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with ImageId in LaunchTemplate", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + "ImageId": "ami-12345678", + }, + }, + }, + } + }, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with ImageId in LaunchTemplate using Name", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + "ImageId": "ami-12345678", + }, + }, + }, + } + }, + { + "LaunchTemplate": { + "LaunchTemplateName": { + "Ref": "LaunchTemplate", + } + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Invalid with no relationship", + {}, + {}, + { + "path": ["Resources", "Instance", "Properties"], + }, + [ + ValidationError( + "'ImageId' is a required property", + path_override=deque( + ["Resources", "Instance", "Properties", "ImageId"] + ), + ) + ], + ), + ( + "Invalid with no ImageId in LaunchTemplate", + { + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + }, + }, + }, + } + }, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [ + ValidationError( + "'ImageId' is a required property", + path_override=deque( + ["Resources", "Instance", "Properties", "ImageId"] + ), + ) + ], + ), + ( + "Invalid with no ImageId in LaunchTemplate with condition", + { + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]} + }, + "Resources": { + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + }, + }, + }, + }, + }, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + "ImageId": { + "Fn::If": [ + "IsUsEast1", + "ami-12345678", + {"Ref": "AWS::NoValue"}, + ] + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [ + ValidationError( + "'ImageId' is a required property", + path_override=deque( + [ + "Resources", + "Instance", + "Properties", + "ImageId", + "Fn::If", + 2, + "Ref", + ] + ), + ) + ], + ), + ], + indirect=["template", "path"], +) +def test_validate(name, instance, expected, rule, validator): + errs = list(rule.validate(validator, "", instance, {})) + + assert ( + errs == expected + ), f"Expected test {name!r} to have {expected!r} but got {errs!r}" diff --git a/test/unit/rules/resources/ecs/test_service_dynamic_ports.py b/test/unit/rules/resources/ecs/test_service_dynamic_ports.py new file mode 100644 index 0000000000..86ea9f71e9 --- /dev/null +++ b/test/unit/rules/resources/ecs/test_service_dynamic_ports.py @@ -0,0 +1,563 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import jsonpatch +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.resources.ecs.ServiceDynamicPorts import ServiceDynamicPorts + + +@pytest.fixture +def rule(): + rule = ServiceDynamicPorts() + yield rule + + +_task_definition = { + "Type": "AWS::ECS::TaskDefinition", + "Properties": { + "ContainerDefinitions": [ + { + "Name": "my-container", + "Image": "my-image", + "PortMappings": [ + { + "ContainerPort": 8080, + "HostPort": 0, + } + ], + } + ], + }, +} + +_target_group = { + "Type": "AWS::ElasticLoadBalancingV2::TargetGroup", + "Properties": { + "HealthCheckPort": "traffic-port", + "Port": 8080, + "Protocol": "HTTP", + }, +} + +_service = { + "Type": "AWS::ECS::Service", + "Properties": { + "TaskDefinition": {"Ref": "TaskDefinition"}, + "LoadBalancers": [ + { + "TargetGroupArn": {"Ref": "TargetGroup"}, + "ContainerName": "my-container", + "ContainerPort": 8080, + } + ], + }, +} + + +@pytest.mark.parametrize( + "template,start_path,expected", + [ + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": dict(_target_group), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyHealthCheckPort": { + "Type": "String", + }, + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": {"Ref": "MyHealthCheckPort"}, + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerPort": { + "Type": "String", + }, + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": ( + "/Properties/ContainerDefinitions" + "/0/PortMappings/0/ContainerPort" + ), + "value": {"Ref": "MyContainerPort"}, + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyTaskDefinition": { + "Type": "String", + }, + }, + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Ref": "MyTaskDefinition"}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Fn::FindInMap": ["A", "B", "C"]}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Fn::GetAtt": ["TaskDefinition", "Arn"]}, + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="const", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Ref": []}, + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="const", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="required", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/ContainerDefinitions/0/Name", + "value": {"Ref": "MyContainerName"}, + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/LoadBalancers/0/ContainerName", + "value": {"Ref": "MyContainerName"}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/ContainerName", + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/ContainerPort", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/TargetGroupArn", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/ContainerDefinitions/0/Name", + "value": "Changed", + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": ( + "/Properties/ContainerDefinitions/" + "0/PortMappings/0/ContainerPort" + ), + "value": "30", + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ], + indirect=["template"], +) +def test_validate(template, start_path, expected, rule, validator): + for instance, instance_validator in get_value_from_path( + validator, template, start_path + ): + errs = list(rule.validate(instance_validator, "", instance, {})) + assert errs == expected, f"Expected {expected} got {errs}"