Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.15](backport #26832) Add Proxy settings to AWS Common #27923

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,34 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add cgroups V2 support {pull}27242[27242]
- update ECS field definitions to ECS 1.11.0. {pull}27107[27107]
- The disk queue is now GA. {pull}27515[27515]
- Add istiod metricset. {pull}21519[21519]
- Release `add_cloudfoundry_metadata` as GA. {pull}21525[21525]
- Add support for OpenStack SSL metadata APIs in `add_cloud_metadata`. {pull}21590[21590]
- Add cloud.account.id for GCP into add_cloud_metadata processor. {pull}21776[21776]
- Add proxy metricset for istio module. {pull}21751[21751]
- Add kubernetes.node.hostname metadata of Kubernetes node. {pull}22189[22189]
- Enable always add_resource_metadata for Pods and Services of kubernetes autodiscovery. {pull}22189[22189]
- Add add_resource_metadata option setting (always enabled) for add_kubernetes_metadata setting. {pull}22189[22189]
- Added Kafka version 2.2 to the list of supported versions. {pull}22328[22328]
- Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439]
- Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521]
- Fix polling node when it is not ready and monitor by hostname {pull}22666[22666]
- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849]
- Update k8s client and release k8s leader lock gracefully {pull}22919[22919]
- Improve equals check. {pull}22778[22778]
- Added "detect_mime_type" processor for detecting mime types {pull}22940[22940]
- Improve event normalization performance {pull}22974[22974]
- Add tini as init system in docker images {pull}22137[22137]
- Added "add_network_direction" processor for determining perimeter-based network direction. {pull}23076[23076]
- Added new `rate_limit` processor for enforcing rate limits on event throughput. {pull}22883[22883]
- Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012]
- Add support for defining explicitly named dynamic templates without path/type match criteria {pull}25422[25422]
- Improve ES output error insights. {pull}25825[25825]
- Add orchestrator.cluster.name/url fields as k8s metadata {pull}26056[26056]
- Libbeat: report beat version to monitoring. {pull}26214[26214]
- Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219]
- `add_process_metadata` processor enrich process information with owner name and id. {issue}21068[21068] {pull}21111[21111]
- Add proxy support for AWS functions. {pull}26832[26832]

