Skip to content

Commit

Permalink
json_decode_fields processor (#2605)
Browse files Browse the repository at this point in the history
* add decode_json_fields processor

* Refactored to pull json_tranform into its own module
  • Loading branch information
suraj-soni authored and Steffen Siering committed Nov 21, 2016
1 parent d7a299e commit 50db534
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
- Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605]

*Metricbeat*

Expand Down
44 changes: 2 additions & 42 deletions filebeat/harvester/reader/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
)

Expand Down Expand Up @@ -69,51 +70,10 @@ func unmarshal(text []byte, fields *map[string]interface{}) error {
if err != nil {
return err
}
transformNumbersDict(*fields)
jsontransform.TransformNumbers(*fields)
return nil
}

// transformNumbersDict walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func transformNumbersDict(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (Message, error) {
message, err := r.reader.Next()
Expand Down
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def test_with_generic_filtering(self):
message_key="message",
keys_under_root=True,
overwrite_keys=True,
add_error_key=True,
add_error_key=True
),
processors=[{
"drop_fields": {
Expand Down Expand Up @@ -305,7 +305,7 @@ def test_with_generic_filtering_remove_headers(self):
message_key="message",
keys_under_root=True,
overwrite_keys=True,
add_error_key=True,
add_error_key=True
),
processors=[{
"drop_fields": {
Expand Down
8 changes: 8 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ def test_clean_inactive(self):
lambda: self.log_contains_count("Registry file updated") > 1,
max_timeout=15)

if os.name == "nt":
# On windows registry recreation can take a bit longer
time.sleep(1)

data = self.get_registry()
assert len(data) == 2

Expand Down Expand Up @@ -834,6 +838,10 @@ def test_clean_removed(self):
lambda: self.log_contains_count("Registry file updated") > 1,
max_timeout=15)

if os.name == "nt":
# On windows registry recration can take a bit longer
time.sleep(1)

data = self.get_registry()
assert len(data) == 2

Expand Down
48 changes: 48 additions & 0 deletions libbeat/common/jsontransform/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jsontransform

import (
"encoding/json"

"github.com/elastic/beats/libbeat/common"
)

// TransformNumbers walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func TransformNumbers(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
TransformNumbers(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
TransformNumbers(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}
156 changes: 156 additions & 0 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package actions

import (
"bytes"
"encoding/json"
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/pkg/errors"
)

type decodeJSONFields struct {
fields []string
maxDepth int
processArray bool
}

type config struct {
Fields []string `config:"fields"`
MaxDepth int `config:"maxDepth" validate:"min=1"`
ProcessArray bool `config:"processArray"`
}

var (
defaultConfig = config{
MaxDepth: 1,
ProcessArray: false,
}
)

var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(newDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "maxDepth", "processArray")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
config := defaultConfig

err := c.Unpack(&config)

if err != nil {
logp.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray}
return f, nil
}

func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
var errs []string

for _, field := range f.fields {
data, err := event.GetValue(field)
if err != nil && errors.Cause(err) != common.ErrKeyNotFound {
debug("Error trying to GetValue for field : %s in event : %v", field, event)
errs = append(errs, err.Error())
continue
}
text, ok := data.(string)
if ok {
var output interface{}
err := unmarshal(f.maxDepth, []byte(text), &output, f.processArray)
if err != nil {
debug("Error trying to unmarshal %s", event[field])
errs = append(errs, err.Error())
continue
}

_, err = event.Put(field, output)
if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
continue
}
}
}

return event, fmt.Errorf(strings.Join(errs, ", "))
}

func unmarshal(maxDepth int, text []byte, fields *interface{}, processArray bool) error {
if err := DecodeJSON(text, fields); err != nil {
return err
}

maxDepth--
if maxDepth == 0 {
return nil
}

tryUnmarshal := func(v interface{}) (interface{}, bool) {
str, isString := v.(string)
if !isString {
return v, false
}

var tmp interface{}
err := unmarshal(maxDepth, []byte(str), &tmp, processArray)
if err != nil {
return v, false
}

return tmp, true
}

// try to deep unmarshal fields
switch O := interface{}(*fields).(type) {
case map[string]interface{}:
for k, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[k] = decoded
}
}
// We want to process arrays here
case []interface{}:
if !processArray {
break
}

for i, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[i] = decoded
}
}
}
return nil
}

func DecodeJSON(text []byte, to *interface{}) error {
dec := json.NewDecoder(bytes.NewReader(text))
dec.UseNumber()
err := dec.Decode(to)

if err != nil {
return err
}

switch O := interface{}(*to).(type) {
case map[string]interface{}:
jsontransform.TransformNumbers(O)
}
return nil
}

func (f decodeJSONFields) String() string {
return "decode_json_fields=" + strings.Join(f.fields, ", ")
}
Loading

0 comments on commit 50db534

Please sign in to comment.