From f4b7bdefe49fa134846b29cdf68cec26c39fea3c Mon Sep 17 00:00:00 2001 From: Alex Resnick Date: Tue, 27 Jul 2021 09:08:36 -0500 Subject: [PATCH] Add Proxy settings to AWS Common (#26832) (cherry picked from commit 94af9dfcede531c0e5b1f55375973d2cd7c4f900) # Conflicts: # x-pack/filebeat/input/awss3/input.go # x-pack/filebeat/input/awss3/s3_integration_test.go # x-pack/libbeat/docs/aws-credentials-config.asciidoc --- CHANGELOG.next.asciidoc | 28 ++ x-pack/filebeat/input/awss3/input.go | 13 + .../input/awss3/s3_integration_test.go | 275 ++++++++++++++++++ .../docs/aws-credentials-config.asciidoc | 4 + 4 files changed, 320 insertions(+) create mode 100644 x-pack/filebeat/input/awss3/s3_integration_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b432637aa1d..6ff6fb2654e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -318,6 +318,34 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add cgroups V2 support {pull}27242[27242] - update ECS field definitions to ECS 1.11.0. {pull}27107[27107] - The disk queue is now GA. {pull}27515[27515] +- Add istiod metricset. {pull}21519[21519] +- Release `add_cloudfoundry_metadata` as GA. {pull}21525[21525] +- Add support for OpenStack SSL metadata APIs in `add_cloud_metadata`. {pull}21590[21590] +- Add cloud.account.id for GCP into add_cloud_metadata processor. {pull}21776[21776] +- Add proxy metricset for istio module. {pull}21751[21751] +- Add kubernetes.node.hostname metadata of Kubernetes node. {pull}22189[22189] +- Enable always add_resource_metadata for Pods and Services of kubernetes autodiscovery. {pull}22189[22189] +- Add add_resource_metadata option setting (always enabled) for add_kubernetes_metadata setting. {pull}22189[22189] +- Added Kafka version 2.2 to the list of supported versions. {pull}22328[22328] +- Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439] +- Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521] +- Fix polling node when it is not ready and monitor by hostname {pull}22666[22666] +- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849] +- Update k8s client and release k8s leader lock gracefully {pull}22919[22919] +- Improve equals check. {pull}22778[22778] +- Added "detect_mime_type" processor for detecting mime types {pull}22940[22940] +- Improve event normalization performance {pull}22974[22974] +- Add tini as init system in docker images {pull}22137[22137] +- Added "add_network_direction" processor for determining perimeter-based network direction. {pull}23076[23076] +- Added new `rate_limit` processor for enforcing rate limits on event throughput. {pull}22883[22883] +- Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012] +- Add support for defining explicitly named dynamic templates without path/type match criteria {pull}25422[25422] +- Improve ES output error insights. {pull}25825[25825] +- Add orchestrator.cluster.name/url fields as k8s metadata {pull}26056[26056] +- Libbeat: report beat version to monitoring. {pull}26214[26214] +- Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219] +- `add_process_metadata` processor enrich process information with owner name and id. {issue}21068[21068] {pull}21111[21111] +- Add proxy support for AWS functions. {pull}26832[26832] *Auditbeat* diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 545c24db322..d206a15abac 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -77,6 +77,13 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) { func (in *s3Input) Name() string { return inputName } func (in *s3Input) Test(ctx v2.TestContext) error { +<<<<<<< HEAD +======= + _, err := awscommon.InitializeAWSConfig(in.config.AWSConfig) + if err != nil { + return fmt.Errorf("InitializeAWSConfig failed: %w", err) + } +>>>>>>> 94af9dfced (Add Proxy settings to AWS Common (#26832)) return nil } @@ -138,6 +145,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } } +<<<<<<< HEAD if in.config.BucketARN != "" { // Create S3 receiver and S3 notification processor. poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states) @@ -166,6 +174,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe apiTimeout: in.config.APITimeout, visibilityTimeout: in.config.VisibilityTimeout, longPollWaitTime: in.config.SQSWaitTime, +======= + awsConfig, err := awscommon.InitializeAWSConfig(in.config.AWSConfig) + if err != nil { + return nil, fmt.Errorf("InitializeAWSConfig failed: %w", err) +>>>>>>> 94af9dfced (Add Proxy settings to AWS Common (#26832)) } s3API := &awsS3API{ diff --git a/x-pack/filebeat/input/awss3/s3_integration_test.go b/x-pack/filebeat/input/awss3/s3_integration_test.go new file mode 100644 index 00000000000..59c3f1e1948 --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_integration_test.go @@ -0,0 +1,275 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build integration +// +build aws + +package awss3 + +import ( + "context" + "net/http" + "os" + "strings" + "sync" + "testing" + "time" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface" + "github.com/stretchr/testify/assert" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/tests/resources" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/elastic/go-concert/unison" +) + +const ( + fileName1 = "sample1.txt" + fileName2 = "sample2.txt" + visibilityTimeout = 300 * time.Second +) + +// GetConfigForTest function gets aws credentials for integration tests. +func getConfigForTest(t *testing.T) config { + t.Helper() + + awsConfig := awscommon.ConfigAWS{} + queueURL := os.Getenv("QUEUE_URL") + profileName := os.Getenv("AWS_PROFILE_NAME") + accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") + secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + sessionToken := os.Getenv("AWS_SESSION_TOKEN") + + config := config{ + VisibilityTimeout: visibilityTimeout, + } + switch { + case queueURL == "": + t.Fatal("$QUEUE_URL is not set in environment") + case profileName == "" && accessKeyID == "": + t.Fatal("$AWS_ACCESS_KEY_ID or $AWS_PROFILE_NAME not set or set to empty") + case profileName != "": + awsConfig.ProfileName = profileName + config.QueueURL = queueURL + config.AWSConfig = awsConfig + return config + case secretAccessKey == "": + t.Fatal("$AWS_SECRET_ACCESS_KEY not set or set to empty") + } + + awsConfig.AccessKeyID = accessKeyID + awsConfig.SecretAccessKey = secretAccessKey + if sessionToken != "" { + awsConfig.SessionToken = sessionToken + } + config.AWSConfig = awsConfig + return config +} + +func defaultTestConfig() *common.Config { + return common.MustNewConfigFrom(common.MapStr{ + "queue_url": os.Getenv("QUEUE_URL"), + "file_selectors": []common.MapStr{ + { + "regex": strings.Replace(fileName1, ".", "\\.", -1), + "max_bytes": 4096, + }, + { + "regex": strings.Replace(fileName2, ".", "\\.", -1), + "max_bytes": 4096, + "parsers": []common.MapStr{ + { + "multiline": common.MapStr{ + "pattern": "^") + assert.Contains(t, message, "") + default: + t.Fatalf("object key %s is unknown", objectKey) + } + } + }) +} + +// MockSQSClient struct is used for unit tests. +type MockSQSClient struct { + sqsiface.ClientAPI +} + +var ( + sqsMessageTest = "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\"," + + "\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\"," + + "\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\"," + + "\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54\"}}}]}" +) + +func (m *MockSQSClient) ReceiveMessageRequest(input *sqs.ReceiveMessageInput) sqs.ReceiveMessageRequest { + httpReq, _ := http.NewRequest("", "", nil) + return sqs.ReceiveMessageRequest{ + Request: &awssdk.Request{ + Data: &sqs.ReceiveMessageOutput{ + Messages: []sqs.Message{ + {Body: awssdk.String(sqsMessageTest)}, + }, + }, + HTTPRequest: httpReq, + }, + } +} + +func (m *MockSQSClient) DeleteMessageRequest(input *sqs.DeleteMessageInput) sqs.DeleteMessageRequest { + httpReq, _ := http.NewRequest("", "", nil) + return sqs.DeleteMessageRequest{ + Request: &awssdk.Request{ + Data: &sqs.DeleteMessageOutput{}, + HTTPRequest: httpReq, + }, + } +} + +func (m *MockSQSClient) ChangeMessageVisibilityRequest(input *sqs.ChangeMessageVisibilityInput) sqs.ChangeMessageVisibilityRequest { + httpReq, _ := http.NewRequest("", "", nil) + return sqs.ChangeMessageVisibilityRequest{ + Request: &awssdk.Request{ + Data: &sqs.ChangeMessageVisibilityOutput{}, + HTTPRequest: httpReq, + }, + } +} + +func TestMockS3Input(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "queue_url": "https://sqs.ap-southeast-1.amazonaws.com/123456/test", + }) + + runTest(t, cfg, true, func(t *testing.T, collector *s3Collector, receiver chan beat.Event) { + defer collector.cancellation.Done() + defer collector.publisher.Close() + + output, err := collector.receiveMessage(collector.sqs, collector.visibilityTimeout) + assert.NoError(t, err) + + var grp unison.MultiErrGroup + errC := make(chan error) + defer close(errC) + grp.Go(func() (err error) { + return collector.processMessage(collector.s3, output.Messages[0], errC) + }) + + event := <-receiver + bucketName, err := event.GetValue("aws.s3.bucket.name") + assert.NoError(t, err) + assert.Equal(t, "test-s3-ks-2", bucketName) + }) +} diff --git a/x-pack/libbeat/docs/aws-credentials-config.asciidoc b/x-pack/libbeat/docs/aws-credentials-config.asciidoc index a1cefd40bf7..f14e872d910 100644 --- a/x-pack/libbeat/docs/aws-credentials-config.asciidoc +++ b/x-pack/libbeat/docs/aws-credentials-config.asciidoc @@ -17,7 +17,11 @@ Some services, such as IAM, do not support regions. The endpoints for these services do not include a region. In `aws` module, `endpoint` config is to set the `endpoint-code` part, such as `amazonaws.com`, `amazonaws.com.cn`, `c2s.ic.gov`, `sc2s.sgov.gov`. +<<<<<<< HEAD * *proxy_url*: URL of the proxy to use to connect to AWS web services. The syntax is `http(s)://:` +======= +* *proxy_url*: URL of the proxy to use to connect to AWS web services. The syntax is http(s)://: +>>>>>>> 94af9dfced (Add Proxy settings to AWS Common (#26832)) [float] ==== Supported Formats