Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for log tags in newest CRI spec #8265

Merged
merged 3 commits into from
Sep 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fix date format in Mongodb Ingest pipeline. {pull}7974[7974]
- Mark the TCP and UDP input as GA. {pull}8125[8125]
- Fixed a docker input error due to the offset update bug in partial log join.{pull}8177[8177]
- Update CRI format to support partial/full tags. {pull}8265[8265]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/docs/inputs/input-docker.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ Enable partial messages joining. Docker `json-file` driver splits log lines larg
end of line (`\n`) is present for common lines in the resulting file, while it's not the for the lines
that have been split. `combine_partial` joins them back together when enabled. It is enabled by default.

===== `cri.parse_flags`

Enable CRI flags parsing from the log file. CRI uses flags to signal a partial line, enabling this will
ensure partial lines are rejoined. It is disabled by default.


The following input configures {beatname_uc} to read the `stdout` stream from
all containers under the default Docker containers path:

Expand Down
5 changes: 4 additions & 1 deletion filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ type config struct {

// Partial configures the prospector to join partial lines
Partial bool `config:"combine_partials"`

// Enable CRI flags parsing (to be switched to default in 7.0)
CRIFlags bool `config:"cri.parse_flags"`
}

type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

// Stream can be all,stdout or stderr
// Stream can be all, stdout or stderr
Stream string `config:"stream"`
}
4 changes: 4 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.cri_flags", -1, config.Partial); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
Expand Down
7 changes: 3 additions & 4 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,9 @@ type config struct {

// Hidden on purpose, used by the docker input:
DockerJSON *struct {
Stream string `config:"stream"`

// TODO move this to true by default
Partial bool `config:"partial"`
Stream string `config:"stream"`
Partial bool `config:"partial"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial)
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.CRIFlags)
}

if h.config.JSON != nil {
Expand Down
135 changes: 86 additions & 49 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,76 +37,118 @@ type DockerJSONReader struct {

// join partial lines
partial bool
}

type dockerLog struct {
Timestamp string `json:"time"`
Log string `json:"log"`
Stream string `json:"stream"`
// parse CRI flags
criflags bool
}

type crioLog struct {
Timestamp time.Time
Stream string
Log []byte
type logLine struct {
Partial bool `json:"-"`
Timestamp time.Time `json:"-"`
Time string `json:"time"`
Stream string `json:"stream"`
Log string `json:"log"`
}

// New creates a new reader renaming a field
func New(r reader.Reader, stream string, partial bool) *DockerJSONReader {
func New(r reader.Reader, stream string, partial bool, CRIFlags bool) *DockerJSONReader {
return &DockerJSONReader{
stream: stream,
partial: partial,
reader: r,
stream: stream,
partial: partial,
reader: r,
criflags: CRIFlags,
}
}

// parseCRILog parses logs in CRI log format.
// CRI log format example :
// 2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
func parseCRILog(message reader.Message, msg *crioLog) (reader.Message, error) {
log := strings.SplitN(string(message.Content), " ", 3)
if len(log) < 3 {
return message, errors.New("invalid CRI log")
func (p *DockerJSONReader) parseCRILog(message *reader.Message, msg *logLine) error {
split := 3
// read line tags if split is enabled:
if p.criflags {
split = 4
}
ts, err := time.Parse(time.RFC3339, log[0])

// current field
i := 0

// timestamp
log := strings.SplitN(string(message.Content), " ", split)
if len(log) < split {
return errors.New("invalid CRI log format")
}
ts, err := time.Parse(time.RFC3339, log[i])
if err != nil {
return message, errors.Wrap(err, "parsing CRI timestamp")
return errors.Wrap(err, "parsing CRI timestamp")
}
message.Ts = ts
i++

// stream
msg.Stream = log[i]
i++

// tags
partial := false
if p.criflags {
// currently only P(artial) or F(ull) are available
tags := strings.Split(log[i], ":")
for _, tag := range tags {
if tag == "P" {
partial = true
}
}
i++
}

msg.Timestamp = ts
msg.Stream = log[1]
msg.Log = []byte(log[2])
msg.Partial = partial
message.AddFields(common.MapStr{
"stream": msg.Stream,
})
message.Content = msg.Log
message.Ts = ts
// Remove ending \n for partial messages
message.Content = []byte(log[i])
if partial {
message.Content = bytes.TrimRightFunc(message.Content, func(r rune) bool {
return r == '\n' || r == '\r'
})
}

return message, nil
return nil
}

// parseReaderLog parses logs in Docker JSON log format.
// Docker JSON log format example:
// {"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}
func parseDockerJSONLog(message reader.Message, msg *dockerLog) (reader.Message, error) {
func (p *DockerJSONReader) parseDockerJSONLog(message *reader.Message, msg *logLine) error {
dec := json.NewDecoder(bytes.NewReader(message.Content))

if err := dec.Decode(&msg); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
return errors.Wrap(err, "decoding docker JSON")
}

// Parse timestamp
ts, err := time.Parse(time.RFC3339, msg.Timestamp)
ts, err := time.Parse(time.RFC3339, msg.Time)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
return errors.Wrap(err, "parsing docker timestamp")
}

message.AddFields(common.MapStr{
"stream": msg.Stream,
})
message.Content = []byte(msg.Log)
message.Ts = ts
msg.Partial = message.Content[len(message.Content)-1] != byte('\n')

return nil
}

func (p *DockerJSONReader) parseLine(message *reader.Message, msg *logLine) error {
if strings.HasPrefix(string(message.Content), "{") {
return p.parseDockerJSONLog(message, msg)
}

return message, nil
return p.parseCRILog(message, msg)
}

// Next returns the next line.
Expand All @@ -117,32 +159,27 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
return message, err
}

var dockerLine dockerLog
var crioLine crioLog
var logLine logLine
err = p.parseLine(&message, &logLine)
if err != nil {
return message, err
}

if strings.HasPrefix(string(message.Content), "{") {
message, err = parseDockerJSONLog(message, &dockerLine)
// Handle multiline messages, join partial lines
for p.partial && logLine.Partial {
next, err := p.reader.Next()
if err != nil {
return message, err
}
// Handle multiline messages, join lines that don't end with \n
for p.partial && message.Content[len(message.Content)-1] != byte('\n') {
next, err := p.reader.Next()
if err != nil {
return message, err
}
next, err = parseDockerJSONLog(next, &dockerLine)
if err != nil {
return message, err
}
message.Content = append(message.Content, next.Content...)
message.Bytes += next.Bytes
err = p.parseLine(&next, &logLine)
if err != nil {
return message, err
}
} else {
message, err = parseCRILog(message, &crioLine)
message.Content = append(message.Content, next.Content...)
message.Bytes += next.Bytes
}

if p.stream != "all" && p.stream != dockerLine.Stream && p.stream != crioLine.Stream {
if p.stream != "all" && p.stream != logLine.Stream {
continue
}

Expand Down
Loading