Skip to content

Commit

Permalink
Clean up the ECS check rule E3049
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Jul 26, 2024
1 parent 58ae2f7 commit 14373b1
Show file tree
Hide file tree
Showing 4 changed files with 671 additions and 79 deletions.
39 changes: 1 addition & 38 deletions src/cfnlint/conditions/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,44 +256,7 @@ def _build_cfn_implies(self, scenarios) -> And:

return And(*conditions)

def can_imply(self, scenarios: dict[str, bool], implies: str) -> bool:
"""Based on a bunch of scenario conditions and their Truth/False value
determine if implies condition is True any time the scenarios are satisfied
solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies])
Args:
scenarios (dict[str, bool]): A list of condition names
and if they are True or False implies: the condition name that
we are implying will also be True
Returns:
bool: if the implied condition will be True if the scenario is True
"""
try:
cnf = self._cnf.copy()
# if the implies condition has to be false in the scenarios we
# know it can never be true
if not scenarios.get(implies, True):
return False

and_condition = self._build_cfn_implies(scenarios)
cnf.add_prop(and_condition)
implies_condition = self._conditions[implies].build_true_cnf(
self._solver_params
)
cnf.add_prop(Implies(and_condition, implies_condition))

results = satisfiable(cnf)
if results:
return True

return False
except KeyError:
# KeyError is because the listed condition doesn't exist because of bad
# formatting or just the wrong condition name
return True

def has_to_imply(self, scenarios: dict[str, bool], implies: str) -> bool:
def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool:
"""Based on a bunch of scenario conditions and their Truth/False value
determine if implies condition is True any time the scenarios are satisfied
solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies])
Expand Down
146 changes: 106 additions & 40 deletions src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,22 @@ def __init__(self) -> None:
keywords=["Resources/AWS::ECS::Service/Properties"],
)

def _filter_resource_name(self, instance: Any) -> str | None:
fn_k, fn_v = is_function(instance)
if fn_k is None:
return None
if fn_k == "Ref":
if isinstance(fn_v, str):
return fn_v
elif fn_k == "Fn::GetAtt":
name = ensure_list(fn_v)[0].split(".")[0]
if isinstance(name, str):
return name
return None

def _get_service_lb_containers(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, str, Validator]]:

) -> Iterator[tuple[str, str, str, Validator]]:
for load_balancer, load_balancer_validator in get_value_from_path(
validator,
instance,
Expand All @@ -46,35 +58,79 @@ def _get_service_lb_containers(
load_balancer,
path=deque(["ContainerName"]),
):
if name is None:
if name is None or not isinstance(name, str):
continue
for port, port_validator in get_value_from_path(
name_validator,
load_balancer,
path=deque(["ContainerPort"]),
):
if port is None:
if port is None or not isinstance(port, (str, int)):
continue
yield name, port, port_validator
for tg, tg_validator in get_value_from_path(
port_validator,
load_balancer,
path=deque(["TargetGroupArn"]),
):
tg = self._filter_resource_name(tg)
if tg is None:
continue
yield name, port, tg, tg_validator

def _get_task_definition_resource_name(
def _get_service_properties(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, Validator]]:
) -> Iterator[tuple[str, str, str, str, Validator]]:
for task_definition_id, task_definition_validator in get_value_from_path(
validator, instance, deque(["TaskDefinition"])
):
fn_k, fn_v = is_function(task_definition_id)
if fn_k == "Ref":
if validator.is_type(fn_v, "string"):
if fn_v in validator.context.resources:
yield fn_v, task_definition_validator
elif fn_k != "Fn::GetAtt":
task_definition_resource_name = self._filter_resource_name(
task_definition_id
)
if task_definition_resource_name is None:
continue
yield ensure_list(fn_v)[0].split(".")[0], task_definition_validator
for (
container_name,
container_port,
target_group,
lb_validator,
) in self._get_service_lb_containers(task_definition_validator, instance):
yield (
task_definition_resource_name,
container_name,
container_port,
target_group,
lb_validator,
)

def _get_target_group_properties(
self, validator: Validator, resource_name: Any
) -> Iterator[tuple[str | int | None, Validator]]:
target_group, target_group_validator = get_resource_by_name(
validator, resource_name, ["AWS::ElasticLoadBalancingV2::TargetGroup"]
)
if not target_group:
return

for port, port_validator in get_value_from_path(
target_group_validator,
target_group,
path=deque(["Properties", "HealthCheckPort"]),
):
if port is None:
yield port, port_validator
continue

if not isinstance(port, (str, int)):
continue
yield port, port_validator

def _get_task_containers(
self, validator: Validator, resource_name: str, container_name: str
) -> Iterator[tuple[Any, Validator]]:
self,
validator: Validator,
resource_name: str,
container_name: str,
container_port: str | int,
) -> Iterator[Validator]:
task_def, task_def_validator = get_resource_by_name(
validator, resource_name, ["AWS::ECS::TaskDefinition"]
)
Expand All @@ -95,11 +151,14 @@ def _get_task_containers(
if not isinstance(name, str):
continue
if name == container_name:
yield container, name_validator
for host_port_validator in self._filter_port_mappings(
name_validator, container, container_port
):
yield host_port_validator

def _filter_port_mappings(
self, validator: Validator, instance: Any, port: Any
) -> Iterator[tuple[Any, Validator]]:
) -> Iterator[Validator]:
for port_mapping, port_mapping_validator in get_value_from_path(
validator,
instance,
Expand All @@ -113,37 +172,44 @@ def _filter_port_mappings(
if not isinstance(container_port, (str, int)):
continue
if str(port) == str(container_port):
for host_port, host_part_validator in get_value_from_path(
for _, host_part_validator in get_value_from_path(
container_port_validator,
port_mapping,
path=deque(["HostPort"]),
):
yield host_port, host_part_validator
yield host_part_validator

def validate(
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any]
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for (
task_definition_resource_name,
task_definition_resource_name_validator,
) in self._get_task_definition_resource_name(validator, instance):

for name, port, lb_validator in self._get_service_lb_containers(
task_definition_resource_name_validator, instance
lb_container_name,
lb_container_port,
lb_target_group,
lb_service_validator,
) in self._get_service_properties(validator, instance):
for container_validator in self._get_task_containers(
lb_service_validator,
task_definition_resource_name,
lb_container_name,
lb_container_port,
):
for container, container_validator in self._get_task_containers(
lb_validator, task_definition_resource_name, name
for (
health_check_port,
health_check_port_validator,
) in self._get_target_group_properties(
container_validator, lb_target_group
):
for host_port, host_port_validator in self._filter_port_mappings(
container_validator, container, port
):
if host_port == 0:
yield ValidationError(
"When using an ECS task definition of host port 0 and "
"associating that container to an ELB the target group "
"has to have a 'HealthCheckPort' of 'traffic-port'",
)

return
yield
if health_check_port != "traffic-port":
err_validator = "const"
if health_check_port is None:
err_validator = "required"
yield ValidationError(
"When using an ECS task definition of host port 0 and "
"associating that container to an ELB the target group "
"has to have a 'HealthCheckPort' of 'traffic-port'",
path_override=health_check_port_validator.context.path.path,
validator=err_validator,
)
2 changes: 1 addition & 1 deletion src/cfnlint/template/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo
return results
scenario[condition_name] = list(condition_bool)[0]

if not self.conditions.has_to_imply(scenario, resource_condition):
if not self.conditions.check_implies(scenario, resource_condition):
return [{**{resource_condition: False}, **scenario}]

# if resource condition isn't available then the resource is available
Expand Down
Loading

0 comments on commit 14373b1

Please sign in to comment.