diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 375b86ac6e8..59875c86d0a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -290,7 +290,7 @@ https://github.com/elastic/beats/compare/v7.12.1...v7.13.0[View commits] - Improve Cisco ASA/FTD parsing of messages {pull}23766[23766] - Better support for identity FW messages. - Change network.bytes, source.bytes, and destination.bytes to long from integer since value can exceed integer capacity. - - Add descriptions for various processors for easier pipeline editing in Kibana UI. + - Add descriptions for various processors for easier pipeline editing in Kibana UI. - Fix usage of unallowed ECS event.outcome values in Cisco ASA/FTD pipeline. {pull}24744[24744]. - Fix IPtables Pipeline and Ubiquiti dashboard. {issue}24878[24878] {pull}24928[24928] - Strip Azure Eventhub connection string in debug logs. {pulll}25066[25066] @@ -1103,6 +1103,7 @@ https://github.com/elastic/beats/compare/v7.9.3\...v7.10.0[View commits] - Adding support for Microsoft 365 Defender (Microsoft Threat Protection) {pull}21446[21446] - Adding support for FIPS in s3 input {pull}21446[21446] - Update Okta documentation for new stateful restarts. {pull}22091[22091] +- Use workers in `aws-s3` input to process SQS messages. {pull}27199[27199] *Heartbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 185b995e19a..4320b9b2d05 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -9898,6 +9898,218 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- +Dependency : github.com/golang/mock +Version: v1.6.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/golang/mock@v1.6.0/LICENSE: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/golang/protobuf Version: v1.4.3 @@ -13626,6 +13838,35 @@ Contents of probable licence file $GOMODCACHE/github.com/oklog/ulid@v1.3.1/LICEN limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/olekukonko/tablewriter +Version: v0.0.5 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/olekukonko/tablewriter@v0.0.5/LICENSE.md: + +Copyright (C) 2014 by Oleku Konko + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/otiai10/copy Version: v1.2.0 @@ -29888,218 +30129,6 @@ third-party archives. limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/golang/mock -Version: v1.3.1 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/golang/mock@v1.3.1/LICENSE: - - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/google/btree Version: v1.0.0 @@ -35050,6 +35079,37 @@ The above copyright notice and this permission notice shall be included in all c THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/mattn/go-runewidth +Version: v0.0.9 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/mattn/go-runewidth@v0.0.9/LICENSE: + +The MIT License (MIT) + +Copyright (c) 2016 Yasuhiro Matsumoto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/mattn/go-sqlite3 Version: v1.9.0 @@ -40747,11 +40807,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/mod -Version: v0.3.0 +Version: v0.4.2 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/mod@v0.3.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/mod@v0.4.2/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. diff --git a/go.mod b/go.mod index bd7ea03215b..2f669c70474 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,7 @@ require ( github.com/gofrs/flock v0.7.2-0.20190320160742-5135e617513b github.com/gofrs/uuid v3.3.0+incompatible github.com/gogo/protobuf v1.3.1 + github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.4.3 github.com/golang/snappy v0.0.1 github.com/gomodule/redigo v1.8.3 @@ -127,6 +128,7 @@ require ( github.com/mitchellh/mapstructure v1.3.3 github.com/morikuni/aec v1.0.0 // indirect github.com/oklog/ulid v1.3.1 + github.com/olekukonko/tablewriter v0.0.5 github.com/opencontainers/go-digest v1.0.0-rc1.0.20190228220655-ac19fd6e7483 // indirect github.com/opencontainers/image-spec v1.0.2-0.20190823105129-775207bd45b6 // indirect github.com/otiai10/copy v1.2.0 @@ -173,7 +175,7 @@ require ( golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.org/x/text v0.3.5 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6 + golang.org/x/tools v0.1.1 google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb google.golang.org/grpc v1.29.1 diff --git a/go.sum b/go.sum index 64da5538d85..d92c2185673 100644 --- a/go.sum +++ b/go.sum @@ -355,6 +355,8 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -531,6 +533,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -566,6 +570,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.5.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -800,8 +806,9 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 13cb37c0f9c..4b68ef4bb0e 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -67,16 +67,29 @@ #session_token: '${AWS_SESSION_TOKEN:"”}' #credential_profile_name: test-aws-s3-input - # Queue url (required) to receive queue messages from + # SQS queue URL to receive messages from (required). #queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue" - # The duration (in seconds) that the received messages are hidden from subsequent - # retrieve requests after being retrieved by a ReceiveMessage request. - #visibility_timeout: 300 + # Maximum number of SQS messages that can be inflight at any time. + #max_number_of_messages: 5 + + # Maximum duration of an AWS API call (excluding S3 GetObject calls). + #api_timeout: 120s + + # Duration that received SQS messages are hidden from subsequent + # requests after being retrieved by a ReceiveMessage request. + #visibility_timeout: 300s # List of S3 object metadata keys to include in events. #include_s3_metadata: [] + # The max number of times an SQS message should be received (retried) before deleting it. + #sqs.max_receive_count: 5 + + # Maximum duration for which the SQS ReceiveMessage call waits for a message + # to arrive in the queue before returning. + #sqs.wait_time: 20s + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 9de827de26e..a302d0366b4 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -11,11 +11,15 @@ AWS S3 ++++ -Use the `aws-s3` input to retrieve logs from S3 objects that are pointed by -messages from specific SQS queues. This input can, for example, be used to -receive S3 server access logs to monitor detailed records for the requests that +Use the `aws-s3` input to retrieve logs from S3 objects that are pointed to by +S3 notification events read from an SQS queue. This input can, for example, be +used to receive S3 access logs to monitor detailed records for the requests that are made to a bucket. +This input depends on S3 notifications delivered to an SQS queue for +`s3:ObjectCreated:*` events. You must create an SQS queue and configure S3 +to publish events to the queue. + When processing a S3 object which pointed by a SQS message, if half of the set visibility timeout passed and the processing is still ongoing, then the visibility timeout of that SQS message will be reset to make sure the message @@ -39,8 +43,9 @@ The `aws-s3` input supports the following configuration options plus the ==== `api_timeout` The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API -call will be interrupted. The default AWS API call timeout for a message is 120 -seconds. The maximum is half of the visibility timeout value. +call will be interrupted. The default AWS API timeout is `120s`. + +The API timeout must be longer than the `sqs.wait_time` value. [id="input-{type}-buffer_size"] [float] @@ -162,9 +167,8 @@ The default is `10 MiB`. [float] ==== `max_number_of_messages` -The maximum number of messages to return. Amazon SQS never returns more messages -than this value (however, fewer messages might be returned). Valid values: 1 to -10. Default: 5. +The maximum number of SQS messages that can be inflight at any time. Defaults +to 5. [id="input-{type}-parsers"] [float] @@ -212,11 +216,31 @@ URL of the AWS SQS queue that messages will be received from. Required. [float] ==== `visibility_timeout` -The duration that the received messages are hidden from subsequent retrieve -requests after being retrieved by a ReceiveMessage request. This value needs to -be a lot bigger than {beatname_uc} collection frequency so if it took too long -to read the S3 log, this SQS message will not be reprocessed. The default -visibility timeout for a message is 300 seconds. The maximum is 12 hours. +The duration that the received SQS messages are hidden from subsequent retrieve +requests after being retrieved by a `ReceiveMessage` request. The default +visibility timeout is `300s`. The maximum is `12h`. {beatname_uc} will +automatically reset the visibility timeout of a message after 1/2 of the +duration passes to prevent a message that is still being processed from +returning to the queue. + +[float] +==== `sqs.max_receive_count` + +The maximum number of times a SQS message should be received (retried) before +deleting it. This feature prevents poison-pill messages (messages that can be +received but can't be processed) from consuming resources. The number of times +a message has been received is tracked using the `ApproximateReceiveCount` SQS +attribute. The default value is 5. + +If you have configured a dead letter queue then you can set this value to +`-1` to disable deletion on failure. + +[float] +==== `sqs.wait_time` + +The maximum duration that an SQS `ReceiveMessage` call should wait for a message +to arrive in the queue before returning. The default value is `20s`. The maximum +value is `20s`. [float] ==== `aws credentials` diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index d042f5c903d..6bd5d93a68a 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3117,16 +3117,29 @@ filebeat.inputs: #session_token: '${AWS_SESSION_TOKEN:"”}' #credential_profile_name: test-aws-s3-input - # Queue url (required) to receive queue messages from + # SQS queue URL to receive messages from (required). #queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue" - # The duration (in seconds) that the received messages are hidden from subsequent - # retrieve requests after being retrieved by a ReceiveMessage request. - #visibility_timeout: 300 + # Maximum number of SQS messages that can be inflight at any time. + #max_number_of_messages: 5 + + # Maximum duration of an AWS API call (excluding S3 GetObject calls). + #api_timeout: 120s + + # Duration that received SQS messages are hidden from subsequent + # requests after being retrieved by a ReceiveMessage request. + #visibility_timeout: 300s # List of S3 object metadata keys to include in events. #include_s3_metadata: [] + # The max number of times an SQS message should be received (retried) before deleting it. + #sqs.max_receive_count: 5 + + # Maximum duration for which the SQS ReceiveMessage call waits for a message + # to arrive in the queue before returning. + #sqs.wait_time: 20s + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/input/awss3/_meta/s3-input.asciidoc b/x-pack/filebeat/input/awss3/_meta/s3-input.asciidoc deleted file mode 100644 index 703f31e5fd0..00000000000 --- a/x-pack/filebeat/input/awss3/_meta/s3-input.asciidoc +++ /dev/null @@ -1,62 +0,0 @@ -=== S3 and SQS Setup -Enable bucket notification: any new object creation in S3 bucket will also -create a notification through SQS. Please see -https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html#step1-create-sqs-queue-for-notification[create-sqs-queue-for-notification] -for more details. -1. In SQS, edit policy document to create a new policy. -2. In S3 bucket, enable and configure event notification. -3. In order to make sure the S3-SQS setup is ready, upload a file into the S3 -bucket and check if SQS gets a message showing that a new object is created with -its name. - -[float] -=== Manual Testing -1. Upload fake log files into the S3 bucket that has SQS notification enabled. -2. Check from SQS if there are N messages received. -3. Start filebeat with `./filebeat -e` and check Kibana if there are events reported -with messages from the example logs. Depends on the number of log lines in each -fake log file, check if the number of events match the number of log lines total -from all log files. -4. Check SQS if messages are deleted successfully. -5. Interrupt the s3 input process by killing filebeat during processing new S3 logs, -check if messages in SQS are in flight instead of deleted. - -[float] -=== Run s3_test.go -Instead of manual testing, `s3_test.go` includes some integration tests that can -be used for validating s3 input. In order to run `s3_test.go`, an AWS environment -with S3-SQS setup is needed. Please see `S3 and SQS Setup` for more details on -how to set up the environment. In the test, it does a cleaning first to remove -all old messages from SQS queue. Then upload a sample log file, which stores in -`./testdata/sample1.txt`, into S3 bucket. Test function calls `input.Run()` -function to read the notification message from SQS and find the log file in S3 -target bucket and get the log message. After validating the events, another round -of cleaning will be done for SQS to remove the message. - -Some environment variables are needed for testing: - -|=== -| Environment Variable | Sample Value -| QUEUE_URL | https://sqs.us-west-1.amazonaws.com/1234567/test-s3-notification -| AWS_PROFILE_NAME | test-mb -| S3_BUCKET_NAME | test-s3 -| S3_BUCKET_REGION | us-west-1 -|=== - -[float] -=== Parallel Processing Test -A basic test was done with three Filebeats running in parallel pointing to the same -SQS queue in AWS. There were 1000 messages available in the queue and each message -notifies a new S3 log has been generated. These S3 logs are simple .txt files and -each contains 10 log lines. With three Filebeats, the messages were processed -evenly without duplicating or missing messages. Test result looks like: - -|======= -| Filebeat # | Total # of Events | Total # of log files -| 1 | 3350 | 335 -| 2| 3350 | 335 -| 3| 3300 | 330 -|======= - -Please see more details in https://github.com/elastic/beats/issues/13457 regarding -to the test. diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore b/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore new file mode 100644 index 00000000000..0825744a776 --- /dev/null +++ b/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore @@ -0,0 +1,3 @@ +terraform/ +outputs.yml +*.tfstate* diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl b/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl new file mode 100644 index 00000000000..ec22975d48e --- /dev/null +++ b/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl @@ -0,0 +1,57 @@ +# This file is maintained automatically by "terraform init". +# Manual edits may be lost in future updates. + +provider "registry.terraform.io/hashicorp/aws" { + version = "3.52.0" + constraints = "~> 3.52" + hashes = [ + "h1:Fy/potyWfS8NVumHqWi6STgaQUX66diUmgZDfFNBeXU=", + "zh:04a4f8a1b34292fd6a72c1efe03f6f10186ecbdc318df36d462d0be1c21ce72d", + "zh:0601006f14f437489902555720dd8fb4e67450356438bab64b61cf6d0e1af681", + "zh:14214e996b8db0a2038b74a2ddbea7356b3e53f73003cde2c9069294d9a6c421", + "zh:17d1ecc280d776271b0fc0fd6a4033933be8e67eb6a39b7bfb3c242cd218645f", + "zh:247ae4bc3b52fba96ed1593e7b23d62da0d2c99498fc0d968fcf28020df3c3aa", + "zh:2e0432fabeb5e44d756a5566168768f1b6dea3cc0e5650fac966820e90d18367", + "zh:34f6f95b88c5d8c105d9a3b7d2712e7df1181948bfbef33bb6a87d7a77c20c0d", + "zh:3de6bf02b9499bf8dc13843da72a03db5ae8188b8157f0e7b3d5bf1d7cd1ac8b", + "zh:43198a223ea6d6dfb82deac62b29181c3be18dc77b9ef9f8d44c32b08e44ea5c", + "zh:a7de44c9445c100a2823c371df03fcaa9ecb1642750ccdc02294fa6cd1095859", + "zh:c3c44bd07e5b6cdb776ff674e39feb708ba3ee3d0dff2c88d1d5db323094d942", + ] +} + +provider "registry.terraform.io/hashicorp/local" { + version = "2.1.0" + hashes = [ + "h1:KfieWtVyGWwplSoLIB5usKAUnrIkDQBkWaR5TI+4WYg=", + "zh:0f1ec65101fa35050978d483d6e8916664b7556800348456ff3d09454ac1eae2", + "zh:36e42ac19f5d68467aacf07e6adcf83c7486f2e5b5f4339e9671f68525fc87ab", + "zh:6db9db2a1819e77b1642ec3b5e95042b202aee8151a0256d289f2e141bf3ceb3", + "zh:719dfd97bb9ddce99f7d741260b8ece2682b363735c764cac83303f02386075a", + "zh:7598bb86e0378fd97eaa04638c1a4c75f960f62f69d3662e6d80ffa5a89847fe", + "zh:ad0a188b52517fec9eca393f1e2c9daea362b33ae2eb38a857b6b09949a727c1", + "zh:c46846c8df66a13fee6eff7dc5d528a7f868ae0dcf92d79deaac73cc297ed20c", + "zh:dc1a20a2eec12095d04bf6da5321f535351a594a636912361db20eb2a707ccc4", + "zh:e57ab4771a9d999401f6badd8b018558357d3cbdf3d33cc0c4f83e818ca8e94b", + "zh:ebdcde208072b4b0f8d305ebf2bfdc62c926e0717599dcf8ec2fd8c5845031c3", + "zh:ef34c52b68933bedd0868a13ccfd59ff1c820f299760b3c02e008dc95e2ece91", + ] +} + +provider "registry.terraform.io/hashicorp/random" { + version = "3.1.0" + hashes = [ + "h1:rKYu5ZUbXwrLG1w81k7H3nce/Ys6yAxXhWcbtk36HjY=", + "zh:2bbb3339f0643b5daa07480ef4397bd23a79963cc364cdfbb4e86354cb7725bc", + "zh:3cd456047805bf639fbf2c761b1848880ea703a054f76db51852008b11008626", + "zh:4f251b0eda5bb5e3dc26ea4400dba200018213654b69b4a5f96abee815b4f5ff", + "zh:7011332745ea061e517fe1319bd6c75054a314155cb2c1199a5b01fe1889a7e2", + "zh:738ed82858317ccc246691c8b85995bc125ac3b4143043219bd0437adc56c992", + "zh:7dbe52fac7bb21227acd7529b487511c91f4107db9cc4414f50d04ffc3cab427", + "zh:a3a9251fb15f93e4cfc1789800fc2d7414bbc18944ad4c5c98f466e6477c42bc", + "zh:a543ec1a3a8c20635cf374110bd2f87c07374cf2c50617eee2c669b3ceeeaa9f", + "zh:d9ab41d556a48bd7059f0810cf020500635bfc696c9fc3adab5ea8915c1d886b", + "zh:d9e13427a7d011dbd654e591b0337e6074eef8c3b9bb11b2e39eaaf257044fd7", + "zh:f7605bd1437752114baf601bdf6931debe6dc6bfe3006eb7e9bb9080931dca8a", + ] +} diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/README.md b/x-pack/filebeat/input/awss3/_meta/terraform/README.md new file mode 100644 index 00000000000..cdb209e7099 --- /dev/null +++ b/x-pack/filebeat/input/awss3/_meta/terraform/README.md @@ -0,0 +1,43 @@ +# Terraform setup for AWS S3 Input Integration Tests + +This directory contains a Terrafrom module that creates the AWS resources needed +for executing the integration tests for the `aws-s3` Filebeat input. It creates +an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to +be delivered to SQS. + +It outputs configuration information that is consumed by the tests to +`outputs.yml`. The AWS resources are randomly named to prevent name collisions +between multiple users. + +### Usage + +You must have the appropriate AWS environment variables for authentication set +before running Terraform or the integration tests. The AWS key must be +authorized to create and destroy S3 buckets and SQS queues. + +1. Execute terraform in this directory to create the resources. This will also +write the `outputs.yml`. + + `terraform apply` + +2. (Optional) View the output configuration. + + ```yaml + "aws_region": "us-east-1" + "bucket_name": "filebeat-s3-integtest-8iok1h" + "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-8iok1h" + ``` + +2. Execute the integration test. + + ``` + cd x-pack/filebeat/inputs/awss3 + go test -tags aws,integration -run TestInputRun -v . + ``` + +3. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete +the S3 bucket and its contents. + + `terraform destroy` + + diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf new file mode 100644 index 00000000000..1b22b8bbfdb --- /dev/null +++ b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf @@ -0,0 +1,62 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 3.52" + } + } +} + +provider "aws" { + region = var.aws_region +} + +resource "random_string" "random" { + length = 6 + special = false + upper = false +} + +resource "aws_s3_bucket" "filebeat-integtest" { + bucket = "filebeat-s3-integtest-${random_string.random.result}" + force_destroy = true +} + +resource "aws_sqs_queue" "filebeat-integtest" { + name = "filebeat-s3-integtest-${random_string.random.result}" + policy = < 2 && (strings.Join(queueHostSplit[2:], ".") == endpoint || (endpoint == "" && queueHostSplit[2] == "amazonaws")) { - return queueHostSplit[1], nil - } - } - return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") -} - -// handle message -func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { - var msg sqsMessage - err := json.Unmarshal([]byte(*m.Body), &msg) - if err != nil { - c.logger.Debug("sqs message body = ", *m.Body) - if jsonError, ok := err.(*json.SyntaxError); ok { - return nil, fmt.Errorf("json unmarshal sqs message body failed at offset %d with syntax error: %w", jsonError.Offset, err) - } else { - return nil, fmt.Errorf("json unmarshal sqs message body failed: %w", err) - } - } - - var s3Infos []s3Info - for _, record := range msg.Records { - if record.EventSource != "aws:s3" || !strings.HasPrefix(record.EventName, "ObjectCreated:") { - return nil, fmt.Errorf("this SQS queue should be dedicated to s3 ObjectCreated event notifications") - } - // Unescape substrings from s3 log name. For example, convert "%3D" back to "=" - filename, err := url.QueryUnescape(record.S3.object.Key) - if err != nil { - return nil, fmt.Errorf("url.QueryUnescape failed for '%s': %w", record.S3.object.Key, err) - } - - if len(c.config.FileSelectors) == 0 { - s3Infos = append(s3Infos, s3Info{ - region: record.AwsRegion, - name: record.S3.bucket.Name, - key: filename, - arn: record.S3.bucket.Arn, - readerConfig: c.config.ReaderConfig, - }) - continue - } - - for _, fs := range c.config.FileSelectors { - if fs.Regex != nil && fs.Regex.MatchString(filename) { - info := s3Info{ - region: record.AwsRegion, - name: record.S3.bucket.Name, - key: filename, - arn: record.S3.bucket.Arn, - readerConfig: fs.ReaderConfig, - } - s3Infos = append(s3Infos, info) - - break - } - } - } - return s3Infos, nil -} - -func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC chan<- error) error { - s3Ctx := &s3Context{ - refs: 1, - errC: errC, - } - defer s3Ctx.done() - - for _, info := range s3Infos { - c.logger.Debugf("Processing file from s3 bucket \"%s\" with name \"%s\"", info.name, info.key) - err := c.createEventsFromS3Info(svc, info, s3Ctx) - if err != nil { - c.logger.Error(fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err)) - s3Ctx.setError(err) - } - } - return nil -} - -func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3Ctx *s3Context) error { - objectHash := s3ObjectHash(info) - - // Download the S3 object using GetObjectRequest. - s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(info.name), - Key: awssdk.String(info.key), - } - req := svc.GetObjectRequest(s3GetObjectInput) - - // The Context will interrupt the request if the timeout expires. - ctx, cancelFn := context.WithTimeout(c.cancellation, c.config.APITimeout) - defer cancelFn() - - { - c.metrics.s3ObjectsRequestedTotal.Inc() - c.metrics.s3ObjectsInflight.Inc() - start := time.Now() - defer func() { - c.metrics.s3ObjectsInflight.Dec() - c.metrics.s3ObjectProcessingTime.Update(time.Since(start).Nanoseconds()) - }() - } - - resp, err := req.Send(ctx) - if err != nil { - var aerr *awssdk.RequestCanceledError - if errors.As(err, &aerr) { - c.logger.Error(fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) - return err - } - - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == "NoSuchKey" { - c.logger.Warnf("Cannot find s3 file '%s' from S3 bucket '%s'", info.key, info.name) - return nil - } - } - return fmt.Errorf("s3 GetObjectRequest failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - } - - defer resp.Body.Close() - - bodyReader := bufio.NewReader(newMonitoredReader(resp.Body, c.metrics.s3BytesProcessedTotal)) - isS3ObjGzipped, err := isStreamGzipped(bodyReader) - if err != nil { - c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err)) - return err - } - - if isS3ObjGzipped { - gzipReader, err := gzip.NewReader(bodyReader) - if err != nil { - c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) - return err - } - defer gzipReader.Close() - bodyReader = bufio.NewReader(gzipReader) - } - - if info.readerConfig.ContentType != "" { - *resp.ContentType = info.readerConfig.ContentType - } - - info.meta = s3Metadata(resp, info.IncludeS3Metadata...) - - // Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config - if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" { - decoder := json.NewDecoder(bodyReader) - err := c.decodeJSON(decoder, objectHash, info, s3Ctx) - if err != nil { - return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - } - return nil - } - - // handle s3 objects that are not json content-type - encodingFactory, ok := encoding.FindEncoding(info.Encoding) - if !ok || encodingFactory == nil { - return fmt.Errorf("unable to find '%v' encoding", info.Encoding) - } - enc, err := encodingFactory(bodyReader) - if err != nil { - return fmt.Errorf("failed to initialize encoding: %v", err) - } - var r reader.Reader - r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ - Codec: enc, - BufferSize: int(info.BufferSize), - Terminator: info.LineTerminator, - MaxBytes: int(info.MaxBytes) * 4, - }) - if err != nil { - return fmt.Errorf("failed to create encode reader: %w", err) - } - r = readfile.NewStripNewline(r, info.LineTerminator) - - r = info.Parsers.Create(r) - - r = readfile.NewLimitReader(r, int(info.MaxBytes)) - - var offset int64 - for { - message, err := r.Next() - if err == io.EOF { - // No more lines - break - } - if err != nil { - return fmt.Errorf("error reading message: %w", err) - } - event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx) - event.Fields.DeepUpdate(message.Fields) - offset += int64(message.Bytes) - if err = c.forwardEvent(event); err != nil { - return fmt.Errorf("forwardEvent failed: %w", err) - } - } - return nil -} - -func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { - offset := 0 - for { - var jsonFields interface{} - err := decoder.Decode(&jsonFields) - if jsonFields == nil { - return nil - } - - if err == io.EOF { - offsetNew, err := c.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx) - if err != nil { - return err - } - offset = offsetNew - } else if err != nil { - // decode json failed, skip this log file - err = fmt.Errorf("decode json failed for '%s' from S3 bucket '%s', skipping this file: %w", s3Info.key, s3Info.name, err) - c.logger.Warn(err) - return nil - } - - offset, err = c.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx) - if err != nil { - return err - } - } -} - -func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) { - switch f := jsonFields.(type) { - case map[string][]interface{}: - if s3Info.ExpandEventListFromField != "" { - textValues, ok := f[s3Info.ExpandEventListFromField] - if !ok { - err := fmt.Errorf("key '%s' not found", s3Info.ExpandEventListFromField) - c.logger.Error(err) - return offset, err - } - for _, v := range textValues { - offset, err := c.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) - if err != nil { - err = fmt.Errorf("convertJSONToEvent failed for '%s' from S3 bucket '%s': %w", s3Info.key, s3Info.name, err) - c.logger.Error(err) - return offset, err - } - } - return offset, nil - } - case map[string]interface{}: - if s3Info.ExpandEventListFromField != "" { - textValues, ok := f[s3Info.ExpandEventListFromField] - if !ok { - err := fmt.Errorf("key '%s' not found", s3Info.ExpandEventListFromField) - c.logger.Error(err) - return offset, err - } - - valuesConverted := textValues.([]interface{}) - for _, textValue := range valuesConverted { - offsetNew, err := c.convertJSONToEvent(textValue, offset, objectHash, s3Info, s3Ctx) - if err != nil { - err = fmt.Errorf("convertJSONToEvent failed for '%s' from S3 bucket '%s': %w", s3Info.key, s3Info.name, err) - c.logger.Error(err) - return offset, err - } - offset = offsetNew - } - return offset, nil - } - - offset, err := c.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx) - if err != nil { - err = fmt.Errorf("convertJSONToEvent failed for '%s' from S3 bucket '%s': %w", s3Info.key, s3Info.name, err) - c.logger.Error(err) - return offset, err - } - return offset, nil - } - return offset, nil -} - -func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) { - vJSON, _ := json.Marshal(jsonFields) - logOriginal := string(vJSON) - log := trimLogDelimiter(logOriginal) - offset += len([]byte(log)) - event := createEvent(log, int64(offset), s3Info, objectHash, s3Ctx) - - err := c.forwardEvent(event) - if err != nil { - err = fmt.Errorf("forwardEvent failed: %w", err) - c.logger.Error(err) - return offset, err - } - return offset, nil -} - -func (c *s3Collector) forwardEvent(event beat.Event) error { - c.publisher.Publish(event) - c.metrics.s3EventsCreatedTotal.Inc() - return c.cancellation.Err() -} - -func (c *s3Collector) deleteMessage(queueURL string, messagesReceiptHandle string, svcSQS sqsiface.ClientAPI) error { - deleteMessageInput := &sqs.DeleteMessageInput{ - QueueUrl: awssdk.String(queueURL), - ReceiptHandle: awssdk.String(messagesReceiptHandle), - } - - req := svcSQS.DeleteMessageRequest(deleteMessageInput) - - // The Context will interrupt the request if the timeout expires. - ctx, cancelFn := context.WithTimeout(c.cancellation, c.config.APITimeout) - defer cancelFn() - - _, err := req.Send(ctx) - if err != nil { - var aerr *awssdk.RequestCanceledError - if errors.As(err, &aerr) { - return nil - } - return fmt.Errorf("SQS DeleteMessageRequest failed: %w", err) - } - return nil -} - -func trimLogDelimiter(log string) string { - return strings.TrimSuffix(log, "\n") -} - -func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { - s3Ctx.Inc() - - event := beat.Event{ - Timestamp: time.Now().UTC(), - Fields: common.MapStr{ - "message": log, - "log": common.MapStr{ - "offset": int64(offset), - "file": common.MapStr{ - "path": constructObjectURL(info), - }, - }, - "aws": common.MapStr{ - "s3": common.MapStr{ - "bucket": common.MapStr{ - "name": info.name, - "arn": info.arn}, - "object": common.MapStr{ - "key": info.key, - }, - }, - }, - "cloud": common.MapStr{ - "provider": "aws", - "region": info.region, - }, - }, - Private: s3Ctx, - } - event.SetID(objectHash + "-" + fmt.Sprintf("%012d", offset)) - - if len(info.meta) > 0 { - event.Fields.Put("aws.s3.metadata", info.meta) - } - - return event -} - -func constructObjectURL(info s3Info) string { - return "https://" + info.name + ".s3-" + info.region + ".amazonaws.com/" + info.key -} - -// s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. -func s3ObjectHash(s3Info s3Info) string { - h := sha256.New() - h.Write([]byte(s3Info.arn + s3Info.key)) - prefix := hex.EncodeToString(h.Sum(nil)) - return prefix[:10] -} - -// s3Metadata returns a map containing the selected S3 object metadata keys. -func s3Metadata(resp *s3.GetObjectResponse, keys ...string) common.MapStr { - if len(keys) == 0 { - return nil - } - - // When you upload objects using the REST API, the optional user-defined - // metadata names must begin with "x-amz-meta-" to distinguish them from - // other HTTP headers. - const userMetaPrefix = "x-amz-meta-" - - allMeta := map[string]interface{}{} - - // Get headers using AWS SDK struct tags. - fields := reflect.TypeOf(resp.GetObjectOutput).Elem() - values := reflect.ValueOf(resp.GetObjectOutput).Elem() - for i := 0; i < fields.NumField(); i++ { - f := fields.Field(i) - - if loc, _ := f.Tag.Lookup("location"); loc != "header" { - continue - } - - name, found := f.Tag.Lookup("locationName") - if !found { - continue - } - name = strings.ToLower(name) - - if name == userMetaPrefix { - continue - } - - v := values.Field(i) - switch v.Kind() { - case reflect.Ptr: - if v.IsNil() { - continue - } - v = v.Elem() - default: - if v.IsZero() { - continue - } - } - - allMeta[name] = v.Interface() - } - - // Add in the user defined headers. - for k, v := range resp.Metadata { - k = strings.ToLower(k) - allMeta[userMetaPrefix+k] = v - } - - // Select the matching headers from the config. - metadata := common.MapStr{} - for _, key := range keys { - key = strings.ToLower(key) - - v, found := allMeta[key] - if !found { - continue - } - - metadata[key] = v - } - - return metadata -} - -func (c *s3Context) setError(err error) { - // only care about the last error for now - // TODO: add "Typed" error to error for context - c.mux.Lock() - defer c.mux.Unlock() - c.err = err -} - -func (c *s3Context) done() { - c.mux.Lock() - defer c.mux.Unlock() - c.refs-- - if c.refs == 0 { - c.errC <- c.err - close(c.errC) - } -} - -func (c *s3Context) Inc() { - c.mux.Lock() - defer c.mux.Unlock() - c.refs++ -} - -// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader) -// represents gzipped content or not. A buffered reader is used so the function can peek into the byte -// stream without consuming it. This makes it convenient for code executed after this function call -// to consume the stream if it wants. -func isStreamGzipped(r *bufio.Reader) (bool, error) { - // Why 512? See https://godoc.org/net/http#DetectContentType - buf, err := r.Peek(512) - if err != nil && err != io.EOF { - return false, err - } - - switch http.DetectContentType(buf) { - case "application/x-gzip", "application/zip": - return true, nil - default: - return false, nil - } -} diff --git a/x-pack/filebeat/input/awss3/collector_test.go b/x-pack/filebeat/input/awss3/collector_test.go deleted file mode 100644 index 5e127839f40..00000000000 --- a/x-pack/filebeat/input/awss3/collector_test.go +++ /dev/null @@ -1,507 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package awss3 - -import ( - "bufio" - "bytes" - "compress/gzip" - "context" - "fmt" - "io" - "io/ioutil" - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/require" - - awssdk "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/s3iface" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/reader" - "github.com/elastic/beats/v7/libbeat/reader/readfile" - "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" -) - -// MockS3Client struct is used for unit tests. -type MockS3Client struct { - s3iface.ClientAPI -} - -var ( - s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png\n" - s3LogString1Trimmed = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png" - s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png\n" - s3LogString2Trimmed = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png" - mockSvc = &MockS3Client{} - info = s3Info{ - name: "test-s3-ks", - key: "log2019-06-21-16-16-54", - region: "us-west-1", - } -) - -func (m *MockS3Client) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest { - logBody := ioutil.NopCloser(bytes.NewReader([]byte(s3LogString1 + s3LogString2))) - httpReq, _ := http.NewRequest("", "", nil) - return s3.GetObjectRequest{ - Request: &awssdk.Request{ - Data: &s3.GetObjectOutput{ - Body: logBody, - }, - HTTPRequest: httpReq, - Retryer: awssdk.NoOpRetryer{}, - }, - } -} - -func TestGetRegionFromQueueURL(t *testing.T) { - casesPositive := []struct { - title string - queueURL string - endpoint string - expectedRegion string - }{ - { - "QueueURL using amazonaws.com domain with blank Endpoint", - "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", - "", - "us-east-1", - }, - { - "QueueURL using abc.xyz and domain with matching Endpoint", - "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", - "abc.xyz", - "us-east-1", - }, - } - - for _, c := range casesPositive { - t.Run(c.title, func(t *testing.T) { - regionName, err := getRegionFromQueueURL(c.queueURL, c.endpoint) - assert.NoError(t, err) - assert.Equal(t, c.expectedRegion, regionName) - }) - } - - casesNegative := []struct { - title string - queueURL string - endpoint string - expectedRegion string - }{ - { - "QueueURL using abc.xyz and domain with blank Endpoint", - "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", - "", - "", - }, - { - "QueueURL using abc.xyz and domain with different Endpoint", - "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", - "googlecloud.com", - "", - }, - { - "QueueURL is an invalid URL", - ":foo", - "", - "", - }, - } - - for _, c := range casesNegative { - t.Run(c.title, func(t *testing.T) { - regionName, err := getRegionFromQueueURL(c.queueURL, c.endpoint) - assert.Error(t, err) - assert.Empty(t, regionName) - }) - } - -} - -func TestHandleMessage(t *testing.T) { - casesPositive := []struct { - title string - message sqs.Message - expectedS3Infos []s3Info - }{ - { - "sqs message with event source aws:s3 and event name ObjectCreated:Put", - sqs.Message{ - Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"), - }, - []s3Info{ - { - name: "test-s3-ks-2", - key: "server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA", - }, - }, - }, - { - "sqs message with event source aws:s3 and event name ObjectCreated:CompleteMultipartUpload", - sqs.Message{ - Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:CompleteMultipartUpload\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"), - }, - []s3Info{ - { - name: "test-s3-ks-2", - key: "server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA", - }, - }, - }, - { - "sqs message with event source aws:s3, event name ObjectCreated:Put and encoded filename", - sqs.Message{ - Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"year%3D2020/month%3D05/test1.txt\"}}}]}"), - }, - []s3Info{ - { - name: "test-s3-ks-2", - key: "year=2020/month=05/test1.txt", - }, - }, - }, - { - "sqs message with event source aws:s3, event name ObjectCreated:Put and gzip filename", - sqs.Message{ - Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"428152502467_CloudTrail_us-east-2_20191219T1655Z_WXCas1PVnOaTpABD.json.gz\"}}}]}"), - }, - []s3Info{ - { - name: "test-s3-ks-2", - key: "428152502467_CloudTrail_us-east-2_20191219T1655Z_WXCas1PVnOaTpABD.json.gz", - }, - }, - }, - } - - p := &s3Collector{config: &config{}} - for _, c := range casesPositive { - t.Run(c.title, func(t *testing.T) { - s3Info, err := p.handleSQSMessage(c.message) - assert.NoError(t, err) - assert.Equal(t, len(c.expectedS3Infos), len(s3Info)) - if len(s3Info) > 0 { - assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key) - assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name) - } - }) - } - - casesNegative := []struct { - title string - message sqs.Message - expectedS3Infos []s3Info - }{ - { - "sqs message with event source aws:s3 and event name ObjectRemoved:Delete", - sqs.Message{ - Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectRemoved:Delete\",\"s3\":{\"configurationId\":\"object-removed-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"), - }, - []s3Info{}, - }, - { - "sqs message with event source aws:ec2 and event name ObjectCreated:Put", - sqs.Message{ - Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:ec2\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"), - }, - []s3Info{}, - }, - } - - for _, c := range casesNegative { - t.Run(c.title, func(t *testing.T) { - s3Info, err := p.handleSQSMessage(c.message) - assert.Error(t, err) - assert.Nil(t, s3Info) - }) - } - -} - -func TestNewS3BucketReader(t *testing.T) { - config := defaultConfig() - p := &s3Collector{cancellation: context.TODO(), config: &config} - s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(info.name), - Key: awssdk.String(info.key), - } - req := mockSvc.GetObjectRequest(s3GetObjectInput) - - // The Context will interrupt the request if the timeout expires. - var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.cancellation, p.config.APITimeout) - defer cancelFn() - - resp, err := req.Send(ctx) - assert.NoError(t, err) - bodyReader := bufio.NewReader(resp.Body) - defer resp.Body.Close() - - encFactory, ok := encoding.FindEncoding("plain") - if !ok { - t.Fatalf("unable to find 'plain' encoding") - } - - enc, err := encFactory(bodyReader) - if err != nil { - t.Fatalf("failed to initialize encoding: %v", err) - } - - var r reader.Reader - r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ - Codec: enc, - BufferSize: 4096, - Terminator: readfile.LineFeed, - }) - if err != nil { - t.Fatalf("Failed to initialize line reader: %v", err) - } - - r = readfile.NewStripNewline(r, readfile.LineFeed) - - for i := 0; i < 3; i++ { - msg, err := r.Next() - switch i { - case 0: - assert.NoError(t, err) - assert.Equal(t, s3LogString1Trimmed, string(msg.Content)) - case 1: - assert.NoError(t, err) - assert.Equal(t, s3LogString2Trimmed, string(msg.Content)) - case 2: - assert.Error(t, io.EOF, err) - assert.Equal(t, "", string(msg.Content)) - } - } -} - -func TestCreateEvent(t *testing.T) { - config := defaultConfig() - p := &s3Collector{cancellation: context.TODO(), config: &config} - errC := make(chan error) - s3Context := &s3Context{ - refs: 1, - errC: errC, - } - - mockSvc := &MockS3Client{} - s3Info := s3Info{ - name: "test-s3-ks", - key: "log2019-06-21-16-16-54", - region: "us-west-1", - arn: "arn:aws:s3:::test-s3-ks", - } - s3ObjectHash := s3ObjectHash(s3Info) - - s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(info.name), - Key: awssdk.String(info.key), - } - req := mockSvc.GetObjectRequest(s3GetObjectInput) - - // The Context will interrupt the request if the timeout expires. - var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.cancellation, p.config.APITimeout) - defer cancelFn() - - resp, err := req.Send(ctx) - assert.NoError(t, err) - reader := bufio.NewReader(resp.Body) - defer resp.Body.Close() - - var events []beat.Event - for { - log, err := reader.ReadString('\n') - if log == "" { - break - } - if err == io.EOF { - event := createEvent(log, int64(len([]byte(log))), s3Info, s3ObjectHash, s3Context) - events = append(events, event) - break - } - - event := createEvent(log, int64(len([]byte(log))), s3Info, s3ObjectHash, s3Context) - events = append(events, event) - } - - assert.Equal(t, 2, len(events)) - - bucketName, err := events[0].Fields.GetValue("aws.s3.bucket.name") - assert.NoError(t, err) - assert.Equal(t, "test-s3-ks", bucketName.(string)) - - objectKey, err := events[0].Fields.GetValue("aws.s3.object.key") - assert.NoError(t, err) - assert.Equal(t, "log2019-06-21-16-16-54", objectKey.(string)) - - cloudProvider, err := events[0].Fields.GetValue("cloud.provider") - assert.NoError(t, err) - assert.Equal(t, "aws", cloudProvider) - - region, err := events[0].Fields.GetValue("cloud.region") - assert.NoError(t, err) - assert.Equal(t, "us-west-1", region) - - message1, err := events[0].Fields.GetValue("message") - assert.NoError(t, err) - assert.Equal(t, s3LogString1, message1.(string)) - - message2, err := events[1].Fields.GetValue("message") - assert.NoError(t, err) - assert.Equal(t, s3LogString2, message2.(string)) - - s3Context.done() -} - -func TestConstructObjectURL(t *testing.T) { - cases := []struct { - title string - s3Info s3Info - expectedObjectURL string - }{ - {"construct with object in s3", - s3Info{ - name: "test-1", - key: "log2019-06-21-16-16-54", - region: "us-west-1", - }, - "https://test-1.s3-us-west-1.amazonaws.com/log2019-06-21-16-16-54", - }, - {"construct with object in a folder of s3", - s3Info{ - name: "test-2", - key: "test-folder-1/test-log-1.txt", - region: "us-east-1", - }, - "https://test-2.s3-us-east-1.amazonaws.com/test-folder-1/test-log-1.txt", - }, - } - for _, c := range cases { - t.Run(c.title, func(t *testing.T) { - objectURL := constructObjectURL(c.s3Info) - assert.Equal(t, c.expectedObjectURL, objectURL) - }) - } -} - -func TestConvertOffsetToString(t *testing.T) { - cases := []struct { - offset int - expectedString string - }{ - { - 123, - "000000000123", - }, - { - 123456, - "000000123456", - }, - { - 123456789123, - "123456789123", - }, - } - for _, c := range cases { - output := fmt.Sprintf("%012d", c.offset) - assert.Equal(t, c.expectedString, output) - } - -} - -func TestIsStreamGzipped(t *testing.T) { - logBytes := []byte(`May 28 03:00:52 Shaunaks-MacBook-Pro-Work syslogd[119]: ASL Sender Statistics -May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing the target of a source after it has been activated; set a breakpoint on _dispatch_bug_deprecated to debug -May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing target queue hierarchy after xpc connection was activated; set a breakpoint on _dispatch_bug_deprecated to debug -`) - - var b bytes.Buffer - gz := gzip.NewWriter(&b) - _, err := gz.Write(logBytes) - require.NoError(t, err) - - err = gz.Close() - require.NoError(t, err) - - tests := map[string]struct { - contents []byte - expected bool - }{ - "not_gzipped": { - logBytes, - false, - }, - "gzipped": { - b.Bytes(), - true, - }, - "empty": { - []byte{}, - false, - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - r := bufio.NewReader(bytes.NewReader(test.contents)) - actual, err := isStreamGzipped(r) - - require.NoError(t, err) - require.Equal(t, test.expected, actual) - }) - } -} - -func TestTrimLogDelimiter(t *testing.T) { - cases := []struct { - title string - logOriginal string - expectedLog string - }{ - {"string with delimiter", - `test -`, - "test", - }, - {"string without delimiter", - "test", - "test", - }, - {"string just with delimiter", - ` -`, - "", - }, - } - for _, c := range cases { - t.Run(c.title, func(t *testing.T) { - log := trimLogDelimiter(c.logOriginal) - assert.Equal(t, c.expectedLog, log) - }) - } -} - -func TestS3Metadata(t *testing.T) { - resp := &s3.GetObjectResponse{ - GetObjectOutput: &s3.GetObjectOutput{ - ContentEncoding: awssdk.String("gzip"), - Metadata: map[string]string{ - "Owner": "foo", - }, - LastModified: awssdk.Time(time.Now()), - }, - } - - meta := s3Metadata(resp, "Content-Encoding", "x-amz-meta-owner", "last-modified") - assert.Len(t, meta, 3) -} diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index cc850ef2aab..d780f8eec89 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -14,12 +14,15 @@ import ( "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) type config struct { APITimeout time.Duration `config:"api_timeout"` VisibilityTimeout time.Duration `config:"visibility_timeout"` + SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. + SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. FIPSEnabled bool `config:"fips_enabled"` MaxNumberOfMessages int `config:"max_number_of_messages"` QueueURL string `config:"queue_url" validate:"required"` @@ -32,6 +35,8 @@ func defaultConfig() config { c := config{ APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, + SQSWaitTime: 20 * time.Second, + SQSMaxReceiveCount: 5, FIPSEnabled: false, MaxNumberOfMessages: 5, } @@ -45,15 +50,21 @@ func (c *config) Validate() error { "less than or equal to 12h", c.VisibilityTimeout) } - if c.APITimeout <= 0 || c.APITimeout > c.VisibilityTimeout/2 { - return fmt.Errorf("api_timeout <%v> must be greater than 0 and less "+ - "than 1/2 of the visibility_timeout (%v)", c.APITimeout, c.VisibilityTimeout/2) + if c.SQSWaitTime <= 0 || c.SQSWaitTime.Seconds() > 20 { + return fmt.Errorf("wait_time <%v> must be greater than 0 and "+ + "less than or equal to 20s", c.SQSWaitTime) } - if c.MaxNumberOfMessages <= 0 || c.MaxNumberOfMessages > 10 { - return fmt.Errorf("max_number_of_messages <%v> must be greater than "+ - "0 and less than or equal to 10", c.MaxNumberOfMessages) + if c.MaxNumberOfMessages <= 0 { + return fmt.Errorf("max_number_of_messages <%v> must be greater than 0", + c.MaxNumberOfMessages) } + + if c.APITimeout < c.SQSWaitTime { + return fmt.Errorf("api_timeout <%v> must be greater than the sqs.wait_time <%v", + c.APITimeout, c.SQSWaitTime) + } + return nil } @@ -76,23 +87,29 @@ type readerConfig struct { Parsers parser.Config `config:",inline"` } -func (f *readerConfig) Validate() error { - if f.BufferSize <= 0 { - return fmt.Errorf("buffer_size <%v> must be greater than 0", f.BufferSize) +func (rc *readerConfig) Validate() error { + if rc.BufferSize <= 0 { + return fmt.Errorf("buffer_size <%v> must be greater than 0", rc.BufferSize) } - if f.MaxBytes <= 0 { - return fmt.Errorf("max_bytes <%v> must be greater than 0", f.MaxBytes) + if rc.MaxBytes <= 0 { + return fmt.Errorf("max_bytes <%v> must be greater than 0", rc.MaxBytes) } - if f.ExpandEventListFromField != "" && f.ContentType != "" && f.ContentType != "application/json" { + + if rc.ExpandEventListFromField != "" && rc.ContentType != "" && rc.ContentType != "application/json" { return fmt.Errorf("content_type must be `application/json` when expand_event_list_from_field is used") } + _, found := encoding.FindEncoding(rc.Encoding) + if !found { + return fmt.Errorf("encoding type <%v> not found", rc.Encoding) + } + return nil } -func (f *readerConfig) InitDefaults() { - f.BufferSize = 16 * humanize.KiByte - f.MaxBytes = 10 * humanize.MiByte - f.LineTerminator = readfile.AutoLineTerminator +func (rc *readerConfig) InitDefaults() { + rc.BufferSize = 16 * humanize.KiByte + rc.MaxBytes = 10 * humanize.MiByte + rc.LineTerminator = readfile.AutoLineTerminator } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 7328467fc14..77e35bcb0f3 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -10,6 +10,7 @@ import ( "github.com/dustin/go-humanize" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" @@ -22,21 +23,21 @@ func TestConfig(t *testing.T) { makeConfig := func() config { // Have a separate copy of defaults in the test to make it clear when // anyone changes the defaults. - cfg := common.MustNewConfigFrom("") - c := parser.Config{} - err := c.Unpack(cfg) - assert.Nil(t, err) + parserConf := parser.Config{} + require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom(""))) return config{ QueueURL: queueURL, APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, + SQSMaxReceiveCount: 5, + SQSWaitTime: 20 * time.Second, FIPSEnabled: false, MaxNumberOfMessages: 5, ReaderConfig: readerConfig{ BufferSize: 16 * humanize.KiByte, MaxBytes: 10 * humanize.MiByte, LineTerminator: readfile.AutoLineTerminator, - Parsers: c, + Parsers: parserConf, }, } } @@ -84,16 +85,7 @@ func TestConfig(t *testing.T) { "queue_url": queueURL, "api_timeout": "0", }, - "api_timeout <0s> must be greater than 0 and less than 1/2 of the visibility_timeout (2m30s)", - nil, - }, - { - "error on api_timeout less than visibility_timeout/2", - common.MapStr{ - "queue_url": queueURL, - "api_timeout": "3m", - }, - "api_timeout <3m0s> must be greater than 0 and less than 1/2 of the visibility_timeout (2m30s)", + "api_timeout <0s> must be greater than the sqs.wait_time", nil, }, { @@ -120,16 +112,7 @@ func TestConfig(t *testing.T) { "queue_url": queueURL, "max_number_of_messages": "0", }, - "max_number_of_messages <0> must be greater than 0 and less than or equal to 10", - nil, - }, - { - "error on max_number_of_messages > 10", - common.MapStr{ - "queue_url": queueURL, - "max_number_of_messages": "11", - }, - "max_number_of_messages <11> must be greater than 0 and less than or equal to 10", + "max_number_of_messages <0> must be greater than 0", nil, }, { @@ -141,15 +124,6 @@ func TestConfig(t *testing.T) { "buffer_size <0> must be greater than 0", nil, }, - { - "error on max_bytes == 0 ", - common.MapStr{ - "queue_url": queueURL, - "max_bytes": "0", - }, - "max_bytes <0> must be greater than 0", - nil, - }, { "error on expand_event_list_from_field and content_type != application/json ", common.MapStr{ diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 3d09a1527cc..82c01778c13 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -7,18 +7,19 @@ package awss3 import ( "context" "fmt" + "net/url" + "strings" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/go-concert/ctxtool" ) const inputName = "aws-s3" @@ -42,101 +43,124 @@ func configure(cfg *common.Config) (v2.Input, error) { return newInput(config) } -// s3Input is a input for s3 +// s3Input is a input for reading logs from S3 when triggered by an SQS message. type s3Input struct { - config config + config config + awsConfig awssdk.Config } func newInput(config config) (*s3Input, error) { - return &s3Input{config: config}, nil -} - -func (in *s3Input) Name() string { return inputName } - -func (in *s3Input) Test(ctx v2.TestContext) error { - _, err := awscommon.InitializeAWSConfig(in.config.AWSConfig) + awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { - return fmt.Errorf("InitializeAWSConfig failed: %w", err) + return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } - return nil -} -func (in *s3Input) Run(ctx v2.Context, pipeline beat.Pipeline) error { - collector, err := in.createCollector(ctx, pipeline) + regionName, err := getRegionFromQueueURL(config.QueueURL, config.AWSConfig.Endpoint) if err != nil { - return err + return nil, fmt.Errorf("failed to get AWS region from queue_url: %w", err) } + awsConfig.Region = regionName - defer collector.metrics.Close() - defer collector.publisher.Close() - collector.run() - - if ctx.Cancelation.Err() == context.Canceled { - return nil - } else { - return ctx.Cancelation.Err() - } + return &s3Input{ + config: config, + awsConfig: awsConfig, + }, nil } -func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3Collector, error) { - log := ctx.Logger.With("queue_url", in.config.QueueURL) +func (in *s3Input) Name() string { return inputName } +func (in *s3Input) Test(ctx v2.TestContext) error { + return nil +} + +func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { + // Wrap input Context's cancellation Done channel a context.Context. This + // goroutine stops with the parent closes the Done channel. + ctx, cancelInputCtx := context.WithCancel(context.Background()) + go func() { + defer cancelInputCtx() + select { + case <-inputContext.Cancelation.Done(): + case <-ctx.Done(): + } + }() + defer cancelInputCtx() + + // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, - ACKHandler: newACKHandler(), + CloseRef: inputContext.Cancelation, + ACKHandler: newEventACKHandler(), }) if err != nil { - return nil, err + return fmt.Errorf("failed to create pipeline client: %w", err) } + defer client.Close() - regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) + // Create SQS receiver and S3 notification processor. + receiver, err := in.createSQSReceiver(inputContext, client) if err != nil { - err := fmt.Errorf("getRegionFromQueueURL failed: %w", err) - log.Error(err) - return nil, err - } else { - log = log.With("region", regionName) + return fmt.Errorf("failed to initialize sqs receiver: %w", err) } + defer receiver.metrics.Close() - awsConfig, err := awscommon.InitializeAWSConfig(in.config.AWSConfig) - if err != nil { - return nil, fmt.Errorf("InitializeAWSConfig failed: %w", err) + if err := receiver.Receive(ctx); err != nil { + return err } - awsConfig.Region = regionName - visibilityTimeout := int64(in.config.VisibilityTimeout.Seconds()) - log.Infof("visibility timeout is set to %v seconds", visibilityTimeout) - log.Infof("aws api timeout is set to %v", in.config.APITimeout) + return nil +} - s3Servicename := "s3" +func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) { + s3ServiceName := "s3" if in.config.FIPSEnabled { - s3Servicename = "s3-fips" + s3ServiceName = "s3-fips" + } + + sqsAPI := &awsSQSAPI{ + client: sqs.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, "sqs", in.awsConfig.Region, in.awsConfig)), + queueURL: in.config.QueueURL, + apiTimeout: in.config.APITimeout, + visibilityTimeout: in.config.VisibilityTimeout, + longPollWaitTime: in.config.SQSWaitTime, + } + + s3API := &awsS3API{ + client: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)), } - log.Debug("s3 service name = ", s3Servicename) - log.Debug("s3 input config max_number_of_messages = ", in.config.MaxNumberOfMessages) - log.Debug("s3 input config endpoint = ", in.config.AWSConfig.Endpoint) + log := ctx.Logger.With("queue_url", in.config.QueueURL) + log.Infof("AWS api_timeout is set to %v.", in.config.APITimeout) + log.Infof("AWS region is set to %v.", in.awsConfig.Region) + log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout) + log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages) + log.Debugf("AWS S3 service name is %v.", s3ServiceName) + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() - return &s3Collector{ - cancellation: ctxtool.FromCanceller(ctx.Cancelation), - logger: log, - config: &in.config, - publisher: client, - visibilityTimeout: visibilityTimeout, - sqs: sqs.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, "sqs", regionName, awsConfig)), - s3: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3Servicename, regionName, awsConfig)), - metrics: newInputMetrics(metricRegistry, ctx.ID), - }, nil + metrics := newInputMetrics(metricRegistry, ctx.ID) + + fileSelectors := in.config.FileSelectors + if len(in.config.FileSelectors) == 0 { + fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} + } + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory) + sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) + + return sqsReader, nil } -func newACKHandler() beat.ACKer { - return acker.ConnectionOnly( - acker.EventPrivateReporter(func(_ int, privates []interface{}) { - for _, private := range privates { - if s3Context, ok := private.(*s3Context); ok { - s3Context.done() - } - } - }), - ) +func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { + // get region from queueURL + // Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs + url, err := url.Parse(queueURL) + if err != nil { + return "", fmt.Errorf(queueURL + " is not a valid URL") + } + if url.Scheme == "https" && url.Host != "" { + queueHostSplit := strings.Split(url.Host, ".") + if len(queueHostSplit) > 2 && (strings.Join(queueHostSplit[2:], ".") == endpoint || (endpoint == "" && queueHostSplit[2] == "amazonaws")) { + return queueHostSplit[1], nil + } + } + return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") } diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go new file mode 100644 index 00000000000..50223b8fc75 --- /dev/null +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -0,0 +1,193 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/dustin/go-humanize" + "github.com/olekukonko/tablewriter" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" +) + +const cloudtrailTestFile = "testdata/aws-cloudtrail.json.gz" + +type constantSQS struct { + msgs []sqs.Message +} + +var _ sqsAPI = (*constantSQS)(nil) + +func newConstantSQS() *constantSQS { + return &constantSQS{ + msgs: []sqs.Message{ + newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))), + }, + } +} + +func (c *constantSQS) ReceiveMessage(ctx context.Context, maxMessages int) ([]sqs.Message, error) { + return c.msgs, nil +} + +func (_ *constantSQS) DeleteMessage(ctx context.Context, msg *sqs.Message) error { + return nil +} + +func (_ *constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqs.Message, timeout time.Duration) error { + return nil +} + +type constantS3 struct { + filename string + data []byte + contentType string +} + +var _ s3Getter = (*constantS3)(nil) + +func newConstantS3(t testing.TB) *constantS3 { + data, err := ioutil.ReadFile(cloudtrailTestFile) + if err != nil { + t.Fatal(err) + } + + return &constantS3{ + filename: filepath.Base(cloudtrailTestFile), + data: data, + contentType: contentTypeJSON, + } +} + +func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectResponse, error) { + return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil +} + +func makeBenchmarkConfig(t testing.TB) config { + cfg := common.MustNewConfigFrom(`--- +queue_url: foo +file_selectors: +- + regex: '.json.gz$' + expand_event_list_from_field: Records +`) + + inputConfig := defaultConfig() + if err := cfg.Unpack(&inputConfig); err != nil { + t.Fatal(err) + } + return inputConfig +} + +func benchmarkInput(t *testing.T, maxMessagesInflight int) testing.BenchmarkResult { + return testing.Benchmark(func(b *testing.B) { + log := logp.NewLogger(inputName) + metricRegistry := monitoring.NewRegistry() + metrics := newInputMetrics(metricRegistry, "test_id") + sqsAPI := newConstantSQS() + s3API := newConstantS3(t) + client := pubtest.NewChanClient(100) + defer close(client.Channel) + conf := makeBenchmarkConfig(t) + + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, time.Minute, 5, s3EventHandlerFactory) + sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler) + + go func() { + for event := range client.Channel { + // Fake the ACK handling that's not implemented in pubtest. + event.Private.(*eventACKTracker).ACK() + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + b.Cleanup(cancel) + + go func() { + for metrics.sqsMessagesReceivedTotal.Get() < uint64(b.N) { + time.Sleep(5 * time.Millisecond) + } + cancel() + }() + + b.ResetTimer() + start := time.Now() + if err := sqsReader.Receive(ctx); err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal(err) + } + } + b.StopTimer() + elapsed := time.Since(start) + + b.ReportMetric(float64(maxMessagesInflight), "max_messages_inflight") + b.ReportMetric(elapsed.Seconds(), "sec") + + b.ReportMetric(float64(metrics.s3EventsCreatedTotal.Get()), "events") + b.ReportMetric(float64(metrics.s3EventsCreatedTotal.Get())/elapsed.Seconds(), "events_per_sec") + + b.ReportMetric(float64(metrics.s3BytesProcessedTotal.Get()), "s3_bytes") + b.ReportMetric(float64(metrics.s3BytesProcessedTotal.Get())/elapsed.Seconds(), "s3_bytes_per_sec") + + b.ReportMetric(float64(metrics.sqsMessagesDeletedTotal.Get()), "sqs_messages") + b.ReportMetric(float64(metrics.sqsMessagesDeletedTotal.Get())/elapsed.Seconds(), "sqs_messages_per_sec") + }) +} + +func TestBenchmarkInput(t *testing.T) { + logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + + results := []testing.BenchmarkResult{ + benchmarkInput(t, 1), + benchmarkInput(t, 2), + benchmarkInput(t, 4), + benchmarkInput(t, 8), + benchmarkInput(t, 16), + benchmarkInput(t, 32), + benchmarkInput(t, 64), + benchmarkInput(t, 128), + benchmarkInput(t, 256), + benchmarkInput(t, 512), + benchmarkInput(t, 1024), + } + + headers := []string{ + "Max Msgs Inflight", + "Events per sec", + "S3 Bytes per sec", + "Time (sec)", + "CPUs", + } + var data [][]string + for _, r := range results { + data = append(data, []string{ + fmt.Sprintf("%v", r.Extra["max_messages_inflight"]), + fmt.Sprintf("%v", r.Extra["events_per_sec"]), + fmt.Sprintf("%v", humanize.Bytes(uint64(r.Extra["s3_bytes_per_sec"]))), + fmt.Sprintf("%v", r.Extra["sec"]), + fmt.Sprintf("%v", runtime.GOMAXPROCS(0)), + }) + } + + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader(headers) + table.AppendBulk(data) + table.Render() +} diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go new file mode 100644 index 00000000000..0580b6f067b --- /dev/null +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -0,0 +1,251 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// See _meta/terraform/README.md for integration test usage instructions. + +// +build integration +// +build aws + +package awss3 + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/external" + "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + "gopkg.in/yaml.v2" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" +) + +const ( + inputID = "test_id" +) + +const ( + terraformOutputYML = "_meta/terraform/outputs.yml" +) + +type terraformOutputData struct { + AWSRegion string `yaml:"aws_region"` + BucketName string `yaml:"bucket_name"` + QueueURL string `yaml:"queue_url"` +} + +func getTerraformOutputs(t *testing.T) terraformOutputData { + t.Helper() + + ymlData, err := ioutil.ReadFile(terraformOutputYML) + if os.IsNotExist(err) { + t.Skipf("Run 'terraform apply' in %v to setup S3 and SQS for the test.", filepath.Dir(terraformOutputYML)) + } + if err != nil { + t.Fatalf("failed reading terraform output data: %v", err) + } + + var rtn terraformOutputData + dec := yaml.NewDecoder(bytes.NewReader(ymlData)) + dec.SetStrict(true) + if err = dec.Decode(&rtn); err != nil { + t.Fatal(err) + } + + return rtn +} + +func makeTestConfig(queueURL string) *common.Config { + return common.MustNewConfigFrom(fmt.Sprintf(`--- +queue_url: %s +max_number_of_messages: 1 +visibility_timeout: 30s +file_selectors: +- + regex: 'events-array.json$' + expand_event_list_from_field: Events + content_type: application/json + include_s3_metadata: + - last-modified + - x-amz-version-id + - x-amz-storage-class + - Content-Length + - Content-Type +- + regex: '\.(?:nd)?json(\.gz)?$' + content_type: application/json +- + regex: 'multiline.txt$' + parsers: + - multiline: + pattern: "^ is not in event", key) + } + + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + + tok, err := dec.Token() + if err != nil { + return err + } + delim, ok := tok.(json.Delim) + if !ok || delim != '[' { + return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) + } + + for dec.More() { + arrayOffset := dec.InputOffset() + + var item json.RawMessage + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) + } + + data, _ := item.MarshalJSON() + evt := createEvent(string(data), offset+arrayOffset, p.s3Obj, objHash, p.s3Metadata) + p.publish(p.acker, &evt) + } + + return nil +} + +func (p *s3ObjectProcessor) readFile(r io.Reader) error { + encodingFactory, ok := encoding.FindEncoding(p.readerConfig.Encoding) + if !ok || encodingFactory == nil { + return fmt.Errorf("failed to find '%v' encoding", p.readerConfig.Encoding) + } + + enc, err := encodingFactory(r) + if err != nil { + return fmt.Errorf("failed to initialize encoding: %w", err) + } + + var reader reader.Reader + reader, err = readfile.NewEncodeReader(ioutil.NopCloser(r), readfile.Config{ + Codec: enc, + BufferSize: int(p.readerConfig.BufferSize), + Terminator: p.readerConfig.LineTerminator, + MaxBytes: int(p.readerConfig.MaxBytes) * 4, + }) + if err != nil { + return fmt.Errorf("failed to create encode reader: %w", err) + } + + reader = readfile.NewStripNewline(reader, p.readerConfig.LineTerminator) + reader = p.readerConfig.Parsers.Create(reader) + reader = readfile.NewLimitReader(reader, int(p.readerConfig.MaxBytes)) + + var offset int64 + for { + message, err := reader.Next() + if err == io.EOF { + // No more lines + break + } + if err != nil { + return fmt.Errorf("error reading message: %w", err) + } + + event := createEvent(string(message.Content), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + event.Fields.DeepUpdate(message.Fields) + offset += int64(message.Bytes) + p.publish(p.acker, &event) + } + + return nil +} + +func (p *s3ObjectProcessor) publish(ack *eventACKTracker, event *beat.Event) { + ack.Add(1) + event.Private = ack + p.metrics.s3EventsCreatedTotal.Inc() + p.publisher.Publish(*event) +} + +func createEvent(message string, offset int64, obj s3EventV2, objectHash string, meta map[string]interface{}) beat.Event { + event := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + "message": message, + "log": common.MapStr{ + "offset": offset, + "file": common.MapStr{ + "path": constructObjectURL(obj), + }, + }, + "aws": common.MapStr{ + "s3": common.MapStr{ + "bucket": common.MapStr{ + "name": obj.S3.Bucket.Name, + "arn": obj.S3.Bucket.ARN}, + "object": common.MapStr{ + "key": obj.S3.Object.Key, + }, + }, + }, + "cloud": common.MapStr{ + "provider": "aws", + "region": obj.AWSRegion, + }, + }, + } + event.SetID(objectID(objectHash, offset)) + + if len(meta) > 0 { + event.Fields.Put("aws.s3.metadata", meta) + } + + return event +} + +func objectID(objectHash string, offset int64) string { + return fmt.Sprintf("%s-%012d", objectHash, offset) +} + +func constructObjectURL(obj s3EventV2) string { + return "https://" + obj.S3.Bucket.Name + ".s3." + obj.AWSRegion + ".amazonaws.com/" + obj.S3.Object.Key +} + +// s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. +func s3ObjectHash(obj s3EventV2) string { + h := sha256.New() + h.Write([]byte(obj.S3.Bucket.ARN)) + h.Write([]byte(obj.S3.Object.Key)) + prefix := hex.EncodeToString(h.Sum(nil)) + return prefix[:10] +} + +// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader) +// represents gzipped content or not. A buffered reader is used so the function can peek into the byte +// stream without consuming it. This makes it convenient for code executed after this function call +// to consume the stream if it wants. +func isStreamGzipped(r *bufio.Reader) (bool, error) { + // Why 512? See https://godoc.org/net/http#DetectContentType + buf, err := r.Peek(512) + if err != nil && err != io.EOF { + return false, err + } + + switch http.DetectContentType(buf) { + case "application/x-gzip", "application/zip": + return true, nil + default: + return false, nil + } +} + +// s3Metadata returns a map containing the selected S3 object metadata keys. +func s3Metadata(resp *s3.GetObjectResponse, keys ...string) common.MapStr { + if len(keys) == 0 { + return nil + } + + // When you upload objects using the REST API, the optional user-defined + // metadata names must begin with "x-amz-meta-" to distinguish them from + // other HTTP headers. + const userMetaPrefix = "x-amz-meta-" + + allMeta := map[string]interface{}{} + + // Get headers using AWS SDK struct tags. + fields := reflect.TypeOf(resp.GetObjectOutput).Elem() + values := reflect.ValueOf(resp.GetObjectOutput).Elem() + for i := 0; i < fields.NumField(); i++ { + f := fields.Field(i) + + if loc, _ := f.Tag.Lookup("location"); loc != "header" { + continue + } + + name, found := f.Tag.Lookup("locationName") + if !found { + continue + } + name = strings.ToLower(name) + + if name == userMetaPrefix { + continue + } + + v := values.Field(i) + switch v.Kind() { + case reflect.Ptr: + if v.IsNil() { + continue + } + v = v.Elem() + default: + if v.IsZero() { + continue + } + } + + allMeta[name] = v.Interface() + } + + // Add in the user defined headers. + for k, v := range resp.Metadata { + k = strings.ToLower(k) + allMeta[userMetaPrefix+k] = v + } + + // Select the matching headers from the config. + metadata := common.MapStr{} + for _, key := range keys { + key = strings.ToLower(key) + + v, found := allMeta[key] + if !found { + continue + } + + metadata[key] = v + } + + return metadata +} diff --git a/x-pack/filebeat/input/awss3/s3_integration_test.go b/x-pack/filebeat/input/awss3/s3_integration_test.go deleted file mode 100644 index 59c3f1e1948..00000000000 --- a/x-pack/filebeat/input/awss3/s3_integration_test.go +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -// +build integration -// +build aws - -package awss3 - -import ( - "context" - "net/http" - "os" - "strings" - "sync" - "testing" - "time" - - awssdk "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface" - "github.com/stretchr/testify/assert" - - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/tests/resources" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/go-concert/unison" -) - -const ( - fileName1 = "sample1.txt" - fileName2 = "sample2.txt" - visibilityTimeout = 300 * time.Second -) - -// GetConfigForTest function gets aws credentials for integration tests. -func getConfigForTest(t *testing.T) config { - t.Helper() - - awsConfig := awscommon.ConfigAWS{} - queueURL := os.Getenv("QUEUE_URL") - profileName := os.Getenv("AWS_PROFILE_NAME") - accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") - secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") - sessionToken := os.Getenv("AWS_SESSION_TOKEN") - - config := config{ - VisibilityTimeout: visibilityTimeout, - } - switch { - case queueURL == "": - t.Fatal("$QUEUE_URL is not set in environment") - case profileName == "" && accessKeyID == "": - t.Fatal("$AWS_ACCESS_KEY_ID or $AWS_PROFILE_NAME not set or set to empty") - case profileName != "": - awsConfig.ProfileName = profileName - config.QueueURL = queueURL - config.AWSConfig = awsConfig - return config - case secretAccessKey == "": - t.Fatal("$AWS_SECRET_ACCESS_KEY not set or set to empty") - } - - awsConfig.AccessKeyID = accessKeyID - awsConfig.SecretAccessKey = secretAccessKey - if sessionToken != "" { - awsConfig.SessionToken = sessionToken - } - config.AWSConfig = awsConfig - return config -} - -func defaultTestConfig() *common.Config { - return common.MustNewConfigFrom(common.MapStr{ - "queue_url": os.Getenv("QUEUE_URL"), - "file_selectors": []common.MapStr{ - { - "regex": strings.Replace(fileName1, ".", "\\.", -1), - "max_bytes": 4096, - }, - { - "regex": strings.Replace(fileName2, ".", "\\.", -1), - "max_bytes": 4096, - "parsers": []common.MapStr{ - { - "multiline": common.MapStr{ - "pattern": "^") - assert.Contains(t, message, "") - default: - t.Fatalf("object key %s is unknown", objectKey) - } - } - }) -} - -// MockSQSClient struct is used for unit tests. -type MockSQSClient struct { - sqsiface.ClientAPI -} - -var ( - sqsMessageTest = "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\"," + - "\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:Put\"," + - "\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\"," + - "\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54\"}}}]}" -) - -func (m *MockSQSClient) ReceiveMessageRequest(input *sqs.ReceiveMessageInput) sqs.ReceiveMessageRequest { - httpReq, _ := http.NewRequest("", "", nil) - return sqs.ReceiveMessageRequest{ - Request: &awssdk.Request{ - Data: &sqs.ReceiveMessageOutput{ - Messages: []sqs.Message{ - {Body: awssdk.String(sqsMessageTest)}, - }, - }, - HTTPRequest: httpReq, - }, - } -} - -func (m *MockSQSClient) DeleteMessageRequest(input *sqs.DeleteMessageInput) sqs.DeleteMessageRequest { - httpReq, _ := http.NewRequest("", "", nil) - return sqs.DeleteMessageRequest{ - Request: &awssdk.Request{ - Data: &sqs.DeleteMessageOutput{}, - HTTPRequest: httpReq, - }, - } -} - -func (m *MockSQSClient) ChangeMessageVisibilityRequest(input *sqs.ChangeMessageVisibilityInput) sqs.ChangeMessageVisibilityRequest { - httpReq, _ := http.NewRequest("", "", nil) - return sqs.ChangeMessageVisibilityRequest{ - Request: &awssdk.Request{ - Data: &sqs.ChangeMessageVisibilityOutput{}, - HTTPRequest: httpReq, - }, - } -} - -func TestMockS3Input(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - cfg := common.MustNewConfigFrom(map[string]interface{}{ - "queue_url": "https://sqs.ap-southeast-1.amazonaws.com/123456/test", - }) - - runTest(t, cfg, true, func(t *testing.T, collector *s3Collector, receiver chan beat.Event) { - defer collector.cancellation.Done() - defer collector.publisher.Close() - - output, err := collector.receiveMessage(collector.sqs, collector.visibilityTimeout) - assert.NoError(t, err) - - var grp unison.MultiErrGroup - errC := make(chan error) - defer close(errC) - grp.Go(func() (err error) { - return collector.processMessage(collector.s3, output.Messages[0], errC) - }) - - event := <-receiver - bucketName, err := event.GetValue("aws.s3.bucket.name") - assert.NoError(t, err) - assert.Equal(t, "test-s3-ks-2", bucketName) - }) -} diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go new file mode 100644 index 00000000000..6cf1ea1fa5a --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -0,0 +1,261 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "bytes" + "context" + "errors" + "io/ioutil" + "path/filepath" + "strings" + "testing" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func newS3Object(t testing.TB, filename, contentType string) (s3EventV2, *s3.GetObjectResponse) { + data, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatal(err) + } + + return newS3Event(filename), newS3GetObjectResponse(filename, data, contentType) +} + +func newS3GetObjectResponse(filename string, data []byte, contentType string) *s3.GetObjectResponse { + r := bytes.NewReader(data) + contentLen := int64(r.Len()) + resp := &s3.GetObjectResponse{ + GetObjectOutput: &s3.GetObjectOutput{ + Body: ioutil.NopCloser(r), + ContentLength: &contentLen, + ContentType: &contentType, + }, + } + switch strings.ToLower(filepath.Ext(filename)) { + case ".gz": + gzipEncoding := "gzip" + resp.ContentEncoding = &gzipEncoding + } + return resp +} + +func TestS3ObjectProcessor(t *testing.T) { + logp.TestingSetup() + + t.Run("download text/plain file", func(t *testing.T) { + testProcessS3Object(t, "testdata/log.txt", "text/plain", 2) + }) + + t.Run("multiline content", func(t *testing.T) { + sel := fileSelectorConfig{ReaderConfig: readerConfig{}} + sel.ReaderConfig.InitDefaults() + + // Unfortunately the config structs for the parser package are not + // exported to use config parsing. + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "parsers": []map[string]interface{}{ + { + "multiline": map[string]interface{}{ + "pattern": "^ len(s3Objects) { + endIdx = len(s3Objects) + } + return &s3.ListObjectsOutput{ + Contents: s3Objects[startIdx:endIdx], + } + }) + mockS3Pager.EXPECT().Err().Return(nil) + + return mockS3Pager +} diff --git a/x-pack/filebeat/input/awss3/semaphore.go b/x-pack/filebeat/input/awss3/semaphore.go new file mode 100644 index 00000000000..2a695f4c621 --- /dev/null +++ b/x-pack/filebeat/input/awss3/semaphore.go @@ -0,0 +1,83 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "sync" +) + +type sem struct { + mutex *sync.Mutex + cond sync.Cond + available int +} + +func newSem(n int) *sem { + var m sync.Mutex + return &sem{ + available: n, + mutex: &m, + cond: sync.Cond{ + L: &m, + }, + } +} + +func (s *sem) AcquireContext(n int, ctx context.Context) (int, error) { + acquireC := make(chan int, 1) + go func() { + defer close(acquireC) + acquireC <- s.Acquire(n) + }() + + select { + case <-ctx.Done(): + return 0, ctx.Err() + case n := <-acquireC: + return n, nil + } +} + +func (s *sem) Acquire(n int) int { + if n <= 0 { + return 0 + } + + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.available == 0 { + s.cond.Wait() + } + + if n >= s.available { + rtn := s.available + s.available = 0 + return rtn + } + + s.available -= n + return n +} + +func (s *sem) Release(n int) { + if n <= 0 { + return + } + + s.mutex.Lock() + defer s.mutex.Unlock() + + s.available += n + s.cond.Signal() +} + +func (s *sem) Available() int { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.available +} diff --git a/x-pack/filebeat/input/awss3/semaphore_test.go b/x-pack/filebeat/input/awss3/semaphore_test.go new file mode 100644 index 00000000000..d71252ffc78 --- /dev/null +++ b/x-pack/filebeat/input/awss3/semaphore_test.go @@ -0,0 +1,33 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSemaphore(t *testing.T) { + s := newSem(5) + + assert.Equal(t, s.Acquire(5), 5) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Asks for 2, and blocks because 0 are available. + // It unblocks and returns 1 when Release(1) is called. + assert.Equal(t, s.Acquire(2), 1) + }() + + // None are available until Release(). + assert.Equal(t, s.Available(), 0) + + s.Release(1) + wg.Wait() +} diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go new file mode 100644 index 00000000000..56f35e473ce --- /dev/null +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/go-concert/timed" +) + +const ( + sqsRetryDelay = 10 * time.Second +) + +type sqsReader struct { + maxMessagesInflight int + workerSem *sem + sqs sqsAPI + msgHandler sqsProcessor + log *logp.Logger + metrics *inputMetrics +} + +func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessagesInflight int, msgHandler sqsProcessor) *sqsReader { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + return &sqsReader{ + maxMessagesInflight: maxMessagesInflight, + workerSem: newSem(maxMessagesInflight), + sqs: sqs, + msgHandler: msgHandler, + log: log, + metrics: metrics, + } +} + +func (r *sqsReader) Receive(ctx context.Context) error { + // This loop tries to keep the workers busy as much as possible while + // honoring the max message cap as opposed to a simpler loop that receives + // N messages, waits for them all to finish, then requests N more messages. + var workerWg sync.WaitGroup + for ctx.Err() == nil { + // Determine how many SQS workers are available. + workers, err := r.workerSem.AcquireContext(r.maxMessagesInflight, ctx) + if err != nil { + break + } + + // Receive (at most) as many SQS messages as there are workers. + msgs, err := r.sqs.ReceiveMessage(ctx, workers) + if err != nil { + r.workerSem.Release(workers) + + if ctx.Err() == nil { + r.log.Warnw("SQS ReceiveMessage returned an error. Will retry after a short delay.", "error", err) + + // Throttle retries. + timed.Wait(ctx, sqsRetryDelay) + } + continue + } + + // Release unused workers. + r.workerSem.Release(workers - len(msgs)) + + // Process each SQS message asynchronously with a goroutine. + r.log.Debugf("Received %v SQS messages.", len(msgs)) + r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) + r.metrics.sqsMessagesInflight.Add(uint64(len(msgs))) + workerWg.Add(len(msgs)) + for _, msg := range msgs { + go func(msg sqs.Message, start time.Time) { + defer func() { + r.metrics.sqsMessagesInflight.Dec() + r.metrics.sqsMessageProcessingTime.Update(time.Since(start).Nanoseconds()) + workerWg.Done() + r.workerSem.Release(1) + }() + + if err := r.msgHandler.ProcessSQS(ctx, &msg); err != nil { + r.log.Warnw("Failed processing SQS message.", "error", err, "message_id", *msg.MessageId) + } + }(msg, time.Now()) + } + } + + // Wait for all workers to finish. + workerWg.Wait() + + if errors.Is(ctx.Err(), context.Canceled) { + // A canceled context is a normal shutdown. + return nil + } + return ctx.Err() +} diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go new file mode 100644 index 00000000000..e42b78e28e8 --- /dev/null +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -0,0 +1,253 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/pkg/errors" + "go.uber.org/multierr" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" +) + +const ( + sqsApproximateReceiveCountAttribute = "ApproximateReceiveCount" +) + +type nonRetryableError struct { + Err error +} + +func (e *nonRetryableError) Unwrap() error { + return e.Err +} + +func (e *nonRetryableError) Error() string { + return "non-retryable error: " + e.Err.Error() +} + +func (e *nonRetryableError) Is(err error) bool { + _, ok := err.(*nonRetryableError) + return ok +} + +func nonRetryableErrorWrap(err error) error { + if errors.Is(err, &nonRetryableError{}) { + return err + } + return &nonRetryableError{Err: err} +} + +// s3EventsV2 is the notification message that Amazon S3 sends to notify of S3 changes. +// This was derived from the version 2.2 schema. +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html +type s3EventsV2 struct { + Records []s3EventV2 `json:"Records"` +} + +// s3EventV2 is a S3 change notification event. +type s3EventV2 struct { + AWSRegion string `json:"awsRegion"` + EventName string `json:"eventName"` + EventSource string `json:"eventSource"` + S3 struct { + Bucket struct { + Name string `json:"name"` + ARN string `json:"arn"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + } `json:"object"` + } `json:"s3"` +} + +type sqsS3EventProcessor struct { + s3ObjectHandler s3ObjectHandlerFactory + sqsVisibilityTimeout time.Duration + maxReceiveCount int + sqs sqsAPI + log *logp.Logger + warnOnce sync.Once + metrics *inputMetrics +} + +func newSQSS3EventProcessor(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, sqsVisibilityTimeout time.Duration, maxReceiveCount int, s3 s3ObjectHandlerFactory) *sqsS3EventProcessor { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + return &sqsS3EventProcessor{ + s3ObjectHandler: s3, + sqsVisibilityTimeout: sqsVisibilityTimeout, + maxReceiveCount: maxReceiveCount, + sqs: sqs, + log: log, + metrics: metrics, + } +} + +func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *sqs.Message) error { + log := p.log.With("message_id", *msg.MessageId) + + keepaliveCtx, keepaliveCancel := context.WithCancel(ctx) + defer keepaliveCancel() + + // Start SQS keepalive worker. + var keepaliveWg sync.WaitGroup + keepaliveWg.Add(1) + go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg) + + processingErr := p.processS3Events(ctx, log, *msg.Body) + + // Stop keepalive routine before changing visibility. + keepaliveCancel() + keepaliveWg.Wait() + + // No error. Delete SQS. + if processingErr == nil { + msgDelErr := p.sqs.DeleteMessage(context.Background(), msg) + if msgDelErr == nil { + p.metrics.sqsMessagesDeletedTotal.Inc() + } + return errors.Wrap(msgDelErr, "failed deleting message from SQS queue (it may be reprocessed)") + } + + if p.maxReceiveCount > 0 && !errors.Is(processingErr, &nonRetryableError{}) { + // Prevent poison pill messages from consuming all workers. Check how + // many times this message has been received before making a disposition. + if v, found := msg.Attributes[sqsApproximateReceiveCountAttribute]; found { + if receiveCount, err := strconv.Atoi(v); err == nil && receiveCount >= p.maxReceiveCount { + processingErr = nonRetryableErrorWrap(fmt.Errorf( + "sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w", + receiveCount, p.maxReceiveCount, err)) + } + } + } + + // An error that reprocessing cannot correct. Delete SQS. + if errors.Is(processingErr, &nonRetryableError{}) { + msgDelErr := p.sqs.DeleteMessage(context.Background(), msg) + if msgDelErr == nil { + p.metrics.sqsMessagesDeletedTotal.Inc() + } + return multierr.Combine( + errors.Wrap(processingErr, "failed processing SQS message (message will be deleted)"), + errors.Wrap(msgDelErr, "failed deleting message from SQS queue (it may be reprocessed)"), + ) + } + + // An error that may be resolved by letting the visibility timeout + // expire thereby putting the message back on SQS. If a dead letter + // queue is enabled then the message will eventually placed on the DLQ + // after maximum receives is reached. + p.metrics.sqsMessagesReturnedTotal.Inc() + return errors.Wrap(processingErr, "failed processing SQS message (it will return to queue after visibility timeout)") +} + +func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, wg *sync.WaitGroup, msg *sqs.Message) { + defer wg.Done() + + t := time.NewTicker(p.sqsVisibilityTimeout / 2) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + log.Debugw("Extending SQS message visibility timeout.", + "visibility_timeout", p.sqsVisibilityTimeout, + "expires_at", time.Now().UTC().Add(p.sqsVisibilityTimeout)) + p.metrics.sqsVisibilityTimeoutExtensionsTotal.Inc() + + // Renew visibility. + if err := p.sqs.ChangeMessageVisibility(ctx, msg, p.sqsVisibilityTimeout); err != nil { + log.Warn("Failed to extend message visibility timeout.", "error", err) + } + } + } +} + +func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, error) { + // NOTE: If AWS introduces a V3 schema this will need updated to handle that schema. + var events s3EventsV2 + dec := json.NewDecoder(strings.NewReader(body)) + if err := dec.Decode(&events); err != nil { + p.log.Debugw("Invalid SQS message body.", "sqs_message_body", body) + return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err) + } + + var out []s3EventV2 + for _, record := range events.Records { + if !p.isObjectCreatedEvents(record) { + p.warnOnce.Do(func() { + p.log.Warnf("Received S3 notification for %q event type, but "+ + "only 'ObjectCreated:*' types are handled. It is recommended "+ + "that you update the S3 Event Notification configuration to "+ + "only include ObjectCreated event types to save resources.", + record.EventName) + }) + continue + } + + // Unescape s3 key name. For example, convert "%3D" back to "=". + key, err := url.QueryUnescape(record.S3.Object.Key) + if err != nil { + return nil, fmt.Errorf("url unescape failed for '%v': %w", record.S3.Object.Key, err) + } + record.S3.Object.Key = key + + out = append(out, record) + } + + return out, nil +} + +func (_ *sqsS3EventProcessor) isObjectCreatedEvents(event s3EventV2) bool { + return event.EventSource == "aws:s3" && strings.HasPrefix(event.EventName, "ObjectCreated:") +} + +func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) error { + s3Events, err := p.getS3Notifications(body) + if err != nil { + if errors.Is(err, context.Canceled) { + // Messages that are in-flight at shutdown should be returned to SQS. + return err + } + return &nonRetryableError{err} + } + log.Debugf("SQS message contained %d S3 event notifications.", len(s3Events)) + defer log.Debug("End processing SQS S3 event notifications.") + + // Wait for all events to be ACKed before proceeding. + acker := newEventACKTracker(ctx) + defer acker.Wait() + + var errs []error + for _, event := range s3Events { + s3Processor := p.s3ObjectHandler.Create(ctx, log, acker, event) + if s3Processor == nil { + continue + } + + // Process S3 object (download, parse, create events). + if err := s3Processor.ProcessS3Object(); err != nil { + errs = append(errs, errors.Wrapf(err, + "failed processing S3 event for object key %q in bucket %q", + event.S3.Object.Key, event.S3.Bucket.Name)) + } + } + + return multierr.Combine(errs...) +} diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go new file mode 100644 index 00000000000..8865c5d30cd --- /dev/null +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -0,0 +1,195 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/timed" +) + +func TestSQSS3EventProcessor(t *testing.T) { + logp.TestingSetup() + + msg := newSQSMessage(newS3Event("log.json")) + + t.Run("s3 events are processed and sqs msg is deleted", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + + gomock.InOrder( + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil), + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), + ) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + require.NoError(t, p.ProcessSQS(ctx, &msg)) + }) + + t.Run("invalid SQS JSON body does not retry", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + + invalidBodyMsg := newSQSMessage(newS3Event("log.json")) + body := *invalidBodyMsg.Body + body = body[10:] + invalidBodyMsg.Body = &body + + gomock.InOrder( + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil), + ) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + err := p.ProcessSQS(ctx, &invalidBodyMsg) + require.Error(t, err) + t.Log(err) + }) + + t.Run("zero S3 events in body", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + + emptyRecordsMsg := newSQSMessage() + + gomock.InOrder( + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), + ) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + require.NoError(t, p.ProcessSQS(ctx, &emptyRecordsMsg)) + }) + + t.Run("visibility is extended after half expires", func(t *testing.T) { + const visibilityTimeout = time.Second + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + mockS3Handler := NewMockS3ObjectHandler(ctrl) + + mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)).AnyTimes().Return(nil) + + gomock.InOrder( + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, _ *logp.Logger, _ *eventACKTracker, _ s3EventV2) { + timed.Wait(ctx, 5*visibilityTimeout) + }).Return(mockS3Handler), + mockS3Handler.EXPECT().ProcessS3Object().Return(nil), + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), + ) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, visibilityTimeout, 5, mockS3HandlerFactory) + require.NoError(t, p.ProcessSQS(ctx, &msg)) + }) + + t.Run("message returns to queue on error", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + mockS3Handler := NewMockS3ObjectHandler(ctrl) + + gomock.InOrder( + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(mockS3Handler), + mockS3Handler.EXPECT().ProcessS3Object().Return(errors.New("fake connectivity problem")), + ) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + err := p.ProcessSQS(ctx, &msg) + t.Log(err) + require.Error(t, err) + }) + + t.Run("message is deleted after multiple receives", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + mockS3Handler := NewMockS3ObjectHandler(ctrl) + + msg := msg + msg.Attributes = map[string]string{ + sqsApproximateReceiveCountAttribute: "10", + } + + gomock.InOrder( + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(mockS3Handler), + mockS3Handler.EXPECT().ProcessS3Object().Return(errors.New("fake connectivity problem")), + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), + ) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + err := p.ProcessSQS(ctx, &msg) + t.Log(err) + require.Error(t, err) + }) +} + +func TestSqsProcessor_getS3Notifications(t *testing.T) { + logp.TestingSetup() + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, time.Minute, 5, nil) + + t.Run("s3 key is url unescaped", func(t *testing.T) { + msg := newSQSMessage(newS3Event("Happy+Face.jpg")) + + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, "Happy Face.jpg", events[0].S3.Object.Key) + }) + + t.Run("non-ObjectCreated event types are ignored", func(t *testing.T) { + event := newS3Event("HappyFace.jpg") + event.EventName = "ObjectRemoved:Delete" + msg := newSQSMessage(event) + + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 0) + }) +} + +func TestNonRecoverableError(t *testing.T) { + e := nonRetryableErrorWrap(fmt.Errorf("failed")) + assert.True(t, errors.Is(e, &nonRetryableError{})) + + var e2 *nonRetryableError + assert.True(t, errors.As(e, &e2)) +} diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go new file mode 100644 index 00000000000..036ab3c13dc --- /dev/null +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -0,0 +1,139 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/gofrs/uuid" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +const testTimeout = 10 * time.Second + +var errFakeConnectivityFailure = errors.New("fake connectivity failure") + +func TestSQSReceiver(t *testing.T) { + logp.TestingSetup() + const maxMessages = 5 + + t.Run("ReceiveMessage success", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockMsgHandler := NewMockSQSProcessor(ctrl) + msg := newSQSMessage(newS3Event("log.json")) + + gomock.InOrder( + // Initial ReceiveMessage for maxMessages. + mockAPI.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Eq(maxMessages)). + Times(1). + DoAndReturn(func(_ context.Context, _ int) ([]sqs.Message, error) { + // Return single message. + return []sqs.Message{msg}, nil + }), + + // Follow up ReceiveMessages for either maxMessages-1 or maxMessages + // depending on how long processing of previous message takes. + mockAPI.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Any()). + Times(1). + DoAndReturn(func(_ context.Context, _ int) ([]sqs.Message, error) { + // Stop the test. + cancel() + return nil, nil + }), + ) + + // Expect the one message returned to have been processed. + mockMsgHandler.EXPECT(). + ProcessSQS(gomock.Any(), gomock.Eq(&msg)). + Times(1). + Return(nil) + + // Execute sqsReader and verify calls/state. + receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) + require.NoError(t, receiver.Receive(ctx)) + assert.Equal(t, maxMessages, receiver.workerSem.available) + }) + + t.Run("retry after ReceiveMessage error", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), sqsRetryDelay+testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockMsgHandler := NewMockSQSProcessor(ctrl) + + gomock.InOrder( + // Initial ReceiveMessage gets an error. + mockAPI.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Eq(maxMessages)). + Times(1). + DoAndReturn(func(_ context.Context, _ int) ([]sqs.Message, error) { + return nil, errFakeConnectivityFailure + }), + // After waiting for sqsRetryDelay, it retries. + mockAPI.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Eq(maxMessages)). + Times(1). + DoAndReturn(func(_ context.Context, _ int) ([]sqs.Message, error) { + cancel() + return nil, nil + }), + ) + + // Execute SQSReceiver and verify calls/state. + receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) + require.NoError(t, receiver.Receive(ctx)) + assert.Equal(t, maxMessages, receiver.workerSem.available) + }) +} + +func newSQSMessage(events ...s3EventV2) sqs.Message { + body, err := json.Marshal(s3EventsV2{Records: events}) + if err != nil { + panic(err) + } + + hash := sha256.Sum256(body) + id, _ := uuid.FromBytes(hash[:16]) + messageID := id.String() + receipt := "receipt-" + messageID + bodyStr := string(body) + + return sqs.Message{ + Body: &bodyStr, + MessageId: &messageID, + ReceiptHandle: &receipt, + } +} + +func newS3Event(key string) s3EventV2 { + record := s3EventV2{ + AWSRegion: "us-east-1", + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + } + record.S3.Bucket.Name = "foo" + record.S3.Bucket.ARN = "arn:aws:s3:::foo" + record.S3.Object.Key = key + return record +} diff --git a/x-pack/filebeat/input/awss3/testdata/aws-cloudtrail.json.gz b/x-pack/filebeat/input/awss3/testdata/aws-cloudtrail.json.gz new file mode 100644 index 00000000000..23ea3bd7acb Binary files /dev/null and b/x-pack/filebeat/input/awss3/testdata/aws-cloudtrail.json.gz differ diff --git a/x-pack/filebeat/input/awss3/testdata/events-array.json b/x-pack/filebeat/input/awss3/testdata/events-array.json new file mode 100644 index 00000000000..34a3b999707 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/events-array.json @@ -0,0 +1,12 @@ +{ + "Events": [ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hello" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "world" + } + ] +} diff --git a/x-pack/filebeat/input/awss3/testdata/invalid.json b/x-pack/filebeat/input/awss3/testdata/invalid.json new file mode 100644 index 00000000000..41bf8e5d10c --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/invalid.json @@ -0,0 +1 @@ +{"bad": json}{"good":"json"} diff --git a/x-pack/filebeat/input/awss3/testdata/log.json b/x-pack/filebeat/input/awss3/testdata/log.json new file mode 100644 index 00000000000..f6aaf5ec64d --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/log.json @@ -0,0 +1 @@ +{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} \ No newline at end of file diff --git a/x-pack/filebeat/input/awss3/testdata/log.ndjson b/x-pack/filebeat/input/awss3/testdata/log.ndjson new file mode 100644 index 00000000000..d1530555c60 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/log.ndjson @@ -0,0 +1,2 @@ +{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"} +{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} diff --git a/x-pack/filebeat/input/awss3/testdata/sample1.txt b/x-pack/filebeat/input/awss3/testdata/log.txt similarity index 100% rename from x-pack/filebeat/input/awss3/testdata/sample1.txt rename to x-pack/filebeat/input/awss3/testdata/log.txt diff --git a/x-pack/filebeat/input/awss3/testdata/multiline.json b/x-pack/filebeat/input/awss3/testdata/multiline.json new file mode 100644 index 00000000000..c2d1fa1bb65 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/multiline.json @@ -0,0 +1,10 @@ +{ + "@timestamp": "2021-05-25T17:25:42.806Z", + "log.level": "error", + "message": "error making http request" +} +{ + "@timestamp": "2021-05-25T17:25:51.391Z", + "log.level": "info", + "message": "available disk space 44.3gb" +} diff --git a/x-pack/filebeat/input/awss3/testdata/multiline.json.gz b/x-pack/filebeat/input/awss3/testdata/multiline.json.gz new file mode 100644 index 00000000000..17c09620e38 Binary files /dev/null and b/x-pack/filebeat/input/awss3/testdata/multiline.json.gz differ diff --git a/x-pack/filebeat/input/awss3/testdata/sample2.txt b/x-pack/filebeat/input/awss3/testdata/multiline.txt similarity index 100% rename from x-pack/filebeat/input/awss3/testdata/sample2.txt rename to x-pack/filebeat/input/awss3/testdata/multiline.txt