Skip to content

Commit

Permalink
Add a parameter to docker prospector to filter on stream
Browse files Browse the repository at this point in the history
Sometimes you are only interested on stdout or stderr, this parameters
allows to filter the input and only read one of them.
  • Loading branch information
exekias committed Jan 12, 2018
1 parent a3970ee commit e21a021
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 43 deletions.
4 changes: 3 additions & 1 deletion filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,6 @@ When using `docker` prospector type you must define `containers.ids`, these are

*`ids`*:: Required, the list of Docker container IDs to read logs from, `'*'` can be used as ID to read from all containers.

*`path`*:: Base path where Docker logs are located. The default is `/var/lib/docker/containers`.
*`path`*:: Base path where Docker logs are located. The default is `/var/lib/docker/containers`.

*`stream`*:: Only read the given stream, this can be: `all`, `stdout` or `stderr`. The default is `all`.
55 changes: 33 additions & 22 deletions filebeat/harvester/reader/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// DockerJSON processor renames a given field
type DockerJSON struct {
reader Reader
// stream filter, `all`, `stderr` or `stdout`
stream string
}

type dockerLog struct {
Expand All @@ -22,33 +24,42 @@ type dockerLog struct {
}

// NewDockerJSON creates a new reader renaming a field
func NewDockerJSON(r Reader) *DockerJSON {
return &DockerJSON{reader: r}
func NewDockerJSON(r Reader, stream string) *DockerJSON {
return &DockerJSON{
stream: stream,
reader: r,
}
}

// Next returns the next line.
func (p *DockerJSON) Next() (Message, error) {
message, err := p.reader.Next()
if err != nil {
return message, err
}
for {
message, err := p.reader.Next()
if err != nil {
return message, err
}

var line dockerLog
dec := json.NewDecoder(bytes.NewReader(message.Content))
if err = dec.Decode(&line); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
}
var line dockerLog
dec := json.NewDecoder(bytes.NewReader(message.Content))
if err = dec.Decode(&line); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
}

// Parse timestamp
ts, err := time.Parse(time.RFC3339, line.Timestamp)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
}
if p.stream != "all" && p.stream != line.Stream {
continue
}

// Parse timestamp
ts, err := time.Parse(time.RFC3339, line.Timestamp)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
}

message.AddFields(common.MapStr{
"stream": line.Stream,
})
message.Content = []byte(line.Log)
message.Ts = ts
return message, nil
message.AddFields(common.MapStr{
"stream": line.Stream,
})
message.Content = []byte(line.Log)
message.Ts = ts
return message, nil
}
}
40 changes: 30 additions & 10 deletions filebeat/harvester/reader/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

func TestDockerJSON(t *testing.T) {
tests := []struct {
input []byte
input [][]byte
stream string
expectedError bool
expectedMessage Message
}{
// Common log message
{
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
stream: "all",
expectedMessage: Message{
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"),
Fields: common.MapStr{"stream": "stdout"},
Expand All @@ -26,35 +28,53 @@ func TestDockerJSON(t *testing.T) {
},
// Wrong JSON
{
input: []byte(`this is not JSON`),
input: [][]byte{[]byte(`this is not JSON`)},
stream: "all",
expectedError: true,
},
// Missing time
{
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`),
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
expectedError: true,
},
// Filtering stream
{
input: [][]byte{
[]byte(`{"log":"filtered\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"unfiltered\n","stream":"stderr","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"unfiltered\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
},
stream: "stderr",
expectedMessage: Message{
Content: []byte("unfiltered\n"),
Fields: common.MapStr{"stream": "stderr"},
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
},
},
}

for _, test := range tests {
r := mockReader{message: test.input}
json := NewDockerJSON(r)
r := &mockReader{messages: test.input}
json := NewDockerJSON(r, test.stream)
message, err := json.Next()

assert.Equal(t, test.expectedError, err != nil)

if !test.expectedError {
if err == nil {
assert.EqualValues(t, test.expectedMessage, message)
}
}
}

type mockReader struct {
message []byte
messages [][]byte
}

func (m mockReader) Next() (Message, error) {
func (m *mockReader) Next() (Message, error) {
message := m.messages[0]
m.messages = m.messages[1:]
return Message{
Content: m.message,
Content: message,
}, nil
}
8 changes: 6 additions & 2 deletions filebeat/prospector/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package docker

var defaultConfig = config{
Containers: containers{
IDs: []string{},
Path: "/var/lib/docker/containers",
IDs: []string{},
Path: "/var/lib/docker/containers",
Stream: "all",
},
}

Expand All @@ -14,4 +15,7 @@ type config struct {
type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

// Stream can be all,stdout or stderr
Stream string `config:"stream"`
}
12 changes: 7 additions & 5 deletions filebeat/prospector/docker/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func NewProspector(cfg *common.Config, outletFactory channel.Factory, context pr
}

// Wrap log prospector with custom docker settings
if len(config.Containers.IDs) > 0 {
for idx, containerID := range config.Containers.IDs {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}
if len(config.Containers.IDs) == 0 {
return nil, errors.New("Docker prospector requires at least one entry under 'containers.ids'")
}

if err := cfg.SetBool("docker-json", -1, true); err != nil {
for idx, containerID := range config.Containers.IDs {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}

if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil {
return nil, errors.Wrap(err, "update prospector config")
}
return log.NewProspector(cfg, outletFactory, context)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type config struct {
JSON *reader.JSONConfig `config:"json"`

// Hidden on purpose, used by the docker prospector:
DockerJSON bool `config:"docker-json"`
DockerJSON string `config:"docker-json"`
}

type LogConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/prospector/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,9 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

if h.config.DockerJSON {
if h.config.DockerJSON != "" {
// Docker json-file format, add custom parsing to the pipeline
r = reader.NewDockerJSON(r)
r = reader.NewDockerJSON(r, h.config.DockerJSON)
}

if h.config.JSON != nil {
Expand Down

0 comments on commit e21a021

Please sign in to comment.