From 2c80987ff7007f2b87f994e9d8dc6dd4ad396d07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Taveira=20Ara=C3=BAjo?= Date: Thu, 9 Nov 2023 15:53:03 -0800 Subject: [PATCH 1/2] feat(subscriber): break out request types Flesh out the two types of requests our handler will support. I'm purposely keeping implementation light so as to make reviews more tractable. --- docs/subscriber.md | 6 ++ handler/subscriber/handler.go | 75 +++++++--------- handler/subscriber/handler_test.go | 126 +++++++++++++++++++++++++-- handler/subscriber/request.go | 96 ++++++++++++++++++++ handler/subscriber/request_test.go | 56 ++++++++++++ handler/subscriber/response.go | 16 ++++ integration/scripts/check_subscriber | 2 +- 7 files changed, 327 insertions(+), 50 deletions(-) create mode 100644 docs/subscriber.md create mode 100644 handler/subscriber/request.go create mode 100644 handler/subscriber/request_test.go create mode 100644 handler/subscriber/response.go diff --git a/docs/subscriber.md b/docs/subscriber.md new file mode 100644 index 00000000..bf9a3c56 --- /dev/null +++ b/docs/subscriber.md @@ -0,0 +1,6 @@ +# Observe Subscriber + +The subscriber stack subscribes CloudWatch Log Groups to a supported destination ARN (either Kinesis Firehose or Lambda). It supports two request types: + +- subscription requests contain a list of log groups which we wish to subscribe to our destination. +- discovery requests contain a list of filters which are used to generate subscription requests. diff --git a/handler/subscriber/handler.go b/handler/subscriber/handler.go index 106262fc..75097da7 100644 --- a/handler/subscriber/handler.go +++ b/handler/subscriber/handler.go @@ -2,14 +2,16 @@ package subscriber import ( "context" + "errors" "fmt" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" - "github.com/go-logr/logr" "github.com/observeinc/aws-sam-testing/handler" ) +var ErrNotImplemented = errors.New("not implemented") + type CloudWatchLogsClient interface { DescribeLogGroups(context.Context, *cloudwatchlogs.DescribeLogGroupsInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.DescribeLogGroupsOutput, error) } @@ -25,54 +27,43 @@ type Handler struct { Client CloudWatchLogsClient } -type SyncRequest struct { - *SyncConfig `json:"sync"` -} - -type SyncConfig struct { - Subscription *cloudwatchlogs.PutSubscriptionFilterInput `json:"subscription,omitempty"` - Limit *int32 `json:"limit,omitempty"` -} - -type task struct { - PutSubscriptionFilterInput *cloudwatchlogs.PutSubscriptionFilterInput `json:"subscription,omitempty"` - DescribeLogGroupsOutput *cloudwatchlogs.DescribeLogGroupsOutput `json:"logGroups"` -} - -type SyncResponse struct { - LogGroupCount int `json:"logGroupCount"` - PageCount int `json:"pageCount"` -} +func (h *Handler) HandleDiscoveryRequest(ctx context.Context, discoveryReq *DiscoveryRequest) (*Response, error) { + var discoveryResp DiscoveryResponse -func (h *Handler) HandleSync(ctx context.Context, request SyncRequest) (*SyncResponse, error) { - logger := logr.FromContextOrDiscard(ctx) + for _, input := range discoveryReq.ToDescribeLogInputs() { + paginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(h.Client, input) - paginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(h.Client, &cloudwatchlogs.DescribeLogGroupsInput{ - Limit: request.Limit, - }) - - var response SyncResponse - - for paginator.HasMorePages() { - output, err := paginator.NextPage(ctx) - if err != nil { - return nil, fmt.Errorf("failed to describe log groups: %w", err) + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to describe log groups: %w", err) + } + discoveryResp.RequestCount++ + discoveryResp.LogGroupCount += len(page.LogGroups) } + } - response.PageCount++ - response.LogGroupCount += len(output.LogGroups) + return &Response{DiscoveryResponse: &discoveryResp}, nil +} - logger.V(6).Info("queueing page") +func (h *Handler) HandleSubscriptionRequest(_ context.Context, _ *SubscriptionRequest) (*Response, error) { + // to be implemented + return nil, nil +} - if err := h.Queue.Put(ctx, &task{ - PutSubscriptionFilterInput: request.Subscription, - DescribeLogGroupsOutput: output, - }); err != nil { - return nil, fmt.Errorf("failed to queue log groups: %w", err) - } +func (h *Handler) HandleRequest(ctx context.Context, req *Request) (*Response, error) { + if err := req.Validate(); err != nil { + return nil, fmt.Errorf("failed to validate request: %w", err) } - return &response, nil + switch { + case req.DiscoveryRequest != nil: + return h.HandleDiscoveryRequest(ctx, req.DiscoveryRequest) + case req.SubscriptionRequest != nil: + return h.HandleSubscriptionRequest(ctx, req.SubscriptionRequest) + default: + return nil, ErrNotImplemented + } } func New(cfg *Config) (*Handler, error) { @@ -89,7 +80,7 @@ func New(cfg *Config) (*Handler, error) { h.Logger = *cfg.Logger } - if err := h.Mux.Register(h.HandleSync); err != nil { + if err := h.Mux.Register(h.HandleRequest); err != nil { return nil, fmt.Errorf("failed to register handler: %w", err) } diff --git a/handler/subscriber/handler_test.go b/handler/subscriber/handler_test.go index ef8bb9bd..a646fadf 100644 --- a/handler/subscriber/handler_test.go +++ b/handler/subscriber/handler_test.go @@ -2,9 +2,14 @@ package subscriber_test import ( "context" + "fmt" "sync" "testing" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/google/go-cmp/cmp" + "github.com/observeinc/aws-sam-testing/handler/handlertest" "github.com/observeinc/aws-sam-testing/handler/subscriber" ) @@ -21,12 +26,119 @@ func (m *MockQueue) Put(_ context.Context, vs ...any) error { return nil } -func TestHandler(t *testing.T) { - _, err := subscriber.New(&subscriber.Config{ - CloudWatchLogsClient: &handlertest.CloudWatchLogsClient{}, - Queue: &MockQueue{}, - }) - if err != nil { - t.Fatal(err) +func TestHandleDiscovery(t *testing.T) { + t.Parallel() + + client := &handlertest.CloudWatchLogsClient{ + LogGroups: []types.LogGroup{ + {LogGroupName: aws.String("/aws/hello")}, + {LogGroupName: aws.String("/aws/ello")}, + {LogGroupName: aws.String("/aws/hola")}, + }, + SubscriptionFilters: []types.SubscriptionFilter{ + {LogGroupName: aws.String("/aws/hello")}, + }, + } + + testcases := []struct { + DiscoveryRequest *subscriber.DiscoveryRequest + ExpectResponse *subscriber.Response + }{ + { + DiscoveryRequest: &subscriber.DiscoveryRequest{}, + /* matches: + - /aws/hello + - /aws/ello + - /aws/hola + */ + ExpectResponse: &subscriber.Response{ + DiscoveryResponse: &subscriber.DiscoveryResponse{ + RequestCount: 1, + LogGroupCount: 3, + }, + }, + }, + { + DiscoveryRequest: &subscriber.DiscoveryRequest{ + LogGroupNamePrefixes: []*string{ + aws.String("/aws/he"), + aws.String("/aws/ho"), + }, + }, + /* matches: + - /aws/hello + - /aws/hola + */ + ExpectResponse: &subscriber.Response{ + DiscoveryResponse: &subscriber.DiscoveryResponse{ + RequestCount: 2, + LogGroupCount: 2, + }, + }, + }, + { + DiscoveryRequest: &subscriber.DiscoveryRequest{ + LogGroupNamePatterns: []*string{ + aws.String("ello"), + aws.String("foo"), + aws.String("bar"), + }, + }, + /* matches: + - /aws/hello + - /aws/ello + */ + ExpectResponse: &subscriber.Response{ + DiscoveryResponse: &subscriber.DiscoveryResponse{ + RequestCount: 3, + LogGroupCount: 2, + }, + }, + }, + { + DiscoveryRequest: &subscriber.DiscoveryRequest{ + LogGroupNamePatterns: []*string{ + aws.String("ello"), + }, + LogGroupNamePrefixes: []*string{ + aws.String("/aws/he"), + }, + }, + /* matches: + - /aws/hello + - /aws/ello + - /aws/hello + */ + ExpectResponse: &subscriber.Response{ + DiscoveryResponse: &subscriber.DiscoveryResponse{ + RequestCount: 2, + LogGroupCount: 3, + }, + }, + }, + } + + for i, tt := range testcases { + tt := tt + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Parallel() + + s, err := subscriber.New(&subscriber.Config{ + CloudWatchLogsClient: client, + Queue: &MockQueue{}, + }) + if err != nil { + t.Fatal(err) + } + + resp, err := s.HandleDiscoveryRequest(context.Background(), tt.DiscoveryRequest) + if err != nil { + t.Fatal(err) + } + + if diff := cmp.Diff(tt.ExpectResponse, resp); diff != "" { + t.Error(diff) + } + }) } } diff --git a/handler/subscriber/request.go b/handler/subscriber/request.go new file mode 100644 index 00000000..2b8a0c12 --- /dev/null +++ b/handler/subscriber/request.go @@ -0,0 +1,96 @@ +package subscriber + +import ( + "errors" + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" +) + +var ErrMalformedRequest = errors.New("malformed request") + +// Request for our handler. +type Request struct { + *SubscriptionRequest `json:"subscribe"` + *DiscoveryRequest `json:"discover"` +} + +// Validate verifies request is a union. +func (r *Request) Validate() error { + if r == nil { + return fmt.Errorf("%w: nil request", ErrMalformedRequest) + } + + var count int + if r.SubscriptionRequest != nil { + count++ + } + if r.DiscoveryRequest != nil { + count++ + } + + if count == 0 { + return fmt.Errorf("%w: empty request", ErrMalformedRequest) + } + + if count > 1 { + return fmt.Errorf("%w: conflicting requests", ErrMalformedRequest) + } + + return nil +} + +// SubscriptionRequest contains a list of log groups to subscribe. +type SubscriptionRequest struct { + // if provided, we can subscribe this set of log group names + LogGroups []*LogGroup `json:"logGroups,omitempty"` +} + +// DiscoveryRequest generates a list of log groups to subscribe. +type DiscoveryRequest struct { + // optional filters + LogGroupNamePatterns []*string `json:"logGroupNamePatterns,omitempty"` + LogGroupNamePrefixes []*string `json:"logGroupNamePrefixes,omitempty"` + + // Limit when pagination list endpoint + Limit *int32 `json:"limit,omitempty"` +} + +// LogGroup represents the minimal viable info we need to be able to subscribe +// to a log group. +// Once we need to support linked accounts we'll likely need more than just a +// name. +type LogGroup struct { + LogGroupName string `json:"logGroupName"` +} + +// ToDescribeLogInputs computes the necessary describe-log-groups commands in order to unpack discovery request +// No attempt is made to dedupe log group names, since subscription is assumed to be idempotent. +func (d *DiscoveryRequest) ToDescribeLogInputs() (inputs []*cloudwatchlogs.DescribeLogGroupsInput) { + if d == nil { + return nil + } + + for _, pattern := range d.LogGroupNamePatterns { + inputs = append(inputs, &cloudwatchlogs.DescribeLogGroupsInput{ + LogGroupNamePattern: pattern, + Limit: d.Limit, + }) + } + + for _, prefix := range d.LogGroupNamePrefixes { + inputs = append(inputs, &cloudwatchlogs.DescribeLogGroupsInput{ + LogGroupNamePrefix: prefix, + Limit: d.Limit, + }) + } + + if len(inputs) == 0 { + // We should list all since we were provided with no log groups + // or filters. + inputs = append(inputs, &cloudwatchlogs.DescribeLogGroupsInput{ + Limit: d.Limit, + }) + } + return inputs +} diff --git a/handler/subscriber/request_test.go b/handler/subscriber/request_test.go new file mode 100644 index 00000000..ade206d9 --- /dev/null +++ b/handler/subscriber/request_test.go @@ -0,0 +1,56 @@ +package subscriber_test + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/observeinc/aws-sam-testing/handler/subscriber" +) + +func TestRequestMalformed(t *testing.T) { + t.Parallel() + + testcases := []struct { + *subscriber.Request + ExpectError error + }{ + { + Request: &subscriber.Request{}, + ExpectError: subscriber.ErrMalformedRequest, + }, + { + Request: &subscriber.Request{ + SubscriptionRequest: &subscriber.SubscriptionRequest{}, + DiscoveryRequest: &subscriber.DiscoveryRequest{}, + }, + ExpectError: subscriber.ErrMalformedRequest, + }, + { + Request: &subscriber.Request{ + SubscriptionRequest: &subscriber.SubscriptionRequest{}, + }, + ExpectError: nil, + }, + { + Request: &subscriber.Request{ + DiscoveryRequest: &subscriber.DiscoveryRequest{}, + }, + ExpectError: nil, + }, + } + + for i, tt := range testcases { + tt := tt + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Parallel() + + err := tt.Request.Validate() + if diff := cmp.Diff(tt.ExpectError, err, cmpopts.EquateErrors()); diff != "" { + t.Error(diff) + } + }) + } +} diff --git a/handler/subscriber/response.go b/handler/subscriber/response.go new file mode 100644 index 00000000..f9d3c629 --- /dev/null +++ b/handler/subscriber/response.go @@ -0,0 +1,16 @@ +package subscriber + +// Response from our handler. +type Response struct { + *SubscriptionResponse `json:"subscription"` + *DiscoveryResponse `json:"discovery"` +} + +type DiscoveryResponse struct { + // RequestCount tracks number of API requests + RequestCount int + // LogGroupCount tracks count of log groups retrieved from API + LogGroupCount int +} + +type SubscriptionResponse struct{} diff --git a/integration/scripts/check_subscriber b/integration/scripts/check_subscriber index a7181833..9af2cb0c 100755 --- a/integration/scripts/check_subscriber +++ b/integration/scripts/check_subscriber @@ -25,7 +25,7 @@ check_result() { fi } -echo '{"sync": {}}' > ${TMPFILE} +echo '{"subscribe": {}}' > ${TMPFILE} RESULT=$(aws lambda invoke \ --function-name ${FUNCTION_NAME} \ --payload fileb://${TMPFILE} ${TMPFILE} \ From 98c88c376339bb83823755c345430cac86266ba3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Taveira=20Ara=C3=BAjo?= Date: Fri, 10 Nov 2023 09:14:45 -0800 Subject: [PATCH 2/2] feat(subscriber): add configuration variables --- apps/subscriber/template.yaml | 25 +++++++++++++++++++ cmd/subscriber/main.go | 13 +++++++--- handler/subscriber/config.go | 41 +++++++++++++++++++++++++++++++ handler/subscriber/config_test.go | 34 +++++++++++++++++++++++++ 4 files changed, 110 insertions(+), 3 deletions(-) diff --git a/apps/subscriber/template.yaml b/apps/subscriber/template.yaml index 55ca0fdb..9fd4a9bc 100644 --- a/apps/subscriber/template.yaml +++ b/apps/subscriber/template.yaml @@ -19,6 +19,27 @@ Globals: MemorySize: 128 Parameters: + FilterName: + Type: String + Description: >- + Subscription filter name. + Default: 'observe-logs-subscription' + FilterPattern: + Type: String + Description: >- + Subscription filter pattern. + Default: '' + DestinationArn: + Type: String + Description: >- + Destination ARN for subscription filter. If not set, all subscription + filters matching configured filter name will be removed. + Default: '' + RoleArn: + Type: String + Description: >- + Role ARN for subscription filter. + Default: '' NameOverride: Type: String Description: >- @@ -144,6 +165,10 @@ Resources: Enabled: true Environment: Variables: + FILTER_NAME: !Ref FilterName + FILTER_PATTERN: !Ref FilterPattern + DESTINATION_ARN: !Ref DestinationArn + ROLE_ARN: !Ref RoleArn QUEUE_URL: !Ref Queue VERBOSITY: 9 Outputs: diff --git a/cmd/subscriber/main.go b/cmd/subscriber/main.go index b2b6c69d..6a4041e5 100644 --- a/cmd/subscriber/main.go +++ b/cmd/subscriber/main.go @@ -16,9 +16,12 @@ import ( ) var env struct { - QueueURL string `env:"QUEUE_URL,required"` - Verbosity int `env:"VERBOSITY,default=1"` - OtelServiceName string `env:"OTEL_SERVICE_NAME"` + FilterName string `env:"FILTER_NAME"` + FilterPattern string `env:"FILTER_PATTERN"` + DestinationARN string `env:"DESTINATION_ARN"` + RoleARN string `env:"ROLE_ARN"` + QueueURL string `env:"QUEUE_URL,required"` + Verbosity int `env:"VERBOSITY,default=1"` } var ( @@ -57,6 +60,10 @@ func realInit() error { } handler, err = subscriber.New(&subscriber.Config{ + FilterName: env.FilterName, + FilterPattern: env.FilterPattern, + DestinationARN: env.DestinationARN, + RoleARN: env.RoleARN, Logger: &logger, CloudWatchLogsClient: cloudwatchlogs.NewFromConfig(awsCfg), Queue: queue, diff --git a/handler/subscriber/config.go b/handler/subscriber/config.go index bef1d7f9..e3149996 100644 --- a/handler/subscriber/config.go +++ b/handler/subscriber/config.go @@ -2,19 +2,39 @@ package subscriber import ( "errors" + "fmt" + "strings" + "github.com/aws/aws-sdk-go-v2/aws/arn" "github.com/go-logr/logr" ) var ( ErrMissingCloudWatchLogsClient = errors.New("missing CloudWatch Logs client") ErrMissingQueue = errors.New("missing queue") + ErrMissingFilterName = errors.New("filter name must be provided if destination ARN is set") + ErrMissingDestinationARN = errors.New("destination ARN must be provided if role ARN is set") + ErrInvalidARN = errors.New("invalid ARN") ) type Config struct { CloudWatchLogsClient Queue + // FilterName for subscription filters managed by this handler + // Our handler will assume it manages all filters that have this name as a + // prefix. + FilterName string + + // FilterPattern for subscription filters + FilterPattern string + + // DestinationARN to subscribe log groups to. + // If empty, delete any subscription filters we manage. + DestinationARN string + // RoleARN for subscription filter + RoleARN string + Logger *logr.Logger } @@ -29,5 +49,26 @@ func (c *Config) Validate() error { errs = append(errs, ErrMissingQueue) } + if c.FilterName == "" && c.DestinationARN != "" { + errs = append(errs, ErrMissingFilterName) + } + + if c.DestinationARN != "" { + if _, err := arn.Parse(c.DestinationARN); err != nil { + errs = append(errs, fmt.Errorf("failed to parse destination: %w: %s", ErrInvalidARN, err)) + } + } + + if c.RoleARN != "" { + if c.DestinationARN == "" { + errs = append(errs, ErrMissingDestinationARN) + } + + roleARN, err := arn.Parse(c.RoleARN) + if err != nil || roleARN.Service != "iam" || strings.HasPrefix(roleARN.Resource, "role/") { + errs = append(errs, fmt.Errorf("failed to parse role: %w", ErrInvalidARN)) + } + } + return errors.Join(errs...) } diff --git a/handler/subscriber/config_test.go b/handler/subscriber/config_test.go index 68f5009e..a0b42175 100644 --- a/handler/subscriber/config_test.go +++ b/handler/subscriber/config_test.go @@ -25,6 +25,40 @@ func TestConfig(t *testing.T) { }, ExpectError: subscriber.ErrMissingQueue, }, + { + Config: subscriber.Config{ + CloudWatchLogsClient: &handlertest.CloudWatchLogsClient{}, + Queue: &MockQueue{}, + DestinationARN: "hello", + }, + ExpectError: subscriber.ErrMissingFilterName, + }, + { + Config: subscriber.Config{ + CloudWatchLogsClient: &handlertest.CloudWatchLogsClient{}, + Queue: &MockQueue{}, + FilterName: "observe-logs-subscription", + DestinationARN: "hello", + }, + ExpectError: subscriber.ErrInvalidARN, + }, + { + Config: subscriber.Config{ + CloudWatchLogsClient: &handlertest.CloudWatchLogsClient{}, + Queue: &MockQueue{}, + FilterName: "observe-logs-subscription", + DestinationARN: "arn:aws:lambda:us-east-2:123456789012:function:my-function", + }, + }, + { + Config: subscriber.Config{ + CloudWatchLogsClient: &handlertest.CloudWatchLogsClient{}, + Queue: &MockQueue{}, + FilterName: "observe-logs-subscription", + RoleARN: "arn:aws:lambda:us-east-2:123456789012:function:my-function", + }, + ExpectError: subscriber.ErrMissingDestinationARN, + }, { Config: subscriber.Config{ CloudWatchLogsClient: &handlertest.CloudWatchLogsClient{},