Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a parameter to docker prospector to filter on stream #6057

Merged
merged 3 commits into from
Jan 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,6 @@ When using `docker` prospector type you must define `containers.ids`, these are

*`ids`*:: Required, the list of Docker container IDs to read logs from, `'*'` can be used as ID to read from all containers.

*`path`*:: Base path where Docker logs are located. The default is `/var/lib/docker/containers`.
*`path`*:: Base path where Docker logs are located. The default is `/var/lib/docker/containers`.

*`stream`*:: Only read the given stream, this can be: `all`, `stdout` or `stderr`. The default is `all`.
55 changes: 33 additions & 22 deletions filebeat/harvester/reader/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// DockerJSON processor renames a given field
type DockerJSON struct {
reader Reader
// stream filter, `all`, `stderr` or `stdout`
stream string
}

type dockerLog struct {
Expand All @@ -22,33 +24,42 @@ type dockerLog struct {
}

// NewDockerJSON creates a new reader renaming a field
func NewDockerJSON(r Reader) *DockerJSON {
return &DockerJSON{reader: r}
func NewDockerJSON(r Reader, stream string) *DockerJSON {
return &DockerJSON{
stream: stream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate that it's a valid stream name.

reader: r,
}
}

// Next returns the next line.
func (p *DockerJSON) Next() (Message, error) {
message, err := p.reader.Next()
if err != nil {
return message, err
}
for {
message, err := p.reader.Next()
if err != nil {
return message, err
}

var line dockerLog
dec := json.NewDecoder(bytes.NewReader(message.Content))
if err = dec.Decode(&line); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
}
var line dockerLog
dec := json.NewDecoder(bytes.NewReader(message.Content))
if err = dec.Decode(&line); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
}

// Parse timestamp
ts, err := time.Parse(time.RFC3339, line.Timestamp)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
}
if p.stream != "all" && p.stream != line.Stream {
continue
}

// Parse timestamp
ts, err := time.Parse(time.RFC3339, line.Timestamp)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
}

message.AddFields(common.MapStr{
"stream": line.Stream,
})
message.Content = []byte(line.Log)
message.Ts = ts
return message, nil
message.AddFields(common.MapStr{
"stream": line.Stream,
})
message.Content = []byte(line.Log)
message.Ts = ts
return message, nil
}
}
40 changes: 30 additions & 10 deletions filebeat/harvester/reader/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

func TestDockerJSON(t *testing.T) {
tests := []struct {
input []byte
input [][]byte
stream string
expectedError bool
expectedMessage Message
}{
// Common log message
{
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
stream: "all",
expectedMessage: Message{
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"),
Fields: common.MapStr{"stream": "stdout"},
Expand All @@ -26,35 +28,53 @@ func TestDockerJSON(t *testing.T) {
},
// Wrong JSON
{
input: []byte(`this is not JSON`),
input: [][]byte{[]byte(`this is not JSON`)},
stream: "all",
expectedError: true,
},
// Missing time
{
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`),
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
expectedError: true,
},
// Filtering stream
{
input: [][]byte{
[]byte(`{"log":"filtered\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"unfiltered\n","stream":"stderr","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"unfiltered\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
},
stream: "stderr",
expectedMessage: Message{
Content: []byte("unfiltered\n"),
Fields: common.MapStr{"stream": "stderr"},
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
},
},
}

for _, test := range tests {
r := mockReader{message: test.input}
json := NewDockerJSON(r)
r := &mockReader{messages: test.input}
json := NewDockerJSON(r, test.stream)
message, err := json.Next()

assert.Equal(t, test.expectedError, err != nil)

if !test.expectedError {
if err == nil {
assert.EqualValues(t, test.expectedMessage, message)
}
}
}

type mockReader struct {
message []byte
messages [][]byte
}

func (m mockReader) Next() (Message, error) {
func (m *mockReader) Next() (Message, error) {
message := m.messages[0]
m.messages = m.messages[1:]
return Message{
Content: m.message,
Content: message,
}, nil
}
8 changes: 6 additions & 2 deletions filebeat/prospector/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package docker

var defaultConfig = config{
Containers: containers{
IDs: []string{},
Path: "/var/lib/docker/containers",
IDs: []string{},
Path: "/var/lib/docker/containers",
Stream: "all",
},
}

Expand All @@ -14,4 +15,7 @@ type config struct {
type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

// Stream can be all,stdout or stderr
Stream string `config:"stream"`
}
12 changes: 7 additions & 5 deletions filebeat/prospector/docker/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func NewProspector(cfg *common.Config, outletFactory channel.Factory, context pr
}

// Wrap log prospector with custom docker settings
if len(config.Containers.IDs) > 0 {
for idx, containerID := range config.Containers.IDs {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}
if len(config.Containers.IDs) == 0 {
return nil, errors.New("Docker prospector requires at least one entry under 'containers.ids'")
}

if err := cfg.SetBool("docker-json", -1, true); err != nil {
for idx, containerID := range config.Containers.IDs {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}

if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil {
return nil, errors.Wrap(err, "update prospector config")
}
return log.NewProspector(cfg, outletFactory, context)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type config struct {
JSON *reader.JSONConfig `config:"json"`

// Hidden on purpose, used by the docker prospector:
DockerJSON bool `config:"docker-json"`
DockerJSON string `config:"docker-json"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using the internal parameter to enable docker parsing and pass the desired stream through it, possible values:

  • "" - default, docker parsing is disabled
  • all - default for docker prospector, read all streams
  • stdout
  • stderr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, didn't realise you passed through the stream param.

}

type LogConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/prospector/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,9 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

if h.config.DockerJSON {
if h.config.DockerJSON != "" {
// Docker json-file format, add custom parsing to the pipeline
r = reader.NewDockerJSON(r)
r = reader.NewDockerJSON(r, h.config.DockerJSON)
}

if h.config.JSON != nil {
Expand Down