Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add target for decode_json_fields #3169

Merged
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]

*Filebeat*
- Add enabled config option to prospectors. {pull}3157[3157]
- Add target option for decoded_json_field. {pull}3169[3169]

*Winlogbeat*

Expand Down
74 changes: 65 additions & 9 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
Expand All @@ -14,15 +15,19 @@ import (
)

type decodeJSONFields struct {
fields []string
maxDepth int
processArray bool
fields []string
maxDepth int
overwriteKeys bool
processArray bool
target *string
}

type config struct {
Fields []string `config:"fields"`
MaxDepth int `config:"max_depth" validate:"min=1"`
ProcessArray bool `config:"process_array"`
Fields []string `config:"fields"`
MaxDepth int `config:"max_depth" validate:"min=1"`
OverwriteKeys bool `config:"overwrite_keys"`
ProcessArray bool `config:"process_array"`
Target *string `config:"target"`
}

var (
Expand All @@ -38,7 +43,7 @@ func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(newDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "max_depth", "process_array")))
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
Expand All @@ -51,7 +56,7 @@ func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray}
f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, processArray: config.ProcessArray, target: config.Target}
return f, nil
}

Expand All @@ -75,7 +80,58 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
continue
}

_, err = event.Put(field, output)
if f.target != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be three cases to handle here. When *f.target == "" you can't use event.Put, instead you need to add the fields from output to event. Looking at the json decoding in Filebeat, the default behavior is to not overwrite keys unless overwrite_key is set to true. So I would follow this model to be consistent.

if len(*f.target) > 0 {
_, err = event.Put(*f.target, output)
} else {
switch t := output.(type) {
Copy link
Member

@andrewkroh andrewkroh Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks good. But now the complexity of the Run method is a bit high with all of the nested if/for/switch statements. WDYT about breaking this into smaller functions?

Copy link
Contributor Author

@martinscholz83 martinscholz83 Dec 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of adding a global helper function for that switch part. Because now we duplicate code. For example, helpers.jsonhelper.OverwriteJsonKeys?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense if there is duplication. 👍

default:
errs = append(errs, errors.New("Error trying to add target to root.").Error())
case map[string]interface{}:
for k, v := range t {
if f.overwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event["json_error"] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event["json_error"] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
}
}
}
} else {
_, err = event.Put(field, output)
}

if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
Expand Down
58 changes: 58 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,64 @@ func TestValidJSONDepthTwo(t *testing.T) {

}

func TestTargetOption(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding a test case. 👍

input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"process_array": false,
"max_depth": 2,
"target": "doc",
})

actual := getActualValue(t, testConfig, input)

expected := common.MapStr{
"doc": map[string]interface{}{
"log": map[string]interface{}{
"level": "info",
},
"stream": "stderr",
"count": 3,
},
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())
}

func TestTargetRootOption(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"process_array": false,
"max_depth": 2,
"target": "",
})

actual := getActualValue(t, testConfig, input)

expected := common.MapStr{
"log": map[string]interface{}{
"level": "info",
},
"stream": "stderr",
"count": 3,
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})
Expand Down