-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
LIBBEAT: Enhancement replace_string processor for replacing strings v…
…alues of fields. (#17342) This PR is to add a replace processor. This processor takes in a field name, search string and replacement string. Searches field value for pattern and replaces it with replacement string.
- Loading branch information
1 parent
164beb0
commit 09fd4df
Showing
4 changed files
with
416 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
[[replace-fields]] | ||
=== Replace fields from events | ||
|
||
++++ | ||
<titleabbrev>replace</titleabbrev> | ||
++++ | ||
|
||
The `replace` processor takes a list of fields to replace the field value | ||
matching a pattern with replacement string. Under the `fields` key, each entry | ||
contains a `field: field-name`, `pattern: regex-pattern` and | ||
`replacement: replacement-string`, where: | ||
|
||
* `field` is the original field name | ||
* `pattern` is regex pattern to match field's value | ||
* `replacement` is the replacement string to use for updating the field's value | ||
|
||
The `replace` processor cannot be used to replace value with a completely new value. | ||
|
||
TIP: You can replace field value to truncate part of field value or replace | ||
it with a new string. It can also be used for masking PII information. | ||
|
||
Following example will change path from /usr/bin to /usr/local/bin | ||
|
||
[source,yaml] | ||
------- | ||
processors: | ||
- replace: | ||
fields: | ||
- field: "file.path" | ||
pattern: "/usr/" | ||
replacement: "/usr/local/" | ||
ignore_missing: false | ||
fail_on_error: true | ||
------- | ||
|
||
The `replace` processor has following configuration settings: | ||
|
||
`ignore_missing`:: (Optional) If set to true, no error is logged in case a specifiedfield | ||
is missing. Default is `false`. | ||
|
||
`fail_on_error`:: (Optional) If set to true, in case of an error the replacement of | ||
field values is stopped and the original event is returned. If set to false, replacement | ||
continues even if an error occurs during replacement. Default is `true`. | ||
|
||
See <<conditions>> for a list of supported conditions. | ||
|
||
You can specify multiple `ignore_missing` processors under the `processors` | ||
section. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package actions | ||
|
||
import ( | ||
"fmt" | ||
"regexp" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
"github.com/elastic/beats/v7/libbeat/processors" | ||
"github.com/elastic/beats/v7/libbeat/processors/checks" | ||
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" | ||
) | ||
|
||
type replaceString struct { | ||
config replaceStringConfig | ||
} | ||
|
||
type replaceStringConfig struct { | ||
Fields []replaceConfig `config:"fields"` | ||
IgnoreMissing bool `config:"ignore_missing"` | ||
FailOnError bool `config:"fail_on_error"` | ||
} | ||
|
||
type replaceConfig struct { | ||
Field string `config:"field"` | ||
Pattern *regexp.Regexp `config:"pattern"` | ||
Replacement string `config:"replacement"` | ||
} | ||
|
||
func init() { | ||
processors.RegisterPlugin("replace", | ||
checks.ConfigChecked(NewReplaceString, | ||
checks.RequireFields("fields"))) | ||
|
||
jsprocessor.RegisterPlugin("Replace", NewReplaceString) | ||
} | ||
|
||
// NewReplaceString returns a new replace processor. | ||
func NewReplaceString(c *common.Config) (processors.Processor, error) { | ||
config := replaceStringConfig{ | ||
IgnoreMissing: false, | ||
FailOnError: true, | ||
} | ||
err := c.Unpack(&config) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to unpack the replace configuration: %s", err) | ||
} | ||
|
||
f := &replaceString{ | ||
config: config, | ||
} | ||
return f, nil | ||
} | ||
|
||
func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) { | ||
var backup common.MapStr | ||
// Creates a copy of the event to revert in case of failure | ||
if f.config.FailOnError { | ||
backup = event.Fields.Clone() | ||
} | ||
|
||
for _, field := range f.config.Fields { | ||
err := f.replaceField(field.Field, field.Pattern, field.Replacement, event.Fields) | ||
if err != nil { | ||
errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err) | ||
logp.Debug("replace", errMsg.Error()) | ||
if f.config.FailOnError { | ||
event.Fields = backup | ||
event.PutValue("error.message", errMsg.Error()) | ||
return event, err | ||
} | ||
} | ||
} | ||
|
||
return event, nil | ||
} | ||
|
||
func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, fields common.MapStr) error { | ||
currentValue, err := fields.GetValue(field) | ||
if err != nil { | ||
// Ignore ErrKeyNotFound errors | ||
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { | ||
return nil | ||
} | ||
return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err) | ||
} | ||
|
||
updatedString := pattern.ReplaceAllString(currentValue.(string), replacement) | ||
_, err = fields.Put(field, updatedString) | ||
if err != nil { | ||
return fmt.Errorf("could not put value: %s: %v, %v", replacement, currentValue, err) | ||
} | ||
return nil | ||
} | ||
|
||
func (f *replaceString) String() string { | ||
return "replace=" + fmt.Sprintf("%+v", f.config.Fields) | ||
} |
Oops, something went wrong.