diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 504c59862c6..dfcd8132ab0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -576,6 +576,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368] - Make `filestream` input GA. {pull}26127[26127] - http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764] +- Add new `parser` to `filestream` input: `container`. {pull}26115[26115] *Heartbeat* diff --git a/filebeat/docs/inputs/input-filestream-reader-options.asciidoc b/filebeat/docs/inputs/input-filestream-reader-options.asciidoc index 1a55c85277c..b2c0fa2fb70 100644 --- a/filebeat/docs/inputs/input-filestream-reader-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-reader-options.asciidoc @@ -151,6 +151,7 @@ Available parsers: * `multiline` * `ndjson` +* `container` In this example, {beatname_uc} is reading multiline messages that consist of 3 lines and are encapsulated in single-line JSON objects. @@ -232,3 +233,28 @@ JSON document and stored in `@metadata._id` *`ignore_decoding_error`*:: An optional configuration setting that specifies if JSON decoding errors should be logged or not. If set to true, errors will not be logged. The default is false. + +[float] +===== `container` + +Use the `container` parser to extract information from containers log files. +It parses lines into common message lines, extracting timestamps too. + +*`stream`*:: Reads from the specified streams only: `all`, `stdout` or `stderr`. The default +is `all`. + +*`format`*:: Use the given format when parsing logs: `auto`, `docker` or `cri`. The +default is `auto`, it will automatically detect the format. To disable +autodetection set any of the other options. + +The following snippet configures {beatname_uc} to read the `stdout` stream from +all containers under the default Kubernetes logs path: + +[source,yaml] +---- + paths: + - "/var/log/containers/*.log" + parsers: + - container: + stream: stdout +---- diff --git a/filebeat/input/filestream/parser.go b/filebeat/input/filestream/parser.go index c64e2c19ee7..c64b8981ae4 100644 --- a/filebeat/input/filestream/parser.go +++ b/filebeat/input/filestream/parser.go @@ -70,6 +70,14 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err) } p = readjson.NewJSONParser(p, &config) + case "container": + config := readjson.DefaultContainerConfig() + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while parsing container parser config: %+v", err) + } + p = readjson.NewContainerParser(p, &config) default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } @@ -96,6 +104,13 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { if err != nil { return fmt.Errorf("error while parsing ndjson parser config: %+v", err) } + case "container": + config := readjson.DefaultContainerConfig() + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return fmt.Errorf("error while parsing container parser config: %+v", err) + } default: return fmt.Errorf("%s: %s", ErrNoSuchParser, name) } diff --git a/filebeat/input/filestream/parser_test.go b/filebeat/input/filestream/parser_test.go index f363425eb12..696729d1e31 100644 --- a/filebeat/input/filestream/parser_test.go +++ b/filebeat/input/filestream/parser_test.go @@ -258,6 +258,126 @@ func TestJSONParsersWithFields(t *testing.T) { } +func TestContainerParser(t *testing.T) { + tests := map[string]struct { + lines string + parsers map[string]interface{} + expectedMessages []reader.Message + }{ + "simple docker lines": { + lines: `{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"} +{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} +{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"} +`, + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{}, + }, + }, + }, + expectedMessages: []reader.Message{ + reader.Message{ + Content: []byte("Fetching main repository github.com/elastic/beats...\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + reader.Message{ + Content: []byte("Fetching dependencies...\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + reader.Message{ + Content: []byte("Execute /scripts/packetbeat_before_build.sh\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + reader.Message{ + Content: []byte("patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + }, + }, + "CRI docker lines": { + lines: `2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache +`, + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "cri", + }, + }, + }, + }, + expectedMessages: []reader.Message{ + reader.Message{ + Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + }, + }, + "corrupt docker lines are skipped": { + lines: `{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"} +{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} +`, + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{}, + }, + }, + }, + expectedMessages: []reader.Message{ + reader.Message{ + Content: []byte("Fetching main repository github.com/elastic/beats...\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + reader.Message{ + Content: []byte("Execute /scripts/packetbeat_before_build.sh\n"), + Fields: common.MapStr{ + "stream": "stdout", + }, + }, + }, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := defaultConfig() + parsersConfig := common.MustNewConfigFrom(test.parsers) + err := parsersConfig.Unpack(&cfg) + require.NoError(t, err) + + p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 1024}, cfg.Reader.Parsers) + + i := 0 + msg, err := p.Next() + for err == nil { + require.Equal(t, test.expectedMessages[i].Content, msg.Content) + require.Equal(t, test.expectedMessages[i].Fields, msg.Fields) + i++ + msg, err = p.Next() + } + }) + } +} func testReader(lines string) reader.Reader { encF, _ := encoding.FindEncoding("") reader := strings.NewReader(lines) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index d57c61c6a26..d182ed7a187 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -87,6 +87,33 @@ func New(r reader.Reader, stream string, partial bool, format string, CRIFlags b return &reader } +func NewContainerParser(r reader.Reader, config *ContainerJSONConfig) *DockerJSONReader { + reader := DockerJSONReader{ + stream: config.Stream.String(), + partial: true, + reader: r, + criflags: true, + logger: logp.NewLogger("parser_container"), + } + + switch config.Format { + case Docker, JSONFile: + reader.parseLine = reader.parseDockerJSONLog + case CRI: + reader.parseLine = reader.parseCRILog + default: + reader.parseLine = reader.parseAuto + } + + if runtime.GOOS == "windows" { + reader.stripNewLine = stripNewLineWin + } else { + reader.stripNewLine = stripNewLine + } + + return &reader +} + // parseCRILog parses logs in CRI log format. // CRI log format example : // 2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache diff --git a/libbeat/reader/readjson/docker_json_config.go b/libbeat/reader/readjson/docker_json_config.go new file mode 100644 index 00000000000..53ec01d25f1 --- /dev/null +++ b/libbeat/reader/readjson/docker_json_config.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readjson + +import "fmt" + +type ContainerFormat uint8 + +type Stream uint8 + +const ( + Auto ContainerFormat = iota + 1 + CRI + Docker + JSONFile + + All Stream = iota + 1 + Stdout + Stderr +) + +var ( + containerFormats = map[string]ContainerFormat{ + "auto": Auto, + "cri": CRI, + "docker": Docker, + "json-file": JSONFile, + } + + containerStreams = map[string]Stream{ + "all": All, + "stdout": Stdout, + "stderr": Stderr, + } +) + +type ContainerJSONConfig struct { + Stream Stream `config:"stream"` + Format ContainerFormat `config:"format"` +} + +func DefaultContainerConfig() ContainerJSONConfig { + return ContainerJSONConfig{ + Format: Auto, + Stream: All, + } +} + +func (f *ContainerFormat) Unpack(v string) error { + val, ok := containerFormats[v] + if !ok { + keys := make([]string, len(containerFormats)) + i := 0 + for k := range containerFormats { + keys[i] = k + i++ + } + return fmt.Errorf("unknown container log format: %s, supported values: %+v", v, keys) + } + *f = val + return nil +} + +func (s *Stream) Unpack(v string) error { + val, ok := containerStreams[v] + if !ok { + keys := make([]string, len(containerStreams)) + i := 0 + for k := range containerStreams { + keys[i] = k + i++ + } + return fmt.Errorf("unknown streams: %s, supported values: %+v", v, keys) + } + *s = val + return nil +} + +func (s *Stream) String() string { + for k, v := range containerStreams { + if v == *s { + return k + } + } + return "" +}