Skip to content

Commit

Permalink
Add target for decoded_json_field (#3169)
Browse files Browse the repository at this point in the history
* Add decoded output to json field
* Add target option
* Add test for target root
* Add option overwrite_keys for decode_json
* Hardcode json_error key
* Set config
* Add changelog entry
  • Loading branch information
martinscholz83 authored and andrewkroh committed Jan 3, 2017
1 parent 1f2be4b commit 3180831
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]

*Filebeat*
- Add enabled config option to prospectors. {pull}3157[3157]
- Add target option for decoded_json_field. {pull}3169[3169]

*Winlogbeat*

Expand Down
74 changes: 65 additions & 9 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
Expand All @@ -14,15 +15,19 @@ import (
)

type decodeJSONFields struct {
fields []string
maxDepth int
processArray bool
fields []string
maxDepth int
overwriteKeys bool
processArray bool
target *string
}

type config struct {
Fields []string `config:"fields"`
MaxDepth int `config:"max_depth" validate:"min=1"`
ProcessArray bool `config:"process_array"`
Fields []string `config:"fields"`
MaxDepth int `config:"max_depth" validate:"min=1"`
OverwriteKeys bool `config:"overwrite_keys"`
ProcessArray bool `config:"process_array"`
Target *string `config:"target"`
}

var (
Expand All @@ -38,7 +43,7 @@ func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(newDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "max_depth", "process_array")))
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
Expand All @@ -51,7 +56,7 @@ func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray}
f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, processArray: config.ProcessArray, target: config.Target}
return f, nil
}

Expand All @@ -75,7 +80,58 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
continue
}

_, err = event.Put(field, output)
if f.target != nil {
if len(*f.target) > 0 {
_, err = event.Put(*f.target, output)
} else {
switch t := output.(type) {
default:
errs = append(errs, errors.New("Error trying to add target to root.").Error())
case map[string]interface{}:
for k, v := range t {
if f.overwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event["json_error"] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event["json_error"] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
}
}
}
} else {
_, err = event.Put(field, output)
}

if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
Expand Down
58 changes: 58 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,64 @@ func TestValidJSONDepthTwo(t *testing.T) {

}

func TestTargetOption(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"process_array": false,
"max_depth": 2,
"target": "doc",
})

actual := getActualValue(t, testConfig, input)

expected := common.MapStr{
"doc": map[string]interface{}{
"log": map[string]interface{}{
"level": "info",
},
"stream": "stderr",
"count": 3,
},
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())
}

func TestTargetRootOption(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"process_array": false,
"max_depth": 2,
"target": "",
})

actual := getActualValue(t, testConfig, input)

expected := common.MapStr{
"log": map[string]interface{}{
"level": "info",
},
"stream": "stderr",
"count": 3,
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})
Expand Down

0 comments on commit 3180831

Please sign in to comment.