Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: Adding notification for eventbridge #7252

Merged
merged 14 commits into from
Feb 16, 2024
11 changes: 6 additions & 5 deletions moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ def __init__(
topic: Optional[List[Dict[str, Any]]] = None,
queue: Optional[List[Dict[str, Any]]] = None,
cloud_function: Optional[List[Dict[str, Any]]] = None,
event_bridge: Optional[Dict[str, Any]] = None,
):
self.topic = (
[
Expand Down Expand Up @@ -923,6 +924,7 @@ def __init__(
if cloud_function
else []
)
self.event_bridge = event_bridge

def to_config_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {"configurations": {}}
Expand All @@ -945,6 +947,8 @@ def to_config_dict(self) -> Dict[str, Any]:
cf_config["type"] = "LambdaConfiguration"
data["configurations"][cloud_function.id] = cf_config

if self.event_bridge is not None:
data["configurations"]["EventBridgeConfiguration"] = self.event_bridge
return data


Expand Down Expand Up @@ -1325,6 +1329,7 @@ def set_notification_configuration(
topic=notification_config.get("TopicConfiguration"),
queue=notification_config.get("QueueConfiguration"),
cloud_function=notification_config.get("CloudFunctionConfiguration"),
event_bridge=notification_config.get("EventBridgeConfiguration"),
)

# Validate that the region is correct:
Expand Down Expand Up @@ -2315,11 +2320,7 @@ def put_bucket_notification_configuration(
- AWSLambda
- SNS
- SQS

bblommers marked this conversation as resolved.
Show resolved Hide resolved
For the following events:

- 's3:ObjectCreated:Copy'
- 's3:ObjectCreated:Put'
- EventBridge
"""
bucket = self.get_bucket(bucket_name)
bucket.set_notification_configuration(notification_config)
Expand Down
98 changes: 98 additions & 0 deletions moto/s3/notifications.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import copy
import json
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List

from moto.core.utils import unix_time
from moto.events.utils import _BASE_EVENT_MESSAGE

_EVENT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"


Expand Down Expand Up @@ -122,6 +126,9 @@ def send_event(

_send_sns_message(account_id, event_body, topic_arn, region_name)

if bucket.notification_configuration.event_bridge is not None:
_send_event_bridge_message(account_id, bucket, event_name, key)


def _send_sqs_message(
account_id: str, event_body: Any, queue_name: str, region_name: str
Expand Down Expand Up @@ -157,6 +164,97 @@ def _send_sns_message(
pass


def _send_event_bridge_message(
account_id: str,
bucket: Any,
event_name: str,
key: Any,
) -> None:
try:
from moto.events.models import events_backends

event = copy.deepcopy(_BASE_EVENT_MESSAGE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _BASE_EVENT_MESSAGE-import should also be local

event["detail-type"] = _detail_type(event_name)
event["source"] = "aws.s3"
event["account"] = account_id
event["time"] = unix_time()
event["region"] = bucket.region_name
event["resources"] = [f"arn:aws:s3:::{bucket.name}"]
event["detail"] = {
"version": "0",
"bucket": {"name": bucket.name},
"object": {
"key": key.name,
"size": key.size,
"eTag": key.etag.replace('"', ""),
"version-id": "IYV3p45BT0ac8hjHg1houSdS1a.Mro8e",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use key.version_id here?

"sequencer": "617f08299329d189",
},
"request-id": "N4N7GDK58NMKJ12R",
"requester": "123456789012",
"source-ip-address": "1.2.3.4",
# ex) s3:ObjectCreated:Put -> ObjectCreated
"reason": event_name.split(":")[1],
}

events_backend = events_backends[account_id][bucket.region_name]
for event_bus in events_backend.event_buses.values():
for rule in event_bus.rules.values():
rule.send_to_targets(event)

except: # noqa
# This is an async action in AWS.
# Even if this part fails, the calling function should pass, so catch all errors
# Possible exceptions that could be thrown:
# - EventBridge does not exist
pass


def _detail_type(event_name: str) -> str:
"""Detail type field values for event messages of s3 EventBridge notification

document: https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html
"""
if event_name in [e for e in S3NotificationEvent.events() if "ObjectCreated" in e]:
return "Object Created"
elif event_name in [
e
for e in S3NotificationEvent.events()
if "ObjectRemoved" in e or "LifecycleExpiration" in e
]:
return "Object Deleted"
elif event_name in [
e for e in S3NotificationEvent.events() if "ObjectRestore" in e
]:
if event_name == S3NotificationEvent.OBJECT_RESTORE_POST_EVENT:
return "Object Restore Initiated"
elif event_name == S3NotificationEvent.OBJECT_RESTORE_COMPLETED_EVENT:
return "Object Restore Completed"
else:
# s3:ObjectRestore:Delete event
return "Object Restore Expired"
elif event_name in [
e for e in S3NotificationEvent.events() if "LifecycleTransition" in e
]:
return "Object Storage Class Changed"
elif event_name in [
e for e in S3NotificationEvent.events() if "IntelligentTiering" in e
]:
return "Object Access Tier Changed"
elif event_name in [e for e in S3NotificationEvent.events() if "ObjectAcl" in e]:
return "Object ACL Updated"
elif event_name in [e for e in S3NotificationEvent.events() if "ObjectTagging"]:
if event_name == S3NotificationEvent.OBJECT_TAGGING_PUT_EVENT:
return "Object Tags Added"
else:
# s3:ObjectTagging:Delete event
return "Object Tags Deleted"
else:
raise ValueError(
f"unsupported event `{event_name}` for s3 eventbridge notification (https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html)"
)


def _invoke_awslambda(
account_id: str, event_body: Any, fn_arn: str, region_name: str
) -> None:
Expand Down
7 changes: 7 additions & 0 deletions moto/s3/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2092,12 +2092,19 @@ def _notification_config_from_body(self) -> Dict[str, Any]:
("Topic", "sns"),
("Queue", "sqs"),
("CloudFunction", "lambda"),
("EventBridge", "events"),
]

found_notifications = (
0 # Tripwire -- if this is not ever set, then there were no notifications
)
for name, arn_string in notification_fields:
# EventBridgeConfiguration is passed as an empty dict.
if name == "EventBridge":
events_field = f"{name}Configuration"
if events_field in parsed_xml["NotificationConfiguration"]:
parsed_xml["NotificationConfiguration"][events_field] = {}
found_notifications += 1
# 1st verify that the proper notification configuration has been passed in (with an ARN that is close
# to being correct -- nothing too complex in the ARN logic):
the_notification = parsed_xml["NotificationConfiguration"].get(
Expand Down
2 changes: 2 additions & 0 deletions tests/test_s3/test_s3_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def test_s3_notification_config_dict():
},
}
],
"EventBridgeConfiguration": {},
}

s3_config_query.backends[DEFAULT_ACCOUNT_ID][
Expand Down Expand Up @@ -389,6 +390,7 @@ def test_s3_notification_config_dict():
"queueARN": "arn:aws:lambda:us-west-2:012345678910:function:mylambda",
"type": "LambdaConfiguration",
},
"EventBridgeConfiguration": {},
}
}

Expand Down
59 changes: 59 additions & 0 deletions tests/test_s3/test_s3_eventbridge_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
from uuid import uuid4

import boto3

from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID

REGION_NAME = "us-east-1"


@mock_aws
def test_pub_object_notification():
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)

rule_name = "test-rule"
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"
logs_client.create_log_group(logGroupName=log_group_name)
events_client.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "test",
"Arn": f"arn:aws:logs:{REGION_NAME}:{ACCOUNT_ID}:log-group:{log_group_name}",
}
],
)

# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)

# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},
)

# Put Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")

events = sorted(
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["eventId"],
)
assert len(events) == 1
event_message = json.loads(events[0]["message"])
assert event_message["detail-type"] == "Object Created"
assert event_message["source"] == "aws.s3"
assert event_message["account"] == ACCOUNT_ID
assert event_message["region"] == REGION_NAME
assert event_message["detail"]["bucket"]["name"] == bucket_name
assert event_message["detail"]["reason"] == "ObjectCreated"
135 changes: 135 additions & 0 deletions tests/test_s3/test_s3_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
from typing import List
from unittest import SkipTest
from uuid import uuid4

import boto3
import pytest

from moto import mock_aws, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.s3.models import FakeBucket, FakeKey
from moto.s3.notifications import (
S3NotificationEvent,
_detail_type,
_send_event_bridge_message,
)

REGION_NAME = "us-east-1"


@pytest.mark.parametrize(
"event_names, expected_event_message",
[
(
[
S3NotificationEvent.OBJECT_CREATED_PUT_EVENT,
S3NotificationEvent.OBJECT_CREATED_POST_EVENT,
S3NotificationEvent.OBJECT_CREATED_COPY_EVENT,
S3NotificationEvent.OBJECT_CREATED_COMPLETE_MULTIPART_UPLOAD_EVENT,
],
"Object Created",
),
(
[
S3NotificationEvent.OBJECT_REMOVED_DELETE_EVENT,
S3NotificationEvent.OBJECT_REMOVED_DELETE_MARKER_CREATED_EVENT,
],
"Object Deleted",
),
([S3NotificationEvent.OBJECT_RESTORE_POST_EVENT], "Object Restore Initiated"),
(
[S3NotificationEvent.OBJECT_RESTORE_COMPLETED_EVENT],
"Object Restore Completed",
),
(
[S3NotificationEvent.OBJECT_RESTORE_DELETE_EVENT],
"Object Restore Expired",
),
(
[S3NotificationEvent.LIFECYCLE_TRANSITION_EVENT],
"Object Storage Class Changed",
),
([S3NotificationEvent.INTELLIGENT_TIERING_EVENT], "Object Access Tier Changed"),
([S3NotificationEvent.OBJECT_ACL_EVENT], "Object ACL Updated"),
([S3NotificationEvent.OBJECT_TAGGING_PUT_EVENT], "Object Tags Added"),
([S3NotificationEvent.OBJECT_TAGGING_DELETE_EVENT], "Object Tags Deleted"),
],
)
def test_detail_type(event_names: List[str], expected_event_message: str):
for event_name in event_names:
assert _detail_type(event_name) == expected_event_message


def test_detail_type_unknown_event():
with pytest.raises(ValueError) as ex:
_detail_type("unknown event")
assert (
str(ex.value)
== "unsupported event `unknown event` for s3 eventbridge notification (https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html)"
)


@mock_aws
def test_send_event_bridge_message():
# setup mocks
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)
rule_name = "test-rule"
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"
logs_client.create_log_group(logGroupName=log_group_name)
mocked_bucket = FakeBucket(str(uuid4()), ACCOUNT_ID, REGION_NAME)
mocked_key = FakeKey(
"test-key", bytes("test content", encoding="utf-8"), ACCOUNT_ID
)

# do nothing if event target does not exists.
_send_event_bridge_message(
ACCOUNT_ID,
mocked_bucket,
S3NotificationEvent.OBJECT_CREATED_PUT_EVENT,
mocked_key,
)
assert (
len(logs_client.filter_log_events(logGroupName=log_group_name)["events"]) == 0
)

# do nothing even if an error is raised while sending events.
events_client.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "test",
"Arn": f"arn:aws:logs:{REGION_NAME}:{ACCOUNT_ID}:log-group:{log_group_name}",
}
],
)

_send_event_bridge_message(ACCOUNT_ID, mocked_bucket, "unknown-event", mocked_key)
assert (
len(logs_client.filter_log_events(logGroupName=log_group_name)["events"]) == 0
)

if not settings.TEST_DECORATOR_MODE:
raise SkipTest(("Doesn't quite work right with the Proxy or Server"))
# an event is correctly sent to the log group.
_send_event_bridge_message(
ACCOUNT_ID,
mocked_bucket,
S3NotificationEvent.OBJECT_CREATED_PUT_EVENT,
mocked_key,
)
events = logs_client.filter_log_events(logGroupName=log_group_name)["events"]
assert len(events) == 1
event_msg = json.loads(events[0]["message"])
assert event_msg["detail-type"] == "Object Created"
assert event_msg["source"] == "aws.s3"
assert event_msg["region"] == REGION_NAME
assert event_msg["resources"] == [f"arn:aws:s3:::{mocked_bucket.name}"]
event_detail = event_msg["detail"]
assert event_detail["bucket"] == {"name": mocked_bucket.name}
assert event_detail["object"]["key"] == mocked_key.name
assert event_detail["reason"] == "ObjectCreated"
Loading