Skip to content

Commit

Permalink
Add decode base64 field processor (#11914)
Browse files Browse the repository at this point in the history
* add decode base64 field processor
  • Loading branch information
mmatur authored and ph committed Jun 18, 2019
1 parent 4d95550 commit 4d9b1d0
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `proxy_disable` output flag to explicitly ignore proxy environment variables. {issue}11713[11713] {pull}12243[12243]
- Processor `add_cloud_metadata` adds fields `cloud.account.id` and `cloud.image.id` for AWS EC2. {pull}12307[12307]
- Add configurable bulk_flush_frequency in kafka output. {pull}12254[12254]
- Add `decode_base64_field` processor for decoding base64 field. {pull}11914[11914]

*Auditbeat*

Expand Down
49 changes: 42 additions & 7 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ For example:
------
+
Similarly, for {beatname_uc} modules, you can define processors under the
`input` section of the module definition.
`input` section of the module definition.
endif::[]
ifeval::["{beatname_lc}"=="metricbeat"]
[source,yaml]
Expand All @@ -119,7 +119,7 @@ ifeval::["{beatname_lc}"=="metricbeat"]
- <processor_name>:
when:
<condition>
<parameters>
<parameters>
----
endif::[]
ifeval::["{beatname_lc}"=="auditbeat"]
Expand All @@ -133,7 +133,7 @@ auditbeat.modules:
- <processor_name>:
when:
<condition>
<parameters>
<parameters>
----
endif::[]
ifeval::["{beatname_lc}"=="packetbeat"]
Expand All @@ -142,7 +142,7 @@ For example:
[source,yaml]
----
packetbeat.protocols:
- type: <protocol_type>
- type: <protocol_type>
processors:
- <processor_name>:
when:
Expand Down Expand Up @@ -214,6 +214,7 @@ ifdef::has_decode_csv_fields_processor[]
* <<decode-csv-fields,`decode_csv_fields`>>
endif::[]
* <<decode-json-fields,`decode_json_fields`>>
* <<decode-base64-field,`decode_base64_field`>>
* <<dissect, `dissect`>>
* <<extract-array,`extract_array`>>
* <<processor-dns, `dns`>>
Expand Down Expand Up @@ -651,7 +652,7 @@ scalar values, arrays, dictionaries, or any nested combination of these. By
default the fields that you specify will be grouped under the `fields`
sub-dictionary in the event. To group the fields under a different
sub-dictionary, use the `target` setting. To store the fields as
top-level fields, set `target: ''`.
top-level fields, set `target: ''`.

`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`.
`fields`:: Fields to be added.
Expand Down Expand Up @@ -864,6 +865,40 @@ is treated as if the field was not set at all.
exist in the event are overwritten by keys from the decoded JSON object. The
default value is false.

[[decode-base64-field]]
=== Decode Base64 fields

The `decode_base64_field` processor specifies a field to base64 decode.
The `field` key contains a `from: old-key` and a `to: new-key` pair. `from` is
the origin and `to` the target name of the field.

To overwrite fields either first rename the target field or use the `drop_fields`
processor to drop the field and then rename the field.

[source,yaml]
-------
processors:
- decode_base64_field:
from: "field1"
to: "field2"
ignore_missing: false
fail_on_error: true
-------

In the example above:
- field1 is decoded in field2

The `decode_base64_field` processor has the following configuration settings:

`ignore_missing`:: (Optional) If set to true, no error is logged in case a key
which should be base64 decoded is missing. Default is `false`.

`fail_on_error`:: (Optional) If set to true, in case of an error the base6 4decode
of fields is stopped and the original event is returned. If set to false, decoding
continues also if an error happened during decoding. Default is `true`.

See <<conditions>> for a list of supported conditions.

[[community-id]]
=== Community ID Network Flow Hash

Expand Down Expand Up @@ -1092,7 +1127,7 @@ construct a lookup key with the value of the field `metricset.host`.
Each Beat can define its own default indexers and matchers which are enabled by
default. For example, FileBeat enables the `container` indexer, which indexes
pod metadata based on all container IDs, and a `logs_path` matcher, which takes
the `log.file.path` field, extracts the container ID, and uses it to retrieve
the `log.file.path` field, extracts the container ID, and uses it to retrieve
metadata.

The configuration below enables the processor when {beatname_lc} is run as a pod in
Expand Down Expand Up @@ -1175,7 +1210,7 @@ You can do this by mounting the socket inside the container. For example:
To avoid privilege issues, you may also need to add `--user=root` to the
`docker run` flags. Because the user must be part of the docker group in order
to access `/var/run/docker.sock`, root access is required if {beatname_uc} is
running as non-root inside the container.
running as non-root inside the container.
=====

[source,yaml]
Expand Down
139 changes: 139 additions & 0 deletions libbeat/processors/actions/decode_base64_field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"encoding/base64"
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

const (
processorName = "decode_base64_field"
)

type decodeBase64Field struct {
log *logp.Logger

config base64Config
}

type base64Config struct {
fromTo `config:"field"`
IgnoreMissing bool `config:"ignore_missing"`
FailOnError bool `config:"fail_on_error"`
}

var (
defaultBase64Config = base64Config{
IgnoreMissing: false,
FailOnError: true,
}
)

func init() {
processors.RegisterPlugin(processorName,
checks.ConfigChecked(NewDecodeBase64Field,
checks.RequireFields("field"),
checks.AllowedFields("field", "when")))
}

// NewDecodeBase64Field construct a new decode_base64_field processor.
func NewDecodeBase64Field(c *common.Config) (processors.Processor, error) {
config := defaultBase64Config

log := logp.NewLogger(processorName)

err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the %s configuration: %s", processorName, err)
}

return &decodeBase64Field{
log: log,
config: config,
}, nil
}

func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
// Creates a copy of the event to revert in case of failure
if f.config.FailOnError {
backup = event.Fields.Clone()
}

err := f.decodeField(f.config.From, f.config.To, event.Fields)
if err != nil && f.config.FailOnError {
errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err)
f.log.Debug("decode base64", errMsg.Error())
event.Fields = backup
_, _ = event.PutValue("error.message", errMsg.Error())
return event, err
}

return event, nil
}

func (f decodeBase64Field) String() string {
return fmt.Sprintf("%s=%+v", processorName, f.config.fromTo)
}

func (f *decodeBase64Field) decodeField(from string, to string, fields common.MapStr) error {
value, err := fields.GetValue(from)
if err != nil {
// Ignore ErrKeyNotFound errors
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
}
return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err)
}

text, ok := value.(string)
if !ok {
return fmt.Errorf("invalid type for `from`, expecting a string received %T", value)
}

decodedData, err := base64.StdEncoding.DecodeString(text)
if err != nil {
return fmt.Errorf("error trying to unmarshal %s: %v", text, err)
}

field := to
// If to is empty
if to == "" || from == to {
// Deletion must happen first to support cases where a becomes a.b
if err = fields.Delete(from); err != nil {
return fmt.Errorf("could not delete key: %s, %+v", from, err)
}

field = from
}

if _, err = fields.Put(field, string(decodedData)); err != nil {
return fmt.Errorf("could not put value: %s: %v, %v", decodedData, field, err)
}

return nil
}
Loading

0 comments on commit 4d9b1d0

Please sign in to comment.