From a4dfc2ed91f006d5fdefea4e8b14a91f7140277a Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Wed, 4 Jan 2017 09:51:18 +0100 Subject: [PATCH 1/4] add json helper --- filebeat/input/event.go | 42 +-------------- libbeat/helpers/jsonhelper.go | 51 +++++++++++++++++++ .../processors/actions/decode_json_fields.go | 42 ++------------- 3 files changed, 56 insertions(+), 79 deletions(-) create mode 100644 libbeat/helpers/jsonhelper.go diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 198e2c6c381d..a605a60004b6 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -1,13 +1,12 @@ package input import ( - "fmt" "time" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/helpers" ) // Event is sent to the output and must contain all relevant information @@ -80,43 +79,6 @@ func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) { // Delete existing json key delete(event, "json") - for k, v := range jsonFields { - if e.JSONConfig.OverwriteKeys { - if k == "@timestamp" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite @timestamp because value is not string") - event[reader.JsonErrorKey] = "@timestamp not overwritten (not string)" - continue - } - - // @timestamp must be of format RFC3339 - ts, err := time.Parse(time.RFC3339, vstr) - if err != nil { - logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event[reader.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) - continue - } - event[k] = common.Time(ts) - } else if k == "type" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite type because value is not string") - event[reader.JsonErrorKey] = "type not overwritten (not string)" - continue - } - if len(vstr) == 0 || vstr[0] == '_' { - logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") - event[reader.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) - continue - } - event[k] = vstr - } else { - event[k] = v - } - } else if _, exists := event[k]; !exists { - event[k] = v - } - } + helpers.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey) } } diff --git a/libbeat/helpers/jsonhelper.go b/libbeat/helpers/jsonhelper.go new file mode 100644 index 000000000000..8d3d310c114a --- /dev/null +++ b/libbeat/helpers/jsonhelper.go @@ -0,0 +1,51 @@ +package helpers + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func WriteJSONKeys(event common.MapStr, keys map[string]interface{}, overwriteKeys bool, errorKey string) { + for k, v := range keys { + if overwriteKeys { + if k == "@timestamp" { + vstr, ok := v.(string) + if !ok { + logp.Err("JSON: Won't overwrite @timestamp because value is not string") + event[errorKey] = "@timestamp not overwritten (not string)" + continue + } + + // @timestamp must be of format RFC3339 + ts, err := time.Parse(time.RFC3339, vstr) + if err != nil { + logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) + event[errorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) + continue + } + event[k] = common.Time(ts) + } else if k == "type" { + vstr, ok := v.(string) + if !ok { + logp.Err("JSON: Won't overwrite type because value is not string") + event[errorKey] = "type not overwritten (not string)" + continue + } + if len(vstr) == 0 || vstr[0] == '_' { + logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") + event[errorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) + continue + } + event[k] = vstr + } else { + event[k] = v + } + } else if _, exists := event[k]; !exists { + event[k] = v + } + + } +} diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 5570acd4b093..5c1164f1da51 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -5,12 +5,13 @@ import ( "encoding/json" "fmt" "strings" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" + "github.com/elastic/beats/libbeat/helpers" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/libbeat/helpers" "github.com/pkg/errors" ) @@ -88,44 +89,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { default: errs = append(errs, errors.New("Error trying to add target to root.").Error()) case map[string]interface{}: - for k, v := range t { - if f.overwriteKeys { - if k == "@timestamp" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite @timestamp because value is not string") - event["json_error"] = "@timestamp not overwritten (not string)" - continue - } - - // @timestamp must be of format RFC3339 - ts, err := time.Parse(time.RFC3339, vstr) - if err != nil { - logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) - continue - } - event[k] = common.Time(ts) - } else if k == "type" { - vstr, ok := v.(string) - if !ok { - logp.Err("JSON: Won't overwrite type because value is not string") - event["json_error"] = "type not overwritten (not string)" - continue - } - if len(vstr) == 0 || vstr[0] == '_' { - logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") - event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) - continue - } - event[k] = vstr - } else { - event[k] = v - } - } else if _, exists := event[k]; !exists { - event[k] = v - } - } + helpers.WriteJSONKeys(event, t, *f.overwriteKeys, "json_error") } } } else { From fa9c1fc7165708db09147f3fbd9a8978d6ed3082 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Wed, 4 Jan 2017 10:51:32 +0100 Subject: [PATCH 2/4] fix package name --- filebeat/input/event.go | 4 ++-- libbeat/helpers/{ => jsonhelper}/jsonhelper.go | 2 +- libbeat/processors/actions/decode_json_fields.go | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) rename libbeat/helpers/{ => jsonhelper}/jsonhelper.go (98%) diff --git a/filebeat/input/event.go b/filebeat/input/event.go index a605a60004b6..d6bbeb9b5faf 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -6,7 +6,7 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/helpers" + "github.com/elastic/beats/libbeat/helpers/jsonhelper" ) // Event is sent to the output and must contain all relevant information @@ -79,6 +79,6 @@ func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) { // Delete existing json key delete(event, "json") - helpers.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey) + jsonhelper.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey) } } diff --git a/libbeat/helpers/jsonhelper.go b/libbeat/helpers/jsonhelper/jsonhelper.go similarity index 98% rename from libbeat/helpers/jsonhelper.go rename to libbeat/helpers/jsonhelper/jsonhelper.go index 8d3d310c114a..bad752d15f70 100644 --- a/libbeat/helpers/jsonhelper.go +++ b/libbeat/helpers/jsonhelper/jsonhelper.go @@ -1,4 +1,4 @@ -package helpers +package jsonhelper import ( "fmt" diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 5c1164f1da51..bce421ec2df3 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -8,10 +8,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" - "github.com/elastic/beats/libbeat/helpers" + "github.com/elastic/beats/libbeat/helpers/jsonhelper" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/libbeat/helpers" "github.com/pkg/errors" ) @@ -89,7 +88,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { default: errs = append(errs, errors.New("Error trying to add target to root.").Error()) case map[string]interface{}: - helpers.WriteJSONKeys(event, t, *f.overwriteKeys, "json_error") + jsonhelper.WriteJSONKeys(event, t, f.overwriteKeys, "json_error") } } } else { From d423f529663fc7f36a5446a6fb08190daac20ac8 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Wed, 4 Jan 2017 13:39:33 +0100 Subject: [PATCH 3/4] Add to jsontransform --- filebeat/input/event.go | 4 ++-- .../jsonhelper => common/jsontransform}/jsonhelper.go | 2 +- libbeat/processors/actions/decode_json_fields.go | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) rename libbeat/{helpers/jsonhelper => common/jsontransform}/jsonhelper.go (98%) diff --git a/filebeat/input/event.go b/filebeat/input/event.go index d6bbeb9b5faf..415de417842c 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -6,7 +6,7 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/helpers/jsonhelper" + "github.com/elastic/beats/libbeat/common/jsontransform" ) // Event is sent to the output and must contain all relevant information @@ -79,6 +79,6 @@ func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) { // Delete existing json key delete(event, "json") - jsonhelper.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey) + jsontransform.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey) } } diff --git a/libbeat/helpers/jsonhelper/jsonhelper.go b/libbeat/common/jsontransform/jsonhelper.go similarity index 98% rename from libbeat/helpers/jsonhelper/jsonhelper.go rename to libbeat/common/jsontransform/jsonhelper.go index bad752d15f70..37bdb7e93e8d 100644 --- a/libbeat/helpers/jsonhelper/jsonhelper.go +++ b/libbeat/common/jsontransform/jsonhelper.go @@ -1,4 +1,4 @@ -package jsonhelper +package jsontransform import ( "fmt" diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index bce421ec2df3..c5919f11e3dc 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -8,7 +8,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" - "github.com/elastic/beats/libbeat/helpers/jsonhelper" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" "github.com/pkg/errors" @@ -88,7 +87,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { default: errs = append(errs, errors.New("Error trying to add target to root.").Error()) case map[string]interface{}: - jsonhelper.WriteJSONKeys(event, t, f.overwriteKeys, "json_error") + jsontransform.WriteJSONKeys(event, t, f.overwriteKeys, "json_error") } } } else { From 30192ce16777667b3f321f98ff431bd6bf9a3b65 Mon Sep 17 00:00:00 2001 From: Martin Scholz Date: Thu, 5 Jan 2017 23:16:42 +0100 Subject: [PATCH 4/4] Add when to allowedFields --- libbeat/processors/actions/decode_json_fields.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index c5919f11e3dc..772c481c472a 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -42,7 +42,7 @@ func init() { processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields, requireFields("fields"), - allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target"))) + allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when"))) } func newDecodeJSONFields(c common.Config) (processors.Processor, error) {