From 14373b162a76521eb4580df7a0fe709090ff966f Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 25 Jul 2024 16:40:26 -0700 Subject: [PATCH] Clean up the ECS check rule E3049 --- src/cfnlint/conditions/conditions.py | 39 +- .../resources/ecs/ServiceDynamicPorts.py | 146 +++-- src/cfnlint/template/template.py | 2 +- .../ecs/test_service_dynamic_ports.py | 563 ++++++++++++++++++ 4 files changed, 671 insertions(+), 79 deletions(-) create mode 100644 test/unit/rules/resources/ecs/test_service_dynamic_ports.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index cbba5bfed2..915932361c 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -256,44 +256,7 @@ def _build_cfn_implies(self, scenarios) -> And: return And(*conditions) - def can_imply(self, scenarios: dict[str, bool], implies: str) -> bool: - """Based on a bunch of scenario conditions and their Truth/False value - determine if implies condition is True any time the scenarios are satisfied - solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) - - Args: - scenarios (dict[str, bool]): A list of condition names - and if they are True or False implies: the condition name that - we are implying will also be True - - Returns: - bool: if the implied condition will be True if the scenario is True - """ - try: - cnf = self._cnf.copy() - # if the implies condition has to be false in the scenarios we - # know it can never be true - if not scenarios.get(implies, True): - return False - - and_condition = self._build_cfn_implies(scenarios) - cnf.add_prop(and_condition) - implies_condition = self._conditions[implies].build_true_cnf( - self._solver_params - ) - cnf.add_prop(Implies(and_condition, implies_condition)) - - results = satisfiable(cnf) - if results: - return True - - return False - except KeyError: - # KeyError is because the listed condition doesn't exist because of bad - # formatting or just the wrong condition name - return True - - def has_to_imply(self, scenarios: dict[str, bool], implies: str) -> bool: + def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) diff --git a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py index 39961e0a5a..00079aad94 100644 --- a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py +++ b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py @@ -32,10 +32,22 @@ def __init__(self) -> None: keywords=["Resources/AWS::ECS::Service/Properties"], ) + def _filter_resource_name(self, instance: Any) -> str | None: + fn_k, fn_v = is_function(instance) + if fn_k is None: + return None + if fn_k == "Ref": + if isinstance(fn_v, str): + return fn_v + elif fn_k == "Fn::GetAtt": + name = ensure_list(fn_v)[0].split(".")[0] + if isinstance(name, str): + return name + return None + def _get_service_lb_containers( self, validator: Validator, instance: Any - ) -> Iterator[tuple[str, str, Validator]]: - + ) -> Iterator[tuple[str, str, str, Validator]]: for load_balancer, load_balancer_validator in get_value_from_path( validator, instance, @@ -46,35 +58,79 @@ def _get_service_lb_containers( load_balancer, path=deque(["ContainerName"]), ): - if name is None: + if name is None or not isinstance(name, str): continue for port, port_validator in get_value_from_path( name_validator, load_balancer, path=deque(["ContainerPort"]), ): - if port is None: + if port is None or not isinstance(port, (str, int)): continue - yield name, port, port_validator + for tg, tg_validator in get_value_from_path( + port_validator, + load_balancer, + path=deque(["TargetGroupArn"]), + ): + tg = self._filter_resource_name(tg) + if tg is None: + continue + yield name, port, tg, tg_validator - def _get_task_definition_resource_name( + def _get_service_properties( self, validator: Validator, instance: Any - ) -> Iterator[tuple[str, Validator]]: + ) -> Iterator[tuple[str, str, str, str, Validator]]: for task_definition_id, task_definition_validator in get_value_from_path( validator, instance, deque(["TaskDefinition"]) ): - fn_k, fn_v = is_function(task_definition_id) - if fn_k == "Ref": - if validator.is_type(fn_v, "string"): - if fn_v in validator.context.resources: - yield fn_v, task_definition_validator - elif fn_k != "Fn::GetAtt": + task_definition_resource_name = self._filter_resource_name( + task_definition_id + ) + if task_definition_resource_name is None: continue - yield ensure_list(fn_v)[0].split(".")[0], task_definition_validator + for ( + container_name, + container_port, + target_group, + lb_validator, + ) in self._get_service_lb_containers(task_definition_validator, instance): + yield ( + task_definition_resource_name, + container_name, + container_port, + target_group, + lb_validator, + ) + + def _get_target_group_properties( + self, validator: Validator, resource_name: Any + ) -> Iterator[tuple[str | int | None, Validator]]: + target_group, target_group_validator = get_resource_by_name( + validator, resource_name, ["AWS::ElasticLoadBalancingV2::TargetGroup"] + ) + if not target_group: + return + + for port, port_validator in get_value_from_path( + target_group_validator, + target_group, + path=deque(["Properties", "HealthCheckPort"]), + ): + if port is None: + yield port, port_validator + continue + + if not isinstance(port, (str, int)): + continue + yield port, port_validator def _get_task_containers( - self, validator: Validator, resource_name: str, container_name: str - ) -> Iterator[tuple[Any, Validator]]: + self, + validator: Validator, + resource_name: str, + container_name: str, + container_port: str | int, + ) -> Iterator[Validator]: task_def, task_def_validator = get_resource_by_name( validator, resource_name, ["AWS::ECS::TaskDefinition"] ) @@ -95,11 +151,14 @@ def _get_task_containers( if not isinstance(name, str): continue if name == container_name: - yield container, name_validator + for host_port_validator in self._filter_port_mappings( + name_validator, container, container_port + ): + yield host_port_validator def _filter_port_mappings( self, validator: Validator, instance: Any, port: Any - ) -> Iterator[tuple[Any, Validator]]: + ) -> Iterator[Validator]: for port_mapping, port_mapping_validator in get_value_from_path( validator, instance, @@ -113,37 +172,44 @@ def _filter_port_mappings( if not isinstance(container_port, (str, int)): continue if str(port) == str(container_port): - for host_port, host_part_validator in get_value_from_path( + for _, host_part_validator in get_value_from_path( container_port_validator, port_mapping, path=deque(["HostPort"]), ): - yield host_port, host_part_validator + yield host_part_validator def validate( - self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] ) -> ValidationResult: for ( task_definition_resource_name, - task_definition_resource_name_validator, - ) in self._get_task_definition_resource_name(validator, instance): - - for name, port, lb_validator in self._get_service_lb_containers( - task_definition_resource_name_validator, instance + lb_container_name, + lb_container_port, + lb_target_group, + lb_service_validator, + ) in self._get_service_properties(validator, instance): + for container_validator in self._get_task_containers( + lb_service_validator, + task_definition_resource_name, + lb_container_name, + lb_container_port, ): - for container, container_validator in self._get_task_containers( - lb_validator, task_definition_resource_name, name + for ( + health_check_port, + health_check_port_validator, + ) in self._get_target_group_properties( + container_validator, lb_target_group ): - for host_port, host_port_validator in self._filter_port_mappings( - container_validator, container, port - ): - if host_port == 0: - yield ValidationError( - "When using an ECS task definition of host port 0 and " - "associating that container to an ELB the target group " - "has to have a 'HealthCheckPort' of 'traffic-port'", - ) - - return - yield + if health_check_port != "traffic-port": + err_validator = "const" + if health_check_port is None: + err_validator = "required" + yield ValidationError( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'", + path_override=health_check_port_validator.context.path.path, + validator=err_validator, + ) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 7c39673951..d14b562910 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if not self.conditions.has_to_imply(scenario, resource_condition): + if not self.conditions.check_implies(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/unit/rules/resources/ecs/test_service_dynamic_ports.py b/test/unit/rules/resources/ecs/test_service_dynamic_ports.py new file mode 100644 index 0000000000..86ea9f71e9 --- /dev/null +++ b/test/unit/rules/resources/ecs/test_service_dynamic_ports.py @@ -0,0 +1,563 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import jsonpatch +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.resources.ecs.ServiceDynamicPorts import ServiceDynamicPorts + + +@pytest.fixture +def rule(): + rule = ServiceDynamicPorts() + yield rule + + +_task_definition = { + "Type": "AWS::ECS::TaskDefinition", + "Properties": { + "ContainerDefinitions": [ + { + "Name": "my-container", + "Image": "my-image", + "PortMappings": [ + { + "ContainerPort": 8080, + "HostPort": 0, + } + ], + } + ], + }, +} + +_target_group = { + "Type": "AWS::ElasticLoadBalancingV2::TargetGroup", + "Properties": { + "HealthCheckPort": "traffic-port", + "Port": 8080, + "Protocol": "HTTP", + }, +} + +_service = { + "Type": "AWS::ECS::Service", + "Properties": { + "TaskDefinition": {"Ref": "TaskDefinition"}, + "LoadBalancers": [ + { + "TargetGroupArn": {"Ref": "TargetGroup"}, + "ContainerName": "my-container", + "ContainerPort": 8080, + } + ], + }, +} + + +@pytest.mark.parametrize( + "template,start_path,expected", + [ + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": dict(_target_group), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyHealthCheckPort": { + "Type": "String", + }, + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": {"Ref": "MyHealthCheckPort"}, + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerPort": { + "Type": "String", + }, + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": ( + "/Properties/ContainerDefinitions" + "/0/PortMappings/0/ContainerPort" + ), + "value": {"Ref": "MyContainerPort"}, + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyTaskDefinition": { + "Type": "String", + }, + }, + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Ref": "MyTaskDefinition"}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Fn::FindInMap": ["A", "B", "C"]}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Fn::GetAtt": ["TaskDefinition", "Arn"]}, + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="const", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/TaskDefinition", + "value": {"Ref": []}, + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + { + "op": "replace", + "path": "/Properties/HealthCheckPort", + "value": "3000", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="const", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ( + "When using an ECS task definition of host " + "port 0 and associating that container to " + "an ELB the target group has to have a " + "'HealthCheckPort' of 'traffic-port'" + ), + validator="required", + path_override=deque( + ["Resources", "TargetGroup", "Properties", "HealthCheckPort"] + ), + ) + ], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/ContainerDefinitions/0/Name", + "value": {"Ref": "MyContainerName"}, + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/LoadBalancers/0/ContainerName", + "value": {"Ref": "MyContainerName"}, + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyContainerName": { + "Type": "String", + } + }, + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/ContainerName", + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/ContainerPort", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/LoadBalancers/0/TargetGroupArn", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/ContainerDefinitions/0/Name", + "value": "Changed", + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": ( + "/Properties/ContainerDefinitions/" + "0/PortMappings/0/ContainerPort" + ), + "value": "30", + }, + ], + ), + "TargetGroup": jsonpatch.apply_patch( + dict(_target_group), + [ + {"op": "remove", "path": "/Properties/HealthCheckPort"}, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ], + indirect=["template"], +) +def test_validate(template, start_path, expected, rule, validator): + for instance, instance_validator in get_value_from_path( + validator, template, start_path + ): + errs = list(rule.validate(instance_validator, "", instance, {})) + assert errs == expected, f"Expected {expected} got {errs}"