diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0c2e3f5b89b..d644d4e0882 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/1035569addc4a3b29ffa14f8a08c27c1ace16ef - Support IPv6 addresses with zone id in IIS ingest pipeline. {issue}9836[9836] error log: {pull}9869[9869] access log: {pull}10030[10030] - Support haproxy log lines without captured headers. {issue}9463[9463] {pull}9958[9958] +- Fix bad bytes count in `docker` input when filtering by stream. {pull}10211[10211] *Heartbeat* diff --git a/filebeat/reader/docker_json/docker_json.go b/filebeat/reader/docker_json/docker_json.go index 00cf3dc2fea..0f08630c26a 100644 --- a/filebeat/reader/docker_json/docker_json.go +++ b/filebeat/reader/docker_json/docker_json.go @@ -170,8 +170,14 @@ func (p *Reader) parseLine(message *reader.Message, msg *logLine) error { // Next returns the next line. func (p *Reader) Next() (reader.Message, error) { + var bytes int for { message, err := p.reader.Next() + + // keep the right bytes count even if we return an error + bytes += message.Bytes + message.Bytes = bytes + if err != nil { return message, err } @@ -185,6 +191,11 @@ func (p *Reader) Next() (reader.Message, error) { // Handle multiline messages, join partial lines for p.partial && logLine.Partial { next, err := p.reader.Next() + + // keep the right bytes count even if we return an error + bytes += next.Bytes + message.Bytes = bytes + if err != nil { return message, err } @@ -193,7 +204,6 @@ func (p *Reader) Next() (reader.Message, error) { return message, err } message.Content = append(message.Content, next.Content...) - message.Bytes += next.Bytes } if p.stream != "all" && p.stream != logLine.Stream { diff --git a/filebeat/reader/docker_json/docker_json_test.go b/filebeat/reader/docker_json/docker_json_test.go index 6854d093b23..12d85a619ea 100644 --- a/filebeat/reader/docker_json/docker_json_test.go +++ b/filebeat/reader/docker_json/docker_json_test.go @@ -54,24 +54,36 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", expectedError: true, + expectedMessage: reader.Message{ + Bytes: 16, + }, }, { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", expectedError: true, + expectedMessage: reader.Message{ + Bytes: 37, + }, }, { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", expectedError: true, + expectedMessage: reader.Message{ + Bytes: 25, + }, }, { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", expectedError: true, + expectedMessage: reader.Message{ + Bytes: 82, + }, }, { name: "CRI log no tags", @@ -98,7 +110,7 @@ func TestDockerJSON(t *testing.T) { criflags: true, }, { - name: "Filtering stream", + name: "Filtering stream, bytes count accounts for all (filtered and not)", input: [][]byte{ []byte(`{"log":"filtered\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), []byte(`{"log":"unfiltered\n","stream":"stderr","time":"2017-11-09T13:27:36.277747246Z"}`), @@ -109,11 +121,11 @@ func TestDockerJSON(t *testing.T) { Content: []byte("unfiltered\n"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), - Bytes: 80, + Bytes: 158, }, }, { - name: "Filtering stream", + name: "Filtering CRI stream, bytes count accounts for all (filtered and not)", input: [][]byte{ []byte(`2017-10-12T13:32:21.232861448Z stdout F 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`), []byte(`2017-11-12T23:32:21.212771448Z stderr F 2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error`), @@ -124,7 +136,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 12, 23, 32, 21, 212771448, time.UTC), - Bytes: 95, + Bytes: 212, }, criflags: true, }, @@ -196,6 +208,9 @@ func TestDockerJSON(t *testing.T) { stream: "all", forceCRI: true, expectedError: true, + expectedMessage: reader.Message{ + Bytes: 82, + }, }, { name: "Force CRI log no tags", @@ -257,6 +272,19 @@ func TestDockerJSON(t *testing.T) { forceCRI: true, criflags: true, }, + { + name: "Error parsing still keeps good bytes count", + input: [][]byte{ + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"shutdown...\n","stream`), + }, + stream: "stdout", + expectedError: true, + expectedMessage: reader.Message{ + Bytes: 139, + }, + partial: true, + }, } for _, test := range tests { @@ -273,6 +301,8 @@ func TestDockerJSON(t *testing.T) { if err == nil { assert.EqualValues(t, test.expectedMessage, message) + } else { + assert.Equal(t, test.expectedMessage.Bytes, message.Bytes) } }) }