Skip to content

Commit

Permalink
ECR: Add get registry scanning configuration operation (getmoto#7402)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfangl authored Feb 29, 2024
1 parent 12460e5 commit ce447bf
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 18 deletions.
2 changes: 1 addition & 1 deletion IMPLEMENTATION_COVERAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2684,7 +2684,7 @@
- [X] get_lifecycle_policy
- [ ] get_lifecycle_policy_preview
- [X] get_registry_policy
- [ ] get_registry_scanning_configuration
- [X] get_registry_scanning_configuration
- [X] get_repository_policy
- [ ] initiate_layer_upload
- [X] list_images
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/services/ecr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ecr
- [X] get_lifecycle_policy
- [ ] get_lifecycle_policy_preview
- [X] get_registry_policy
- [ ] get_registry_scanning_configuration
- [X] get_registry_scanning_configuration
- [X] get_repository_policy
- [ ] initiate_layer_upload
- [X] list_images
Expand Down
64 changes: 54 additions & 10 deletions moto/ecr/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
import json
import re
import threading
from collections import namedtuple
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Tuple
Expand Down Expand Up @@ -390,6 +391,11 @@ def __init__(self, region_name: str, account_id: str):
self.registry_policy: Optional[str] = None
self.replication_config: Dict[str, Any] = {"rules": []}
self.repositories: Dict[str, Repository] = {}
self.registry_scanning_configuration: Dict[str, Any] = {
"scanType": "BASIC",
"rules": [],
}
self.registry_scanning_configuration_update_lock = threading.RLock()
self.tagger = TaggingService(tag_name="tags")

@staticmethod
Expand Down Expand Up @@ -495,6 +501,19 @@ def create_repository(
self.repositories[repository_name] = repository
self.tagger.tag_resource(repository.arn, tags)

# check if any of the registry scanning policies applies to the repository
with self.registry_scanning_configuration_update_lock:
for rule in self.registry_scanning_configuration["rules"]:
for repo_filter in rule["repositoryFilters"]:
if self._match_repository_filter(
repo_filter["filter"], repository_name
):
repository.scanning_config["scanFrequency"] = rule[
"scanFrequency"
]
# AWS testing seems to indicate that this is always overwritten
repository.scanning_config["appliedScanFilters"] = [repo_filter]

return repository

def delete_repository(
Expand Down Expand Up @@ -1117,16 +1136,41 @@ def put_replication_configuration(

return {"replicationConfiguration": replication_config}

def put_registry_scanning_configuration(self, rules: List[Dict[str, Any]]) -> None:
for rule in rules:
for repo_filter in rule["repositoryFilters"]:
for repo in self.repositories.values():
if repo_filter["filter"] == repo.name or re.match(
repo_filter["filter"], repo.name
):
repo.scanning_config["scanFrequency"] = rule["scanFrequency"]
# AWS testing seems to indicate that this is always overwritten
repo.scanning_config["appliedScanFilters"] = [repo_filter]
def _match_repository_filter(self, filter: str, repository_name: str) -> bool:
filter_regex = filter.replace("*", ".*")
return filter in repository_name or bool(
re.match(filter_regex, repository_name)
)

def get_registry_scanning_configuration(self) -> Dict[str, Any]:
return self.registry_scanning_configuration

def put_registry_scanning_configuration(
self, scan_type: str, rules: List[Dict[str, Any]]
) -> None:
# locking here to avoid simultaneous updates which leads to inconsistent state
with self.registry_scanning_configuration_update_lock:
self.registry_scanning_configuration = {
"scanType": scan_type,
"rules": rules,
}

# reset all rules first
for repo in self.repositories.values():
repo.scanning_config["scanFrequency"] = "MANUAL"
repo.scanning_config["appliedScanFilters"] = []

for rule in rules:
for repo_filter in rule["repositoryFilters"]:
for repo in self.repositories.values():
if self._match_repository_filter(
repo_filter["filter"], repo.name
):
repo.scanning_config["scanFrequency"] = rule[
"scanFrequency"
]
# AWS testing seems to indicate that this is always overwritten
repo.scanning_config["appliedScanFilters"] = [repo_filter]

def describe_registry(self) -> Dict[str, Any]:
return {
Expand Down
17 changes: 15 additions & 2 deletions moto/ecr/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,24 @@ def put_replication_configuration(self) -> str:
)
)

def get_registry_scanning_configuration(self) -> str:
registry_scanning_config = (
self.ecr_backend.get_registry_scanning_configuration()
)
return json.dumps(
{
"registryId": self.current_account,
"scanningConfiguration": registry_scanning_config,
}
)

def put_registry_scanning_configuration(self) -> str:
scan_type = self._get_param("scanType")
rules = self._get_param("rules")
self.ecr_backend.put_registry_scanning_configuration(rules)
return json.dumps({"scanType": scan_type, "rules": rules})
self.ecr_backend.put_registry_scanning_configuration(scan_type, rules)
return json.dumps(
{"registryScanningConfiguration": {"scanType": scan_type, "rules": rules}}
)

def describe_registry(self) -> str:
return json.dumps(self.ecr_backend.describe_registry())
123 changes: 119 additions & 4 deletions tests/test_ecr/test_ecr_scanning_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import boto3

from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID

ECR_REGION = "us-east-1"
ECR_REPO = "test-repo"


@mock_aws
Expand Down Expand Up @@ -133,9 +137,120 @@ def test_put_registry_scanning_configuration():
"repositoryArn": repo_arn,
"repositoryName": repo_name,
"scanOnPush": False,
"scanFrequency": "SCAN_ON_PUSH",
"appliedScanFilters": [
{"filter": f"{repo_name[:4]}*", "filterType": "WILDCARD"}
],
"scanFrequency": "MANUAL",
"appliedScanFilters": [],
}
]


@mock_aws
def test_registry_scanning_configuration_lifecycle():
client = boto3.client("ecr", region_name=ECR_REGION)
client.create_repository(repositoryName=ECR_REPO)

get_scanning_config_response = client.get_registry_scanning_configuration()
assert get_scanning_config_response["registryId"] == ACCOUNT_ID
assert get_scanning_config_response["scanningConfiguration"] == {
"rules": [],
"scanType": "BASIC",
}

put_scanning_config_response = client.put_registry_scanning_configuration(
scanType="BASIC",
rules=[
{
"repositoryFilters": [
{
"filter": "test-*",
"filterType": "WILDCARD",
}
],
"scanFrequency": "SCAN_ON_PUSH",
}
],
)

assert put_scanning_config_response["registryScanningConfiguration"] == {
"rules": [
{
"repositoryFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"scanFrequency": "SCAN_ON_PUSH",
}
],
"scanType": "BASIC",
}

# check if scanning config is returned in get operation
get_scanning_config_response = client.get_registry_scanning_configuration()
assert get_scanning_config_response["registryId"] == ACCOUNT_ID
assert get_scanning_config_response["scanningConfiguration"] == {
"rules": [
{
"repositoryFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"scanFrequency": "SCAN_ON_PUSH",
}
],
"scanType": "BASIC",
}

# check if the scanning config is returned in batch_get_repository_scanning_configuration
repo_scanning_config_result = client.batch_get_repository_scanning_configuration(
repositoryNames=[ECR_REPO]
)
assert repo_scanning_config_result["scanningConfigurations"][0] == {
"appliedScanFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/{ECR_REPO}",
"repositoryName": ECR_REPO,
"scanFrequency": "SCAN_ON_PUSH",
"scanOnPush": False,
}

# create new repository and check if scanning config is applied
client.create_repository(repositoryName="test-repo-2")
repo_scanning_config_result = client.batch_get_repository_scanning_configuration(
repositoryNames=["test-repo-2"]
)
assert repo_scanning_config_result["scanningConfigurations"][0] == {
"appliedScanFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/test-repo-2",
"repositoryName": "test-repo-2",
"scanFrequency": "SCAN_ON_PUSH",
"scanOnPush": False,
}

# revert scanning config and see if it is properly applied to all repositories
put_scanning_config_response = client.put_registry_scanning_configuration(
scanType="BASIC",
rules=[],
)
assert put_scanning_config_response["registryScanningConfiguration"] == {
"rules": [],
"scanType": "BASIC",
}

get_scanning_config_response = client.get_registry_scanning_configuration()
assert get_scanning_config_response["registryId"] == ACCOUNT_ID
assert get_scanning_config_response["scanningConfiguration"] == {
"rules": [],
"scanType": "BASIC",
}

repo_scanning_config_result = client.batch_get_repository_scanning_configuration(
repositoryNames=[ECR_REPO, "test-repo-2"]
)
assert repo_scanning_config_result["scanningConfigurations"] == [
{
"appliedScanFilters": [],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/{ECR_REPO}",
"repositoryName": ECR_REPO,
"scanFrequency": "MANUAL",
"scanOnPush": False,
},
{
"appliedScanFilters": [],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/test-repo-2",
"repositoryName": "test-repo-2",
"scanFrequency": "MANUAL",
"scanOnPush": False,
},
]

0 comments on commit ce447bf

Please sign in to comment.