Skip to content

Commit

Permalink
Add new parser to filestream input: container (elastic#26115)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds support for a new parser named `container`. This is the reader that powers the `container` input behind the scenes. Now it is exposed as a parser.

Example configuration for reading container logs with the `filesteam` input:

```yaml
type: filestream
paths:
  - /path/to/containers/*/*.log
parsers:
  - container: ~
```

### Limitations

The PR does not provide feature parity with the `container` input because of the lack of support for separating the states of stdout and strerr streams. It is coming in a follow-up PR.

## Why is it important?

It is a step toward supporting reading container logs from every input that supports `parsers` option.

(cherry picked from commit e2449af)
  • Loading branch information
kvch committed Jun 9, 2021
1 parent 910f67f commit a57f4a5
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368]
- Make `filestream` input GA. {pull}26127[26127]
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]
- Add new `parser` to `filestream` input: `container`. {pull}26115[26115]

*Heartbeat*

Expand Down
26 changes: 26 additions & 0 deletions filebeat/docs/inputs/input-filestream-reader-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ Available parsers:

* `multiline`
* `ndjson`
* `container`

In this example, {beatname_uc} is reading multiline messages that consist of 3 lines
and are encapsulated in single-line JSON objects.
Expand Down Expand Up @@ -232,3 +233,28 @@ JSON document and stored in `@metadata._id`
*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
be logged. The default is false.

[float]
===== `container`

Use the `container` parser to extract information from containers log files.
It parses lines into common message lines, extracting timestamps too.

*`stream`*:: Reads from the specified streams only: `all`, `stdout` or `stderr`. The default
is `all`.

*`format`*:: Use the given format when parsing logs: `auto`, `docker` or `cri`. The
default is `auto`, it will automatically detect the format. To disable
autodetection set any of the other options.

The following snippet configures {beatname_uc} to read the `stdout` stream from
all containers under the default Kubernetes logs path:

[source,yaml]
----
paths:
- "/var/log/containers/*.log"
parsers:
- container:
stream: stdout
----
15 changes: 15 additions & 0 deletions filebeat/input/filestream/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace)
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
p = readjson.NewJSONParser(p, &config)
case "container":
config := readjson.DefaultContainerConfig()
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing container parser config: %+v", err)
}
p = readjson.NewContainerParser(p, &config)
default:
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
Expand All @@ -96,6 +104,13 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
if err != nil {
return fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
case "container":
config := readjson.DefaultContainerConfig()
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing container parser config: %+v", err)
}
default:
return fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
Expand Down
120 changes: 120 additions & 0 deletions filebeat/input/filestream/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,126 @@ func TestJSONParsersWithFields(t *testing.T) {

}

func TestContainerParser(t *testing.T) {
tests := map[string]struct {
lines string
parsers map[string]interface{}
expectedMessages []reader.Message
}{
"simple docker lines": {
lines: `{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"}
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{},
},
},
},
expectedMessages: []reader.Message{
reader.Message{
Content: []byte("Fetching main repository github.com/elastic/beats...\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
reader.Message{
Content: []byte("Fetching dependencies...\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
reader.Message{
Content: []byte("Execute /scripts/packetbeat_before_build.sh\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
reader.Message{
Content: []byte("patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
},
},
"CRI docker lines": {
lines: `2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{
"format": "cri",
},
},
},
},
expectedMessages: []reader.Message{
reader.Message{
Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
},
},
"corrupt docker lines are skipped": {
lines: `{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{},
},
},
},
expectedMessages: []reader.Message{
reader.Message{
Content: []byte("Fetching main repository github.com/elastic/beats...\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
reader.Message{
Content: []byte("Execute /scripts/packetbeat_before_build.sh\n"),
Fields: common.MapStr{
"stream": "stdout",
},
},
},
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := defaultConfig()
parsersConfig := common.MustNewConfigFrom(test.parsers)
err := parsersConfig.Unpack(&cfg)
require.NoError(t, err)

p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 1024}, cfg.Reader.Parsers)

i := 0
msg, err := p.Next()
for err == nil {
require.Equal(t, test.expectedMessages[i].Content, msg.Content)
require.Equal(t, test.expectedMessages[i].Fields, msg.Fields)
i++
msg, err = p.Next()
}
})
}
}
func testReader(lines string) reader.Reader {
encF, _ := encoding.FindEncoding("")
reader := strings.NewReader(lines)
Expand Down
27 changes: 27 additions & 0 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,33 @@ func New(r reader.Reader, stream string, partial bool, format string, CRIFlags b
return &reader
}

func NewContainerParser(r reader.Reader, config *ContainerJSONConfig) *DockerJSONReader {
reader := DockerJSONReader{
stream: config.Stream.String(),
partial: true,
reader: r,
criflags: true,
logger: logp.NewLogger("parser_container"),
}

switch config.Format {
case Docker, JSONFile:
reader.parseLine = reader.parseDockerJSONLog
case CRI:
reader.parseLine = reader.parseCRILog
default:
reader.parseLine = reader.parseAuto
}

if runtime.GOOS == "windows" {
reader.stripNewLine = stripNewLineWin
} else {
reader.stripNewLine = stripNewLine
}

return &reader
}

// parseCRILog parses logs in CRI log format.
// CRI log format example :
// 2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
Expand Down
101 changes: 101 additions & 0 deletions libbeat/reader/readjson/docker_json_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readjson

import "fmt"

type ContainerFormat uint8

type Stream uint8

const (
Auto ContainerFormat = iota + 1
CRI
Docker
JSONFile

All Stream = iota + 1
Stdout
Stderr
)

var (
containerFormats = map[string]ContainerFormat{
"auto": Auto,
"cri": CRI,
"docker": Docker,
"json-file": JSONFile,
}

containerStreams = map[string]Stream{
"all": All,
"stdout": Stdout,
"stderr": Stderr,
}
)

type ContainerJSONConfig struct {
Stream Stream `config:"stream"`
Format ContainerFormat `config:"format"`
}

func DefaultContainerConfig() ContainerJSONConfig {
return ContainerJSONConfig{
Format: Auto,
Stream: All,
}
}

func (f *ContainerFormat) Unpack(v string) error {
val, ok := containerFormats[v]
if !ok {
keys := make([]string, len(containerFormats))
i := 0
for k := range containerFormats {
keys[i] = k
i++
}
return fmt.Errorf("unknown container log format: %s, supported values: %+v", v, keys)
}
*f = val
return nil
}

func (s *Stream) Unpack(v string) error {
val, ok := containerStreams[v]
if !ok {
keys := make([]string, len(containerStreams))
i := 0
for k := range containerStreams {
keys[i] = k
i++
}
return fmt.Errorf("unknown streams: %s, supported values: %+v", v, keys)
}
*s = val
return nil
}

func (s *Stream) String() string {
for k, v := range containerStreams {
if v == *s {
return k
}
}
return ""
}

0 comments on commit a57f4a5

Please sign in to comment.