-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add experimental Docker json-file prospector (#5402)
- Loading branch information
Showing
11 changed files
with
206 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package reader | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"time" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
// DockerJSON processor renames a given field | ||
type DockerJSON struct { | ||
reader Reader | ||
} | ||
|
||
type dockerLog struct { | ||
Timestamp string `json:"time"` | ||
Log string `json:"log"` | ||
Stream string `json:"stream"` | ||
} | ||
|
||
// NewDockerJSON creates a new reader renaming a field | ||
func NewDockerJSON(r Reader) *DockerJSON { | ||
return &DockerJSON{reader: r} | ||
} | ||
|
||
// Next returns the next line. | ||
func (p *DockerJSON) Next() (Message, error) { | ||
message, err := p.reader.Next() | ||
if err != nil { | ||
return message, err | ||
} | ||
|
||
var line dockerLog | ||
dec := json.NewDecoder(bytes.NewReader(message.Content)) | ||
if err = dec.Decode(&line); err != nil { | ||
return message, errors.Wrap(err, "decoding docker JSON") | ||
} | ||
|
||
// Parse timestamp | ||
ts, err := time.Parse(time.RFC3339, line.Timestamp) | ||
if err != nil { | ||
return message, errors.Wrap(err, "parsing docker timestamp") | ||
} | ||
|
||
message.AddFields(common.MapStr{ | ||
"stream": line.Stream, | ||
}) | ||
message.Content = []byte(line.Log) | ||
message.Ts = ts | ||
return message, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package reader | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestDockerJSON(t *testing.T) { | ||
tests := []struct { | ||
input []byte | ||
expectedError bool | ||
expectedMessage Message | ||
}{ | ||
// Common log message | ||
{ | ||
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), | ||
expectedMessage: Message{ | ||
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), | ||
Fields: common.MapStr{"stream": "stdout"}, | ||
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), | ||
}, | ||
}, | ||
// Wrong JSON | ||
{ | ||
input: []byte(`this is not JSON`), | ||
expectedError: true, | ||
}, | ||
// Missing time | ||
{ | ||
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`), | ||
expectedError: true, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
r := mockReader{message: test.input} | ||
json := NewDockerJSON(r) | ||
message, err := json.Next() | ||
|
||
assert.Equal(t, test.expectedError, err != nil) | ||
|
||
if !test.expectedError { | ||
assert.EqualValues(t, test.expectedMessage, message) | ||
} | ||
} | ||
} | ||
|
||
type mockReader struct { | ||
message []byte | ||
} | ||
|
||
func (m mockReader) Next() (Message, error) { | ||
return Message{ | ||
Content: m.message, | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package docker | ||
|
||
var defaultConfig = config{ | ||
Containers: containers{ | ||
IDs: []string{}, | ||
Path: "/var/lib/docker/containers", | ||
}, | ||
} | ||
|
||
type config struct { | ||
Containers containers `config:"containers"` | ||
} | ||
|
||
type containers struct { | ||
IDs []string `config:"ids"` | ||
Path string `config:"path"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package docker | ||
|
||
import ( | ||
"path" | ||
|
||
"github.com/elastic/beats/filebeat/channel" | ||
"github.com/elastic/beats/filebeat/prospector" | ||
"github.com/elastic/beats/filebeat/prospector/log" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/common/cfgwarn" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
func init() { | ||
err := prospector.Register("docker", NewProspector) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// NewProspector creates a new docker prospector | ||
func NewProspector(cfg *common.Config, outletFactory channel.Factory, context prospector.Context) (prospector.Prospectorer, error) { | ||
cfgwarn.Experimental("Docker prospector is enabled.") | ||
|
||
config := defaultConfig | ||
if err := cfg.Unpack(&config); err != nil { | ||
return nil, errors.Wrap(err, "reading docker prospector config") | ||
} | ||
|
||
// Wrap log prospector with custom docker settings | ||
if len(config.Containers.IDs) > 0 { | ||
for idx, containerID := range config.Containers.IDs { | ||
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log")) | ||
} | ||
} | ||
|
||
if err := cfg.SetBool("docker-json", -1, true); err != nil { | ||
return nil, errors.Wrap(err, "update prospector config") | ||
} | ||
return log.NewProspector(cfg, outletFactory, context) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters