From e2451263a0b7b647e0bb1458c96f7c62c89427da Mon Sep 17 00:00:00 2001 From: maddin2016 Date: Sat, 10 Dec 2016 22:45:21 +0100 Subject: [PATCH 01/11] Add decoded output to json field --- libbeat/processors/actions/decode_json_fields.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 392c66db6c4..96b98e75969 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -75,7 +75,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { continue } - _, err = event.Put(field, output) + _, err = event.Put("json", output) if err != nil { debug("Error trying to Put value %v for field : %s", output, field) errs = append(errs, err.Error()) From 1fd75b0120f7cc4fec6ab2727c4020db4f2de203 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Mon, 12 Dec 2016 08:56:20 +0100 Subject: [PATCH 02/11] fix test --- libbeat/processors/actions/decode_json_fields_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 70b0bdd5df6..619379654d9 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -70,11 +70,12 @@ func TestValidJSONDepthOne(t *testing.T) { actual := getActualValue(t, testConfig, input) expected := common.MapStr{ - "msg": map[string]interface{}{ + "json": map[string]interface{}{ "log": "{\"level\":\"info\"}", "stream": "stderr", "count": 3, }, + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", "pipeline": "us1", } @@ -97,13 +98,14 @@ func TestValidJSONDepthTwo(t *testing.T) { actual := getActualValue(t, testConfig, input) expected := common.MapStr{ - "msg": map[string]interface{}{ + "json": map[string]interface{}{ "log": map[string]interface{}{ "level": "info", }, "stream": "stderr", "count": 3, }, + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", "pipeline": "us1", } From 6cf262dd2031114ec2f5c8aa6dff19b155fc2f4f Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Tue, 13 Dec 2016 09:44:21 +0100 Subject: [PATCH 03/11] Add target option --- .../processors/actions/decode_json_fields.go | 14 ++++++-- .../actions/decode_json_fields_test.go | 36 ++++++++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 96b98e75969..8d4e5e0898e 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -17,18 +17,21 @@ type decodeJSONFields struct { fields []string maxDepth int processArray bool + target string } type config struct { Fields []string `config:"fields"` MaxDepth int `config:"maxDepth" validate:"min=1"` ProcessArray bool `config:"processArray"` + Target string `config:"target"` } var ( defaultConfig = config{ MaxDepth: 1, ProcessArray: false, + Target: "", } ) @@ -38,7 +41,7 @@ func init() { processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields, requireFields("fields"), - allowedFields("fields", "maxDepth", "processArray"))) + allowedFields("fields", "maxDepth", "processArray", "target"))) } func newDecodeJSONFields(c common.Config) (processors.Processor, error) { @@ -51,7 +54,7 @@ func newDecodeJSONFields(c common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) } - f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray} + f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray, target: config.Target} return f, nil } @@ -75,7 +78,12 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { continue } - _, err = event.Put("json", output) + if len(f.target) > 0 { + _, err = event.Put(f.target, output) + } else { + _, err = event.Put(field, output) + } + if err != nil { debug("Error trying to Put value %v for field : %s", output, field) errs = append(errs, err.Error()) diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 619379654d9..05b2175dfcd 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -70,12 +70,11 @@ func TestValidJSONDepthOne(t *testing.T) { actual := getActualValue(t, testConfig, input) expected := common.MapStr{ - "json": map[string]interface{}{ + "msg": map[string]interface{}{ "log": "{\"level\":\"info\"}", "stream": "stderr", "count": 3, }, - "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", "pipeline": "us1", } @@ -98,14 +97,13 @@ func TestValidJSONDepthTwo(t *testing.T) { actual := getActualValue(t, testConfig, input) expected := common.MapStr{ - "json": map[string]interface{}{ + "msg": map[string]interface{}{ "log": map[string]interface{}{ "level": "info", }, "stream": "stderr", "count": 3, }, - "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", "pipeline": "us1", } @@ -113,6 +111,36 @@ func TestValidJSONDepthTwo(t *testing.T) { } +func TestTargetOption(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "processArray": false, + "maxDepth": 2, + "target": "doc", + }) + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "doc": map[string]interface{}{ + "log": map[string]interface{}{ + "level": "info", + }, + "stream": "stderr", + "count": 3, + }, + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) +} + func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { if testing.Verbose() { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) From 00ae54f3c50c3297985d9d41cefc4bbebf7136b4 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Wed, 14 Dec 2016 09:25:50 +0100 Subject: [PATCH 04/11] Make target default nil --- libbeat/processors/actions/decode_json_fields.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 8d4e5e0898e..bc29b779717 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -17,21 +17,20 @@ type decodeJSONFields struct { fields []string maxDepth int processArray bool - target string + target *string } type config struct { Fields []string `config:"fields"` MaxDepth int `config:"maxDepth" validate:"min=1"` ProcessArray bool `config:"processArray"` - Target string `config:"target"` + Target *string `config:"target"` } var ( defaultConfig = config{ MaxDepth: 1, ProcessArray: false, - Target: "", } ) @@ -78,8 +77,8 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { continue } - if len(f.target) > 0 { - _, err = event.Put(f.target, output) + if f.target != nil { + _, err = event.Put(*f.target, output) } else { _, err = event.Put(field, output) } From 93ad30ff48763eeb8a82eeed02f3dd9b5b98c4f4 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Thu, 15 Dec 2016 13:17:10 +0100 Subject: [PATCH 05/11] Add filter to check if target belongs to root --- libbeat/processors/actions/decode_json_fields.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index bc29b779717..d9e9615239a 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -78,7 +78,20 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { } if f.target != nil { - _, err = event.Put(*f.target, output) + if len(*f.target) > 0 { + _, err = event.Put(*f.target, output) + } else { + switch t := output.(type) { + default: + errs = append(errs, errors.New("Error trying to add target to root.").Error()) + case map[string]interface{}: + for k, v := range t { + if _, exists := event[k]; !exists { + event[k] = v + } + } + } + } } else { _, err = event.Put(field, output) } From e5b481338fbe5b70ffbdeff8a94e96cea826d3f5 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Thu, 15 Dec 2016 13:30:55 +0100 Subject: [PATCH 06/11] Add test for target root --- .../actions/decode_json_fields_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 05b2175dfcd..1fd3bfaf58e 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -141,6 +141,34 @@ func TestTargetOption(t *testing.T) { assert.Equal(t, expected.String(), actual.String()) } +func TestTargetRootOption(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "processArray": false, + "maxDepth": 2, + "target": "", + }) + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "log": map[string]interface{}{ + "level": "info", + }, + "stream": "stderr", + "count": 3, + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) +} + func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { if testing.Verbose() { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) From 962e18e95fce070f9a2d00f125a2924b3cd817f7 Mon Sep 17 00:00:00 2001 From: lrz-hal Date: Thu, 22 Dec 2016 18:35:35 +0100 Subject: [PATCH 07/11] Add option overwrite_keys for decode_json --- .../processors/actions/decode_json_fields.go | 57 +++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 643c0316e98..96cf94b5606 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -5,7 +5,9 @@ import ( "encoding/json" "fmt" "strings" + "time" + "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" @@ -14,17 +16,19 @@ import ( ) type decodeJSONFields struct { - fields []string - maxDepth int - processArray bool - target *string + fields []string + maxDepth int + overwriteKeys bool + processArray bool + target *string } type config struct { - Fields []string `config:"fields"` - MaxDepth int `config:"max_depth" validate:"min=1"` - ProcessArray bool `config:"process_array"` - Target *string `config:"target"` + Fields []string `config:"fields"` + MaxDepth int `config:"max_depth" validate:"min=1"` + OverwriteKeys bool `config:"overwrite_keys"` + ProcessArray bool `config:"process_array"` + Target *string `config:"target"` } var ( @@ -40,7 +44,7 @@ func init() { processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields, requireFields("fields"), - allowedFields("fields", "max_depth", "process_array", "target"))) + allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target"))) } func newDecodeJSONFields(c common.Config) (processors.Processor, error) { @@ -86,7 +90,40 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { errs = append(errs, errors.New("Error trying to add target to root.").Error()) case map[string]interface{}: for k, v := range t { - if _, exists := event[k]; !exists { + if f.overwriteKeys { + if k == "@timestamp" { + vstr, ok := v.(string) + if !ok { + logp.Err("JSON: Won't overwrite @timestamp because value is not string") + event[reader.JsonErrorKey] = "@timestamp not overwritten (not string)" + continue + } + + // @timestamp must be of format RFC3339 + ts, err := time.Parse(time.RFC3339, vstr) + if err != nil { + logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) + event[reader.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) + continue + } + event[k] = common.Time(ts) + } else if k == "type" { + vstr, ok := v.(string) + if !ok { + logp.Err("JSON: Won't overwrite type because value is not string") + event[reader.JsonErrorKey] = "type not overwritten (not string)" + continue + } + if len(vstr) == 0 || vstr[0] == '_' { + logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") + event[reader.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) + continue + } + event[k] = vstr + } else { + event[k] = v + } + } else if _, exists := event[k]; !exists { event[k] = v } } From d7f9ef04052792680a6c609b25e7e72a9edfdc40 Mon Sep 17 00:00:00 2001 From: lrz-hal Date: Thu, 22 Dec 2016 18:48:49 +0100 Subject: [PATCH 08/11] Hardcode json_error key --- libbeat/processors/actions/decode_json_fields.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 96cf94b5606..0295615a543 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" @@ -95,7 +94,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { vstr, ok := v.(string) if !ok { logp.Err("JSON: Won't overwrite @timestamp because value is not string") - event[reader.JsonErrorKey] = "@timestamp not overwritten (not string)" + event["json_error"] = "@timestamp not overwritten (not string)" continue } @@ -103,7 +102,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { ts, err := time.Parse(time.RFC3339, vstr) if err != nil { logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event[reader.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) + event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) continue } event[k] = common.Time(ts) @@ -111,12 +110,12 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { vstr, ok := v.(string) if !ok { logp.Err("JSON: Won't overwrite type because value is not string") - event[reader.JsonErrorKey] = "type not overwritten (not string)" + event["json_error"] = "type not overwritten (not string)" continue } if len(vstr) == 0 || vstr[0] == '_' { logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") - event[reader.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) + event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) continue } event[k] = vstr From de156a28e5db42678d972b15677de7ee57c2cedb Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Fri, 23 Dec 2016 08:53:48 +0100 Subject: [PATCH 09/11] Set config --- libbeat/processors/actions/decode_json_fields.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 0295615a543..5570acd4b09 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -56,7 +56,7 @@ func newDecodeJSONFields(c common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) } - f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray, target: config.Target} + f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, processArray: config.ProcessArray, target: config.Target} return f, nil } From 877868a59a7c70ae975fc0ea3c57f6e8c50a8897 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Fri, 23 Dec 2016 08:59:44 +0100 Subject: [PATCH 10/11] Fix test --- .../actions/decode_json_fields_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 1882fb03764..053f52797ba 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -118,10 +118,10 @@ func TestTargetOption(t *testing.T) { } testConfig, _ = common.NewConfigFrom(map[string]interface{}{ - "fields": fields, - "processArray": false, - "maxDepth": 2, - "target": "doc", + "fields": fields, + "process_array": false, + "max_depth": 2, + "target": "doc", }) actual := getActualValue(t, testConfig, input) @@ -148,10 +148,10 @@ func TestTargetRootOption(t *testing.T) { } testConfig, _ = common.NewConfigFrom(map[string]interface{}{ - "fields": fields, - "processArray": false, - "maxDepth": 2, - "target": "", + "fields": fields, + "process_array": false, + "max_depth": 2, + "target": "", }) actual := getActualValue(t, testConfig, input) From 2967c0013e860c7f70d22be5266295959ae95b07 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Fri, 23 Dec 2016 09:40:51 +0100 Subject: [PATCH 11/11] Add changelog entry --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 33a88b52756..a0c488118bc 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Add enabled config option to prospectors. {pull}3157[3157] +- Add target option for decoded_json_field. {pull}3169[3169] *Winlogbeat*