Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jsonhelper #3280

Merged
merged 4 commits into from
Jan 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 2 additions & 40 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/common/jsontransform"
)

// Event is sent to the output and must contain all relevant information
Expand Down Expand Up @@ -80,43 +79,6 @@ func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) {
// Delete existing json key
delete(event, "json")

for k, v := range jsonFields {
if e.JSONConfig.OverwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event[reader.JsonErrorKey] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event[reader.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event[reader.JsonErrorKey] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event[reader.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
}
jsontransform.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey)
}
}
51 changes: 51 additions & 0 deletions libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package jsontransform

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

func WriteJSONKeys(event common.MapStr, keys map[string]interface{}, overwriteKeys bool, errorKey string) {
for k, v := range keys {
if overwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event[errorKey] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event[errorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event[errorKey] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event[errorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}

}
}
42 changes: 2 additions & 40 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
Expand Down Expand Up @@ -43,7 +42,7 @@ func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(newDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target")))
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
Expand Down Expand Up @@ -88,44 +87,7 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
default:
errs = append(errs, errors.New("Error trying to add target to root.").Error())
case map[string]interface{}:
for k, v := range t {
if f.overwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event["json_error"] = "@timestamp not overwritten (not string)"
continue
}

// @timestamp must be of format RFC3339
ts, err := time.Parse(time.RFC3339, vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event["json_error"] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = common.Time(ts)
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event["json_error"] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event["json_error"] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
}
jsontransform.WriteJSONKeys(event, t, f.overwriteKeys, "json_error")
}
}
} else {
Expand Down