Skip to content

Commit

Permalink
feat(subscriber): add eventbridge (#103)
Browse files Browse the repository at this point in the history
Wire in eventbridge rules to automatically trigger subscription lambda.
Test verifies that our discovery rule triggers the lambda on stack
install.
  • Loading branch information
jta committed Nov 29, 2023
1 parent 788dcfc commit ac9ccf1
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 11 deletions.
79 changes: 78 additions & 1 deletion apps/subscriber/template.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
---
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Transform:
- AWS::Serverless-2016-10-31
- AWS::LanguageExtensions
Description: 'Subscribe logs to Observe.'
Metadata:
AWS::ServerlessRepo::Application:
Expand Down Expand Up @@ -60,6 +62,12 @@ Parameters:
Description: Maximum number of concurrent workers when processing log groups.
Default: ''
AllowedPattern: '^[0-9]*$'
DiscoveryRate:
Type: String
Description: EventBridge rate expression for periodically triggering
discovery. If not set, no eventbridge rules are configured.
Default: ''
AllowedPattern: '^([1-9]\d* (minute|hour|day)s?)?$'
NameOverride:
Type: String
Description: >-
Expand All @@ -73,6 +81,22 @@ Conditions:
UseStackName: !Equals
- !Ref NameOverride
- ''
HasDiscoveryRate: !Not
- !Equals
- !Ref DiscoveryRate
- ''
HasLogGroupNamePatterns: !Not
- !Equals
- !Join
- ','
- !Ref LogGroupNamePatterns
- ''
HasLogGroupNamePrefixes: !Not
- !Equals
- !Join
- ','
- !Ref LogGroupNamePrefixes
- ''

Resources:
DeadLetter:
Expand Down Expand Up @@ -208,6 +232,59 @@ Resources:
QUEUE_URL: !Ref Queue
VERBOSITY: 9
NUM_WORKERS: !Ref NumWorkers
SubscriptionEvents:
Type: AWS::Events::Rule
Condition: HasDiscoveryRate
DependsOn: QueuePolicy
Properties:
Description: "Subscribe new log groups"
State: ENABLED
EventPattern:
source:
- "aws.logs"
detail-type:
- "AWS API Call via CloudTrail"
detail:
eventSource:
- "logs.amazonaws.com"
eventName:
- "CreateLogGroup"
Targets:
- Arn: !GetAtt Queue.Arn
Id: SubscriptionEvent
InputTransformer:
InputPathsMap:
logGroupName: "$.detail.requestParameters.logGroupName"
InputTemplate: >-
{"subscribe": {"logGroups": [{ "logGroupName": "<logGroupName>"}]}}
DiscoveryEvents:
Type: 'AWS::Events::Rule'
Condition: HasDiscoveryRate
# We must have the appropriate permissions before attempting to write to
# queue. Removing this dependency will cause first trigger to be silently
# dropped.
DependsOn: QueuePolicy
Properties:
Description: Trigger log group discovery
ScheduleExpression: !Sub 'rate(${DiscoveryRate})'
State: ENABLED
Targets:
- Arn: !GetAtt Queue.Arn
Id: DiscoveryEvent
RetryPolicy:
MaximumEventAgeInSeconds: 60
MaximumRetryAttempts: 3
Input: !ToJsonString
discover:
logGroupNamePatterns: !If
- HasLogGroupNamePatterns
- !Ref LogGroupNamePatterns
- []
logGroupNamePrefixes: !If
- HasLogGroupNamePrefixes
- !Ref LogGroupNamePrefixes
- []

Outputs:
Function:
Description: "Lambda Function ARN"
Expand Down
33 changes: 25 additions & 8 deletions docs/subscriber.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

The Observe Subscriber application is an AWS SAM application that subscribes CloudWatch Log Groups to a supported destination ARN, such as Kinesis Firehose or Lambda. It operates with two types of requests: subscription requests and discovery requests.

Additionally, the stack provides a method for automatically triggering subscription through Eventbridge rules.

## Configuration

The subscriber Lambda function manages subscription filters for log groups and uses the following environment variables for configuration:
Expand All @@ -11,7 +13,7 @@ The subscriber Lambda function manages subscription filters for log groups and u
| `FILTER_NAME` | (Required) Name for the subscription filter. Any existing filters with this prefix will be removed. |
| `FILTER_PATTERN` | Pattern for the subscription filter. See [AWS documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html) for details. |
| `DESTINATION_ARN` | Destination ARN for the subscription filter. If empty, any filters with `FILTER_NAME` will be removed. |
| `ROLE_ARN` | Role ARN, required if `DESTINATION_ARN` is set. |
| `ROLE_ARN` | Role ARN. Can only be set if `DESTINATION_ARN` is also set. |

The scope of log groups the Lambda function applies to is determined by:

Expand Down Expand Up @@ -104,13 +106,28 @@ To perform subscriptions in the same invocation as a discovery request, include

The successful invocation response will include subscription stats embedded within the discovery stats.

---
```json
{
"discovery": {
"logGroupCount": 3,
"requestCount": 2,
"subscription": {
"deleted": 0,
"updated": 0,
"skipped": 0,
"processed": 3
}
}
}
```

## Automatic subscription through Eventbridge rules

The stack optionally installs eventbridge rules which automatically subscribe log groups the the configured destination. To enable this feature, you must set the `DiscoveryRate` parameter to a valid [AWS EventBridge rate expression](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rate-expressions.html) (e.g. `1 hour`).

## Additional Notes
If this parameter is set, two EventBridge rules are installed:

- **Inline Subscriptions**: The `inline` option can be useful for immediate subscription after discovery but may increase the invocation duration.
- **SQS Queue Usage**: By default, if an SQS queue is provided, the Lambda function will fan out subscription requests for better scalability and management.
- **IAM Role**: The role specified in `ROLE_ARN` should have the necessary permissions to manage CloudWatch Logs and the destination resource.
- **Deployment and Updates**: For deployment instructions, refer to the main `README.md` and `DEVELOPER.md` documents. When updating the application, remember to adjust the `SemanticVersion` in `template.yaml` to reflect the changes.
- a discovery request that will be fire at the desired rate,
- a subscription request will be fired on log group creation. This rule will only fire if CloudTrail is configured within the account and region our subscriber is running in.

Please refer to the provided `template.yaml` for the complete definition of the SAM application and to customize the deployment to fit your requirements.
Both rules will send requests to the SQS queue, which in turn are consumed by the subscriber lambda.
4 changes: 4 additions & 0 deletions handler/subscriber/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/go-logr/logr"
)

