Skip to content

Commit

Permalink
Add add_docker_metadata processor (#4352)
Browse files Browse the repository at this point in the history
* Add `add_docker_metadata` processor

* Add docker/client dependency

* Remove duplicated docker vendored libraries
  • Loading branch information
exekias authored and tsg committed May 31, 2017
1 parent 8719a6a commit 30ff394
Show file tree
Hide file tree
Showing 635 changed files with 51,151 additions and 29,694 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Add a variable to the SysV init scripts to make it easier to change the user. {pull}4340[4340]
- Add the option to write the generated Elasticsearch mapping template into a file. {pull}4323[4323]
- Add instance_name in GCE add_cloud_metadata processor. {pull}4414[4414]
- Add `add_docker_metadata` processor. {pull}4352[4352]

*Filebeat*
- Add experimental Redis slow log prospector type. {pull}4180[4180]
Expand Down
1,013 changes: 901 additions & 112 deletions NOTICE

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,19 @@ filebeat.prospectors:
#- add_locale:
# format: offset
#
# The following example enriches each event with docker metadata, it matches
# given fields to an existing container id and adds info from that container:
#
#processors:
#- add_docker_metadata:
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
# # certificate: "/etc/pki/client/cert.pem"
# # key: "/etc/pki/client/cert.key"
#

#================================ Outputs ======================================

Expand Down
13 changes: 13 additions & 0 deletions heartbeat/heartbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,19 @@ heartbeat.scheduler:
#- add_locale:
# format: offset
#
# The following example enriches each event with docker metadata, it matches
# given fields to an existing container id and adds info from that container:
#
#processors:
#- add_docker_metadata:
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
# # certificate: "/etc/pki/client/cert.pem"
# # key: "/etc/pki/client/cert.key"
#

#================================ Outputs ======================================

Expand Down
13 changes: 13 additions & 0 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@
#- add_locale:
# format: offset
#
# The following example enriches each event with docker metadata, it matches
# given fields to an existing container id and adds info from that container:
#
#processors:
#- add_docker_metadata:
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
# # certificate: "/etc/pki/client/cert.pem"
# # key: "/etc/pki/client/cert.key"
#

#================================ Outputs ======================================

Expand Down
1 change: 1 addition & 0 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
// Register default processors.
_ "github.com/elastic/beats/libbeat/processors/actions"
_ "github.com/elastic/beats/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_docker_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/kubernetes"

Expand Down
96 changes: 96 additions & 0 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package add_docker_metadata

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

func init() {
processors.RegisterPlugin("add_docker_metadata", newDockerMetadataProcessor)
}

type addDockerMetadata struct {
watcher Watcher
fields []string
}

func newDockerMetadataProcessor(cfg common.Config) (processors.Processor, error) {
return buildDockerMetadataProcessor(cfg, NewWatcher)
}

func buildDockerMetadataProcessor(cfg common.Config, watcherConstructor WatcherConstructor) (processors.Processor, error) {
logp.Beta("The add_docker_metadata processor is beta")

config := defaultConfig()

err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_docker_metadata configuration: %s", err)
}

watcher, err := watcherConstructor(config.Host, config.TLS)
if err != nil {
return nil, err
}

if err = watcher.Start(); err != nil {
return nil, err
}

return &addDockerMetadata{
watcher: watcher,
fields: config.Fields,
}, nil
}

func (d *addDockerMetadata) Run(event common.MapStr) (common.MapStr, error) {
var cid string
for _, field := range d.fields {
value, err := event.GetValue(field)
if err != nil {
continue
}

if strValue, ok := value.(string); ok {
cid = strValue
}
}

if cid == "" {
return event, nil
}

container := d.watcher.Container(cid)
if container != nil {
meta := common.MapStr{}
metaIface, ok := event["docker"]
if ok {
meta = metaIface.(common.MapStr)
}

if len(container.Labels) > 0 {
labels := common.MapStr{}
for k, v := range container.Labels {
labels.Put(k, v)
}
meta.Put("container.labels", labels)
}

meta.Put("container.id", container.ID)
meta.Put("container.image", container.Image)
meta.Put("container.name", container.Name)
event["docker"] = meta
} else {
logp.Debug("docker", "Container not found: %s", cid)
}

return event, nil
}

func (d *addDockerMetadata) String() string {
return "add_docker_metadata=[fields=" + strings.Join(d.fields, ", ") + "]"
}
126 changes: 126 additions & 0 deletions libbeat/processors/add_docker_metadata/add_docker_metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package add_docker_metadata

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

func TestInitialization(t *testing.T) {
var testConfig = common.NewConfig()

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.Equal(t, common.MapStr{}, result)
}

func TestNoMatch(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"foo"},
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"field": "value",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.Equal(t, common.MapStr{"field": "value"}, result)
}

func TestMatchNoContainer(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"foo"},
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"foo": "garbage",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.Equal(t, common.MapStr{"foo": "garbage"}, result)
}

func TestMatchContainer(t *testing.T) {
testConfig, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"foo"},
})
assert.NoError(t, err)

p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(
map[string]*Container{
"container_id": &Container{
ID: "container_id",
Image: "image",
Name: "name",
Labels: map[string]string{
"a": "1",
"b": "2",
},
},
}))
assert.NoError(t, err, "initializing add_docker_metadata processor")

input := common.MapStr{
"foo": "container_id",
}
result, err := p.Run(input)
assert.NoError(t, err, "processing an event")

assert.EqualValues(t, common.MapStr{
"docker": common.MapStr{
"container": common.MapStr{
"id": "container_id",
"image": "image",
"labels": common.MapStr{
"a": "1",
"b": "2",
},
"name": "name",
},
},
"foo": "container_id",
}, result)
}

// Mock container watcher

func MockWatcherFactory(containers map[string]*Container) WatcherConstructor {
if containers == nil {
containers = make(map[string]*Container)
}
return func(host string, tls *TLSConfig) (Watcher, error) {
return &mockWatcher{containers: containers}, nil
}
}

type mockWatcher struct {
containers map[string]*Container
}

func (m *mockWatcher) Start() error {
return nil
}

func (m *mockWatcher) Container(ID string) *Container {
return m.containers[ID]
}

func (m *mockWatcher) Containers() map[string]*Container {
return m.containers
}
21 changes: 21 additions & 0 deletions libbeat/processors/add_docker_metadata/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package add_docker_metadata

// Config for docker processor
type Config struct {
Host string `config:"host"`
TLS *TLSConfig `config:"ssl"`
Fields []string `config:"match_fields"`
}

// TLSConfig for docker socket connection
type TLSConfig struct {
CA string `config:"certificate_authority"`
Certificate string `config:"certificate"`
Key string `config:"key"`
}

func defaultConfig() Config {
return Config{
Host: "unix:///var/run/docker.sock",
}
}
Loading

0 comments on commit 30ff394

Please sign in to comment.