Skip to content

Commit

Permalink
Cherry-pick #8424 to 6.x: Reduce casting between []byte and string in…
Browse files Browse the repository at this point in the history
… CRI log processing (#8718)

* Reduce casting between []byte and string in CRI log processing (#8424)

* Reduce casting between []byte and string in CRI log processing

* Refactor CRI and json-file detection

* Improve handling of new lines in linux/windows CRI logs.

* Move new line removal to OS specific functions.
* Add option to force CRI log parsing, otherwise autodetect log type.

* Fixed unit tests and new line removal

* Fixed new line removal
* Added unit tests to check Forced CRI flag

(cherry picked from commit 3f7d6a6)

* Add CHANGELOG and docs for CRI force flag (#8699)

(cherry picked from commit 06ec3b9)

* Fix conflicting paths

* Fix build for all OS after #8424 (#8717)

* Fix build for all OS after #8424

* Apply PR comments
  • Loading branch information
exekias authored Oct 26, 2018
1 parent 416ab2b commit dc829f9
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Release `docker` input as GA. {pull}8328[8328]
- Keep unparsed user agent information in user_agent.original. {pull}7823[7832]
- Added default and TCP parsing formats to HAproxy module {issue}8311[8311] {pull}8637[8637]
- Allow to force CRI format parsing for better performance {pull}8424[8424]
- Add Suricata IDS/IDP/NSM module. {issue}8153[8153] {pull}8693[8693]

*Heartbeat*
Expand Down
5 changes: 5 additions & 0 deletions filebeat/docs/inputs/input-docker.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ all containers under the default Docker containers path:
- "*"
----

===== `cri.force`

Force CRI format parsing. This disables automatic format detection, use it when you know format is CRI
to gain some performance. This is false by default.

include::../inputs/input-common-harvester-options.asciidoc[]

include::../inputs/input-common-file-options.asciidoc[]
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type config struct {

// Enable CRI flags parsing (to be switched to default in 7.0)
CRIFlags bool `config:"cri.parse_flags"`

// Fore CRI format (don't perform autodetection)
CRIForce bool `config:"cri.force"`
}

type containers struct {
Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.force_cri_logs", -1, config.CRIForce); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type config struct {
DockerJSON *struct {
Stream string `config:"stream"`
Partial bool `config:"partial"`
ForceCRI bool `config:"force_cri_logs"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.CRIFlags)
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags)
}

if h.config.JSON != nil {
Expand Down
58 changes: 44 additions & 14 deletions filebeat/reader/docker_json/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package docker_json
import (
"bytes"
"encoding/json"
"strings"
"runtime"
"time"

"github.com/elastic/beats/filebeat/reader"
Expand All @@ -38,8 +38,13 @@ type Reader struct {
// join partial lines
partial bool

// Force log format: json-file | cri
forceCRI bool

// parse CRI flags
criflags bool

stripNewLine func(msg *reader.Message)
}

type logLine struct {
Expand All @@ -51,13 +56,22 @@ type logLine struct {
}

// New creates a new reader renaming a field
func New(r reader.Reader, stream string, partial bool, CRIFlags bool) *Reader {
return &Reader{
func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool) *Reader {
reader := Reader{
stream: stream,
partial: partial,
reader: r,
forceCRI: forceCRI,
criflags: CRIFlags,
}

if runtime.GOOS == "windows" {
reader.stripNewLine = stripNewLineWin
} else {
reader.stripNewLine = stripNewLine
}

return &reader
}

// parseCRILog parses logs in CRI log format.
Expand All @@ -74,28 +88,28 @@ func (p *Reader) parseCRILog(message *reader.Message, msg *logLine) error {
i := 0

// timestamp
log := strings.SplitN(string(message.Content), " ", split)
log := bytes.SplitN(message.Content, []byte{' '}, split)
if len(log) < split {
return errors.New("invalid CRI log format")
}
ts, err := time.Parse(time.RFC3339, log[i])
ts, err := time.Parse(time.RFC3339, string(log[i]))
if err != nil {
return errors.Wrap(err, "parsing CRI timestamp")
}
message.Ts = ts
i++

// stream
msg.Stream = log[i]
msg.Stream = string(log[i])
i++

// tags
partial := false
if p.criflags {
// currently only P(artial) or F(ull) are available
tags := strings.Split(log[i], ":")
tags := bytes.Split(log[i], []byte{':'})
for _, tag := range tags {
if tag == "P" {
if len(tag) == 1 && tag[0] == 'P' {
partial = true
}
}
Expand All @@ -106,12 +120,10 @@ func (p *Reader) parseCRILog(message *reader.Message, msg *logLine) error {
message.AddFields(common.MapStr{
"stream": msg.Stream,
})
// Remove ending \n for partial messages
message.Content = []byte(log[i])
// Remove \n ending for partial messages
message.Content = log[i]
if partial {
message.Content = bytes.TrimRightFunc(message.Content, func(r rune) bool {
return r == '\n' || r == '\r'
})
p.stripNewLine(message)
}

return nil
Expand Down Expand Up @@ -144,7 +156,12 @@ func (p *Reader) parseDockerJSONLog(message *reader.Message, msg *logLine) error
}

func (p *Reader) parseLine(message *reader.Message, msg *logLine) error {
if strings.HasPrefix(string(message.Content), "{") {
if p.forceCRI {
return p.parseCRILog(message, msg)
}

// If froceCRI isn't set, autodetect file type
if len(message.Content) > 0 && message.Content[0] == '{' {
return p.parseDockerJSONLog(message, msg)
}

Expand Down Expand Up @@ -186,3 +203,16 @@ func (p *Reader) Next() (reader.Message, error) {
return message, err
}
}

func stripNewLine(msg *reader.Message) {
l := len(msg.Content)
if l > 0 && msg.Content[l-1] == '\n' {
msg.Content = msg.Content[:l-1]
}
}

func stripNewLineWin(msg *reader.Message) {
msg.Content = bytes.TrimRightFunc(msg.Content, func(r rune) bool {
return r == '\n' || r == '\r'
})
}
76 changes: 72 additions & 4 deletions filebeat/reader/docker_json/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestDockerJSON(t *testing.T) {
input [][]byte
stream string
partial bool
forceCRI bool
criflags bool
expectedError bool
expectedMessage reader.Message
Expand Down Expand Up @@ -143,19 +144,19 @@ func TestDockerJSON(t *testing.T) {
},
},
{
name: "Split lines",
name: "CRI Split lines",
input: [][]byte{
[]byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`),
[]byte(`2017-11-12T23:32:21.212771448Z stdout F error`),
},
stream: "stdout",
stream: "stdout",
partial: true,
expectedMessage: reader.Message{
Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 163,
},
partial: true,
criflags: true,
},
{
Expand Down Expand Up @@ -189,12 +190,79 @@ func TestDockerJSON(t *testing.T) {
Bytes: 109,
},
},
{
name: "Force CRI with JSON logs",
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
forceCRI: true,
expectedError: true,
},
{
name: "Force CRI log no tags",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
expectedMessage: reader.Message{
Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC),
Bytes: 115,
},
forceCRI: true,
criflags: false,
},
{
name: "Force CRI log with flags",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
expectedMessage: reader.Message{
Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC),
Bytes: 117,
},
forceCRI: true,
criflags: true,
},
{
name: "Force CRI split lines",
input: [][]byte{
[]byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`),
[]byte(`2017-11-12T23:32:21.212771448Z stdout F error`),
},
stream: "stdout",
partial: true,
expectedMessage: reader.Message{
Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 163,
},
forceCRI: true,
criflags: true,
},
{
name: "Force CRI split lines and remove \\n",
input: [][]byte{
[]byte("2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"),
[]byte("2017-11-12T23:32:21.212771448Z stdout F error"),
},
stream: "stdout",
expectedMessage: reader.Message{
Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 164,
},
partial: true,
forceCRI: true,
criflags: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := &mockReader{messages: test.input}
json := New(r, test.stream, test.partial, test.criflags)
json := New(r, test.stream, test.partial, test.forceCRI, test.criflags)
message, err := json.Next()

if test.expectedError {
Expand Down

0 comments on commit dc829f9

Please sign in to comment.