*Auditbeat*

Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) {
func (in *s3Input) Name() string { return inputName }

func (in *s3Input) Test(ctx v2.TestContext) error {
<<<<<<< HEAD
=======
_, err := awscommon.InitializeAWSConfig(in.config.AWSConfig)
if err != nil {
return fmt.Errorf("InitializeAWSConfig failed: %w", err)
}
>>>>>>> 94af9dfced (Add Proxy settings to AWS Common (#26832))
return nil
}

Expand Down Expand Up @@ -138,6 +145,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
}

<<<<<<< HEAD
if in.config.BucketARN != "" {
// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
Expand Down Expand Up @@ -166,6 +174,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
longPollWaitTime: in.config.SQSWaitTime,
=======
awsConfig, err := awscommon.InitializeAWSConfig(in.config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("InitializeAWSConfig failed: %w", err)
>>>>>>> 94af9dfced (Add Proxy settings to AWS Common (#26832))
}

s3API := &awsS3API{
Expand Down
275 changes: 275 additions & 0 deletions x-pack/filebeat/input/awss3/s3_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build integration
// +build aws

package awss3

import (
"context"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface"
"github.com/stretchr/testify/assert"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
"github.com/elastic/beats/v7/libbeat/tests/resources"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/go-concert/unison"
)

const (
fileName1 = "sample1.txt"
fileName2 = "sample2.txt"
visibilityTimeout = 300 * time.Second
)

// GetConfigForTest function gets aws credentials for integration tests.
func getConfigForTest(t *testing.T) config {
t.Helper()

awsConfig := awscommon.ConfigAWS{}
queueURL := os.Getenv("QUEUE_URL")
profileName := os.Getenv("AWS_PROFILE_NAME")
accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
sessionToken := os.Getenv("AWS_SESSION_TOKEN")

config := config{
VisibilityTimeout: visibilityTimeout,
}
switch {
case queueURL == "":
t.Fatal("$QUEUE_URL is not set in environment")
case profileName == "" && accessKeyID == "":
t.Fatal("$AWS_ACCESS_KEY_ID or $AWS_PROFILE_NAME not set or set to empty")
case profileName != "":
awsConfig.ProfileName = profileName
config.QueueURL = queueURL
config.AWSConfig = awsConfig
return config
case secretAccessKey == "":
t.Fatal("$AWS_SECRET_ACCESS_KEY not set or set to empty")
}

awsConfig.AccessKeyID = accessKeyID
awsConfig.SecretAccessKey = secretAccessKey
if sessionToken != "" {
awsConfig.SessionToken = sessionToken
}
config.AWSConfig = awsConfig
return config
}

func defaultTestConfig() *common.Config {
return common.MustNewConfigFrom(common.MapStr{
"queue_url": os.Getenv("QUEUE_URL"),
"file_selectors": []common.MapStr{
{
"regex": strings.Replace(fileName1, ".", "\\.", -1),
"max_bytes": 4096,
},
{
"regex": strings.Replace(fileName2, ".", "\\.", -1),
"max_bytes": 4096,
"parsers": []common.MapStr{
{
"multiline": common.MapStr{
"pattern": "^<Event",
"negate": true,
"match": "after",
},
},
},
},
},
})
}

func newV2Context() (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
return v2.Context{
Logger: logp.NewLogger("s3_test"),
ID: "test_id",
Cancelation: ctx,
}, cancel
}

func setupInput(t *testing.T, cfg *common.Config) (*s3Collector, chan beat.Event) {
inp, err := Plugin().Manager.Create(cfg)
if err != nil {
t.Fatal(err)
}

ctx, cancel := newV2Context()
t.Cleanup(cancel)

client := pubtest.NewChanClient(0)
pipeline := pubtest.ConstClient(client)
collector, err := inp.(*s3Input).createCollector(ctx, pipeline)
if err != nil {
t.Fatal(err)
}
return collector, client.Channel
}

func setupCollector(t *testing.T, cfg *common.Config, mock bool) (*s3Collector, chan beat.Event) {
collector, receiver := setupInput(t, cfg)
if mock {
svcS3 := &MockS3Client{}
svcSQS := &MockSQSClient{}
collector.sqs = svcSQS
collector.s3 = svcS3
return collector, receiver
}

config := getConfigForTest(t)
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
t.Fatal("failed InitializeAWSConfig with AWS Config: ", err)
}

s3BucketRegion := os.Getenv("S3_BUCKET_REGION")
if s3BucketRegion == "" {
t.Log("S3_BUCKET_REGION is not set, default to us-west-1")
s3BucketRegion = "us-west-1"
}
awsConfig.Region = s3BucketRegion
awsConfig = awsConfig.Copy()
collector.sqs = sqs.New(awsConfig)
collector.s3 = s3.New(awsConfig)
return collector, receiver
}

func runTest(t *testing.T, cfg *common.Config, mock bool, run func(t *testing.T, collector *s3Collector, receiver chan beat.Event)) {
collector, receiver := setupCollector(t, cfg, mock)
run(t, collector, receiver)
}

func TestS3Input(t *testing.T) {
runTest(t, defaultTestConfig(), false, func(t *testing.T, collector *s3Collector, receiver chan beat.Event) {
// upload a sample log file for testing
s3BucketNameEnv := os.Getenv("S3_BUCKET_NAME")
if s3BucketNameEnv == "" {
t.Fatal("failed to get S3_BUCKET_NAME")
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
collector.run()
}()

for i := 0; i < 4; i++ {
event := <-receiver
bucketName, err := event.GetValue("aws.s3.bucket.name")
assert.NoError(t, err)
assert.Equal(t, s3BucketNameEnv, bucketName)

objectKey, err := event.GetValue("aws.s3.object.key")
assert.NoError(t, err)

switch objectKey {
case fileName1:
message, err := event.GetValue("message")
assert.NoError(t, err)
assert.Contains(t, message, "logline")
case fileName2:
message, err := event.GetValue("message")
assert.NoError(t, err)
assert.Contains(t, message, "<Event>")
assert.Contains(t, message, "</Event>")
default:
t.Fatalf("object key %s is unknown", objectKey)
}
}
})
}

// MockSQSClient struct is used for unit tests.
type MockSQSClient struct {
sqsiface.ClientAPI
}

var (
sqsMessageTest = "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\"," +
"\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\"," +
"\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\"," +
"\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54\"}}}]}"
)

func (m *MockSQSClient) ReceiveMessageRequest(input *sqs.ReceiveMessageInput) sqs.ReceiveMessageRequest {
httpReq, _ := http.NewRequest("", "", nil)
return sqs.ReceiveMessageRequest{
Request: &awssdk.Request{
Data: &sqs.ReceiveMessageOutput{
Messages: []sqs.Message{
{Body: awssdk.String(sqsMessageTest)},
},
},
HTTPRequest: httpReq,
},
}
}

func (m *MockSQSClient) DeleteMessageRequest(input *sqs.DeleteMessageInput) sqs.DeleteMessageRequest {
httpReq, _ := http.NewRequest("", "", nil)
return sqs.DeleteMessageRequest{
Request: &awssdk.Request{
Data: &sqs.DeleteMessageOutput{},
HTTPRequest: httpReq,
},
}
}

func (m *MockSQSClient) ChangeMessageVisibilityRequest(input *sqs.ChangeMessageVisibilityInput) sqs.ChangeMessageVisibilityRequest {
httpReq, _ := http.NewRequest("", "", nil)
return sqs.ChangeMessageVisibilityRequest{
Request: &awssdk.Request{
Data: &sqs.ChangeMessageVisibilityOutput{},
HTTPRequest: httpReq,
},
}
}

func TestMockS3Input(t *testing.T) {
defer resources.NewGoroutinesChecker().Check(t)
cfg := common.MustNewConfigFrom(map[string]interface{}{
"queue_url": "https://sqs.ap-southeast-1.amazonaws.com/123456/test",
})

runTest(t, cfg, true, func(t *testing.T, collector *s3Collector, receiver chan beat.Event) {
defer collector.cancellation.Done()
defer collector.publisher.Close()

output, err := collector.receiveMessage(collector.sqs, collector.visibilityTimeout)
assert.NoError(t, err)

var grp unison.MultiErrGroup
errC := make(chan error)
defer close(errC)
grp.Go(func() (err error) {
return collector.processMessage(collector.s3, output.Messages[0], errC)
})

event := <-receiver
bucketName, err := event.GetValue("aws.s3.bucket.name")
assert.NoError(t, err)
assert.Equal(t, "test-s3-ks-2", bucketName)
})
}
4 changes: 4 additions & 0 deletions x-pack/libbeat/docs/aws-credentials-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ Some services, such as IAM, do not support regions. The endpoints for these
services do not include a region. In `aws` module, `endpoint` config is to set
the `endpoint-code` part, such as `amazonaws.com`, `amazonaws.com.cn`, `c2s.ic.gov`,
`sc2s.sgov.gov`.
<<<<<<< HEAD
* *proxy_url*: URL of the proxy to use to connect to AWS web services. The syntax is `http(s)://<IP/Hostname>:<port>`
=======
* *proxy_url*: URL of the proxy to use to connect to AWS web services. The syntax is http(s)://<IP/Hostname>:<port>
>>>>>>> 94af9dfced (Add Proxy settings to AWS Common (#26832))

[float]
==== Supported Formats
Expand Down