From 6362dae7a8f309499d807931fd3c87c5ac100ef8 Mon Sep 17 00:00:00 2001 From: William Easton Date: Fri, 24 May 2024 08:20:10 -0500 Subject: [PATCH] Sample implementation of SQS Endpoint support on main --- x-pack/filebeat/input/awss3/input.go | 20 ++++++++++++---- x-pack/filebeat/input/awss3/input_test.go | 13 +++++++++++ x-pack/filebeat/input/awss3/sqs.go | 28 ++++++++++++++++++++++- 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index f0fa3137974..484a35b1a76 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -6,6 +6,8 @@ package awss3 import ( "fmt" + "net/url" + "strings" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -48,13 +50,21 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) { return nil, fmt.Errorf("initializing AWS config: %w", err) } - if config.AWSConfig.Endpoint != "" { - // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint + endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + // A custom endpoint has been specified! + if err == nil && config.AWSConfig.Endpoint != "" && !strings.HasPrefix(endpointUri.Hostname(), "s3") { + + // For backwards compat: + // If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint + // If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { return awssdk.Endpoint{ - PartitionID: "aws", - URL: config.AWSConfig.Endpoint, - SigningRegion: awsConfig.Region, + PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, + URL: config.AWSConfig.Endpoint, + SigningRegion: awsConfig.Region, + HostnameImmutable: true, }, nil }) } diff --git a/x-pack/filebeat/input/awss3/input_test.go b/x-pack/filebeat/input/awss3/input_test.go index 432bd360bfc..c9f9b91a5a8 100644 --- a/x-pack/filebeat/input/awss3/input_test.go +++ b/x-pack/filebeat/input/awss3/input_test.go @@ -75,6 +75,19 @@ func TestRegionSelection(t *testing.T) { regionName: "us-east-2", want: "us-east-2", }, + { + name: "abc.xyz_and_domain_with_matching_s3_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, + { + name: "abc.xyz_and_domain_with_matching_s3_endpoint_region_override", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + regionName: "us-west-3", + want: "us-west-3", + }, { name: "abc.xyz_and_domain_with_matching_endpoint", queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 36985f73720..4663623307f 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -33,6 +33,7 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ func getRegionFromQueueURL(queueURL, endpoint string) string { // get region from queueURL + // Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs // Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs // Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue u, err := url.Parse(queueURL) @@ -40,8 +41,33 @@ func getRegionFromQueueURL(queueURL, endpoint string) string { return "" } - // check for sqs queue url + e, err := url.Parse(endpoint) + if err != nil { + return "" + } + + // Parse a user-provided custom endpoint to see if we can get the region from it + // requires the endpoint to be in the format of https://s3.{REGION_ENDPOINT}.{ENDPOINT} + // requires the queue url to be in the format of https://sqs.{REGION_ENDPOINT}.{ENDPOINT} + // If the endpoint value matches, return the region_endpoint host := strings.SplitN(u.Host, ".", 3) + custom_endpoint := strings.SplitN(e.Host, ".", 3) + + if endpoint != "" && custom_endpoint[0] == "s3" && len(host) == 3 && len(custom_endpoint) == 3 { + // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint + endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2] + + // We cannot infer the region by matching the endpoint and queue url + if !endpointMatchesQueueUrl { + return "" + } + + region := host[1] + return region + } + + // check for sqs queue url + host = strings.SplitN(u.Host, ".", 3) if len(host) == 3 && host[0] == "sqs" { if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) { return host[1]