diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 485f4d4152fb..e640b5a0334d 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -58,6 +58,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Removed the `common.Float` type. {issue}28279[28279] {pull}28280[28280] {pull}28376[28376] - Removed Beat generators. {pull}28816[28816] - libbeat.logp package forces ECS compliant logs. Logs are JSON formatted. Options to enable ECS/JSON have been removed. {issue}15544[15544] {pull}28573[28573] +- Removed deprecated disk spool from Beats. Use disk queue instead. {pull}28869[28869] ==== Bugfixes diff --git a/NOTICE.txt b/NOTICE.txt index d5599add95ee..e4d0e19c26ca 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -7641,217 +7641,6 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.7 limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/elastic/go-txfile -Version: v0.0.8 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-txfile@v0.0.8/LICENSE: - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-ucfg Version: v0.8.3 @@ -33854,639 +33643,6 @@ Contents of probable licence file $GOMODCACHE/github.com/urso/diag@v0.0.0-202002 limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/urso/go-bin -Version: v0.0.0-20180220135811-781c575c9f0e -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/urso/go-bin@v0.0.0-20180220135811-781c575c9f0e/LICENSE: - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - --------------------------------------------------------------------------------- -Dependency : github.com/urso/magetools -Version: v0.0.0-20200125210132-c2e338f92f3a -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/urso/magetools@v0.0.0-20200125210132-c2e338f92f3a/LICENSE: - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - --------------------------------------------------------------------------------- -Dependency : github.com/urso/qcgen -Version: v0.0.0-20180131103024-0b059e7db4f4 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/urso/qcgen@v0.0.0-20180131103024-0b059e7db4f4/LICENSE: - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/xdg/stringprep Version: v1.0.3 diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 1068dbd80829..24bfac144149 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -197,66 +197,6 @@ auditbeat.modules: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 0ae039ad589a..45d679cc1944 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1111,66 +1111,6 @@ filebeat.inputs: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/go.mod b/go.mod index a06dd9e5d490..c47fcd15bc8b 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,6 @@ require ( github.com/elastic/go-seccomp-bpf v1.2.0 github.com/elastic/go-structform v0.0.9 github.com/elastic/go-sysinfo v1.7.1 - github.com/elastic/go-txfile v0.0.8 github.com/elastic/go-ucfg v0.8.3 github.com/elastic/go-windows v1.0.1 github.com/elastic/gosigar v0.14.2 @@ -151,7 +150,6 @@ require ( github.com/tsg/go-daemon v0.0.0-20200207173439-e704b93fd89b github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786 github.com/ugorji/go/codec v1.1.8 - github.com/urso/magetools v0.0.0-20200125210132-c2e338f92f3a // indirect github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71 github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 github.com/xdg/scram v1.0.3 @@ -268,7 +266,6 @@ require ( github.com/sirupsen/logrus v1.8.1 // indirect github.com/stretchr/objx v0.2.0 // indirect github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797 // indirect - github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e // indirect github.com/xdg/stringprep v1.0.3 // indirect go.elastic.co/fastjson v1.1.0 // indirect go.opencensus.io v0.23.0 // indirect diff --git a/go.sum b/go.sum index 4ae1569e4dac..3ba3d94b54e4 100644 --- a/go.sum +++ b/go.sum @@ -529,8 +529,6 @@ github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZN github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= -github.com/elastic/go-txfile v0.0.8 h1:hqFMmLM+UCDMJeSyuCWe8YwS+HtoX7F+cz5fhPYRTn4= -github.com/elastic/go-txfile v0.0.8/go.mod h1:H0nCoFae0a4ga57apgxFsgmRjevNCsEaT6g56JoeKAE= github.com/elastic/go-ucfg v0.7.0/go.mod h1:iaiY0NBIYeasNgycLyTvhJftQlQEUO2hpF+FX0JKxzo= github.com/elastic/go-ucfg v0.8.3 h1:leywnFjzr2QneZZWhE6uWd+QN/UpP0sdJRHYyuFvkeo= github.com/elastic/go-ucfg v0.8.3/go.mod h1:iaiY0NBIYeasNgycLyTvhJftQlQEUO2hpF+FX0JKxzo= @@ -745,7 +743,6 @@ github.com/godbus/dbus/v5 v5.0.5 h1:9Eg0XUhQxtkV8ykTMKtMMYY72g4NgxtRq4jgh4Ih5YM= github.com/godbus/dbus/v5 v5.0.5/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godror/godror v0.10.4 h1:44FcfzDPp/PJZzen5Hm59SZQBhgrbR6E1KwCjg6gnJo= github.com/godror/godror v0.10.4/go.mod h1:9MVLtu25FBJBMHkPs0m3Ngf/VmwGcLpM2HS8PlNGw9U= -github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -1520,13 +1517,6 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797 h1:OHNw/6pXODJAB32NujjdQO/KIYQ3KAbHQfCzH81XdCs= github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797/go.mod h1:pNWFTeQ+V1OYT/TzWpnWb6eQBdoXpdx+H+lrH97/Oyo= -github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e h1:NiofbjIUI5gR+ybDsGSVH1fWyjSeDYiYVJHT1+kcsak= -github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e/go.mod h1:6GfHrdWBQYjFRIznu7XuQH4lYB2w8nO4bnImVKkzPOM= -github.com/urso/magetools v0.0.0-20190919040553-290c89e0c230/go.mod h1:DFxTNgS/ExCGmmjVjSOgS2WjtfjKXgCyDzAFgbtovSA= -github.com/urso/magetools v0.0.0-20200125210132-c2e338f92f3a h1:jWAaRFnay3H2e6S0GGCl5nKrkgQNlarCE/kvcutzBmw= -github.com/urso/magetools v0.0.0-20200125210132-c2e338f92f3a/go.mod h1:DbaJnRzkGaWrMWm5Hz6QVnUj//x9/zjrfx8bF3J+GJY= -github.com/urso/qcgen v0.0.0-20180131103024-0b059e7db4f4 h1:hhA8EBThzz9PztawVTycKvfETVuBqxAQ5keFlAVtbAw= -github.com/urso/qcgen v0.0.0-20180131103024-0b059e7db4f4/go.mod h1:RspW+E2Yb7Fs7HclB2tiDaiu6Rp41BiIG4Wo1YaoXGc= github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71 h1:CehQeKbysHV8J2V7AD0w8NL2x1h04kmmo/Ft5su4lU0= github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71/go.mod h1:Wp40HwmjM59FkDIVFfcCb9LzBbnc0XAMp8++hJuWvSU= github.com/vbatts/tar-split v0.11.1/go.mod h1:LEuURwDEiWjRjwu46yU3KVGuUdVv/dcnpcEPSzR8z6g= @@ -1867,7 +1857,6 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200102141924-c96a22e43c9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200107162124-548cf772de50/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 6bac78d08c20..8f0f019626bd 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -343,66 +343,6 @@ heartbeat.jobs: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 2170f2fcec4e..19692cfec94b 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -140,66 +140,6 @@ setup.template.settings: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/libbeat/_meta/config/general.reference.yml.tmpl b/libbeat/_meta/config/general.reference.yml.tmpl index 58a39af4b34f..27118c979c9e 100644 --- a/libbeat/_meta/config/general.reference.yml.tmpl +++ b/libbeat/_meta/config/general.reference.yml.tmpl @@ -77,66 +77,6 @@ # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index 054379e1b20a..fb930831dac3 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -196,166 +196,3 @@ too many errors or overloading the host system if the target disk becomes unavailable for an extended time. The default value is `30s` (thirty seconds). - - -[float] -[[configuration-internal-queue-spool]] -=== Configure the file spool queue - -beta[] - -NOTE: The file spool queue is a deprecated feature offered as-is for backwards compatibility. The supported way to queue events in persistent storage is the disk queue. - -The file spool queue stores all events in an on disk ring buffer. The spool -has a write buffer, which new events are written to. Events written to the -spool are forwarded to the outputs, only after the write buffer has been -flushed successfully. - -The spool waits for the output to acknowledge or drop events. If the spool is -full, no new events can be inserted. The spool will block. Space is freed only -after a signal from the output has been received. - -On disk, the spool divides a file into pages. The `file.page_size` setting -configures the file's page size at file creation time. The optimal page size depends -on the effective block size, used by the underlying file system. - -This sample configuration enables the spool with all default settings (See -<> for defaults) and the -default file path: - -[source,yaml] ------------------------------------------------------------------------------- -queue.spool: ~ ------------------------------------------------------------------------------- - -This sample configuration creates a spool of 512MiB, with 16KiB pages. The -write buffer is flushed if 10MiB of contents, or 1024 events have been -written. If the oldest available event has been waiting for 5s in the write -buffer, the buffer will be flushed as well: - -[source,yaml] ------------------------------------------------------------------------------- -queue.spool: - file: - path: "${path.data}/spool.dat" - size: 512MiB - page_size: 16KiB - write: - buffer_size: 10MiB - flush.timeout: 5s - flush.events: 1024 ------------------------------------------------------------------------------- - -[float] -[[configuration-internal-queue-spool-reference]] -==== Configuration options - -You can specify the following options in the `queue.spool` section of the -+{beatname_lc}.yml+ config file: - -[float] -===== `file.path` - -The spool file path. The file is created on startup, if it does not exist. - -The default value is "${path.data}/spool.dat". - -[float] -===== `file.permissions` - -The file permissions. The permissions are applied when the file is -created. In case the file already exists, the file permissions are compared -with `file.permissions`. The spool file is not opened if the actual file -permissions are more permissive then configured. - -The default value is 0600. - - -[float] -===== `file.size` - -Spool file size. - -The default value is 100 MiB. - -NOTE: The size should be much larger then the expected event sizes -and write buffer size. Otherwise the queue will block, because it has not -enough space. - -NOTE: The file size cannot be changed once the file has been generated. This -limitation will be removed in the future. - -[float] -===== `file.page_size` - -The file's page size. - -The spool file is split into pages of `page_size`. All I/O -operations operate on complete pages. - -The default value is 4096 (4KiB). - -NOTE: This setting should match the file system's minimum block size. If the -`page_size` is not a multiple of the file system's block size, the file system -might create additional read operations on writes. - -NOTE: The page size is only set at file creation time. It cannot be changed -afterwards. - -[float] -===== `file.prealloc` - -If `prealloc` is set to `true`, truncate is used to reserve the space up to -`file.size`. This setting is only used when the file is created. - -The file will dynamically grow, if `prealloc` is set to false. The spool -blocks, if `prealloc` is `false` and the system is out of disk space. - -The default value is `true`. - -[float] -===== `write.buffer_size` - -The write buffer size. The write buffer is flushed, once the buffer size is exceeded. - -Very big events are allowed to be bigger then the configured buffer size. But -the write buffer will be flushed right after the event has been serialized. - -The default value is 1MiB. - -[float] -===== `write.codec` - -The event encoding used for serialized events. Valid values are `json` and `cbor`. - -The default value is `cbor`. - -[float] -===== `write.flush.timeout` - -Maximum wait time of the oldest event in the write buffer. If set to 0, the -write buffer will only be flushed once `write.flush.events` or `write.buffer_size` is fulfilled. - -The default value is 1s. - -[float] -===== `write.flush.events` - -Number of buffered events. The write buffer is flushed once the limit is reached. - -The default value is 16384. - -[float] -===== `read.flush.timeout` - -The spool reader tries to read up to the output's `bulk_max_size` events at once. - -If `read.flush.timeout` is set to 0s, all available events are forwarded -immediately to the output. - -If `read.flush.timeout` is set to a value bigger then 0s, the spool will wait -for more events to be flushed. Events are forwarded to the output if -`bulk_max_size` events have been read or the oldest read event has been waiting -for the configured duration. - -The default value is 0s. diff --git a/libbeat/publisher/includes/includes.go b/libbeat/publisher/includes/includes.go index a14dd16d3ba9..befc0e93d43e 100644 --- a/libbeat/publisher/includes/includes.go +++ b/libbeat/publisher/includes/includes.go @@ -29,5 +29,4 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/redis" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - _ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool" ) diff --git a/libbeat/publisher/pipeline/stress/configs/pipeline/small_spool.yml b/libbeat/publisher/pipeline/stress/configs/pipeline/small_spool.yml deleted file mode 100644 index d5f999440e30..000000000000 --- a/libbeat/publisher/pipeline/stress/configs/pipeline/small_spool.yml +++ /dev/null @@ -1,11 +0,0 @@ -pipeline.queue.spool: - file: - path: ${test.tmpdir}/${test.name}-spool.dat - size: 1MiB - page_size: 4KiB - prealloc: true - write: - buffer_size: 16KiB - flush_timeout: 100ms - read: - flush_timeout: 0 diff --git a/libbeat/publisher/pipeline/stress/stress_test.go b/libbeat/publisher/pipeline/stress/stress_test.go index b12af68681f8..c0afcc3a0218 100644 --- a/libbeat/publisher/pipeline/stress/stress_test.go +++ b/libbeat/publisher/pipeline/stress/stress_test.go @@ -34,7 +34,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/publisher/pipeline/stress" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - _ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool" ) // additional flags diff --git a/libbeat/publisher/queue/spool/codec.go b/libbeat/publisher/queue/spool/codec.go deleted file mode 100644 index 69f693a4817e..000000000000 --- a/libbeat/publisher/queue/spool/codec.go +++ /dev/null @@ -1,203 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "bytes" - "fmt" - "time" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/outputs/codec" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/go-structform" - "github.com/elastic/go-structform/cborl" - "github.com/elastic/go-structform/gotype" - "github.com/elastic/go-structform/json" - "github.com/elastic/go-structform/ubjson" -) - -type encoder struct { - buf bytes.Buffer - folder *gotype.Iterator - codec codecID -} - -type decoder struct { - buf []byte - - json *json.Parser - cborl *cborl.Parser - ubjson *ubjson.Parser - unfolder *gotype.Unfolder -} - -type codecID uint8 - -type entry struct { - Timestamp int64 - Flags uint8 - Meta common.MapStr - Fields common.MapStr -} - -const ( - // Note: Never change order. Codec IDs must be not change in the future. Only - // adding new IDs is allowed. - codecUnknown codecID = iota - codecJSON - codecUBJSON - codecCBORL - - flagGuaranteed uint8 = 1 << 0 -) - -func newEncoder(codec codecID) (*encoder, error) { - switch codec { - case codecJSON, codecCBORL, codecUBJSON: - break - default: - return nil, fmt.Errorf("unknown codec type '%v'", codec) - } - - e := &encoder{codec: codec} - e.reset() - return e, nil -} - -func (e *encoder) reset() { - e.folder = nil - - var visitor structform.Visitor - switch e.codec { - case codecJSON: - visitor = json.NewVisitor(&e.buf) - case codecCBORL: - visitor = cborl.NewVisitor(&e.buf) - case codecUBJSON: - visitor = ubjson.NewVisitor(&e.buf) - default: - panic("no codec configured") - } - - folder, err := gotype.NewIterator(visitor, - gotype.Folders( - codec.MakeTimestampEncoder(), - codec.MakeBCTimestampEncoder(), - ), - ) - if err != nil { - panic(err) - } - - e.folder = folder -} - -func (e *encoder) encode(event *publisher.Event) ([]byte, error) { - e.buf.Reset() - e.buf.WriteByte(byte(e.codec)) - - var flags uint8 - if (event.Flags & publisher.GuaranteedSend) == publisher.GuaranteedSend { - flags = flagGuaranteed - } - - err := e.folder.Fold(entry{ - Timestamp: event.Content.Timestamp.UTC().UnixNano(), - Flags: flags, - Meta: event.Content.Meta, - Fields: event.Content.Fields, - }) - if err != nil { - e.reset() - return nil, err - } - - return e.buf.Bytes(), nil -} - -func newDecoder() *decoder { - d := &decoder{} - d.reset() - return d -} - -func (d *decoder) reset() { - unfolder, err := gotype.NewUnfolder(nil) - if err != nil { - panic(err) // can not happen - } - - d.unfolder = unfolder - d.json = json.NewParser(unfolder) - d.cborl = cborl.NewParser(unfolder) - d.ubjson = ubjson.NewParser(unfolder) -} - -// Buffer prepares the read buffer to hold the next event of n bytes. -func (d *decoder) Buffer(n int) []byte { - if cap(d.buf) > n { - d.buf = d.buf[:n] - } else { - d.buf = make([]byte, n) - } - return d.buf -} - -func (d *decoder) Decode() (publisher.Event, error) { - var ( - to entry - err error - codec = codecID(d.buf[0]) - contents = d.buf[1:] - ) - - d.unfolder.SetTarget(&to) - defer d.unfolder.Reset() - - switch codec { - case codecJSON: - err = d.json.Parse(contents) - case codecUBJSON: - err = d.ubjson.Parse(contents) - case codecCBORL: - err = d.cborl.Parse(contents) - default: - return publisher.Event{}, fmt.Errorf("unknown codec type '%v'", codec) - } - - if err != nil { - d.reset() // reset parser just in case - return publisher.Event{}, err - } - - var flags publisher.EventFlags - if (to.Flags & flagGuaranteed) != 0 { - flags |= publisher.GuaranteedSend - } - - return publisher.Event{ - Flags: flags, - Content: beat.Event{ - Timestamp: time.Unix(0, to.Timestamp), - Fields: to.Fields, - Meta: to.Meta, - }, - }, nil -} diff --git a/libbeat/publisher/queue/spool/codec_test.go b/libbeat/publisher/queue/spool/codec_test.go deleted file mode 100644 index 6460985f6c30..000000000000 --- a/libbeat/publisher/queue/spool/codec_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/publisher" -) - -func TestEncodeDecode(t *testing.T) { - tests := map[string]codecID{ - "json": codecJSON, - "ubjson": codecUBJSON, - "cborl": codecCBORL, - } - - fieldTimeStr := "2020-01-14T20:33:23.779Z" - fieldTime, _ := time.Parse(time.RFC3339Nano, fieldTimeStr) - event := publisher.Event{ - Content: beat.Event{ - Timestamp: time.Now().Round(0), - Fields: common.MapStr{ - "time": fieldTime, - "commontime": common.Time(fieldTime), - }, - }, - } - expected := publisher.Event{ - Content: beat.Event{ - Timestamp: event.Content.Timestamp, - Fields: common.MapStr{ - "time": fieldTime.Format(time.RFC3339Nano), - "commontime": common.Time(fieldTime).String(), - }, - }, - } - - for name, codec := range tests { - t.Run(name, func(t *testing.T) { - encoder, err := newEncoder(codec) - assert.NoError(t, err) - - encoded, err := encoder.encode(&event) - assert.NoError(t, err) - - decoder := newDecoder() - decoder.buf = encoded - - observed, err := decoder.Decode() - assert.NoError(t, err) - - assert.Equal(t, expected, observed) - }) - } -} diff --git a/libbeat/publisher/queue/spool/config.go b/libbeat/publisher/queue/spool/config.go deleted file mode 100644 index 1d9d9a3299d4..000000000000 --- a/libbeat/publisher/queue/spool/config.go +++ /dev/null @@ -1,129 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "errors" - "fmt" - "os" - "strings" - "time" - - "github.com/dustin/go-humanize" - "github.com/joeshaw/multierror" - - "github.com/elastic/beats/v7/libbeat/common/cfgtype" -) - -type config struct { - File pathConfig `config:"file"` - Write writeConfig `config:"write"` - Read readConfig `config:"read"` -} - -type pathConfig struct { - Path string `config:"path"` - Permissions os.FileMode `config:"permissions"` - MaxSize cfgtype.ByteSize `config:"size"` - PageSize cfgtype.ByteSize `config:"page_size"` - Prealloc bool `config:"prealloc"` -} - -type writeConfig struct { - BufferSize cfgtype.ByteSize `config:"buffer_size"` - FlushEvents int `config:"flush.events"` - FlushTimeout time.Duration `config:"flush.timeout"` - Codec codecID `config:"codec"` -} - -type readConfig struct { - FlushTimeout time.Duration `config:"flush.timeout"` -} - -func defaultConfig() config { - return config{ - File: pathConfig{ - Path: "", - Permissions: 0600, - MaxSize: 100 * humanize.MiByte, - PageSize: 4 * humanize.KiByte, - Prealloc: true, - }, - Write: writeConfig{ - BufferSize: 1 * humanize.MiByte, - FlushTimeout: 1 * time.Second, - FlushEvents: 16 * 1024, - Codec: codecCBORL, - }, - Read: readConfig{ - FlushTimeout: 0, - }, - } -} - -func (c *pathConfig) Validate() error { - var errs multierror.Errors - - if c.MaxSize < humanize.MiByte { - errs = append(errs, errors.New("max size must be larger 1MiB")) - } - - if !c.Permissions.IsRegular() { - errs = append(errs, fmt.Errorf("permissions %v are not regular file permissions", c.Permissions.String())) - } else { - m := c.Permissions.Perm() - if (m & 0400) == 0 { - errs = append(errs, errors.New("file must be readable by current user")) - } - if (m & 0200) == 0 { - errs = append(errs, errors.New("file must be writable by current user")) - } - } - - // TODO: good 'limit' on pageSize? - - if c.PageSize >= c.MaxSize { - errs = append(errs, fmt.Errorf("page_size (%v) must be less then size (%v)", c.PageSize, c.MaxSize)) - } - - return errs.Err() -} - -func (c *writeConfig) Validate() error { - return nil -} - -func (c *readConfig) Validate() error { - return nil -} - -func (c *codecID) Unpack(value string) error { - ids := map[string]codecID{ - "json": codecJSON, - "ubjson": codecUBJSON, - "cbor": codecCBORL, - } - - id, exists := ids[strings.ToLower(value)] - if !exists { - return fmt.Errorf("codec '%v' not available", value) - } - - *c = id - return nil -} diff --git a/libbeat/publisher/queue/spool/consume.go b/libbeat/publisher/queue/spool/consume.go deleted file mode 100644 index 74f3058f739f..000000000000 --- a/libbeat/publisher/queue/spool/consume.go +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "errors" - "io" - - "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" -) - -type consumer struct { - ctx *spoolCtx - closed atomic.Bool - done chan struct{} - - resp chan getResponse - requ chan getRequest -} - -type batch struct { - events []publisher.Event - state ackState - ack chan batchAckMsg -} - -type ackState uint8 - -const ( - batchActive ackState = iota - batchACK -) - -func newConsumer(ctx *spoolCtx, requ chan getRequest) *consumer { - return &consumer{ - ctx: ctx, - closed: atomic.MakeBool(false), - done: make(chan struct{}), - - // internal API - resp: make(chan getResponse), - requ: requ, - } -} - -func (c *consumer) Close() error { - if c.closed.Swap(true) { - return errors.New("already closed") - } - - close(c.done) - return nil -} - -func (c *consumer) Closed() bool { - return c.closed.Load() || c.ctx.Closed() -} - -func (c *consumer) Get(sz int) (queue.Batch, error) { - log := c.ctx.logger - - if c.Closed() { - return nil, io.EOF - } - - var resp getResponse - for { - select { - case <-c.ctx.Done(): - return nil, io.EOF - - case <-c.done: - return nil, io.EOF - - case c.requ <- getRequest{sz: sz, resp: c.resp}: - } - - resp = <-c.resp - err := resp.err - if err == nil { - break - } - - if err != errRetry { - log.Debug("consumer: error response:", err) - return nil, err - } - } - - log.Debug("consumer: received batch:", len(resp.buf)) - return &batch{ - events: resp.buf, - state: batchActive, - ack: resp.ack, - }, nil -} - -func (b *batch) Events() []publisher.Event { - if b.state != batchActive { - panic("Get Events from inactive batch") - } - return b.events -} - -func (b *batch) ACK() { - if b.state != batchActive { - switch b.state { - case batchACK: - panic("Can not acknowledge already acknowledged batch") - default: - panic("inactive batch") - } - } - - b.report() -} - -func (b *batch) report() { - if b.ack != nil { - b.ack <- batchAckMsg{} - } -} diff --git a/libbeat/publisher/queue/spool/inbroker.go b/libbeat/publisher/queue/spool/inbroker.go deleted file mode 100644 index b165f2a152b9..000000000000 --- a/libbeat/publisher/queue/spool/inbroker.go +++ /dev/null @@ -1,550 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "fmt" - "math" - "time" - - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/go-txfile/pq" -) - -type inBroker struct { - ctx *spoolCtx - ackListener queue.ACKListener - - // active state handler - state func(*inBroker) bool - - // api channels - events chan pushRequest - pubCancel chan producerCancelRequest - - // queue signaling - sigACK chan struct{} - sigFlush chan uint - ackDone chan struct{} - - // queue state - queue *pq.Queue - writer *pq.Writer - clientStates clientStates - - // Event contents, that still needs to be send to the queue. An event is - // pending if it has been serialized, but not added to the write buffer in - // full, as some I/O operation on the write buffer failed. - // => - // - keep pointer to yet unwritten event contents - // - do not accept any events if pending is not nil - // - wait for signal from reader/queue-gc to retry writing the pending - // events contents - pending []byte - - bufferedEvents uint // number of buffered events - - // flush settings - timer *timer - flushEvents uint - - enc *encoder -} - -const ( - inSigChannelSize = 3 - inEventChannelSize = 20 -) - -func newInBroker( - ctx *spoolCtx, - ackListener queue.ACKListener, - qu *pq.Queue, - codec codecID, - flushTimeout time.Duration, - flushEvents uint, -) (*inBroker, error) { - enc, err := newEncoder(codec) - if err != nil { - return nil, err - } - - writer, err := qu.Writer() - if err != nil { - return nil, err - } - - b := &inBroker{ - ctx: ctx, - ackListener: ackListener, - state: (*inBroker).stateEmpty, - - // API - events: make(chan pushRequest, inEventChannelSize), - pubCancel: make(chan producerCancelRequest), - sigACK: make(chan struct{}, inSigChannelSize), - sigFlush: make(chan uint, inSigChannelSize), - ackDone: make(chan struct{}), - - // queue state - queue: qu, - writer: writer, - clientStates: clientStates{}, - pending: nil, - bufferedEvents: 0, - - // internal - timer: newTimer(flushTimeout), - flushEvents: flushEvents, - enc: enc, - } - - ctx.Go(b.eventLoop) - ctx.Go(b.ackLoop) - return b, nil -} - -func (b *inBroker) Producer(cfg queue.ProducerConfig) queue.Producer { - return newProducer(b.ctx, b.pubCancel, b.events, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel) -} - -// onFlush is run whenever the queue flushes it's write buffer. The callback is -// run in the same go-routine as the Flush was executed from. -// Only the (*inBroker).eventLoop triggers a flush. -func (b *inBroker) onFlush(n uint) { - log := b.ctx.logger - log.Debug("inbroker: onFlush ", n) - - if n == 0 { - return - } - - if b.ackListener != nil { - b.ackListener.OnACK(int(n)) - } - b.ctx.logger.Debug("inbroker: flushed events:", n) - b.bufferedEvents -= n - b.sigFlush <- n -} - -// onACK is run whenever the queue releases ACKed events. The number of acked -// events and freed pages will is reported. -// Flush events are forward to the brokers eventloop, so to give the broker a -// chance to retry writing in case it has been blocked on a full queue. -func (b *inBroker) onACK(events, pages uint) { - if pages > 0 { - b.sigACK <- struct{}{} - } -} - -func (b *inBroker) ackLoop() { - log := b.ctx.logger - - log.Debug("start flush ack loop") - defer log.Debug("stop flush ack loop") - - for { - var n uint - select { - case <-b.ackDone: - return - - case n = <-b.sigFlush: - log.Debug("inbroker: receive flush", n) - states := b.clientStates.Pop(int(n)) - b.sendACKs(states) - } - } -} - -// sendACKs returns the range of ACKed/Flushed events to the individual -// producers ACK handlers. -func (b *inBroker) sendACKs(states []clientState) { - log := b.ctx.logger - - // reverse iteration on client states, so to report ranges of ACKed events - // only once. - N := len(states) - total := 0 - for i := N - 1; i != -1; i-- { - st := &states[i] - if st.state == nil { - continue - } - - count := (st.seq - st.state.lastACK) - if count == 0 || count > math.MaxUint32/2 { - // seq number comparison did underflow. This happens only if st.seq has - // already been acknowledged - // log.Debug("seq number already acked: ", st.seq) - - st.state = nil - continue - } - - log.Debugf("broker ACK events: count=%v, start-seq=%v, end-seq=%v\n", - count, - st.state.lastACK+1, - st.seq, - ) - - total += int(count) - if total > N { - panic(fmt.Sprintf("Too many events acked (expected=%v, total=%v)", - N, total, - )) - } - - // report range of ACKed events - st.state.ackCB(int(count)) - st.state.lastACK = st.seq - st.state = nil - } -} - -func (b *inBroker) eventLoop() { - log := b.ctx.logger - log.Info("spool input eventloop start") - defer log.Info("spool input eventloop stop") - - // notify ackLoop to stop only after eventLoop has finished (after last flush) - defer close(b.ackDone) - defer b.eventloopShutdown() - - for { - ok := b.state(b) - if !ok { - break - } - } -} - -func (b *inBroker) eventloopShutdown() { - // try to flush events/buffers on shutdown. - if b.bufferedEvents == 0 { - return - } - - // Try to flush pending events. - w := b.writer - for len(b.pending) > 0 { - n, err := w.Write(b.pending) - b.pending = b.pending[n:] - if err != nil { - return - } - } - w.Flush() -} - -// stateEmpty is the brokers active state if the write buffer is empty and the -// queue did not block on write or flush operations. -// ACKs from the output are ignored, as events can still be added to the write -// buffer. -// -// stateEmpty transitions: -// -> stateEmpty if serializing the event failed -// -> stateWithTimer if event is written to buffer without flush -// => start timer -// -> stateBlocked if queue did return an error on write (Flush failed) -func (b *inBroker) stateEmpty() bool { - log := b.ctx.logger - - select { - case <-b.ctx.Done(): - return false - - case req := <-b.events: - log.Debug("inbroker (stateEmpty): new event") - - buf, st, err := b.encodeEvent(&req) - if err != nil { - log.Debug(" inbroker (stateEmpty): encode failed") - b.respondDrop(&req) - break - } - - // write/flush failed -> block until space in file becomes available - err = b.addEvent(buf, st) - if err != nil { - log.Debug(" inbroker: append failed, blocking") - b.state = (*inBroker).stateBlocked - break - } - - // start flush timer - if b.flushEvents > 0 && b.bufferedEvents == b.flushEvents { - log.Debug(" inbroker (stateEmpty): flush events") - err := b.flushBuffer() - if err != nil { - log.Debug(" inbroker (stateEmpty): flush failed, blocking") - b.state = (*inBroker).stateBlocked - } - break - - } else if b.bufferedEvents > 0 { - log.Debug(" inbroker (stateEmpty): start flush timer") - b.timer.Start() - b.state = (*inBroker).stateWithTimer - } - - case req := <-b.pubCancel: - b.handleCancel(&req) - - case <-b.sigACK: - // ignore ACKs as long as we can write without blocking - } - - return true -} - -// stateWithTimer is the brokers active state, if the write buffer is not empty. -// The flush timer is enabled as long as the broker is in this state. -// ACKs from the output are ignored, as events can still be added to the write -// buffer. -// -// stateWithTimer transitions: -// -> stateWithTimer -// - if serializing failed -// - if event is added to buffer, without flush -// - flush, but more events are available in the buffer (might reset timer) -// -> stateEmpty if all events have been flushed -// -> stateBlocked if queue did return an error on write/flush (Flush failed) -func (b *inBroker) stateWithTimer() bool { - log := b.ctx.logger - - select { - case <-b.ctx.Done(): - return false - - case req := <-b.events: - log.Debug("inbroker (stateWithTimer): new event") - - buf, st, err := b.encodeEvent(&req) - if err != nil { - log.Debug(" inbroker (stateWithTimer): encode failed") - b.respondDrop(&req) - break - } - - count := b.bufferedEvents - err = b.addEvent(buf, st) - if err != nil { - log.Debug(" inbroker (stateWithTimer): append failed, blocking") - b.state = (*inBroker).stateBlocked - break - } - - flushed := b.bufferedEvents < count - if !flushed && b.flushEvents > 0 && b.bufferedEvents == b.flushEvents { - err := b.flushBuffer() - if err != nil { - log.Debug(" inbroker (stateWithTimer): flush failed, blocking") - b.state = (*inBroker).stateBlocked - break - } - - flushed = true - } - - if !flushed { - break - } - - // write buffer has been flushed, reset timer and broker state - log.Debug(" inbroker (stateWithTimer): buffer flushed") - if b.bufferedEvents == 0 { - b.timer.Stop(false) - b.state = (*inBroker).stateEmpty - } else { - // restart timer, as new event is most likely the only event buffered - // -> reduce IO - log.Debug(" inbroker (stateWithTimer): start flush timer") - b.timer.Restart() - } - - case req := <-b.pubCancel: - b.handleCancel(&req) - - case <-b.timer.C: - log.Debug("inbroker (stateWithTimer): flush timeout", b.bufferedEvents) - - b.timer.Stop(true) - - err := b.flushBuffer() - if err != nil { - log.Debug(" inbroker (stateWithTimer): flush failed, blocking") - b.state = (*inBroker).stateBlocked - break - } - - log.Debug(" inbroker (stateWithTimer): flush succeeded") - - if b.bufferedEvents > 0 { - // flush did not push all events? Restart timer. - log.Debug(" inbroker (stateWithTimer): start flush timer", b.bufferedEvents) - b.timer.Start() - break - } - - b.state = (*inBroker).stateEmpty - - case <-b.sigACK: - // ignore ACKs as long as we can write without blocking - } - - return true -} - -// stateBlocked is the brokers active state if the write buffer can not accept -// any new events. -// The broker will wait for an ACK signal from the outputs and retry flushing, -// in the hope of enough memory being available to flush the buffers. -// If flush did succeed, we try to add the pending event. -// For the time the broker is in this state, no events from any producers will -// be accepted. Thusly all producers will block. Closing a producer, unblocks -// the producer. The producers event (after close) might be processed or -// ignored in the future. -// -// stateBlocked transitions: -// -> stateEmpty if flush was successful and write buffer is empty -// -> stateWithTimer if flush was successful, but we still have some pending events -// -> stateBlocked if flush failed (still not enough space) -func (b *inBroker) stateBlocked() bool { - log := b.ctx.logger - - select { - case <-b.ctx.Done(): - return false - - case req := <-b.pubCancel: - b.handleCancel(&req) - - case <-b.sigACK: - // TODO: - // Have write buffer report number of unallocated pages and take number - // of freed pages into account before retrying. This way no transaction - // must be created if it's already clear the flush will not succeed. - - log.Debug("inbroker (stateBlocked): ACK event from queue -> try to unblock") - - err := b.flushBuffer() - if err != nil { - log.Debug(" inbroker (stateBlocked): flush failed, blocking") - break - } - - if len(b.pending) > 0 { - tmp := b.pending - b.pending = nil - err := b.writeEvent(tmp) - if err != nil || len(b.pending) > 0 { - log.Debugf("writing pending event failed: %+v", err) - break - } - } - - if b.bufferedEvents == 0 { - b.state = (*inBroker).stateEmpty - break - } - - b.timer.Start() - log.Debug(" inbroker (stateBlocked): start flush timer") - b.state = (*inBroker).stateWithTimer - } - - return true -} - -func (b *inBroker) handleCancel(req *producerCancelRequest) { - // mark state as cancelled, so to not accept any new events - // from the state object. - if st := req.state; st != nil { - st.cancelled = true - } - - if req.resp != nil { - req.resp <- producerCancelResponse{removed: 0} - } -} - -func (b *inBroker) encodeEvent(req *pushRequest) ([]byte, clientState, error) { - buf, err := b.enc.encode(&req.event) - if err != nil { - return nil, clientState{}, err - } - - if req.state == nil { - return buf, clientState{}, nil - } - - return buf, clientState{seq: req.seq, state: req.state}, nil -} - -func (b *inBroker) respondDrop(req *pushRequest) { - if req.state != nil { - if cb := req.state.dropCB; cb != nil { - cb(req.event.Content) - } - } -} - -func (b *inBroker) addEvent(buf []byte, st clientState) error { - log := b.ctx.logger - - b.bufferedEvents++ - log.Debug(" inbroker: add event of size", len(buf), b.bufferedEvents) - - count := b.clientStates.Add(st) - log.Debug(" add event -> active:", count) - - err := b.writeEvent(buf) - log.Debugf(" inbroker write -> events=%v, err=%+v ", b.bufferedEvents, err) - - return err -} - -func (b *inBroker) writeEvent(buf []byte) error { - log := b.ctx.logger - - // append event to queue - w := b.writer - n, err := w.Write(buf) - buf = buf[n:] - if len(buf) > 0 { - b.pending = buf - } else if err == nil { - log.Debug("writer: finalize event in buffer") - err = w.Next() - } - - if err != nil { - log.Debugf("Appending event content to write buffer failed with %+v", err) - } - return err -} - -func (b *inBroker) flushBuffer() error { - err := b.writer.Flush() - if err != nil { - log := b.ctx.logger - log.Errorf("Spool flush failed with: %+v", err) - } - return err -} diff --git a/libbeat/publisher/queue/spool/internal_api.go b/libbeat/publisher/queue/spool/internal_api.go deleted file mode 100644 index a6fd97102d4b..000000000000 --- a/libbeat/publisher/queue/spool/internal_api.go +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "github.com/elastic/beats/v7/libbeat/publisher" -) - -// producer -> broker API -type ( - pushRequest struct { - event publisher.Event - seq uint32 - state *produceState - } - - producerCancelRequest struct { - state *produceState - resp chan producerCancelResponse - } - - producerCancelResponse struct { - removed int - } -) - -// consumer -> broker API - -type ( - getRequest struct { - sz int // request sz events from the broker - resp chan getResponse // channel to send response to - } - - getResponse struct { - ack chan batchAckMsg - err error - buf []publisher.Event - } - - batchAckMsg struct{} - - batchCancelRequest struct { - // ack *ackChan - } -) diff --git a/libbeat/publisher/queue/spool/log.go b/libbeat/publisher/queue/spool/log.go deleted file mode 100644 index 64150366b531..000000000000 --- a/libbeat/publisher/queue/spool/log.go +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "fmt" - "sync" - - "github.com/elastic/beats/v7/libbeat/logp" -) - -type logger interface { - Debug(...interface{}) - Debugf(string, ...interface{}) - - Info(...interface{}) - Infof(string, ...interface{}) - - Error(...interface{}) - Errorf(string, ...interface{}) -} - -var _defaultLogger struct { - singleton logger - init sync.Once -} - -func defaultLogger() logger { - _defaultLogger.init.Do(func() { - _defaultLogger.singleton = logp.NewLogger("spool") - }) - return _defaultLogger.singleton -} - -// func defaultLogger() logger { return (*outLogger)(nil) } - -type outLogger struct{} - -func (l *outLogger) Debug(vs ...interface{}) { l.report("Debug", vs) } -func (l *outLogger) Debugf(fmt string, vs ...interface{}) { l.reportf("Debug: ", fmt, vs) } - -func (l *outLogger) Info(vs ...interface{}) { l.report("Info", vs) } -func (l *outLogger) Infof(fmt string, vs ...interface{}) { l.reportf("Info", fmt, vs) } - -func (l *outLogger) Error(vs ...interface{}) { l.report("Error", vs) } -func (l *outLogger) Errorf(fmt string, vs ...interface{}) { l.reportf("Error", fmt, vs) } - -func (l *outLogger) report(level string, vs []interface{}) { - args := append([]interface{}{level, ":"}, vs...) - fmt.Println(args...) -} - -func (*outLogger) reportf(level string, str string, vs []interface{}) { - str = level + ": " + str - fmt.Printf(str, vs...) -} diff --git a/libbeat/publisher/queue/spool/module.go b/libbeat/publisher/queue/spool/module.go deleted file mode 100644 index acf22145c4b2..000000000000 --- a/libbeat/publisher/queue/spool/module.go +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/paths" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/go-txfile" -) - -func init() { - queue.RegisterQueueType( - "spool", - create, - feature.MakeDetails( - "Disk spool", - "Buffer events in disk spool before sending to the output.", - feature.Beta)) -} - -func create( - ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config, inQueueSize int, -) (queue.Queue, error) { - cfgwarn.Beta("Spooling to disk is beta") - - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return nil, err - } - - path := config.File.Path - if path == "" { - path = paths.Resolve(paths.Data, "spool.dat") - } - - flushEvents := uint(0) - if count := config.Write.FlushEvents; count > 0 { - flushEvents = uint(count) - } - - var log logger = logp - if logp == nil { - log = defaultLogger() - } - - return newDiskSpool(log, path, settings{ - ACKListener: ackListener, - Mode: config.File.Permissions, - WriteBuffer: uint(config.Write.BufferSize), - WriteFlushTimeout: config.Write.FlushTimeout, - WriteFlushEvents: flushEvents, - ReadFlushTimeout: config.Read.FlushTimeout, - Codec: config.Write.Codec, - File: txfile.Options{ - MaxSize: uint64(config.File.MaxSize), - PageSize: uint32(config.File.PageSize), - Prealloc: config.File.Prealloc, - Readonly: false, - }, - }) -} diff --git a/libbeat/publisher/queue/spool/outbroker.go b/libbeat/publisher/queue/spool/outbroker.go deleted file mode 100644 index 409b2cde3885..000000000000 --- a/libbeat/publisher/queue/spool/outbroker.go +++ /dev/null @@ -1,536 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "errors" - "sync" - "time" - - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/go-txfile/pq" -) - -type outBroker struct { - ctx *spoolCtx - state func(*outBroker) bool - - // internal API - sigFlushed chan uint - get chan getRequest - - // ack signaling - pendingACKs chanList // list of pending batches to be forwarded to the ackLoop - scheduledACKs chan chanList // shared channel for forwarding batches to ackLoop - schedACKs chan chanList // active ack forwarding channel, as used by broker (nil if pendingACKs is empty) - - // queue state - queue *pq.Queue - reader *pq.Reader - available uint // number of available events. getRequests are only accepted if available > 0 - events []publisher.Event - required int - total int - active getRequest - - // internal - timer *timer - dec *decoder -} - -type chanList struct { - head *ackChan - tail *ackChan -} - -type ackChan struct { - next *ackChan - ch chan batchAckMsg - total int // total number of events to ACK with this batch -} - -const ( - // maximum number of events if getRequest size is <0 - maxEvents = 2048 - - outSigChannelSize = 3 -) - -var ackChanPool = sync.Pool{ - New: func() interface{} { - return &ackChan{ - ch: make(chan batchAckMsg, 1), - } - }, -} - -var errRetry = errors.New("retry") - -func newOutBroker(ctx *spoolCtx, qu *pq.Queue, flushTimeout time.Duration) (*outBroker, error) { - reader := qu.Reader() - - var ( - avail uint - err error - ) - func() { - if err = reader.Begin(); err != nil { - return - } - defer reader.Done() - avail, err = reader.Available() - }() - if err != nil { - return nil, err - } - - b := &outBroker{ - ctx: ctx, - state: nil, - - // API - sigFlushed: make(chan uint, outSigChannelSize), - get: make(chan getRequest), - - // ack signaling - pendingACKs: chanList{}, - scheduledACKs: make(chan chanList), - schedACKs: nil, - - // queue state - queue: qu, - reader: reader, - available: avail, - events: nil, - required: 0, - total: 0, - active: getRequest{}, - - // internal - timer: newTimer(flushTimeout), - dec: newDecoder(), - } - - b.initState() - ctx.Go(b.eventLoop) - ctx.Go(b.ackLoop) - return b, nil -} - -func (b *outBroker) Consumer() *consumer { - return newConsumer(b.ctx, b.get) -} - -// onFlush is run whenever the queue flushes it's write buffer. The callback is -// run in the same go-routine as the Flush was executed from. -func (b *outBroker) onFlush(n uint) { - if n > 0 { - select { - case <-b.ctx.Done(): // ignore flush messages on shutdown - - case b.sigFlushed <- n: - - } - } -} - -// onACK is run whenever the queue releases ACKed events. The number of acked -// events and freed pages will is reported. -func (b *outBroker) onACK(events, pages uint) { -} - -func (b *outBroker) ackLoop() { - log := b.ctx.logger - - log.Debug("start output ack loop") - defer log.Debug("stop output ack loop") - - var ackList chanList // list of pending acks - for { - select { - case <-b.ctx.Done(): - return - - case lst := <-b.scheduledACKs: - ackList.concat(&lst) - - case <-ackList.channel(): - ackCh := ackList.pop() - - for { - log.Debugf("receive ACK of %v events\n", ackCh.total) - err := b.queue.ACK(uint(ackCh.total)) - if err != nil { - log.Debugf("ack failed with: %+v", err) - time.Sleep(1 * time.Second) - continue - } - - log.Debug("ACK succeeded") - break - } - - releaseACKChan(ackCh) - } - } -} - -func (b *outBroker) eventLoop() { - for { - ok := b.state(b) - if !ok { - break - } - } -} - -// initState resets the brokers state to the initial state and clears -// buffers/points from last state updates. -func (b *outBroker) initState() { - b.events = nil - b.required = 0 - b.total = 0 - b.active = getRequest{} - if b.available == 0 { - b.state = (*outBroker).stateWaitEvents - } else { - b.state = (*outBroker).stateActive - } -} - -// stateWaitEvents is the brokers state if the queue is empty. -// The broker waits for new events and does not accept and consumer requests. -// -// stateWaitEvents transitions: -// -> stateActive: if a queue flush signal has been received -func (b *outBroker) stateWaitEvents() bool { - log := b.ctx.logger - log.Debug("outbroker (stateWaitEvents): waiting for new events") - - select { - case <-b.ctx.Done(): - return false - - case n := <-b.sigFlushed: - log.Debug("outbroker (stateWaitEvents): flush event", n) - b.available += n - b.state = (*outBroker).stateActive - - case b.schedACKs <- b.pendingACKs: - b.handleACKsScheduled() - } - - return true -} - -// stateActive is the brokers initial state, waiting for consumer to request -// new events. -// Flush signals from the input are ignored. -// -// stateActive transitions: -// -> stateActive: if consumer event get request has been fulfilled (N events -// copied or 0 timeout) -// -> stateWaitEvents: if queue is empty after read -// -> stateWithTimer: if only small number of events are available and flush -// timeout is configured. -func (b *outBroker) stateActive() bool { - log := b.ctx.logger - - select { - case <-b.ctx.Done(): - return false - - case n := <-b.sigFlushed: - b.available += n - - case b.schedACKs <- b.pendingACKs: - b.handleACKsScheduled() - - case req := <-b.get: - var events []publisher.Event - required := maxEvents - if req.sz > 0 { - events = make([]publisher.Event, 0, req.sz) - required = req.sz - } - - log.Debug("outbroker (stateActive): get request", required) - - var err error - var total int - events, total, err = b.collectEvents(events, required) - required -= len(events) - b.available -= uint(total) - - log.Debug(" outbroker (stateActive): events collected", len(events), total, err) - - // forward error to consumer and continue with current state - if err != nil { - log.Debug(" outbroker (stateActive): return error") - b.returnError(req, events, total, err) - b.initState() - break - } - - // enough events? Return - if required == 0 || (len(events) > 0 && b.timer.Zero()) { - log.Debug(" outbroker (stateActive): return events") - b.returnEvents(req, events, total) - b.initState() // prepare for next request - break - } - - // If no events have been decoded, signal an error to the consumer to retry. - // Meanwhile reinitialize state, waiting for more events. - if len(events) == 0 { - b.returnError(req, nil, total, errRetry) - b.initState() - break - } - - // not enough events -> start timer and try to collect more - b.events = events - b.required = required - b.active = req - b.total = total - b.timer.Start() - log.Debug(" outbroker (stateActive): switch to stateWithTimer") - b.state = (*outBroker).stateWithTimer - } - - return true -} - -// stateWithTimer is the brokers active state, if the events read is less then -// the minimal number of requested events. -// Once the timer triggers or more events have been consumed, the get response -// will be send to the consumer. -// -// stateWithTimer transitions: -// -> stateWithTimer: if some, but not enough events have been read from the -// queue -// -> stateActive: if the timer triggers or enough events have been returned -// to the consumer -func (b *outBroker) stateWithTimer() bool { - log := b.ctx.logger - - select { - case <-b.ctx.Done(): - return false - - case b.schedACKs <- b.pendingACKs: - b.handleACKsScheduled() - - case <-b.timer.C: - b.timer.Stop(true) - log.Debug("outbroker (stateWithTimer): flush timer") - b.returnEvents(b.active, b.events, b.total) - - log.Debug("outbroker (stateWithTimer): switch to stateActive") - b.initState() - - case n := <-b.sigFlushed: - // yay, more events \o/ - - b.available += n - - L := len(b.events) - required := b.required - events, total, err := b.collectEvents(b.events, required) - b.available -= uint(total) - collected := len(events) - L - required -= collected - total += b.total - - log.Debug(" outbroker (stateWithTimer): events collected", len(events), total, err) - - // continue with stateWithTimer? - if err == nil && required > 0 { - b.events = events - b.total = total - b.required = required - log.Debug(" outbroker (stateWithTimer): switch to stateWithTimer") - break - } - - // done serving consumer request - b.timer.Stop(false) - if err != nil { - log.Debug(" outbroker (stateWithTimer): return error") - b.returnError(b.active, events, total, err) - } else { - log.Debug(" outbroker (stateWithTimer): return events") - b.returnEvents(b.active, events, total) - } - - log.Debug("outbroker (stateWithTimer): switch to stateActive") - b.initState() - } - - return true -} - -func (b *outBroker) handleACKsScheduled() { - b.schedACKs = nil - b.pendingACKs = chanList{} -} - -func (b *outBroker) newACKChan(total int) *ackChan { - ackCh := newACKChan(total) - b.pendingACKs.append(ackCh) - b.schedACKs = b.scheduledACKs - return ackCh -} - -// signalDrop forwards an ACK of total events to the ackloop. -// The batch is marked as ACKed by the output. -// signalDrop is used to free space in the queue, in case -// a continuous set of events has been dropped due to decoding errors. -func (b *outBroker) signalDrop(total int) { - ackCh := b.newACKChan(total) - ackCh.ch <- batchAckMsg{} -} - -func (b *outBroker) returnEvents(req getRequest, events []publisher.Event, total int) { - ackCh := b.newACKChan(total) - req.resp <- getResponse{ - ack: ackCh.ch, - err: nil, - buf: events, - } -} - -func (b *outBroker) returnError( - req getRequest, - events []publisher.Event, - total int, - err error, -) { - var ch chan batchAckMsg - - if len(events) == 0 && total > 0 { - b.signalDrop(total) - } - if len(events) > 0 { - ackCh := b.newACKChan(total) - ch = ackCh.ch - } - - req.resp <- getResponse{ - ack: ch, - err: err, - buf: events, - } -} - -func (b *outBroker) collectEvents( - events []publisher.Event, - N int, -) ([]publisher.Event, int, error) { - log := b.ctx.logger - reader := b.reader - - // ensure all read operations happen within same transaction - err := reader.Begin() - if err != nil { - return nil, 0, err - } - defer reader.Done() - - count := 0 - for N > 0 { - sz, err := reader.Next() - if sz <= 0 || err != nil { - return events, count, err - } - - count++ - - buf := b.dec.Buffer(sz) - _, err = reader.Read(buf) - if err != nil { - return events, count, err - } - - event, err := b.dec.Decode() - if err != nil { - log.Debug("Failed to decode event from spool: %v", err) - continue - } - - events = append(events, event) - N-- - } - - return events, count, nil -} - -func newACKChan(total int) *ackChan { - c := ackChanPool.Get().(*ackChan) - c.next = nil - c.total = total - return c -} - -func releaseACKChan(c *ackChan) { - c.next = nil - ackChanPool.Put(c) -} - -func (l *chanList) append(ch *ackChan) { - if l.head == nil { - l.head = ch - } else { - l.tail.next = ch - } - l.tail = ch -} - -func (l *chanList) concat(other *chanList) { - if other.head == nil { - return - } - - if l.head == nil { - *l = *other - return - } - - l.tail.next = other.head - l.tail = other.tail -} - -func (l *chanList) channel() chan batchAckMsg { - if l.head == nil { - return nil - } - return l.head.ch -} - -func (l *chanList) pop() *ackChan { - ch := l.head - if ch != nil { - l.head = ch.next - if l.head == nil { - l.tail = nil - } - } - - ch.next = nil - return ch -} diff --git a/libbeat/publisher/queue/spool/produce.go b/libbeat/publisher/queue/spool/produce.go deleted file mode 100644 index 6a74d93b1c61..000000000000 --- a/libbeat/publisher/queue/spool/produce.go +++ /dev/null @@ -1,203 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "sync" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" -) - -// forgetfulProducer forwards event to the inBroker. The forgetfulProducer -// provides no event ACK handling and no callbacks. -type forgetfulProducer struct { - openState openState -} - -// ackProducer forwards events to the inBroker. The ackBroker provides -// functionality for ACK/Drop callbacks. -type ackProducer struct { - dropOnCancel bool - seq uint32 - state produceState - openState openState - pubCancel chan producerCancelRequest -} - -// openState tracks the producer->inBroker connection state. -type openState struct { - ctx *spoolCtx - done chan struct{} - events chan pushRequest -} - -// produceState holds the ackProducer internal callback and event ACK state -// shared between ackProducer instances and inBroker instances. -// The state is used to compute the number of per producer ACKed events and -// executing locally configured callbacks. -type produceState struct { - ackCB ackHandler - dropCB func(beat.Event) - cancelled bool - lastACK uint32 -} - -type ackHandler func(count int) - -type clientStates struct { - mux sync.Mutex - clients []clientState -} - -type clientState struct { - seq uint32 // event sequence number - state *produceState // the producer it's state used to compute and signal the ACK count -} - -func newProducer( - ctx *spoolCtx, - pubCancel chan producerCancelRequest, - events chan pushRequest, - ackCB ackHandler, - dropCB func(beat.Event), - dropOnCancel bool, -) queue.Producer { - openState := openState{ - ctx: ctx, - done: make(chan struct{}), - events: events, - } - - if ackCB == nil { - return &forgetfulProducer{openState: openState} - } - - p := &ackProducer{ - seq: 1, - dropOnCancel: dropOnCancel, - openState: openState, - pubCancel: pubCancel, - } - p.state.ackCB = ackCB - p.state.dropCB = dropCB - return p -} - -func (p *forgetfulProducer) Publish(event publisher.Event) bool { - return p.openState.publish(p.makeRequest(event)) -} - -func (p *forgetfulProducer) TryPublish(event publisher.Event) bool { - return p.openState.tryPublish(p.makeRequest(event)) -} - -func (p *forgetfulProducer) makeRequest(event publisher.Event) pushRequest { - return pushRequest{event: event} -} - -func (p *forgetfulProducer) Cancel() int { - p.openState.Close() - return 0 -} - -func (p *ackProducer) Publish(event publisher.Event) bool { - return p.updSeq(p.openState.publish(p.makeRequest(event))) -} - -func (p *ackProducer) TryPublish(event publisher.Event) bool { - return p.updSeq(p.openState.tryPublish(p.makeRequest(event))) -} - -func (p *ackProducer) Cancel() int { - p.openState.Close() - - if p.dropOnCancel { - ch := make(chan producerCancelResponse) - p.pubCancel <- producerCancelRequest{ - state: &p.state, - resp: ch, - } - - // wait for cancel to being processed - resp := <-ch - return resp.removed - } - return 0 -} - -func (p *ackProducer) updSeq(ok bool) bool { - if ok { - p.seq++ - } - return ok -} - -func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { - return pushRequest{event: event, seq: p.seq, state: &p.state} -} - -func (st *openState) Close() { - close(st.done) -} - -func (st *openState) publish(req pushRequest) bool { - select { - case st.events <- req: - return true - case <-st.done: - st.events = nil - return false - } -} - -func (st *openState) tryPublish(req pushRequest) bool { - select { - case st.events <- req: - return true - case <-st.done: - st.events = nil - return false - default: - log := st.ctx.logger - log.Debugf("Dropping event, queue is blocked (seq=%v) ", req.seq) - return false - } -} - -func (s *clientStates) Add(st clientState) int { - s.mux.Lock() - s.clients = append(s.clients, st) - l := len(s.clients) - s.mux.Unlock() - return l -} - -func (s *clientStates) RemoveLast() { - s.mux.Lock() - s.clients = s.clients[:len(s.clients)-1] - s.mux.Unlock() -} - -func (s *clientStates) Pop(n int) (states []clientState) { - s.mux.Lock() - states, s.clients = s.clients[:n], s.clients[n:] - s.mux.Unlock() - return states -} diff --git a/libbeat/publisher/queue/spool/spool.go b/libbeat/publisher/queue/spool/spool.go deleted file mode 100644 index c796170fdc75..000000000000 --- a/libbeat/publisher/queue/spool/spool.go +++ /dev/null @@ -1,250 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "fmt" - "os" - "runtime" - "sync" - "time" - - "github.com/pkg/errors" - - "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/go-txfile" - "github.com/elastic/go-txfile/pq" -) - -// diskSpool implements an on-disk queue.Queue. -type diskSpool struct { - // producer/input support - inCtx *spoolCtx - inBroker *inBroker - - // consumer/output support - outCtx *spoolCtx - outBroker *outBroker - - queue *pq.Queue - file *txfile.File -} - -type spoolCtx struct { - logger logger - wg sync.WaitGroup - active atomic.Bool - done chan struct{} -} - -// settings configure a new spool to be created. -type settings struct { - Mode os.FileMode - - File txfile.Options - - // Queue write buffer size. If a single event is bigger then the - // write-buffer, the write-buffer will grow. In this case will the write - // buffer be flushed and reset to its original size. - WriteBuffer uint - - ACKListener queue.ACKListener - - WriteFlushTimeout time.Duration - WriteFlushEvents uint - ReadFlushTimeout time.Duration - - Codec codecID -} - -const minInFlushTimeout = 100 * time.Millisecond -const minOutFlushTimeout = 0 * time.Millisecond - -// newDiskSpool creates and initializes a new file based queue. -func newDiskSpool(logger logger, path string, settings settings) (*diskSpool, error) { - mode := settings.Mode - if mode == 0 { - mode = os.ModePerm - } - - ok := false - inCtx := newSpoolCtx(logger) - outCtx := newSpoolCtx(logger) - defer ifNotOK(&ok, inCtx.Close) - defer ifNotOK(&ok, outCtx.Close) - - if info, err := os.Lstat(path); err != nil { - if !os.IsNotExist(err) { - return nil, err - } - } else if runtime.GOOS != "windows" { - perm := info.Mode().Perm() - cfgPerm := settings.Mode.Perm() - - // check if file has permissions set, that must not be set via config - if (perm | cfgPerm) != cfgPerm { - return nil, fmt.Errorf("file permissions for '%v' must be more strict (required permissions: %v, actual permissions: %v)", - path, cfgPerm, perm) - } - } - - f, err := txfile.Open(path, mode, settings.File) - if err != nil { - return nil, errors.Wrapf(err, "spool queue: failed to open file at path '%s'", path) - } - defer ifNotOK(&ok, ignoreErr(f.Close)) - - queueDelegate, err := pq.NewStandaloneDelegate(f) - if err != nil { - return nil, err - } - - spool := &diskSpool{ - inCtx: inCtx, - outCtx: outCtx, - } - - queue, err := pq.New(queueDelegate, pq.Settings{ - WriteBuffer: settings.WriteBuffer, - Flushed: spool.onFlush, - ACKed: spool.onACK, - }) - if err != nil { - return nil, err - } - defer ifNotOK(&ok, ignoreErr(queue.Close)) - - inFlushTimeout := settings.WriteFlushTimeout - if inFlushTimeout < minInFlushTimeout { - inFlushTimeout = minInFlushTimeout - } - inBroker, err := newInBroker( - inCtx, settings.ACKListener, queue, settings.Codec, - inFlushTimeout, settings.WriteFlushEvents) - if err != nil { - return nil, err - } - - outFlushTimeout := settings.ReadFlushTimeout - if outFlushTimeout < minOutFlushTimeout { - outFlushTimeout = minOutFlushTimeout - } - outBroker, err := newOutBroker(outCtx, queue, outFlushTimeout) - if err != nil { - return nil, err - } - - ok = true - spool.queue = queue - spool.inBroker = inBroker - spool.outBroker = outBroker - spool.file = f - return spool, nil -} - -// Close shuts down the queue and closes the used file. -func (s *diskSpool) Close() error { - // stop all workers (waits for all workers to be finished) - s.outCtx.Close() - s.inCtx.Close() - - // close queue (potentially flushing write buffer) - err := s.queue.Close() - - // finally unmap and close file - s.file.Close() - - return err -} - -// BufferConfig returns the queue initial buffer settings. -func (s *diskSpool) BufferConfig() queue.BufferConfig { - return queue.BufferConfig{MaxEvents: -1} -} - -// Producer creates a new queue producer for publishing events. -func (s *diskSpool) Producer(cfg queue.ProducerConfig) queue.Producer { - return s.inBroker.Producer(cfg) -} - -// Consumer creates a new queue consumer for consuming and acking events. -func (s *diskSpool) Consumer() queue.Consumer { - return s.outBroker.Consumer() -} - -// onFlush is run whenever the queue signals it's write buffer being flushed. -// Flush events are forwarded to all workers. -// The onFlush callback is directly called by the queue writer (same go-routine) -// on Write or Flush operations. -func (s *diskSpool) onFlush(n uint) { - s.inBroker.onFlush(n) - s.outBroker.onFlush(n) -} - -// onACK is run whenever the queue signals events being acked and removed from -// the queue. -// ACK events are forwarded to all workers. -func (s *diskSpool) onACK(events, pages uint) { - s.inBroker.onACK(events, pages) -} - -func newSpoolCtx(logger logger) *spoolCtx { - return &spoolCtx{ - logger: logger, - active: atomic.MakeBool(true), - done: make(chan struct{}), - } -} - -func (ctx *spoolCtx) Close() { - if ctx.active.CAS(true, false) { - close(ctx.done) - ctx.wg.Wait() - } -} - -func (ctx *spoolCtx) Done() <-chan struct{} { - return ctx.done -} - -func (ctx *spoolCtx) Open() bool { - return ctx.active.Load() -} - -func (ctx *spoolCtx) Closed() bool { - return !ctx.Open() -} - -func (ctx *spoolCtx) Go(fn func()) { - ctx.wg.Add(1) - go func() { - defer ctx.wg.Done() - fn() - }() -} - -func ifNotOK(b *bool, fn func()) { - if !(*b) { - fn() - } -} - -func ignoreErr(fn func() error) func() { - return func() { fn() } -} diff --git a/libbeat/publisher/queue/spool/spool_test.go b/libbeat/publisher/queue/spool/spool_test.go deleted file mode 100644 index b5947152d9a9..000000000000 --- a/libbeat/publisher/queue/spool/spool_test.go +++ /dev/null @@ -1,159 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "flag" - "fmt" - "math/rand" - "testing" - "time" - - humanize "github.com/dustin/go-humanize" - - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" - "github.com/elastic/go-txfile" - "github.com/elastic/go-txfile/txfiletest" -) - -var seed int64 -var debug bool - -type testQueue struct { - *diskSpool - teardown func() -} - -type testLogger struct { - t *testing.T -} - -type silentLogger struct{} - -func init() { - flag.Int64Var(&seed, "seed", time.Now().UnixNano(), "test random seed") - flag.BoolVar(&debug, "noisy", false, "print test logs to console") -} - -func TestProduceConsumer(t *testing.T) { - maxEvents := 4096 - minEvents := 32 - - rand.Seed(seed) - events := rand.Intn(maxEvents-minEvents) + minEvents - batchSize := rand.Intn(events-8) + 4 - - t.Log("seed: ", seed) - t.Log("events: ", events) - t.Log("batchSize: ", batchSize) - - testWith := func(factory queuetest.QueueFactory) func(t *testing.T) { - return func(test *testing.T) { - t.Run("single", func(t *testing.T) { - queuetest.TestSingleProducerConsumer(t, events, batchSize, factory) - }) - t.Run("multi", func(t *testing.T) { - queuetest.TestMultiProducerConsumer(t, events, batchSize, factory) - }) - } - } - - testWith(makeTestQueue( - 128*humanize.KiByte, 4*humanize.KiByte, 16*humanize.KiByte, - 100*time.Millisecond, - ))(t) -} - -func makeTestQueue( - maxSize, pageSize, writeBuffer uint, - flushTimeout time.Duration, -) func(*testing.T) queue.Queue { - return func(t *testing.T) queue.Queue { - if debug { - fmt.Println("Test:", t.Name()) - } - - ok := false - path, cleanPath := txfiletest.SetupPath(t, "") - defer func() { - if !ok { - cleanPath() - } - }() - - var logger logger - if debug { - logger = &testLogger{t} - } else { - logger = new(silentLogger) - } - - spool, err := newDiskSpool(logger, path, settings{ - WriteBuffer: writeBuffer, - WriteFlushTimeout: flushTimeout, - Codec: codecCBORL, - File: txfile.Options{ - MaxSize: uint64(maxSize), - PageSize: uint32(pageSize), - Prealloc: true, - Readonly: false, - }, - }) - if err != nil { - t.Fatal(err) - } - - tq := &testQueue{diskSpool: spool, teardown: cleanPath} - return tq - } -} - -func (t *testQueue) Close() error { - err := t.diskSpool.Close() - t.teardown() - return err -} - -func (l *testLogger) Debug(vs ...interface{}) { l.report("Debug", vs) } -func (l *testLogger) Debugf(fmt string, vs ...interface{}) { l.reportf("Debug: ", fmt, vs) } - -func (l *testLogger) Info(vs ...interface{}) { l.report("Info", vs) } -func (l *testLogger) Infof(fmt string, vs ...interface{}) { l.reportf("Info", fmt, vs) } - -func (l *testLogger) Error(vs ...interface{}) { l.report("Error", vs) } -func (l *testLogger) Errorf(fmt string, vs ...interface{}) { l.reportf("Error", fmt, vs) } - -func (l *testLogger) report(level string, vs []interface{}) { - args := append([]interface{}{level, ":"}, vs...) - l.t.Log(args...) - fmt.Println(args...) -} - -func (l *testLogger) reportf(level string, str string, vs []interface{}) { - str = level + ": " + str - l.t.Logf(str, vs...) - fmt.Printf(str, vs...) -} - -func (*silentLogger) Debug(vs ...interface{}) {} -func (*silentLogger) Debugf(fmt string, vs ...interface{}) {} -func (*silentLogger) Info(vs ...interface{}) {} -func (*silentLogger) Infof(fmt string, vs ...interface{}) {} -func (*silentLogger) Error(vs ...interface{}) {} -func (*silentLogger) Errorf(fmt string, vs ...interface{}) {} diff --git a/libbeat/publisher/queue/spool/timer.go b/libbeat/publisher/queue/spool/timer.go deleted file mode 100644 index 9f08dcfc69aa..000000000000 --- a/libbeat/publisher/queue/spool/timer.go +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package spool - -import ( - "time" -) - -type timer struct { - // flush timer - timer *time.Timer - C <-chan time.Time - duration time.Duration -} - -func newTimer(duration time.Duration) *timer { - stdtimer := time.NewTimer(duration) - if !stdtimer.Stop() { - <-stdtimer.C - } - - return &timer{ - timer: stdtimer, - C: nil, - duration: duration, - } -} - -func (t *timer) Zero() bool { - return t.duration == 0 -} - -func (t *timer) Restart() { - t.Stop(false) - t.Start() -} - -func (t *timer) Start() { - if t.C != nil { - return - } - - t.timer.Reset(t.duration) - t.C = t.timer.C -} - -func (t *timer) Stop(triggered bool) { - if t.C == nil { - return - } - - if !triggered && !t.timer.Stop() { - <-t.C - } - - t.C = nil -} diff --git a/libbeat/scripts/cmd/stress_pipeline/main.go b/libbeat/scripts/cmd/stress_pipeline/main.go index 49ba19c686c0..2b32ad596a97 100644 --- a/libbeat/scripts/cmd/stress_pipeline/main.go +++ b/libbeat/scripts/cmd/stress_pipeline/main.go @@ -35,7 +35,6 @@ import ( "github.com/elastic/beats/v7/libbeat/paths" "github.com/elastic/beats/v7/libbeat/publisher/pipeline/stress" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - _ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool" "github.com/elastic/beats/v7/libbeat/service" ) diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 178a5cb2411e..7673d7b01b50 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1020,66 +1020,6 @@ metricbeat.modules: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index a34efacdc331..5f931ddde706 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -692,66 +692,6 @@ packetbeat.ignore_outgoing: false # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 1705648185a4..4cca1af761e6 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -120,66 +120,6 @@ winlogbeat.event_logs: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index c5f30eecbe0c..f52cc6feab15 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -253,66 +253,6 @@ auditbeat.modules: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/dockerlogbeat/main.go b/x-pack/dockerlogbeat/main.go index e363aefb6670..36472e2ec9e8 100644 --- a/x-pack/dockerlogbeat/main.go +++ b/x-pack/dockerlogbeat/main.go @@ -20,7 +20,6 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/logstash" _ "github.com/elastic/beats/v7/libbeat/outputs/redis" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - _ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool" "github.com/elastic/beats/v7/libbeat/service" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager" ) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index c25163576d10..e96b8e7f97dc 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3264,66 +3264,6 @@ filebeat.inputs: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 397214f8cd50..5ea23a3b0dbf 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -375,66 +375,6 @@ functionbeat.provider.aws.functions: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 6bac78d08c20..8f0f019626bd 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -343,66 +343,6 @@ heartbeat.jobs: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 575581600bd6..be4adb144c8a 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1541,66 +1541,6 @@ metricbeat.modules: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index c22416ddb193..edb3adcfb1d4 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -94,66 +94,6 @@ seccomp.enabled: false # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index a34efacdc331..5f931ddde706 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -692,66 +692,6 @@ packetbeat.ignore_outgoing: false # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 982b46591ec9..5d06be1566a1 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -163,66 +163,6 @@ winlogbeat.event_logs: # length of its retry interval each time, up to this maximum. #max_retry_interval: 30s - # The spool queue will store events in a local spool file, before - # forwarding the events to the outputs. - # Note: the spool queue is deprecated and will be removed in the future. - # Use the disk queue instead. - # - # The spool file is a circular buffer, which blocks once the file/buffer is full. - # Events are put into a write buffer and flushed once the write buffer - # is full or the flush_timeout is triggered. - # Once ACKed by the output, events are removed immediately from the queue, - # making space for new events to be persisted. - #spool: - # The file namespace configures the file path and the file creation settings. - # Once the file exists, the `size`, `page_size` and `prealloc` settings - # will have no more effect. - #file: - # Location of spool file. The default value is ${path.data}/spool.dat. - #path: "${path.data}/spool.dat" - - # Configure file permissions if file is created. The default value is 0600. - #permissions: 0600 - - # File size hint. The spool blocks, once this limit is reached. The default value is 100 MiB. - #size: 100MiB - - # The files page size. A file is split into multiple pages of the same size. The default value is 4KiB. - #page_size: 4KiB - - # If prealloc is set, the required space for the file is reserved using - # truncate. The default value is true. - #prealloc: true - - # Spool writer settings - # Events are serialized into a write buffer. The write buffer is flushed if: - # - The buffer limit has been reached. - # - The configured limit of buffered events is reached. - # - The flush timeout is triggered. - #write: - # Sets the write buffer size. - #buffer_size: 1MiB - - # Maximum duration after which events are flushed if the write buffer - # is not full yet. The default value is 1s. - #flush.timeout: 1s - - # Number of maximum buffered events. The write buffer is flushed once the - # limit is reached. - #flush.events: 16384 - - # Configure the on-disk event encoding. The encoding can be changed - # between restarts. - # Valid encodings are: json, ubjson, and cbor. - #codec: cbor - #read: - # Reader flush timeout, waiting for more events to become available, so - # to fill a complete batch as required by the outputs. - # If flush_timeout is 0, all available events are forwarded to the - # outputs immediately. - # The default value is 0s. - #flush.timeout: 0s - # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. #max_procs: