diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a2bad114b156..59f235a4e51e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff] - Fixed a memory leak when harvesters are closed. {pull}7820[7820] - Fixed a docker input error due to the offset update bug in partial log join.{pull}8177[8177] +- Update CRI format to support partial/full tags. {pull}8265[8265] *Heartbeat* diff --git a/filebeat/docs/inputs/input-docker.asciidoc b/filebeat/docs/inputs/input-docker.asciidoc index 84cc062d284d..6685745e078c 100644 --- a/filebeat/docs/inputs/input-docker.asciidoc +++ b/filebeat/docs/inputs/input-docker.asciidoc @@ -55,6 +55,12 @@ Enable partial messages joining. Docker `json-file` driver splits log lines larg end of line (`\n`) is present for common lines in the resulting file, while it's not the for the lines that have been split. `combine_partial` joins them back together when enabled. It is enabled by default. +===== `cri.parse_flags` + +Enable CRI flags parsing from the log file. CRI uses flags to signal a partial line, enabling this will +ensure partial lines are rejoined. It is disabled by default. + + The following input configures {beatname_uc} to read the `stdout` stream from all containers under the default Docker containers path: diff --git a/filebeat/input/docker/config.go b/filebeat/input/docker/config.go index dc945f29bd67..e33bb869ac56 100644 --- a/filebeat/input/docker/config.go +++ b/filebeat/input/docker/config.go @@ -31,12 +31,15 @@ type config struct { // Partial configures the prospector to join partial lines Partial bool `config:"combine_partials"` + + // Enable CRI flags parsing (to be switched to default in 7.0) + CRIFlags bool `config:"cri.parse_flags"` } type containers struct { IDs []string `config:"ids"` Path string `config:"path"` - // Stream can be all,stdout or stderr + // Stream can be all, stdout or stderr Stream string `config:"stream"` } diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go index ce34276e5479..56ffda2a699f 100644 --- a/filebeat/input/docker/input.go +++ b/filebeat/input/docker/input.go @@ -71,6 +71,10 @@ func NewInput( return nil, errors.Wrap(err, "update input config") } + if err := cfg.SetBool("docker-json.cri_flags", -1, config.Partial); err != nil { + return nil, errors.Wrap(err, "update input config") + } + // Add stream to meta to ensure different state per stream if config.Containers.Stream != "all" { if context.Meta == nil { diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index 0b13992518d5..38c34fd5ea28 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -104,10 +104,9 @@ type config struct { // Hidden on purpose, used by the docker input: DockerJSON *struct { - Stream string `config:"stream"` - - // TODO move this to true by default - Partial bool `config:"partial"` + Stream string `config:"stream"` + Partial bool `config:"partial"` + CRIFlags bool `config:"cri_flags"` } `config:"docker-json"` } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index d1d4f1481851..294ffb1ef983 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -559,7 +559,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { if h.config.DockerJSON != nil { // Docker json-file format, add custom parsing to the pipeline - r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial) + r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.CRIFlags) } if h.config.JSON != nil { diff --git a/filebeat/reader/docker_json/docker_json.go b/filebeat/reader/docker_json/docker_json.go index f3913da1eb4b..a2849f3e37e7 100644 --- a/filebeat/reader/docker_json/docker_json.go +++ b/filebeat/reader/docker_json/docker_json.go @@ -37,67 +37,100 @@ type Reader struct { // join partial lines partial bool -} -type dockerLog struct { - Timestamp string `json:"time"` - Log string `json:"log"` - Stream string `json:"stream"` + // parse CRI flags + criflags bool } -type crioLog struct { - Timestamp time.Time - Stream string - Log []byte +type logLine struct { + Partial bool `json:"-"` + Timestamp time.Time `json:"-"` + Time string `json:"time"` + Stream string `json:"stream"` + Log string `json:"log"` } // New creates a new reader renaming a field -func New(r reader.Reader, stream string, partial bool) *Reader { +func New(r reader.Reader, stream string, partial bool, CRIFlags bool) *Reader { return &Reader{ - stream: stream, - partial: partial, - reader: r, + stream: stream, + partial: partial, + reader: r, + criflags: CRIFlags, } } // parseCRILog parses logs in CRI log format. // CRI log format example : // 2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache -func parseCRILog(message reader.Message, msg *crioLog) (reader.Message, error) { - log := strings.SplitN(string(message.Content), " ", 3) - if len(log) < 3 { - return message, errors.New("invalid CRI log") +func (p *Reader) parseCRILog(message *reader.Message, msg *logLine) error { + split := 3 + // read line tags if split is enabled: + if p.criflags { + split = 4 } - ts, err := time.Parse(time.RFC3339, log[0]) + + // current field + i := 0 + + // timestamp + log := strings.SplitN(string(message.Content), " ", split) + if len(log) < split { + return errors.New("invalid CRI log format") + } + ts, err := time.Parse(time.RFC3339, log[i]) if err != nil { - return message, errors.Wrap(err, "parsing CRI timestamp") + return errors.Wrap(err, "parsing CRI timestamp") + } + message.Ts = ts + i++ + + // stream + msg.Stream = log[i] + i++ + + // tags + partial := false + if p.criflags { + // currently only P(artial) or F(ull) are available + tags := strings.Split(log[i], ":") + for _, tag := range tags { + if tag == "P" { + partial = true + } + } + i++ } - msg.Timestamp = ts - msg.Stream = log[1] - msg.Log = []byte(log[2]) + msg.Partial = partial message.AddFields(common.MapStr{ "stream": msg.Stream, }) - message.Content = msg.Log - message.Ts = ts + // Remove ending \n for partial messages + message.Content = []byte(log[i]) + if partial { + message.Content = bytes.TrimRightFunc(message.Content, func(r rune) bool { + return r == '\n' || r == '\r' + }) + } - return message, nil + return nil } // parseReaderLog parses logs in Docker JSON log format. // Docker JSON log format example: // {"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"} -func parseDockerJSONLog(message reader.Message, msg *dockerLog) (reader.Message, error) { +func (p *Reader) parseDockerJSONLog(message *reader.Message, msg *logLine) error { dec := json.NewDecoder(bytes.NewReader(message.Content)) + if err := dec.Decode(&msg); err != nil { - return message, errors.Wrap(err, "decoding docker JSON") + return errors.Wrap(err, "decoding docker JSON") } // Parse timestamp - ts, err := time.Parse(time.RFC3339, msg.Timestamp) + ts, err := time.Parse(time.RFC3339, msg.Time) if err != nil { - return message, errors.Wrap(err, "parsing docker timestamp") + return errors.Wrap(err, "parsing docker timestamp") } message.AddFields(common.MapStr{ @@ -105,8 +138,17 @@ func parseDockerJSONLog(message reader.Message, msg *dockerLog) (reader.Message, }) message.Content = []byte(msg.Log) message.Ts = ts + msg.Partial = message.Content[len(message.Content)-1] != byte('\n') + + return nil +} + +func (p *Reader) parseLine(message *reader.Message, msg *logLine) error { + if strings.HasPrefix(string(message.Content), "{") { + return p.parseDockerJSONLog(message, msg) + } - return message, nil + return p.parseCRILog(message, msg) } // Next returns the next line. @@ -117,32 +159,27 @@ func (p *Reader) Next() (reader.Message, error) { return message, err } - var dockerLine dockerLog - var crioLine crioLog + var logLine logLine + err = p.parseLine(&message, &logLine) + if err != nil { + return message, err + } - if strings.HasPrefix(string(message.Content), "{") { - message, err = parseDockerJSONLog(message, &dockerLine) + // Handle multiline messages, join partial lines + for p.partial && logLine.Partial { + next, err := p.reader.Next() if err != nil { return message, err } - // Handle multiline messages, join lines that don't end with \n - for p.partial && message.Content[len(message.Content)-1] != byte('\n') { - next, err := p.reader.Next() - if err != nil { - return message, err - } - next, err = parseDockerJSONLog(next, &dockerLine) - if err != nil { - return message, err - } - message.Content = append(message.Content, next.Content...) - message.Bytes += next.Bytes + err = p.parseLine(&next, &logLine) + if err != nil { + return message, err } - } else { - message, err = parseCRILog(message, &crioLine) + message.Content = append(message.Content, next.Content...) + message.Bytes += next.Bytes } - if p.stream != "all" && p.stream != dockerLine.Stream && p.stream != crioLine.Stream { + if p.stream != "all" && p.stream != logLine.Stream { continue } diff --git a/filebeat/reader/docker_json/docker_json_test.go b/filebeat/reader/docker_json/docker_json_test.go index 4f2cbcb8f7a5..9469d38cbbd4 100644 --- a/filebeat/reader/docker_json/docker_json_test.go +++ b/filebeat/reader/docker_json/docker_json_test.go @@ -29,14 +29,16 @@ import ( func TestDockerJSON(t *testing.T) { tests := []struct { + name string input [][]byte stream string partial bool + criflags bool expectedError bool expectedMessage reader.Message }{ - // Common log message { + name: "Common log message", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, stream: "all", expectedMessage: reader.Message{ @@ -46,32 +48,32 @@ func TestDockerJSON(t *testing.T) { Bytes: 122, }, }, - // Wrong JSON { + name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", expectedError: true, }, - // Wrong CRI { + name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", expectedError: true, }, - // Wrong CRI { + name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", expectedError: true, }, - // Missing time { + name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", expectedError: true, }, - // CRI log { + name: "CRI log no tags", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, stream: "all", expectedMessage: reader.Message{ @@ -80,9 +82,22 @@ func TestDockerJSON(t *testing.T) { Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC), Bytes: 115, }, + criflags: false, }, - // Filtering stream { + name: "CRI log", + input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC), + Bytes: 117, + }, + criflags: true, + }, + { + name: "Filtering stream", input: [][]byte{ []byte(`{"log":"filtered\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), []byte(`{"log":"unfiltered\n","stream":"stderr","time":"2017-11-09T13:27:36.277747246Z"}`), @@ -96,23 +111,24 @@ func TestDockerJSON(t *testing.T) { Bytes: 80, }, }, - // Filtering stream { + name: "Filtering stream", input: [][]byte{ - []byte(`2017-10-12T13:32:21.232861448Z stdout 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`), - []byte(`2017-11-12T23:32:21.212771448Z stderr 2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error`), - []byte(`2017-12-12T10:32:21.212864448Z stdout 2017-12-12 10:32:21.212 [WARN][88] table.go 222: Warn`), + []byte(`2017-10-12T13:32:21.232861448Z stdout F 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`), + []byte(`2017-11-12T23:32:21.212771448Z stderr F 2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error`), + []byte(`2017-12-12T10:32:21.212864448Z stdout F 2017-12-12 10:32:21.212 [WARN][88] table.go 222: Warn`), }, stream: "stderr", expectedMessage: reader.Message{ Content: []byte("2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 12, 23, 32, 21, 212771448, time.UTC), - Bytes: 93, + Bytes: 95, }, + criflags: true, }, - // Split lines { + name: "Split lines", input: [][]byte{ []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), []byte(`{"log":"shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), @@ -126,8 +142,40 @@ func TestDockerJSON(t *testing.T) { Bytes: 190, }, }, - // Split lines with partial disabled { + name: "Split lines", + input: [][]byte{ + []byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`), + []byte(`2017-11-12T23:32:21.212771448Z stdout F error`), + }, + stream: "stdout", + expectedMessage: reader.Message{ + Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC), + Bytes: 163, + }, + partial: true, + criflags: true, + }, + { + name: "Split lines and remove \\n", + input: [][]byte{ + []byte("2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"), + []byte("2017-11-12T23:32:21.212771448Z stdout F error"), + }, + stream: "stdout", + expectedMessage: reader.Message{ + Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC), + Bytes: 164, + }, + partial: true, + criflags: true, + }, + { + name: "Split lines with partial disabled", input: [][]byte{ []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), []byte(`{"log":"shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), @@ -144,15 +192,21 @@ func TestDockerJSON(t *testing.T) { } for _, test := range tests { - r := &mockReader{messages: test.input} - json := New(r, test.stream, test.partial) - message, err := json.Next() + t.Run(test.name, func(t *testing.T) { + r := &mockReader{messages: test.input} + json := New(r, test.stream, test.partial, test.criflags) + message, err := json.Next() - assert.Equal(t, test.expectedError, err != nil) + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } - if err == nil { - assert.EqualValues(t, test.expectedMessage, message) - } + if err == nil { + assert.EqualValues(t, test.expectedMessage, message) + } + }) } }