Skip to content

Commit

Permalink
Add WriteJSONKeys to jsontransform package (elastic#3280)
Browse files Browse the repository at this point in the history
WriteJSONKeys reduces code duplication between the Filebeat json parser and the decode_json_fields processor.

Fixed bug with decode_json_fields where the `when` condition was not allowed.
  • Loading branch information
martinscholz83 authored and andrewkroh committed Jan 6, 2017
1 parent 77cdb8f commit d1fe6ef
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 80 deletions.
42 changes: 2 additions & 40 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/common/jsontransform"
)

// Event is sent to the output and must contain all relevant information
Expand Down Expand Up @@ -80,43 +79,6 @@ func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) {
// Delete existing json key
delete(event, "json")

for k, v := range jsonFields {
if e.JSONConfig.OverwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event[reader.JsonErrorKey] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event[reader.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event[reader.JsonErrorKey] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event[reader.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
}
jsontransform.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey)
}
}
51 changes: 51 additions & 0 deletions libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package jsontransform

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

func WriteJSONKeys(event common.MapStr, keys map[string]interface{}, overwriteKeys bool, errorKey string) {
for k, v := range keys {
if overwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event[errorKey] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event[errorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event[errorKey] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event[errorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}

}
}
42 changes: 2 additions & 40 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
Expand Down Expand Up @@ -43,7 +42,7 @@ func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(newDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target")))
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
Expand Down Expand Up @@ -88,44 +87,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
default:
errs = append(errs, errors.New("Error trying to add target to root.").Error())
case map[string]interface{}:
for k, v := range t {
if f.overwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event["json_error"] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event["json_error"] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
}
jsontransform.WriteJSONKeys(event, t, f.overwriteKeys, "json_error")
}
}
} else {
Expand Down

0 comments on commit d1fe6ef

Please sign in to comment.