Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[main] Fix handling of custom Endpoint when using S3 + SQS #39722

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import (
"fmt"
"net/url"
"strings"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

Expand Down Expand Up @@ -48,13 +50,21 @@
return nil, fmt.Errorf("initializing AWS config: %w", err)
}

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
endpointUri, err := url.Parse(config.AWSConfig.Endpoint)
// A custom endpoint has been specified!
if err == nil && config.AWSConfig.Endpoint != "" && !strings.HasPrefix(endpointUri.Hostname(), "s3") {

// For backwards compat:
// If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint
// If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs

awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

Check failure on line 61 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)

Check failure on line 61 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
HostnameImmutable: true,
}, nil
})
}
Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ func TestRegionSelection(t *testing.T) {
regionName: "us-east-2",
want: "us-east-2",
},
{
name: "abc.xyz_and_domain_with_matching_s3_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_s3_endpoint_region_override",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
regionName: "us-west-3",
want: "us-west-3",
},
{
name: "abc.xyz_and_domain_with_matching_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
Expand Down
28 changes: 27 additions & 1 deletion x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,41 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_

func getRegionFromQueueURL(queueURL, endpoint string) string {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
u, err := url.Parse(queueURL)
if err != nil {
return ""
}

// check for sqs queue url
e, err := url.Parse(endpoint)
if err != nil {
return ""
}

// Parse a user-provided custom endpoint to see if we can get the region from it
// requires the endpoint to be in the format of https://s3.{REGION_ENDPOINT}.{ENDPOINT}
// requires the queue url to be in the format of https://sqs.{REGION_ENDPOINT}.{ENDPOINT}
// If the endpoint value matches, return the region_endpoint
host := strings.SplitN(u.Host, ".", 3)
custom_endpoint := strings.SplitN(e.Host, ".", 3)

if endpoint != "" && custom_endpoint[0] == "s3" && len(host) == 3 && len(custom_endpoint) == 3 {
// Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint
endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2]

// We cannot infer the region by matching the endpoint and queue url
if !endpointMatchesQueueUrl {
return ""
}

region := host[1]
return region
}

// check for sqs queue url
host = strings.SplitN(u.Host, ".", 3)
if len(host) == 3 && host[0] == "sqs" {
if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) {
return host[1]
Expand Down
Loading