From d1fe6ef5cc68b811eddb9e6e2a6dfb2512873737 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Fri, 6 Jan 2017 19:31:07 +0100 Subject: [PATCH] Add WriteJSONKeys to jsontransform package (#3280) WriteJSONKeys reduces code duplication between the Filebeat json parser and the decode_json_fields processor. Fixed bug with decode_json_fields where the `when` condition was not allowed. --- filebeat/input/event.go | 42 +-------------- libbeat/common/jsontransform/jsonhelper.go | 51 +++++++++++++++++++ .../processors/actions/decode_json_fields.go | 42 +-------------- 3 files changed, 55 insertions(+), 80 deletions(-) create mode 100644 libbeat/common/jsontransform/jsonhelper.go diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 198e2c6c381..415de417842 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -1,13 +1,12 @@ package input import ( - "fmt" "time" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/common/jsontransform" ) // Event is sent to the output and must contain all relevant information @@ -80,43 +79,6 @@ func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) { // Delete existing json key delete(event, "json") - for k, v := range jsonFields { - if e.JSONConfig.OverwriteKeys { - if k == "@timestamp" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite @timestamp because value is not string") - event[reader.JsonErrorKey] = "@timestamp not overwritten (not string)" - continue - } - - // @timestamp must be of format RFC3339 - ts, err := time.Parse(time.RFC3339, vstr) - if err != nil { - logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event[reader.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) - continue - } - event[k] = common.Time(ts) - } else if k == "type" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite type because value is not string") - event[reader.JsonErrorKey] = "type not overwritten (not string)" - continue - } - if len(vstr) == 0 || vstr[0] == '_' { - logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") - event[reader.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) - continue - } - event[k] = vstr - } else { - event[k] = v - } - } else if _, exists := event[k]; !exists { - event[k] = v - } - } + jsontransform.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey) } } diff --git a/libbeat/common/jsontransform/jsonhelper.go b/libbeat/common/jsontransform/jsonhelper.go new file mode 100644 index 00000000000..37bdb7e93e8 --- /dev/null +++ b/libbeat/common/jsontransform/jsonhelper.go @@ -0,0 +1,51 @@ +package jsontransform + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func WriteJSONKeys(event common.MapStr, keys map[string]interface{}, overwriteKeys bool, errorKey string) { + for k, v := range keys { + if overwriteKeys { + if k == "@timestamp" { + vstr, ok := v.(string) + if !ok { + logp.Err("JSON: Won't overwrite @timestamp because value is not string") + event[errorKey] = "@timestamp not overwritten (not string)" + continue + } + + // @timestamp must be of format RFC3339 + ts, err := time.Parse(time.RFC3339, vstr) + if err != nil { + logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) + event[errorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) + continue + } + event[k] = common.Time(ts) + } else if k == "type" { + vstr, ok := v.(string) + if !ok { + logp.Err("JSON: Won't overwrite type because value is not string") + event[errorKey] = "type not overwritten (not string)" + continue + } + if len(vstr) == 0 || vstr[0] == '_' { + logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") + event[errorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) + continue + } + event[k] = vstr + } else { + event[k] = v + } + } else if _, exists := event[k]; !exists { + event[k] = v + } + + } +} diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 5570acd4b09..772c481c472 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "strings" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" @@ -43,7 +42,7 @@ func init() { processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields, requireFields("fields"), - allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target"))) + allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when"))) } func newDecodeJSONFields(c common.Config) (processors.Processor, error) { @@ -88,44 +87,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { default: errs = append(errs, errors.New("Error trying to add target to root.").Error()) case map[string]interface{}: - for k, v := range t { - if f.overwriteKeys { - if k == "@timestamp" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite @timestamp because value is not string") - event["json_error"] = "@timestamp not overwritten (not string)" - continue - } - - // @timestamp must be of format RFC3339 - ts, err := time.Parse(time.RFC3339, vstr) - if err != nil { - logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) - continue - } - event[k] = common.Time(ts) - } else if k == "type" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite type because value is not string") - event["json_error"] = "type not overwritten (not string)" - continue - } - if len(vstr) == 0 || vstr[0] == '_' { - logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") - event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) - continue - } - event[k] = vstr - } else { - event[k] = v - } - } else if _, exists := event[k]; !exists { - event[k] = v - } - } + jsontransform.WriteJSONKeys(event, t, f.overwriteKeys, "json_error") } } } else {