var ErrNoQueue = errors.New("no queue defined")
Expand All @@ -15,6 +16,9 @@ func (h *Handler) HandleDiscoveryRequest(ctx context.Context, discoveryReq *Disc
Discovery: new(DiscoveryStats),
}

logger := logr.FromContextOrDiscard(ctx)
logger.V(3).Info("handling discovery request", "request", discoveryReq)

var inline bool
if discoveryReq.Inline == nil {
inline = h.Queue == nil
Expand Down
12 changes: 12 additions & 0 deletions integration/scripts/check_subscriber
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ TMPFILE=$(mktemp)
FUNCTION_NAME=$(echo "$FUNCTION_ARN" | cut -d: -f7)
AWS_REGION=$(echo "$FUNCTION_ARN" | cut -d: -f4)

LOG_EVENTS=$(aws logs filter-log-events \
--region ${AWS_REGION} \
--log-group-name /aws/lambda/${FUNCTION_NAME} | jq -r '.events[] | .message | fromjson?' )

[[ ! -z "${LOG_EVENTS}" ]] || DIE "subscriber lambda not invoked by eventbridge rule"

[[ -z $(jq -r 'select(.level == "ERROR")' <<< ${LOG_EVENTS}) ]] || DIE "errors detected in lambda logs"

# this requires verbosity to be set to at least 3
[[ ! -z $(jq -r 'select(.msg == "handling discovery request")' <<< ${LOG_EVENTS}) ]] || DIE "no discovery request detected"

check_result() {
ERR=$(jq '.StatusCode != 200 or has("FunctionError")' <<<"$1")
if [[ "$ERR" == true ]]; then
Expand All @@ -27,6 +38,7 @@ check_result() {
}

echo '{"subscribe": {"logGroups": [{"logGroupName": "does_not_exist"}]}}' > ${TMPFILE}

RESULT=$(aws lambda invoke \
--function-name ${FUNCTION_NAME} \
--payload fileb://${TMPFILE} ${TMPFILE} \
Expand Down
6 changes: 4 additions & 2 deletions integration/tests/subscriber.tftest.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ run "install" {
name = run.setup.id
app = "subscriber"
parameters = {
LogGroupNamePatterns = "*"
DiscoveryRate = "5 minutes"
}
capabilities = [
"CAPABILITY_IAM",
Expand All @@ -17,7 +19,7 @@ run "install" {
}
}

run "check_invoke" {
run "check_eventbridge_invoked" {
module {
source = "./modules/exec"
}
Expand All @@ -31,6 +33,6 @@ run "check_invoke" {

assert {
condition = output.error == ""
error_message = "Failed to invoke lambda function"
error_message = "Failed to verify subscriber invocation"
}
}

0 comments on commit ac9ccf1

Please sign in to comment.