From a30a6e5059632f5f659543479d06db6de0750455 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Tue, 25 Jun 2024 11:14:39 +0200 Subject: [PATCH 01/11] Adding code to OOTB support AWS Eventbridge generated events for S3 changes, see https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html --- metricbeat/include/list_init.go | 3 +- x-pack/filebeat/input/awss3/sqs_s3_event.go | 71 ++++++++++++++++++- .../filebeat/input/awss3/sqs_s3_event_test.go | 10 +++ x-pack/filebeat/input/awss3/sqs_test.go | 24 +++++++ 4 files changed, 105 insertions(+), 3 deletions(-) diff --git a/metricbeat/include/list_init.go b/metricbeat/include/list_init.go index 037e00b2891..fbfa58a65c5 100644 --- a/metricbeat/include/list_init.go +++ b/metricbeat/include/list_init.go @@ -21,10 +21,9 @@ package include import ( // Import packages to perform 'func InitializeModule()' when in-use. - m0 "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" m1 "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" + m0 "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" m2 "github.com/elastic/beats/v7/metricbeat/processor/add_kubernetes_metadata" - // Import packages that perform 'func init()'. ) diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index db893e443ac..b3c334810e2 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -44,7 +44,7 @@ func (e *nonRetryableError) Error() string { } func (e *nonRetryableError) Is(err error) bool { - _, ok := err.(*nonRetryableError) //nolint:errorlint // This is not used directly to detected wrapped errors (errors.Is handles unwrapping). + _, ok := err.(*nonRetryableError) // This is not used directly to detected wrapped errors (errors.Is handles unwrapping). return ok } @@ -83,6 +83,43 @@ type s3EventV2 struct { } `json:"s3"` } +// eventBridgeEvents is 'Object Created' payload generated by AWS EventBridge +// At the moment it doesn't seem to have a version +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html +type eventBridgeEvents struct { + Messages []eventBridgeEvent `json:"messages"` +} + +// Object created event. +type eventBridgeEvent struct { + Version string `json:"version"` + Id string `json:"id"` + DetailType string `json:"detail-type"` + Source string `json:"source"` + Account string `json:"account"` + Time string `json:"time"` + Region string `json:"region"` + Resources []string `json:"resources"` + Detail struct { + Version string `json:"version"` + Bucket struct { + Name string `json:"name"` + } + Object struct { + Key string `json:"key"` + Size int `json:"size"` + Etag string `json:"etag"` + VersionId string `json:"version-id"` + Sequencer string `json:"sequencer"` + } + RequestId string `json:"request-id"` + Requester string `json:"requester"` + SourceIpAddress string `json:"source-ip-address"` + Reason string `json:"reason"` + } +} + type sqsS3EventProcessor struct { s3ObjectHandler s3ObjectHandlerFactory sqsVisibilityTimeout time.Duration @@ -252,6 +289,17 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro } } + // Check if the notification is from S3 -> EventBridge -> SQS + var eventBridgeEvents eventBridgeEvents + if events.Records == nil { + dec := json.NewDecoder(strings.NewReader(events.Message)) + if err := dec.Decode(&eventBridgeEvents); err != nil { + p.log.Debugw("Could not parse message as EventBridge payload", "sqs_message_body", body) + } else { + convertEventBridge(&eventBridgeEvents, &events) + } + } + if events.Records == nil { p.log.Debugw("Invalid SQS message body: missing Records field", "sqs_message_body", body) return nil, errors.New("the message is an invalid S3 notification: missing Records field") @@ -260,6 +308,27 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro return p.getS3Info(events) } +func convertEventBridge(eventBridgeEvents *eventBridgeEvents, s3Events *s3EventsV2) { + for i := range eventBridgeEvents.Messages { + s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(&eventBridgeEvents.Messages[i])) + } +} + +func convertEventBridgeEvent(message *eventBridgeEvent) s3EventV2 { + var event = s3EventV2{} + if message.DetailType == "Object Created" { + event.SetEventName("ObjectCreated:Put") + } + event.SetS3BucketARN(message.Resources[0]) + event.SetAWSRegion(message.Region) + if message.Source == "aws.s3" { + event.SetEventSource("aws:s3") + } + event.SetS3BucketName(message.Detail.Bucket.Name) + event.SetS3ObjectKey(message.Detail.Object.Key) + return event +} + func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) { out := make([]s3EventV2, 0, len(events.Records)) for _, record := range events.Records { diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 65552525136..e39b23bb48a 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -270,6 +270,16 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) }) + t.Run("EventBridge-sqs notification", func(t *testing.T) { + msg := newEventBridgeSQSMessage() + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, "test-object-key", events[0].S3.Object.Key) + assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN) + assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) + }) + t.Run("missing Records fail", func(t *testing.T) { msg := `{"message":"missing records"}` _, err := p.getS3Notifications(msg) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index cf82f03c6de..c8dbc02c091 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -9,9 +9,12 @@ import ( "crypto/sha256" "encoding/json" "errors" + "github.com/aws/aws-sdk-go/service/sqs" "testing" "time" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/gofrs/uuid" @@ -221,6 +224,27 @@ func newSNSSQSMessage() types.Message { } } +func newEventBridgeSQSMessage() sqs.Message { + body, err := json.Marshal(s3EventsV2{ + Message: "{\"Messages\":[{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}]}", + }) + if err != nil { + panic(err) + } + + hash := sha256.Sum256(body) + id, _ := uuid.FromBytes(hash[:16]) + messageID := id.String() + receipt := "receipt-" + messageID + bodyStr := string(body) + + return sqs.Message{ + Body: &bodyStr, + MessageId: &messageID, + ReceiptHandle: &receipt, + } +} + func newS3Event(key string) s3EventV2 { record := s3EventV2{ AWSRegion: "us-east-1", From 5c22b133b4da31e0bb19470379cbaf41ef3bf5de Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Tue, 25 Jun 2024 11:14:39 +0200 Subject: [PATCH 02/11] running make update and make fmt --- NOTICE.txt | 424 ++++++++++----------- filebeat/tests/system/test_autodiscover.py | 20 +- go.mod | 2 +- x-pack/filebeat/input/awss3/sqs_test.go | 1 - 4 files changed, 215 insertions(+), 232 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 3461b42f866..8cf1cf6042c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5042,6 +5042,218 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-lambda-go@v1.44 +-------------------------------------------------------------------------------- +Dependency : github.com/aws/aws-sdk-go +Version: v1.38.60 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go@v1.38.60/LICENSE.txt: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2 Version: v1.26.1 @@ -33474,218 +33686,6 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. --------------------------------------------------------------------------------- -Dependency : github.com/aws/aws-sdk-go -Version: v1.38.60 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go@v1.38.60/LICENSE.txt: - - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream Version: v1.6.2 diff --git a/filebeat/tests/system/test_autodiscover.py b/filebeat/tests/system/test_autodiscover.py index 8f89f21b374..ba05dc601cb 100644 --- a/filebeat/tests/system/test_autodiscover.py +++ b/filebeat/tests/system/test_autodiscover.py @@ -24,24 +24,8 @@ def test_docker(self): autodiscover={ 'docker': { 'cleanup_timeout': '0s', - 'templates': f''' - - condition: - equals.docker.container.name: {container.name} - config: - - type: log - paths: - - %s/${{data.docker.container.name}}.log - ''' % self.working_dir, - }, - }, - ) - - proc = self.start_beat() - self._test(container) - - self.wait_until(lambda: self.log_contains('Stopping runner: input')) - proc.check_kill_and_wait() - + 'templates': f''' - condition: + equals.docker.container.name: {container.name} config: - type: log paths: - %s/${{data.docker.container.name}}.log ''' % self.working_dir, }, }, ) proc = self.start_beat() self._test(container) self.wait_until(lambda: self.log_contains('Stopping runner: input')) proc.check_kill_and_wait() @unittest.skipIf(not INTEGRATION_TESTS or os.getenv("TESTING_ENVIRONMENT") == "2x", "integration test not available on 2.x") diff --git a/go.mod b/go.mod index 84203986bf0..3852dd1c4ca 100644 --- a/go.mod +++ b/go.mod @@ -191,6 +191,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 github.com/Azure/go-autorest/autorest/adal v0.9.21 github.com/apache/arrow/go/v14 v14.0.2 + github.com/aws/aws-sdk-go v1.38.60 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 github.com/aws/aws-sdk-go-v2/service/cloudformation v1.50.0 @@ -256,7 +257,6 @@ require ( github.com/apache/arrow/go/v12 v12.0.0 // indirect github.com/apache/thrift v0.19.0 // indirect github.com/armon/go-radix v1.0.0 // indirect - github.com/aws/aws-sdk-go v1.38.60 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index c8dbc02c091..9612b0a9195 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -9,7 +9,6 @@ import ( "crypto/sha256" "encoding/json" "errors" - "github.com/aws/aws-sdk-go/service/sqs" "testing" "time" From 74fea52e945168a7e676ac0c06282996cee88243 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Wed, 26 Jun 2024 09:47:44 +0200 Subject: [PATCH 03/11] updating test code to return error and call testing.T NoError + fixing warnings: unused vars, name collision --- .../input/awss3/input_benchmark_test.go | 47 ++++++++++--------- .../filebeat/input/awss3/sqs_s3_event_test.go | 36 ++++++++------ x-pack/filebeat/input/awss3/sqs_test.go | 29 +++++++----- 3 files changed, 65 insertions(+), 47 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 09b7c8bd9d2..ecc8fd55e3b 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -46,27 +46,29 @@ type constantSQS struct { var _ sqsAPI = (*constantSQS)(nil) -func newConstantSQS() *constantSQS { - return &constantSQS{ - msgs: []sqsTypes.Message{ - newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))), - }, +func newConstantSQS() (*constantSQS, error) { + event, err := newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))) + if err != nil { + return nil, err } + return &constantSQS{ + msgs: []sqsTypes.Message{event}, + }, nil } -func (c *constantSQS) ReceiveMessage(ctx context.Context, maxMessages int) ([]sqsTypes.Message, error) { +func (c *constantSQS) ReceiveMessage(context.Context, int) ([]sqsTypes.Message, error) { return c.msgs, nil } -func (*constantSQS) DeleteMessage(ctx context.Context, msg *sqsTypes.Message) error { +func (*constantSQS) DeleteMessage(context.Context, *sqsTypes.Message) error { return nil } -func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.Message, timeout time.Duration) error { +func (*constantSQS) ChangeMessageVisibility(context.Context, *sqsTypes.Message, time.Duration) error { return nil } -func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) { +func (c *constantSQS) GetQueueAttributes(context.Context, []sqsTypes.QueueAttributeName) (map[string]string, error) { return map[string]string{}, nil } @@ -84,7 +86,7 @@ func (c *s3PagerConstant) HasMorePages() bool { return c.currentIndex < len(c.objects) } -func (c *s3PagerConstant) NextPage(ctx context.Context, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { +func (c *s3PagerConstant) NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { if !c.HasMorePages() { return nil, errors.New("no more pages") } @@ -143,19 +145,19 @@ func newConstantS3(t testing.TB) *constantS3 { } } -func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { +func (c constantS3) GetObject(context.Context, string, string) (*s3.GetObjectOutput, error) { return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } -func (c constantS3) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { +func (c constantS3) CopyObject(context.Context, string, string, string, string) (*s3.CopyObjectOutput, error) { return nil, nil } -func (c constantS3) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { +func (c constantS3) DeleteObject(context.Context, string, string) (*s3.DeleteObjectOutput, error) { return nil, nil } -func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager { +func (c constantS3) ListObjectsPaginator(string, string) s3Pager { return c.pagerConstant } @@ -164,7 +166,7 @@ var _ beat.Pipeline = (*fakePipeline)(nil) // fakePipeline returns new ackClients. type fakePipeline struct{} -func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) { +func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { return &ackClient{}, nil } @@ -211,12 +213,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR var err error pipeline := &fakePipeline{} - conf := makeBenchmarkConfig(t) - conf.MaxNumberOfMessages = maxMessagesInflight - sqsReader := newSQSReaderInput(conf, aws.Config{}) + config := makeBenchmarkConfig(t) + config.MaxNumberOfMessages = maxMessagesInflight + sqsReader := newSQSReaderInput(config, aws.Config{}) sqsReader.log = log.Named("sqs") sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), maxMessagesInflight) - sqsReader.sqs = newConstantSQS() + sqsReader.sqs, err = newConstantSQS() + require.NoError(t, err) sqsReader.s3 = newConstantS3(t) sqsReader.msgHandler, err = sqsReader.createEventProcessor(pipeline) require.NoError(t, err, "createEventProcessor must succeed") @@ -252,7 +255,8 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR } func TestBenchmarkInputSQS(t *testing.T) { - logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + require.NoError(t, err) results := []testing.BenchmarkResult{ benchmarkInputSQS(t, 1), @@ -388,7 +392,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult } func TestBenchmarkInputS3(t *testing.T) { - logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + require.NoError(t, err) results := []testing.BenchmarkResult{ benchmarkInputS3(t, 1), diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index e39b23bb48a..92401fe45ee 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -28,9 +28,10 @@ import ( ) func TestSQSS3EventProcessor(t *testing.T) { - logp.TestingSetup() - - msg := newSQSMessage(newS3Event("log.json")) + err := logp.TestingSetup() + require.NoError(t, err) + msg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) t.Run("s3 events are processed and sqs msg is deleted", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) @@ -64,7 +65,9 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockBeatPipeline := NewMockBeatPipeline(ctrl) - invalidBodyMsg := newSQSMessage(newS3Event("log.json")) + invalidBodyMsg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) + body := *invalidBodyMsg.Body body = body[10:] invalidBodyMsg.Body = &body @@ -74,7 +77,7 @@ func TestSQSS3EventProcessor(t *testing.T) { ) p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - err := p.ProcessSQS(ctx, &invalidBodyMsg) + err = p.ProcessSQS(ctx, &invalidBodyMsg) require.Error(t, err) t.Log(err) }) @@ -89,7 +92,8 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockBeatPipeline := NewMockBeatPipeline(ctrl) - emptyRecordsMsg := newSQSMessage([]s3EventV2{}...) + emptyRecordsMsg, err := newSQSMessage([]s3EventV2{}...) + require.NoError(t, err) gomock.InOrder( mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), @@ -189,7 +193,8 @@ func TestSQSS3EventProcessor(t *testing.T) { } func TestSqsProcessor_keepalive(t *testing.T) { - msg := newSQSMessage(newS3Event("log.json")) + msg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) // Ensure both ReceiptHandleIsInvalid and InvalidParameterValue error codes trigger stops. // See https://github.com/elastic/beats/issues/30675. @@ -237,13 +242,14 @@ func TestSqsProcessor_keepalive(t *testing.T) { } func TestSqsProcessor_getS3Notifications(t *testing.T) { - logp.TestingSetup() + err := logp.TestingSetup() + require.NoError(t, err) p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil) t.Run("s3 key is url unescaped", func(t *testing.T) { - msg := newSQSMessage(newS3Event("Happy+Face.jpg")) - + msg, err := newSQSMessage(newS3Event("Happy+Face.jpg")) + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 1) @@ -253,15 +259,16 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { t.Run("non-ObjectCreated event types are ignored", func(t *testing.T) { event := newS3Event("HappyFace.jpg") event.EventName = "ObjectRemoved:Delete" - msg := newSQSMessage(event) - + msg, err := newSQSMessage(event) + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 0) }) t.Run("sns-sqs notification", func(t *testing.T) { - msg := newSNSSQSMessage() + msg, err := newSNSSQSMessage() + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 1) @@ -271,7 +278,8 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { }) t.Run("EventBridge-sqs notification", func(t *testing.T) { - msg := newEventBridgeSQSMessage() + msg, err := newEventBridgeSQSMessage() + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 1) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 9612b0a9195..7db6b860bc1 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go-v2/aws" @@ -31,7 +33,8 @@ var ( ) func TestSQSReceiver(t *testing.T) { - logp.TestingSetup() + err := logp.TestingSetup() + require.NoError(t, err) const maxMessages = 5 @@ -43,7 +46,8 @@ func TestSQSReceiver(t *testing.T) { defer ctrl.Finish() mockSQS := NewMockSQSAPI(ctrl) mockMsgHandler := NewMockSQSProcessor(ctrl) - msg := newSQSMessage(newS3Event("log.json")) + msg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) // Initial ReceiveMessage for maxMessages. mockSQS.EXPECT(). @@ -129,7 +133,8 @@ func TestSQSReceiver(t *testing.T) { } func TestGetApproximateMessageCount(t *testing.T) { - logp.TestingSetup() + err := logp.TestingSetup() + require.NoError(t, err) const count = 500 attrName := []types.QueueAttributeName{sqsApproximateNumberOfMessages} @@ -182,10 +187,10 @@ func TestGetApproximateMessageCount(t *testing.T) { }) } -func newSQSMessage(events ...s3EventV2) types.Message { +func newSQSMessage(events ...s3EventV2) (types.Message, error) { body, err := json.Marshal(s3EventsV2{Records: events}) if err != nil { - panic(err) + return types.Message{}, err } hash := sha256.Sum256(body) @@ -198,16 +203,16 @@ func newSQSMessage(events ...s3EventV2) types.Message { Body: &bodyStr, MessageId: &messageID, ReceiptHandle: &receipt, - } + }, nil } -func newSNSSQSMessage() types.Message { +func newSNSSQSMessage() (types.Message, error) { body, err := json.Marshal(s3EventsV2{ TopicArn: "arn:aws:sns:us-east-1:1234:sns-topic", Message: "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"sns-notification-vpc-flow-logs\",\"bucket\":{\"name\":\"vpc-flow-logs-ks\",\"arn\":\"arn:aws:s3:::vpc-flow-logs-ks\"},\"object\":{\"key\":\"test-object-key\"}}}]}", }) if err != nil { - panic(err) + return types.Message{}, err } hash := sha256.Sum256(body) @@ -220,15 +225,15 @@ func newSNSSQSMessage() types.Message { Body: &bodyStr, MessageId: &messageID, ReceiptHandle: &receipt, - } + }, nil } -func newEventBridgeSQSMessage() sqs.Message { +func newEventBridgeSQSMessage() (sqs.Message, error) { body, err := json.Marshal(s3EventsV2{ Message: "{\"Messages\":[{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}]}", }) if err != nil { - panic(err) + return sqs.Message{}, err } hash := sha256.Sum256(body) @@ -241,7 +246,7 @@ func newEventBridgeSQSMessage() sqs.Message { Body: &bodyStr, MessageId: &messageID, ReceiptHandle: &receipt, - } + }, err } func newS3Event(key string) s3EventV2 { From fbda183036e6812009c0f5fc7ff29e56589b8628 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Wed, 26 Jun 2024 12:33:14 +0200 Subject: [PATCH 04/11] Extending input documentation to cover changes --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index c55c80952a4..9c206e0faeb 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -580,6 +580,13 @@ Please see https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-not for more details. SQS queue will be configured as a https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html[subscriber to the SNS topic]. +[float] +=== S3 -> EventBridge -> SQS setup +Amazon S3 can alternatively https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html[send events to EventBridge], +which can then be used to route these events to SQS. While the S3 input will +filter for 'Object Created' events it's more efficient to configure EventBridge +to only forward the 'Object Created' events. + [float] === Parallel Processing From d38d96eb97f33f39d45bc4bcec1dd9fceaee19c5 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Wed, 26 Jun 2024 12:40:30 +0200 Subject: [PATCH 05/11] updating CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 69d6a176304..fbfb9dcc9ee 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -286,6 +286,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929] - Add ability to remove request trace logs from CEL input. {pull}39969[39969] - Add ability to remove request trace logs from HTTPJSON input. {pull}40003[40003] +- Added out of the box support for Amazon EventBridge notifications over SQS to S3 input {pull}40006[40006] *Auditbeat* From 7487de32037343c4c9cc7e0fbaf11e352d1c46fe Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Thu, 27 Jun 2024 08:29:36 +0200 Subject: [PATCH 06/11] revert change to test_autodiscover.py --- filebeat/tests/system/test_autodiscover.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/filebeat/tests/system/test_autodiscover.py b/filebeat/tests/system/test_autodiscover.py index ba05dc601cb..8f89f21b374 100644 --- a/filebeat/tests/system/test_autodiscover.py +++ b/filebeat/tests/system/test_autodiscover.py @@ -24,8 +24,24 @@ def test_docker(self): autodiscover={ 'docker': { 'cleanup_timeout': '0s', - 'templates': f''' - condition: - equals.docker.container.name: {container.name} config: - type: log paths: - %s/${{data.docker.container.name}}.log ''' % self.working_dir, }, }, ) proc = self.start_beat() self._test(container) self.wait_until(lambda: self.log_contains('Stopping runner: input')) proc.check_kill_and_wait() + 'templates': f''' + - condition: + equals.docker.container.name: {container.name} + config: + - type: log + paths: + - %s/${{data.docker.container.name}}.log + ''' % self.working_dir, + }, + }, + ) + + proc = self.start_beat() + self._test(container) + + self.wait_until(lambda: self.log_contains('Stopping runner: input')) + proc.check_kill_and_wait() + @unittest.skipIf(not INTEGRATION_TESTS or os.getenv("TESTING_ENVIRONMENT") == "2x", "integration test not available on 2.x") From e5a633625669835cdc2a12574b48ffe8b06db9d4 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Thu, 27 Jun 2024 09:16:47 +0200 Subject: [PATCH 07/11] revert list_init.go --- metricbeat/include/list_init.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metricbeat/include/list_init.go b/metricbeat/include/list_init.go index fbfa58a65c5..037e00b2891 100644 --- a/metricbeat/include/list_init.go +++ b/metricbeat/include/list_init.go @@ -21,9 +21,10 @@ package include import ( // Import packages to perform 'func InitializeModule()' when in-use. - m1 "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" m0 "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" + m1 "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" m2 "github.com/elastic/beats/v7/metricbeat/processor/add_kubernetes_metadata" + // Import packages that perform 'func init()'. ) From 15f3c7457ce5348a4c2c9a949447acb7c432d200 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Sat, 20 Jul 2024 00:31:29 +0200 Subject: [PATCH 08/11] Fixing up / extending tests - organized imports - end to end integration test - removed message array wrapping as it's not the ootb setup, expand_event_list_from_field: Messages unsure what setup was used to get the originally reported format regardless 'expand_event_list_from_field: Messages' would fix this --- .../awss3/_meta/terraform/.terraform.lock.hcl | 3 + .../input/awss3/_meta/terraform/main.tf | 71 ++++++++++ .../input/awss3/_meta/terraform/outputs.tf | 20 +-- .../input/awss3/input_integration_test.go | 121 +++++++++++++----- x-pack/filebeat/input/awss3/sqs_s3_event.go | 22 ++-- x-pack/filebeat/input/awss3/sqs_test.go | 16 +-- 6 files changed, 184 insertions(+), 69 deletions(-) diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl b/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl index 9814a954d21..0f717b2d3e4 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl +++ b/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl @@ -5,6 +5,7 @@ provider "registry.terraform.io/hashicorp/aws" { version = "4.46.0" constraints = "4.46.0" hashes = [ + "h1:EZB4OgvytV38JpWyye9zoMQ0bfT9yB9xSXM5NY3Lrws=", "h1:m7RCtncaQbSD9VhNTX2xbuZY3TlYnUrluvmYZeYHb1s=", "zh:1678e6a4bdb3d81a6713adc62ca0fdb8250c584e10c10d1daca72316e9db8df2", "zh:329903acf86ef6072502736dff4c43c2b50f762a958f76aa924e2d74c7fca1e3", @@ -28,6 +29,7 @@ provider "registry.terraform.io/hashicorp/local" { version = "2.2.3" hashes = [ "h1:FvRIEgCmAezgZUqb2F+PZ9WnSSnR5zbEM2ZI+GLmbMk=", + "h1:aWp5iSUxBGgPv1UnV5yag9Pb0N+U1I0sZb38AXBFO8A=", "zh:04f0978bb3e052707b8e82e46780c371ac1c66b689b4a23bbc2f58865ab7d5c0", "zh:6484f1b3e9e3771eb7cc8e8bab8b35f939a55d550b3f4fb2ab141a24269ee6aa", "zh:78a56d59a013cb0f7eb1c92815d6eb5cf07f8b5f0ae20b96d049e73db915b238", @@ -47,6 +49,7 @@ provider "registry.terraform.io/hashicorp/random" { version = "3.4.3" hashes = [ "h1:saZR+mhthL0OZl4SyHXZraxyaBNVMxiZzks78nWcZ2o=", + "h1:xZGZf18JjMS06pFa4NErzANI98qi59SEcBsOcS2P2yQ=", "zh:41c53ba47085d8261590990f8633c8906696fa0a3c4b384ff6a7ecbf84339752", "zh:59d98081c4475f2ad77d881c4412c5129c56214892f490adf11c7e7a5a47de9b", "zh:686ad1ee40b812b9e016317e7f34c0d63ef837e084dea4a1f578f64a6314ad53", diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf index 164b14f93ca..2b825274990 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf @@ -147,3 +147,74 @@ resource "aws_sns_topic_subscription" "filebeat-integtest-sns" { protocol = "sqs" endpoint = aws_sqs_queue.filebeat-integtest-sns.arn } + +resource "aws_s3_bucket" "filebeat-integtest-eventbridge" { + bucket = "filebeat-s3-integtest-eventbridge-${random_string.random.result}" + force_destroy = true +} + +resource "aws_sqs_queue" "filebeat-integtest-eventbridge" { + name = "filebeat-s3-integtest-eventbridge-${random_string.random.result}" +} + +data "aws_iam_policy_document" "sqs_queue_policy" { + statement { + effect = "Allow" + actions = ["sqs:SendMessage"] + + principals { + type = "Service" + identifiers = ["events.amazonaws.com"] + } + + resources = [aws_sqs_queue.filebeat-integtest-eventbridge.arn] + } +} + +resource "aws_sqs_queue_policy" "filebeat-integtest-eventbridge" { + queue_url = aws_sqs_queue.filebeat-integtest-eventbridge.id + policy = data.aws_iam_policy_document.sqs_queue_policy.json +} + +resource "aws_cloudwatch_event_rule" "sqs" { + name = "capture-s3-notification" + description = "Capture s3 changes" + + event_pattern = jsonencode({ + source = [ + "aws.s3" + ], + detail-type = [ + "Object Created" + ] + detail = { + bucket = { + name = [ aws_s3_bucket.filebeat-integtest-eventbridge.id ] + } + } + }) + + depends_on = [ + aws_s3_bucket.filebeat-integtest-eventbridge + ] +} + +resource "aws_cloudwatch_event_target" "sqs" { + rule = aws_cloudwatch_event_rule.sqs.name + target_id = "SendToSQS" + arn = aws_sqs_queue.filebeat-integtest-eventbridge.arn + + depends_on = [ + aws_cloudwatch_event_rule.sqs + ] +} + +resource "aws_s3_bucket_notification" "bucket_notification-eventbridge" { + bucket = aws_s3_bucket.filebeat-integtest-eventbridge.id + eventbridge = true + + depends_on = [ + aws_cloudwatch_event_target.sqs + ] +} + diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf b/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf index e95983a7237..eb7d55c6df6 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf @@ -5,17 +5,19 @@ resource "local_file" "secrets" { "bucket_name" : aws_s3_bucket.filebeat-integtest.id "bucket_name_for_sns" : aws_s3_bucket.filebeat-integtest-sns.id "queue_url_for_sns" : aws_sqs_queue.filebeat-integtest-sns.url + "bucket_name_for_eventbridge" : aws_s3_bucket.filebeat-integtest-eventbridge.id + "queue_url_for_eventbridge" : aws_sqs_queue.filebeat-integtest-eventbridge.url }) filename = "${path.module}/outputs.yml" file_permission = "0644" } -resource "local_file" "secrets-localstack" { - content = yamlencode({ - "queue_url" : aws_sqs_queue.filebeat-integtest-localstack.url - "aws_region" : aws_s3_bucket.filebeat-integtest-localstack.region - "bucket_name" : aws_s3_bucket.filebeat-integtest-localstack.id - }) - filename = "${path.module}/outputs-localstack.yml" - file_permission = "0644" -} +# resource "local_file" "secrets-localstack" { +# content = yamlencode({ +# "queue_url" : aws_sqs_queue.filebeat-integtest-localstack.url +# "aws_region" : aws_s3_bucket.filebeat-integtest-localstack.region +# "bucket_name" : aws_s3_bucket.filebeat-integtest-localstack.id +# }) +# filename = "${path.module}/outputs-localstack.yml" +# file_permission = "0644" +# } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index af488505d0e..88d81a9f0c8 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -50,6 +50,8 @@ type terraformOutputData struct { QueueURL string `yaml:"queue_url"` BucketNameForSNS string `yaml:"bucket_name_for_sns"` QueueURLForSNS string `yaml:"queue_url_for_sns"` + BucketNameForEB string `yaml:"bucket_name_for_eventbridge"` + QueueURLForEB string `yaml:"queue_url_for_eventbridge"` } func getTerraformOutputs(t *testing.T, isLocalStack bool) terraformOutputData { @@ -255,16 +257,16 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 8) - assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), uint64(0x13)) - assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, 8, s3Input.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, s3Input.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, s3Input.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, s3Input.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, s3Input.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 8, s3Input.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, uint64(0x13), s3Input.metrics.s3EventsCreatedTotal.Get()) + assert.Greater(t, 0.0, s3Input.metrics.sqsLagTime.Mean()) + assert.EqualValues(t, 0.0, s3Input.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunSQS(t *testing.T) { @@ -306,16 +308,16 @@ func TestInputRunSQS(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12) + assert.EqualValues(t, 8, sqsInput.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, sqsInput.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, sqsInput.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, sqsInput.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 12, sqsInput.metrics.s3EventsCreatedTotal.Get()) assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, 0.0, sqsInput.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -354,12 +356,12 @@ func TestInputRunS3(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8) - assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) + assert.EqualValues(t, 0, s3Input.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, s3Input.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 8, s3Input.metrics.s3ObjectsListedTotal.Get()) + assert.EqualValues(t, 7, s3Input.metrics.s3ObjectsProcessedTotal.Get()) + assert.EqualValues(t, 7, s3Input.metrics.s3ObjectsAckedTotal.Get()) + assert.EqualValues(t, 12, s3Input.metrics.s3EventsCreatedTotal.Get()) } func uploadS3TestFiles(t *testing.T, region, bucket string, s3Client *s3.Client, filenames ...string) { @@ -548,14 +550,65 @@ func TestInputRunSNS(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12) + assert.EqualValues(t, 8, sqsInput.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, sqsInput.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, sqsInput.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, sqsInput.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 12, sqsInput.metrics.s3EventsCreatedTotal.Get()) assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, 0.0, sqsInput.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end +} + +func TestInputRunEventbridgeSQS(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to set up S3 and SQS and must be executed manually. + tfConfig := getTerraformOutputs(t, false) + awsCfg := makeAWSConfig(t, tfConfig.AWSRegion) + + // Ensure SQS is empty before testing. + drainSQS(t, tfConfig.AWSRegion, tfConfig.BucketNameForEB, awsCfg) + + s3Client := s3.NewFromConfig(awsCfg) + uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketNameForEB, s3Client, + "testdata/events-array.json", + "testdata/invalid.json", + "testdata/log.json", + "testdata/log.ndjson", + "testdata/multiline.json", + "testdata/multiline.json.gz", + "testdata/multiline.txt", + "testdata/log.txt", // Skipped (no match). + ) + + sqsInput := createSQSInput(t, makeTestConfigSQS(tfConfig.QueueURLForEB)) + + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(15*time.Second, func() { + cancel() + }) + + var errGroup errgroup.Group + errGroup.Go(func() error { + return sqsInput.Run(inputCtx, &fakePipeline{}) + }) + + if err := errGroup.Wait(); err != nil { + t.Fatal(err) + } + + assert.EqualValues(t, 8, sqsInput.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, sqsInput.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, sqsInput.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, sqsInput.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 12, sqsInput.metrics.s3EventsCreatedTotal.Get()) + assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) + assert.EqualValues(t, 0.0, sqsInput.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index b3c334810e2..c56df5b2ac8 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -87,10 +87,6 @@ type s3EventV2 struct { // At the moment it doesn't seem to have a version // https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html // https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html -type eventBridgeEvents struct { - Messages []eventBridgeEvent `json:"messages"` -} - // Object created event. type eventBridgeEvent struct { Version string `json:"version"` @@ -117,7 +113,7 @@ type eventBridgeEvent struct { Requester string `json:"requester"` SourceIpAddress string `json:"source-ip-address"` Reason string `json:"reason"` - } + } `json:"detail"` } type sqsS3EventProcessor struct { @@ -290,13 +286,13 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro } // Check if the notification is from S3 -> EventBridge -> SQS - var eventBridgeEvents eventBridgeEvents if events.Records == nil { - dec := json.NewDecoder(strings.NewReader(events.Message)) - if err := dec.Decode(&eventBridgeEvents); err != nil { - p.log.Debugw("Could not parse message as EventBridge payload", "sqs_message_body", body) + var eventBridgeEvent eventBridgeEvent + dec := json.NewDecoder(strings.NewReader(body)) + if err := dec.Decode(&eventBridgeEvent); err != nil { + p.log.Debugw("Could not parse message as EventBridge payload", "sqs_message_body", body, "error", err) } else { - convertEventBridge(&eventBridgeEvents, &events) + convertEventBridge(&eventBridgeEvent, &events) } } @@ -308,10 +304,8 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro return p.getS3Info(events) } -func convertEventBridge(eventBridgeEvents *eventBridgeEvents, s3Events *s3EventsV2) { - for i := range eventBridgeEvents.Messages { - s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(&eventBridgeEvents.Messages[i])) - } +func convertEventBridge(eventBridgeEvent *eventBridgeEvent, s3Events *s3EventsV2) { + s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(eventBridgeEvent)) } func convertEventBridgeEvent(message *eventBridgeEvent) s3EventV2 { diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 7db6b860bc1..0ef8cfc3000 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -12,15 +12,13 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/aws-sdk-go/service/sqs" "github.com/gofrs/uuid" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/logp" ) @@ -229,13 +227,7 @@ func newSNSSQSMessage() (types.Message, error) { } func newEventBridgeSQSMessage() (sqs.Message, error) { - body, err := json.Marshal(s3EventsV2{ - Message: "{\"Messages\":[{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}]}", - }) - if err != nil { - return sqs.Message{}, err - } - + body := []byte("{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}") hash := sha256.Sum256(body) id, _ := uuid.FromBytes(hash[:16]) messageID := id.String() @@ -246,7 +238,7 @@ func newEventBridgeSQSMessage() (sqs.Message, error) { Body: &bodyStr, MessageId: &messageID, ReceiptHandle: &receipt, - }, err + }, nil } func newS3Event(key string) s3EventV2 { From 7cb424d6b711e1a1006f2c728cfa646f0ebc12df Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Sat, 20 Jul 2024 00:40:08 +0200 Subject: [PATCH 09/11] reverting commenting out localstack variables, tbh localstack part should be separate terraform dir/setup but is out of scope of this PR. --- .../input/awss3/_meta/terraform/outputs.tf | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf b/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf index eb7d55c6df6..f197d69b66b 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf @@ -12,12 +12,12 @@ resource "local_file" "secrets" { file_permission = "0644" } -# resource "local_file" "secrets-localstack" { -# content = yamlencode({ -# "queue_url" : aws_sqs_queue.filebeat-integtest-localstack.url -# "aws_region" : aws_s3_bucket.filebeat-integtest-localstack.region -# "bucket_name" : aws_s3_bucket.filebeat-integtest-localstack.id -# }) -# filename = "${path.module}/outputs-localstack.yml" -# file_permission = "0644" -# } +resource "local_file" "secrets-localstack" { + content = yamlencode({ + "queue_url" : aws_sqs_queue.filebeat-integtest-localstack.url + "aws_region" : aws_s3_bucket.filebeat-integtest-localstack.region + "bucket_name" : aws_s3_bucket.filebeat-integtest-localstack.id + }) + filename = "${path.module}/outputs-localstack.yml" + file_permission = "0644" +} From e52d931fb04ca0c973ffa83bd322ba76eb7d1820 Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Thu, 25 Jul 2024 14:36:46 +0200 Subject: [PATCH 10/11] Fixing possible array out of bounds. Thank you tests/CI --- x-pack/filebeat/input/awss3/sqs_s3_event.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index c56df5b2ac8..a489f6a7f72 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -305,15 +305,17 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro } func convertEventBridge(eventBridgeEvent *eventBridgeEvent, s3Events *s3EventsV2) { - s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(eventBridgeEvent)) + for _, resource := range eventBridgeEvent.Resources { + s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(resource, eventBridgeEvent)) + } } -func convertEventBridgeEvent(message *eventBridgeEvent) s3EventV2 { +func convertEventBridgeEvent(resource string, message *eventBridgeEvent) s3EventV2 { var event = s3EventV2{} if message.DetailType == "Object Created" { event.SetEventName("ObjectCreated:Put") } - event.SetS3BucketARN(message.Resources[0]) + event.SetS3BucketARN(resource) event.SetAWSRegion(message.Region) if message.Source == "aws.s3" { event.SetEventSource("aws:s3") From b4ba7cc8ef47d3bd55cc1c63ff5151b35ea5326a Mon Sep 17 00:00:00 2001 From: mjmbischoff Date: Fri, 13 Sep 2024 14:41:42 +0200 Subject: [PATCH 11/11] running make update --- NOTICE.txt | 424 ++++++++++++++++++++++++++--------------------------- 1 file changed, 212 insertions(+), 212 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 5fec0eff31d..b0a0438eb29 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5073,218 +5073,6 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-lambda-go@v1.44 --------------------------------------------------------------------------------- -Dependency : github.com/aws/aws-sdk-go -Version: v1.38.60 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go@v1.38.60/LICENSE.txt: - - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2 Version: v1.30.4 @@ -34017,6 +33805,218 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/aws/aws-sdk-go +Version: v1.38.60 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go@v1.38.60/LICENSE.txt: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream Version: v1.6.4