diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7fa7be11f93..2ad46a0803e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -549,6 +549,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve event normalization performance {pull}22974[22974] - Add tini as init system in docker images {pull}22137[22137] - Added "add_network_direction" processor for determining perimeter-based network direction. {pull}23076[23076] +- Added new `rate_limit` processor for enforcing rate limits on event throughput. {pull}22883[22883] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 2d574adf53c..2fa71582363 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -11032,6 +11032,217 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/jonboulle/clockwork +Version: v0.2.2 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/jonboulle/clockwork@v0.2.2/LICENSE: + +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/josephspurrier/goversioninfo Version: v0.0.0-20190209210621-63e6d1acd3dd diff --git a/go.mod b/go.mod index afd34ca9cf3..61ab57020b6 100644 --- a/go.mod +++ b/go.mod @@ -104,6 +104,7 @@ require ( github.com/insomniacslk/dhcp v0.0.0-20180716145214-633285ba52b2 github.com/jmoiron/sqlx v1.2.1-0.20190826204134-d7d95172beb5 github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 + github.com/jonboulle/clockwork v0.2.2 github.com/josephspurrier/goversioninfo v0.0.0-20190209210621-63e6d1acd3dd github.com/jpillora/backoff v1.0.0 // indirect github.com/jstemmer/go-junit-report v0.9.1 diff --git a/go.sum b/go.sum index 8662993d812..58fd46efade 100644 --- a/go.sum +++ b/go.sum @@ -465,6 +465,8 @@ github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8 github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= +github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/josephspurrier/goversioninfo v0.0.0-20190209210621-63e6d1acd3dd h1:KikNiFwUO3QLyeKyN4k9yBH9Pcu/gU/yficWi61cJIw= github.com/josephspurrier/goversioninfo v0.0.0-20190209210621-63e6d1acd3dd/go.mod h1:eJTEwMjXb7kZ633hO3Ln9mBUCOjX2+FlTljvpl9SYdE= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= diff --git a/libbeat/cmd/instance/imports_common.go b/libbeat/cmd/instance/imports_common.go index a2b2569d61c..e47dbf93799 100644 --- a/libbeat/cmd/instance/imports_common.go +++ b/libbeat/cmd/instance/imports_common.go @@ -34,6 +34,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/processors/dns" _ "github.com/elastic/beats/v7/libbeat/processors/extract_array" _ "github.com/elastic/beats/v7/libbeat/processors/fingerprint" + _ "github.com/elastic/beats/v7/libbeat/processors/ratelimit" _ "github.com/elastic/beats/v7/libbeat/processors/registered_domain" _ "github.com/elastic/beats/v7/libbeat/processors/translate_sid" _ "github.com/elastic/beats/v7/libbeat/processors/urldecode" diff --git a/libbeat/processors/ratelimit/algorithm.go b/libbeat/processors/ratelimit/algorithm.go new file mode 100644 index 00000000000..2d937d26931 --- /dev/null +++ b/libbeat/processors/ratelimit/algorithm.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ratelimit + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" +) + +var registry = make(map[string]constructor, 0) + +// algoConfig for rate limit algorithm. +type algoConfig struct { + // limit is the rate limit to be enforced by the algorithm. + limit rate + + // config is any algorithm-specific additional configuration. + config common.Config +} + +// algorithm is the interface that all rate limiting algorithms must +// conform to. +type algorithm interface { + // IsAllowed accepts a key and returns whether that key is allowed + // (true) or not (false). If a key is allowed, it means it is NOT + // rate limited. If a key is not allowed, it means it is being rate + // limited. + IsAllowed(uint64) bool +} + +type constructor func(algoConfig) (algorithm, error) + +func register(id string, ctor constructor) { + registry[id] = ctor +} + +// factory returns the requested rate limiting algorithm, if one is found. If not found, +// an error is returned. +func factory(id string, config algoConfig) (algorithm, error) { + var ctor constructor + var found bool + if ctor, found = registry[id]; !found { + return nil, fmt.Errorf("rate limiting algorithm '%v' not implemented", id) + } + + algorithm, err := ctor(config) + if err != nil { + return nil, errors.Wrap(err, "could not construct algorithm") + } + + return algorithm, nil +} diff --git a/libbeat/processors/ratelimit/config.go b/libbeat/processors/ratelimit/config.go new file mode 100644 index 00000000000..79fb5a49932 --- /dev/null +++ b/libbeat/processors/ratelimit/config.go @@ -0,0 +1,47 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ratelimit + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// config for rate limit processor. +type config struct { + Limit rate `config:"limit" validate:"required"` + Fields []string `config:"fields"` + Algorithm common.ConfigNamespace `config:"algorithm"` +} + +func (c *config) setDefaults() error { + if c.Algorithm.Name() == "" { + cfg, err := common.NewConfigFrom(map[string]interface{}{ + "token_bucket": map[string]interface{}{}, + }) + + if err != nil { + return errors.Wrap(err, "could not parse default configuration") + } + + c.Algorithm.Unpack(cfg) + } + + return nil +} diff --git a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc new file mode 100644 index 00000000000..cdc33d14ab6 --- /dev/null +++ b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc @@ -0,0 +1,40 @@ +[[rate_limit]] +=== Rate limit the flow of events +beta[] + +++++ +rate_limit +++++ + +The `rate_limit` processor limits the throughput of events based on +the specified configuration. + +[source,yaml] +----------------------------------------------------- +processors: +- rate_limit: + limit: "10000/m" +----------------------------------------------------- + +[source,yaml] +----------------------------------------------------- +processors: +- rate_limit: + fields: + - "cloudfoundry.org.name" + limit: "400/s" +----------------------------------------------------- + +[source,yaml] +----------------------------------------------------- +processors: +- if.equals.cloudfoundry.org.name: "acme" + then: + - rate_limit: + limit: "500/s" +----------------------------------------------------- + +The following settings are supported: + +`limit`:: The rate limit. Supported time units for the rate are `s` (per second), `m` (per minute), and `h` (per hour). +`fields`:: (Optional) List of fields. The rate limit will be applied to each distinct value derived by combining the values of these fields. diff --git a/libbeat/processors/ratelimit/limit.go b/libbeat/processors/ratelimit/limit.go new file mode 100644 index 00000000000..8b1f1e0517c --- /dev/null +++ b/libbeat/processors/ratelimit/limit.go @@ -0,0 +1,92 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ratelimit + +import ( + "fmt" + "strconv" + "strings" +) + +type unit string + +const ( + unitPerSecond unit = "s" + unitPerMinute unit = "m" + unitPerHour unit = "h" +) + +type rate struct { + value float64 + unit unit +} + +// Unpack creates a rate from the given string +func (l *rate) Unpack(str string) error { + parts := strings.Split(str, "/") + if len(parts) != 2 { + return fmt.Errorf(`rate in invalid format: %v. Must be specified as "number/unit"`, str) + } + + valueStr := strings.TrimSpace(parts[0]) + unitStr := strings.TrimSpace(parts[1]) + + v, err := strconv.ParseFloat(valueStr, 8) + if err != nil { + return fmt.Errorf(`rate's value component is not numeric: %v`, valueStr) + } + + if allowed := []unit{unitPerSecond, unitPerMinute, unitPerHour}; !contains(allowed, unitStr) { + allowedStrs := make([]string, len(allowed)) + for _, a := range allowed { + allowedStrs = append(allowedStrs, "/"+string(a)) + } + + return fmt.Errorf(`rate's unit component must be specified as one of: %v`, strings.Join(allowedStrs, ",")) + } + + u := unit(unitStr) + + l.value = v + l.unit = u + + return nil +} + +func (l *rate) valuePerSecond() float64 { + switch l.unit { + case unitPerSecond: + return l.value + case unitPerMinute: + return l.value / 60 + case unitPerHour: + return l.value / (60 * 60) + } + + return 0 +} + +func contains(allowed []unit, candidate string) bool { + for _, a := range allowed { + if candidate == string(a) { + return true + } + } + + return false +} diff --git a/libbeat/processors/ratelimit/rate_limit.go b/libbeat/processors/ratelimit/rate_limit.go new file mode 100644 index 00000000000..210ac5a5912 --- /dev/null +++ b/libbeat/processors/ratelimit/rate_limit.go @@ -0,0 +1,129 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ratelimit + +import ( + "fmt" + "sort" + + "github.com/jonboulle/clockwork" + "github.com/mitchellh/hashstructure" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/processors" +) + +func init() { + processors.RegisterPlugin(processorName, new) +} + +const processorName = "rate_limit" + +type rateLimit struct { + config config + algorithm algorithm + logger *logp.Logger +} + +// new constructs a new rate limit processor. +func new(cfg *common.Config) (processors.Processor, error) { + var config config + if err := cfg.Unpack(&config); err != nil { + return nil, errors.Wrap(err, "could not unpack processor configuration") + } + + if err := config.setDefaults(); err != nil { + return nil, errors.Wrap(err, "could not set default configuration") + } + + algoConfig := algoConfig{ + limit: config.Limit, + config: *config.Algorithm.Config(), + } + algo, err := factory(config.Algorithm.Name(), algoConfig) + if err != nil { + return nil, errors.Wrap(err, "could not construct rate limiting algorithm") + } + + p := &rateLimit{ + config: config, + algorithm: algo, + logger: logp.NewLogger("rate_limit"), + } + + p.setClock(clockwork.NewRealClock()) + + return p, nil +} + +// Run applies the configured rate limit to the given event. If the event is within the +// configured rate limit, it is returned as-is. If not, nil is returned. +func (p *rateLimit) Run(event *beat.Event) (*beat.Event, error) { + key, err := p.makeKey(event) + if err != nil { + return nil, errors.Wrap(err, "could not make key") + } + + if p.algorithm.IsAllowed(key) { + return event, nil + } + + p.logger.Debugf("event [%v] dropped by rate_limit processor", event) + return nil, nil +} + +func (p *rateLimit) String() string { + return fmt.Sprintf( + "%v=[limit=[%v],fields=[%v],algorithm=[%v]]", + processorName, p.config.Limit, p.config.Fields, p.config.Algorithm.Name(), + ) +} + +func (p *rateLimit) makeKey(event *beat.Event) (uint64, error) { + if len(p.config.Fields) == 0 { + return 0, nil + } + + sort.Strings(p.config.Fields) + values := make([]string, len(p.config.Fields)) + for _, field := range p.config.Fields { + value, err := event.GetValue(field) + if err != nil { + if err != common.ErrKeyNotFound { + return 0, errors.Wrapf(err, "error getting value of field: %v", field) + } + + value = "" + } + + values = append(values, fmt.Sprintf("%v", value)) + } + + return hashstructure.Hash(values, nil) +} + +// setClock allows test code to inject a fake clock +// TODO: remove this method and move tests that use it to algorithm level. +func (p *rateLimit) setClock(c clockwork.Clock) { + if a, ok := p.algorithm.(interface{ setClock(clock clockwork.Clock) }); ok { + a.setClock(c) + } +} diff --git a/libbeat/processors/ratelimit/rate_limit_test.go b/libbeat/processors/ratelimit/rate_limit_test.go new file mode 100644 index 00000000000..99941ebfd0f --- /dev/null +++ b/libbeat/processors/ratelimit/rate_limit_test.go @@ -0,0 +1,174 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ratelimit + +import ( + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestNew(t *testing.T) { + cases := map[string]struct { + config common.MapStr + err string + }{ + "default": { + common.MapStr{}, + "", + }, + "unknown_algo": { + common.MapStr{ + "algorithm": common.MapStr{ + "foobar": common.MapStr{}, + }, + }, + "rate limiting algorithm 'foobar' not implemented", + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + config := common.MustNewConfigFrom(test.config) + _, err := new(config) + if test.err == "" { + require.NoError(t, err) + } else { + require.Error(t, err, test.err) + } + }) + } +} + +func TestRateLimit(t *testing.T) { + inEvents := []beat.Event{ + { + Timestamp: time.Now(), + Fields: common.MapStr{ + "foo": "bar", + }, + }, + { + Timestamp: time.Now(), + Fields: common.MapStr{ + "foo": "bar", + "baz": "mosquito", + }, + }, + { + Timestamp: time.Now(), + Fields: common.MapStr{ + "baz": "qux", + }, + }, + { + Timestamp: time.Now(), + Fields: common.MapStr{ + "foo": "seger", + }, + }, + } + + cases := map[string]struct { + config common.MapStr + inEvents []beat.Event + delay time.Duration + outEvents []beat.Event + }{ + "rate_0": { + config: common.MapStr{}, + inEvents: inEvents, + outEvents: []beat.Event{}, + }, + "rate_1_per_min": { + config: common.MapStr{ + "limit": "1/m", + }, + inEvents: inEvents, + outEvents: inEvents[0:1], + }, + "rate_2_per_min": { + config: common.MapStr{ + "limit": "2/m", + }, + inEvents: inEvents, + outEvents: inEvents[0:2], + }, + "rate_5_per_min": { + config: common.MapStr{ + "limit": "5/m", + }, + inEvents: inEvents, + outEvents: inEvents, + }, + "rate_2_per_sec": { + config: common.MapStr{ + "limit": "2/s", + }, + delay: 200 * time.Millisecond, + inEvents: inEvents, + outEvents: []beat.Event{inEvents[0], inEvents[1], inEvents[3]}, + }, + "with_fields": { + config: common.MapStr{ + "limit": "1/s", + "fields": []string{"foo"}, + }, + delay: 400 * time.Millisecond, + inEvents: inEvents, + outEvents: []beat.Event{inEvents[0], inEvents[2], inEvents[3]}, + }, + "with_burst": { + config: common.MapStr{ + "limit": "2/s", + "burst_multiplier": 2, + }, + delay: 400 * time.Millisecond, + inEvents: inEvents, + outEvents: inEvents, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + p, err := new(common.MustNewConfigFrom(test.config)) + require.NoError(t, err) + + fakeClock := clockwork.NewFakeClock() + + p.(*rateLimit).setClock(fakeClock) + + out := make([]beat.Event, 0) + for _, in := range test.inEvents { + o, err := p.Run(&in) + require.NoError(t, err) + if o != nil { + out = append(out, *o) + } + fakeClock.Advance(test.delay) + } + + require.Equal(t, test.outEvents, out) + }) + } +} diff --git a/libbeat/processors/ratelimit/token_bucket.go b/libbeat/processors/ratelimit/token_bucket.go new file mode 100644 index 00000000000..d7460a37b06 --- /dev/null +++ b/libbeat/processors/ratelimit/token_bucket.go @@ -0,0 +1,201 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ratelimit + +import ( + "sync" + "time" + + "github.com/jonboulle/clockwork" + "github.com/pkg/errors" + + "github.com/elastic/go-concert/unison" + + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func init() { + register("token_bucket", newTokenBucket) +} + +type bucket struct { + tokens float64 + lastReplenish time.Time +} + +type tokenBucket struct { + mu unison.Mutex + + limit rate + depth float64 + buckets sync.Map + + // GC thresholds and metrics + gc struct { + thresholds tokenBucketGCConfig + metrics struct { + numCalls atomic.Uint + } + } + + clock clockwork.Clock + logger *logp.Logger +} + +type tokenBucketGCConfig struct { + // NumCalls is the number of calls made to IsAllowed. When more than + // the specified number of calls are made, GC is performed. + NumCalls uint `config:"num_calls"` +} + +type tokenBucketConfig struct { + BurstMultiplier float64 `config:"burst_multiplier"` + + // GC governs when completely filled token buckets must be deleted + // to free up memory. GC is performed when _any_ of the GC conditions + // below are met. After each GC, counters corresponding to _each_ of + // the GC conditions below are reset. + GC tokenBucketGCConfig `config:"gc"` +} + +func newTokenBucket(config algoConfig) (algorithm, error) { + cfg := tokenBucketConfig{ + BurstMultiplier: 1.0, + GC: tokenBucketGCConfig{ + NumCalls: 10000, + }, + } + + if err := config.config.Unpack(&cfg); err != nil { + return nil, errors.Wrap(err, "could not unpack token_bucket algorithm configuration") + } + + return &tokenBucket{ + limit: config.limit, + depth: config.limit.value * cfg.BurstMultiplier, + buckets: sync.Map{}, + gc: struct { + thresholds tokenBucketGCConfig + metrics struct { + numCalls atomic.Uint + } + }{ + thresholds: tokenBucketGCConfig{ + NumCalls: cfg.GC.NumCalls, + }, + }, + clock: clockwork.NewRealClock(), + logger: logp.NewLogger("token_bucket"), + mu: unison.MakeMutex(), + }, nil +} + +func (t *tokenBucket) IsAllowed(key uint64) bool { + t.runGC() + + b := t.getBucket(key) + allowed := b.withdraw() + + t.gc.metrics.numCalls.Inc() + return allowed +} + +// setClock allows test code to inject a fake clock +func (t *tokenBucket) setClock(c clockwork.Clock) { + t.clock = c +} + +func (t *tokenBucket) getBucket(key uint64) *bucket { + v, exists := t.buckets.LoadOrStore(key, &bucket{ + tokens: t.depth, + lastReplenish: t.clock.Now(), + }) + b := v.(*bucket) + + if exists { + b.replenish(t.limit, t.clock) + return b + } + + return b +} + +func (b *bucket) withdraw() bool { + if b.tokens < 1 { + return false + } + b.tokens-- + return true +} + +func (b *bucket) replenish(rate rate, clock clockwork.Clock) { + secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() + tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() + + b.tokens += tokensToReplenish + b.lastReplenish = clock.Now() +} + +func (t *tokenBucket) runGC() { + // Don't run GC if thresholds haven't been crossed. + if t.gc.metrics.numCalls.Load() < t.gc.thresholds.NumCalls { + return + } + + if !t.mu.TryLock() { + return + } + + go func() { + defer t.mu.Unlock() + gcStartTime := time.Now() + + // Add tokens to all buckets according to the rate limit + // and flag full buckets for deletion. + toDelete := make([]uint64, 0) + numBucketsBefore := 0 + t.buckets.Range(func(k, v interface{}) bool { + key := k.(uint64) + b := v.(*bucket) + + b.replenish(t.limit, t.clock) + + if b.tokens >= t.depth { + toDelete = append(toDelete, key) + } + + numBucketsBefore++ + return true + }) + + // Cleanup full buckets to free up memory + for _, key := range toDelete { + t.buckets.Delete(key) + } + + // Reset GC metrics + t.gc.metrics.numCalls = atomic.MakeUint(0) + + gcDuration := time.Now().Sub(gcStartTime) + numBucketsDeleted := len(toDelete) + numBucketsAfter := numBucketsBefore - numBucketsDeleted + t.logger.Debugf("gc duration: %v, buckets: (before: %v, deleted: %v, after: %v)", + gcDuration, numBucketsBefore, numBucketsDeleted, numBucketsAfter) + }() +}