From 382997fe5d2319532c90a37d0a3e31cb08d686df Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Mon, 25 Jul 2016 22:24:28 +0200 Subject: [PATCH] Unmarshal JSON numbers as ints where possible The change modifies the unmarshaling logic to try parsing the numbers in an int64 first, and only on error parse to a float64. In practice, this means that e.g. 1 will become an int64 and 1.0 will become a float64. Fixes #2038. Includes a system test to verify that ticket. --- CHANGELOG.asciidoc | 3 +- filebeat/harvester/reader/json.go | 60 +++++++++++- filebeat/harvester/reader/json_test.go | 128 ++++++++++++------------- filebeat/tests/files/logs/json_int.log | 2 + filebeat/tests/system/test_json.py | 31 ++++++ 5 files changed, 152 insertions(+), 72 deletions(-) create mode 100644 filebeat/tests/files/logs/json_int.log diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0118fae4f24..d8ad45b8521 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -48,9 +48,10 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d *Topbeat* *Filebeat* -- Fix potential data loss between filebeat restarts, reporting unpublished lines as published. {issue}2041[2041] +- Fix potential data loss between filebeat restarts, reporting unpublished lines as published. {issue}2041[2041] - Fix open file handler issue {issue}2028[2028] {pull}2020[2020] +- Fix filtering of JSON events when using integers in conditions {issue}2038[2038] *Winlogbeat* - Fix potential data loss between winlogbeat restarts, reporting unpublished lines as published. {issue}2041[2041] diff --git a/filebeat/harvester/reader/json.go b/filebeat/harvester/reader/json.go index 697dd61fca2..8e5456bae7f 100644 --- a/filebeat/harvester/reader/json.go +++ b/filebeat/harvester/reader/json.go @@ -1,6 +1,7 @@ package reader import ( + "bytes" "encoding/json" "fmt" @@ -25,8 +26,9 @@ func NewJSON(r Reader, cfg *JSONConfig) *JSON { // decodeJSON unmarshals the text parameter into a MapStr and // returns the new text column if one was requested. func (r *JSON) decodeJSON(text []byte) ([]byte, common.MapStr) { - var jsonFields common.MapStr - err := json.Unmarshal(text, &jsonFields) + var jsonFields map[string]interface{} + + err := unmarshal(text, &jsonFields) if err != nil { logp.Err("Error decoding JSON: %v", err) if r.cfg.AddErrorKey { @@ -58,6 +60,60 @@ func (r *JSON) decodeJSON(text []byte) ([]byte, common.MapStr) { return []byte(textString), jsonFields } +// unmarshal is equivalent with json.Unmarshal but it converts numbers +// to int64 where possible, instead of using always float64. +func unmarshal(text []byte, fields *map[string]interface{}) error { + dec := json.NewDecoder(bytes.NewReader(text)) + dec.UseNumber() + err := dec.Decode(fields) + if err != nil { + return err + } + transformNumbersDict(*fields) + return nil +} + +// transformNumbersDict walks a json decoded tree an replaces json.Number +// with int64, float64, or string, in this order of preference (i.e. if it +// parses as an int, use int. if it parses as a float, use float. etc). +func transformNumbersDict(dict common.MapStr) { + for k, v := range dict { + switch vv := v.(type) { + case json.Number: + dict[k] = transformNumber(vv) + case map[string]interface{}: + transformNumbersDict(vv) + case []interface{}: + transformNumbersArray(vv) + } + } +} + +func transformNumber(value json.Number) interface{} { + i64, err := value.Int64() + if err == nil { + return i64 + } + f64, err := value.Float64() + if err == nil { + return f64 + } + return value.String() +} + +func transformNumbersArray(arr []interface{}) { + for i, v := range arr { + switch vv := v.(type) { + case json.Number: + arr[i] = transformNumber(vv) + case map[string]interface{}: + transformNumbersDict(vv) + case []interface{}: + transformNumbersArray(vv) + } + } +} + // Next decodes JSON and returns the filled Line object. func (r *JSON) Next() (Message, error) { reader, err := r.reader.Next() diff --git a/filebeat/harvester/reader/json_test.go b/filebeat/harvester/reader/json_test.go index 05c597a2142..dce752db611 100644 --- a/filebeat/harvester/reader/json_test.go +++ b/filebeat/harvester/reader/json_test.go @@ -1,90 +1,80 @@ -// +build !integration - package reader import ( "testing" - "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) -func TestDecodeJSON(t *testing.T) { +func TestUnmarshal(t *testing.T) { type io struct { - Text string - Config JSONConfig - ExpectedText string - ExpectedMap common.MapStr + Name string + Input string + Output map[string]interface{} } - var tests = []io{ - { - Text: `{"message": "test", "value": 1}`, - Config: JSONConfig{MessageKey: "message"}, - ExpectedText: "test", - ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, - }, - { - Text: `{"message": "test", "value": 1}`, - Config: JSONConfig{MessageKey: "message1"}, - ExpectedText: "", - ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, - }, - { - Text: `{"message": "test", "value": 1}`, - Config: JSONConfig{MessageKey: "value"}, - ExpectedText: "", - ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, + tests := []io{ + io{ + Name: "Top level int, float, string, bool", + Input: `{"a": 3, "b": 2.0, "c": "hello", "d": true}`, + Output: map[string]interface{}{ + "a": int64(3), + "b": float64(2), + "c": "hello", + "d": true, + }, }, - { - Text: `{"message": "test", "value": "1"}`, - Config: JSONConfig{MessageKey: "value"}, - ExpectedText: "1", - ExpectedMap: common.MapStr{"message": "test", "value": "1"}, + io{ + Name: "Nested objects with ints", + Input: `{"a": 3, "b": {"c": {"d": 5}}}`, + Output: map[string]interface{}{ + "a": int64(3), + "b": map[string]interface{}{ + "c": map[string]interface{}{ + "d": int64(5), + }, + }, + }, }, - { - // in case of JSON decoding errors, the text is passed as is - Text: `{"message": "test", "value": "`, - Config: JSONConfig{MessageKey: "value"}, - ExpectedText: `{"message": "test", "value": "`, - ExpectedMap: nil, + io{ + Name: "Array of floats", + Input: `{"a": 3, "b": {"c": [4.0, 4.1, 4.2]}}`, + Output: map[string]interface{}{ + "a": int64(3), + "b": map[string]interface{}{ + "c": []interface{}{ + float64(4.0), float64(4.1), float64(4.2), + }, + }, + }, }, - { - // Add key error helps debugging this - Text: `{"message": "test", "value": "`, - Config: JSONConfig{MessageKey: "value", AddErrorKey: true}, - ExpectedText: `{"message": "test", "value": "`, - ExpectedMap: common.MapStr{"json_error": "Error decoding JSON: unexpected end of JSON input"}, + io{ + Name: "Array of mixed ints and floats", + Input: `{"a": 3, "b": {"c": [4, 4.1, 4.2]}}`, + Output: map[string]interface{}{ + "a": int64(3), + "b": map[string]interface{}{ + "c": []interface{}{ + int64(4), float64(4.1), float64(4.2), + }, + }, + }, }, - { - // If the text key is not found, put an error - Text: `{"message": "test", "value": "1"}`, - Config: JSONConfig{MessageKey: "hello", AddErrorKey: true}, - ExpectedText: ``, - ExpectedMap: common.MapStr{"message": "test", "value": "1", "json_error": "Key 'hello' not found"}, - }, - { - // If the text key is found, but not a string, put an error - Text: `{"message": "test", "value": 1}`, - Config: JSONConfig{MessageKey: "value", AddErrorKey: true}, - ExpectedText: ``, - ExpectedMap: common.MapStr{"message": "test", "value": float64(1), "json_error": "Value of key 'value' is not a string"}, - }, - { - // Without a text key, simple return the json and an empty text - Text: `{"message": "test", "value": 1}`, - Config: JSONConfig{AddErrorKey: true}, - ExpectedText: ``, - ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, + io{ + Name: "Negative values", + Input: `{"a": -3, "b": -1.0}`, + Output: map[string]interface{}{ + "a": int64(-3), + "b": float64(-1), + }, }, } for _, test := range tests { - - var p JSON - p.cfg = &test.Config - text, map_ := p.decodeJSON([]byte(test.Text)) - assert.Equal(t, test.ExpectedText, string(text)) - assert.Equal(t, test.ExpectedMap, map_) + t.Logf("Running test %s", test.Name) + var output map[string]interface{} + err := unmarshal([]byte(test.Input), &output) + assert.NoError(t, err) + assert.Equal(t, test.Output, output) } } diff --git a/filebeat/tests/files/logs/json_int.log b/filebeat/tests/files/logs/json_int.log new file mode 100644 index 00000000000..d8bf3d6bb42 --- /dev/null +++ b/filebeat/tests/files/logs/json_int.log @@ -0,0 +1,2 @@ +{"http_user_agent": "ELB-HealthChecker/1.0", "status": 200} +{"http_user_agent": "ELB-HealthChecker/1.0", "status": 404} diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index d7d78f98cd2..878797d554c 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -334,3 +334,34 @@ def test_with_generic_filtering_remove_headers(self): assert "res" not in o assert o["method"] == "GET" assert o["message"] == "Sent response: " + + def test_integer_condition(self): + """ + It should work to drop JSON event based on an integer + value by using a simple `equal` condition. See #2038. + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + json=dict( + keys_under_root=True, + ), + processors=[{ + "drop_event": { + "when": "equals.status: 200", + }, + }] + ) + os.mkdir(self.working_dir + "/log/") + self.copy_files(["logs/json_int.log"], + source_dir="../files", + target_dir="log") + + proc = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + proc.check_kill_and_wait() + + output = self.read_output() + assert len(output) == 1 + assert output[0]["status"] == 404