From 5605b998fdeaee51e8bcd0e9b2f0a3ac5b8eb67e Mon Sep 17 00:00:00 2001 From: premendrasingh Date: Tue, 28 Apr 2020 05:51:15 -0700 Subject: [PATCH 1/2] LIBBEAT: Enhancement replace_string processor for replacing strings values of fields. (#17342) This PR is to add a replace processor. This processor takes in a field name, search string and replacement string. Searches field value for pattern and replaces it with replacement string. (cherry picked from commit 09fd4df169324d4e02b2f3bf3c074d7501f21968) --- CHANGELOG.next.asciidoc | 2 + .../processors/actions/docs/replace.asciidoc | 49 ++++ libbeat/processors/actions/replace.go | 118 +++++++++ libbeat/processors/actions/replace_test.go | 248 ++++++++++++++++++ 4 files changed, 417 insertions(+) create mode 100644 libbeat/processors/actions/docs/replace.asciidoc create mode 100644 libbeat/processors/actions/replace.go create mode 100644 libbeat/processors/actions/replace_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 04018aeb271..1a4cb159ee3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -259,6 +259,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add Kerberos support to Kafka input and output. {pull}16781[16781] - Update supported versions of `redis` output. {pull}17198[17198] - Update documentation for system.process.memory fields to include clarification on Windows os's. {pull}17268[17268] +- Add `replace` processor for replacing string values of fields. {pull}17342[17342] +- Add optional regex based cid extractor to `add_kubernetes_metadata` processor. {pull}17360[17360] - Add `urldecode` processor to for decoding URL-encoded fields. {pull}17505[17505] - Add support for AWS IAM `role_arn` in credentials config. {pull}17658[17658] {issue}12464[12464] diff --git a/libbeat/processors/actions/docs/replace.asciidoc b/libbeat/processors/actions/docs/replace.asciidoc new file mode 100644 index 00000000000..3faf3e0bcce --- /dev/null +++ b/libbeat/processors/actions/docs/replace.asciidoc @@ -0,0 +1,49 @@ +[[replace-fields]] +=== Replace fields from events + +++++ +replace +++++ + +The `replace` processor takes a list of fields to replace the field value +matching a pattern with replacement string. Under the `fields` key, each entry +contains a `field: field-name`, `pattern: regex-pattern` and +`replacement: replacement-string`, where: + +* `field` is the original field name +* `pattern` is regex pattern to match field's value +* `replacement` is the replacement string to use for updating the field's value + +The `replace` processor cannot be used to replace value with a completely new value. + +TIP: You can replace field value to truncate part of field value or replace +it with a new string. It can also be used for masking PII information. + +Following example will change path from /usr/bin to /usr/local/bin + +[source,yaml] +------- +processors: +- replace: + fields: + - field: "file.path" + pattern: "/usr/" + replacement: "/usr/local/" + ignore_missing: false + fail_on_error: true +------- + +The `replace` processor has following configuration settings: + +`ignore_missing`:: (Optional) If set to true, no error is logged in case a specifiedfield +is missing. Default is `false`. + +`fail_on_error`:: (Optional) If set to true, in case of an error the replacement of +field values is stopped and the original event is returned. If set to false, replacement +continues even if an error occurs during replacement. Default is `true`. + +See <> for a list of supported conditions. + +You can specify multiple `ignore_missing` processors under the `processors` +section. + diff --git a/libbeat/processors/actions/replace.go b/libbeat/processors/actions/replace.go new file mode 100644 index 00000000000..37245817050 --- /dev/null +++ b/libbeat/processors/actions/replace.go @@ -0,0 +1,118 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "fmt" + "regexp" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/checks" + jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" +) + +type replaceString struct { + config replaceStringConfig +} + +type replaceStringConfig struct { + Fields []replaceConfig `config:"fields"` + IgnoreMissing bool `config:"ignore_missing"` + FailOnError bool `config:"fail_on_error"` +} + +type replaceConfig struct { + Field string `config:"field"` + Pattern *regexp.Regexp `config:"pattern"` + Replacement string `config:"replacement"` +} + +func init() { + processors.RegisterPlugin("replace", + checks.ConfigChecked(NewReplaceString, + checks.RequireFields("fields"))) + + jsprocessor.RegisterPlugin("Replace", NewReplaceString) +} + +// NewReplaceString returns a new replace processor. +func NewReplaceString(c *common.Config) (processors.Processor, error) { + config := replaceStringConfig{ + IgnoreMissing: false, + FailOnError: true, + } + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the replace configuration: %s", err) + } + + f := &replaceString{ + config: config, + } + return f, nil +} + +func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) { + var backup common.MapStr + // Creates a copy of the event to revert in case of failure + if f.config.FailOnError { + backup = event.Fields.Clone() + } + + for _, field := range f.config.Fields { + err := f.replaceField(field.Field, field.Pattern, field.Replacement, event.Fields) + if err != nil { + errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err) + logp.Debug("replace", errMsg.Error()) + if f.config.FailOnError { + event.Fields = backup + event.PutValue("error.message", errMsg.Error()) + return event, err + } + } + } + + return event, nil +} + +func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, fields common.MapStr) error { + currentValue, err := fields.GetValue(field) + if err != nil { + // Ignore ErrKeyNotFound errors + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err) + } + + updatedString := pattern.ReplaceAllString(currentValue.(string), replacement) + _, err = fields.Put(field, updatedString) + if err != nil { + return fmt.Errorf("could not put value: %s: %v, %v", replacement, currentValue, err) + } + return nil +} + +func (f *replaceString) String() string { + return "replace=" + fmt.Sprintf("%+v", f.config.Fields) +} diff --git a/libbeat/processors/actions/replace_test.go b/libbeat/processors/actions/replace_test.go new file mode 100644 index 00000000000..e54d16c5012 --- /dev/null +++ b/libbeat/processors/actions/replace_test.go @@ -0,0 +1,248 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "reflect" + "regexp" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestReplaceRun(t *testing.T) { + var tests = []struct { + description string + Fields []replaceConfig + IgnoreMissing bool + FailOnError bool + Input common.MapStr + Output common.MapStr + error bool + }{ + { + description: "simple field replacing", + Fields: []replaceConfig{ + { + Field: "f", + Pattern: regexp.MustCompile(`a`), + Replacement: "b", + }, + }, + Input: common.MapStr{ + "f": "abc", + }, + Output: common.MapStr{ + "f": "bbc", + }, + error: false, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "Add one more hierarchy to event", + Fields: []replaceConfig{ + { + Field: "f.b", + Pattern: regexp.MustCompile(`a`), + Replacement: "b", + }, + }, + Input: common.MapStr{ + "f": common.MapStr{ + "b": "abc", + }, + }, + Output: common.MapStr{ + "f": common.MapStr{ + "b": "bbc", + }, + }, + error: false, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "replace two fields at the same time.", + Fields: []replaceConfig{ + { + Field: "f", + Pattern: regexp.MustCompile(`a.*c`), + Replacement: "cab", + }, + { + Field: "g", + Pattern: regexp.MustCompile(`ef`), + Replacement: "oor", + }, + }, + Input: common.MapStr{ + "f": "abbbc", + "g": "def", + }, + Output: common.MapStr{ + "f": "cab", + "g": "door", + }, + error: false, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "test missing fields", + Fields: []replaceConfig{ + { + Field: "f", + Pattern: regexp.MustCompile(`abc`), + Replacement: "xyz", + }, + { + Field: "g", + Pattern: regexp.MustCompile(`def`), + Replacement: "", + }, + }, + Input: common.MapStr{ + "m": "abc", + "n": "def", + }, + Output: common.MapStr{ + "m": "abc", + "n": "def", + "error": common.MapStr{ + "message": "Failed to replace fields in processor: could not fetch value for key: f, Error: key not found", + }, + }, + error: true, + IgnoreMissing: false, + FailOnError: true, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + f := &replaceString{ + config: replaceStringConfig{ + Fields: test.Fields, + IgnoreMissing: test.IgnoreMissing, + FailOnError: test.FailOnError, + }, + } + event := &beat.Event{ + Fields: test.Input, + } + + newEvent, err := f.Run(event) + if !test.error { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + } + + assert.True(t, reflect.DeepEqual(newEvent.Fields, test.Output)) + }) + } +} + +func TestReplaceField(t *testing.T) { + var tests = []struct { + Field string + Pattern *regexp.Regexp + Replacement string + ignoreMissing bool + failOnError bool + Input common.MapStr + Output common.MapStr + error bool + description string + }{ + { + description: "replace part of field value with another string", + Field: "f", + Pattern: regexp.MustCompile(`a`), + Replacement: "b", + Input: common.MapStr{ + "f": "abc", + }, + Output: common.MapStr{ + "f": "bbc", + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "Add hierarchy to event and replace", + Field: "f.b", + Pattern: regexp.MustCompile(`a`), + Replacement: "b", + Input: common.MapStr{ + "f": common.MapStr{ + "b": "abc", + }, + }, + Output: common.MapStr{ + "f": common.MapStr{ + "b": "bbc", + }, + }, + error: false, + ignoreMissing: false, + failOnError: true, + }, + { + description: "try replacing value of missing fields in event", + Field: "f", + Pattern: regexp.MustCompile(`abc`), + Replacement: "xyz", + Input: common.MapStr{ + "m": "abc", + "n": "def", + }, + Output: common.MapStr{ + "m": "abc", + "n": "def", + }, + error: true, + ignoreMissing: false, + failOnError: true, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + + f := &replaceString{ + config: replaceStringConfig{ + IgnoreMissing: test.ignoreMissing, + FailOnError: test.failOnError, + }, + } + + err := f.replaceField(test.Field, test.Pattern, test.Replacement, test.Input) + if err != nil { + assert.Equal(t, test.error, true) + } + + assert.True(t, reflect.DeepEqual(test.Input, test.Output)) + }) + } +} From 0a4972325253ce7592f2fecfd853eae3bc495ddc Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 28 Apr 2020 14:54:00 +0200 Subject: [PATCH 2/2] fix changelog --- CHANGELOG.next.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1a4cb159ee3..2feb3728f9a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -260,7 +260,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update supported versions of `redis` output. {pull}17198[17198] - Update documentation for system.process.memory fields to include clarification on Windows os's. {pull}17268[17268] - Add `replace` processor for replacing string values of fields. {pull}17342[17342] -- Add optional regex based cid extractor to `add_kubernetes_metadata` processor. {pull}17360[17360] - Add `urldecode` processor to for decoding URL-encoded fields. {pull}17505[17505] - Add support for AWS IAM `role_arn` in credentials config. {pull}17658[17658] {issue}12464[12464]