-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Elastic Agent] Add docker composable dynamic provider. #20842
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package docker | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common/docker" | ||
) | ||
|
||
// Config for docker provider | ||
type Config struct { | ||
Host string `config:"host"` | ||
TLS *docker.TLSConfig `config:"ssl"` | ||
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"` | ||
} | ||
|
||
// InitDefaults initializes the default values for the config. | ||
func (c *Config) InitDefaults() { | ||
c.Host = "unix:///var/run/docker.sock" | ||
c.CleanupTimeout = 60 * time.Second | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package docker | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/common/bus" | ||
"github.com/elastic/beats/v7/libbeat/common/docker" | ||
"github.com/elastic/beats/v7/libbeat/common/safemapstr" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
func init() { | ||
composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder) | ||
} | ||
|
||
type dockerContainerData struct { | ||
container *docker.Container | ||
mapping map[string]interface{} | ||
processors []map[string]interface{} | ||
} | ||
type dynamicProvider struct { | ||
logger *logger.Logger | ||
config *Config | ||
} | ||
|
||
// Run runs the environment context provider. | ||
func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { | ||
watcher, err := docker.NewWatcher(c.logger, c.config.Host, c.config.TLS, false) | ||
if err != nil { | ||
// info only; return nil (do nothing) | ||
c.logger.Infof("Docker provider skipped, unable to connect: %s", err) | ||
return nil | ||
} | ||
startListener := watcher.ListenStart() | ||
stopListener := watcher.ListenStop() | ||
stoppers := map[string]*time.Timer{} | ||
stopTrigger := make(chan *dockerContainerData) | ||
|
||
if err := watcher.Start(); err != nil { | ||
// info only; return nil (do nothing) | ||
c.logger.Infof("Docker provider skipped, unable to connect: %s", err) | ||
return nil | ||
} | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <-comm.Done(): | ||
startListener.Stop() | ||
stopListener.Stop() | ||
|
||
// Stop all timers before closing the channel | ||
for _, stopper := range stoppers { | ||
stopper.Stop() | ||
} | ||
close(stopTrigger) | ||
return | ||
case event := <-startListener.Events(): | ||
ChrsMark marked this conversation as resolved.
Show resolved
Hide resolved
|
||
data, err := generateData(event) | ||
if err != nil { | ||
c.logger.Errorf("%s", err) | ||
continue | ||
} | ||
if stopper, ok := stoppers[data.container.ID]; ok { | ||
c.logger.Debugf("container %s is restarting, aborting pending stop", data.container.ID) | ||
stopper.Stop() | ||
delete(stoppers, data.container.ID) | ||
return | ||
} | ||
comm.AddOrUpdate(data.container.ID, data.mapping, data.processors) | ||
case event := <-stopListener.Events(): | ||
data, err := generateData(event) | ||
if err != nil { | ||
c.logger.Errorf("%s", err) | ||
continue | ||
} | ||
stopper := time.AfterFunc(c.config.CleanupTimeout, func() { | ||
stopTrigger <- data | ||
}) | ||
stoppers[data.container.ID] = stopper | ||
case data := <-stopTrigger: | ||
if _, ok := stoppers[data.container.ID]; ok { | ||
delete(stoppers, data.container.ID) | ||
} | ||
comm.Remove(data.container.ID) | ||
} | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
// DynamicProviderBuilder builds the dynamic provider. | ||
func DynamicProviderBuilder(c *config.Config) (composable.DynamicProvider, error) { | ||
logger, err := logger.New("composable.providers.docker") | ||
if err != nil { | ||
return nil, err | ||
} | ||
var cfg Config | ||
if c == nil { | ||
c = config.New() | ||
} | ||
err = c.Unpack(&cfg) | ||
if err != nil { | ||
return nil, errors.New(err, "failed to unpack configuration") | ||
} | ||
return &dynamicProvider{logger, &cfg}, nil | ||
} | ||
|
||
func generateData(event bus.Event) (*dockerContainerData, error) { | ||
container, ok := event["container"].(*docker.Container) | ||
if !ok { | ||
return nil, fmt.Errorf("unable to get container from watcher event") | ||
} | ||
|
||
labelMap := common.MapStr{} | ||
processorLabelMap := common.MapStr{} | ||
for k, v := range container.Labels { | ||
safemapstr.Put(labelMap, k, v) | ||
processorLabelMap.Put(common.DeDot(k), v) | ||
} | ||
|
||
data := &dockerContainerData{ | ||
container: container, | ||
mapping: map[string]interface{}{ | ||
"container": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": labelMap, | ||
}, | ||
}, | ||
processors: []map[string]interface{}{ | ||
{ | ||
"add_fields": map[string]interface{}{ | ||
"fields": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": processorLabelMap, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should revisit later on what the default info is that we ship, if we potentially should reduce it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Works for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @blakerouse Could you follow up with an issue? I'm pretty sure we will forget otherwise :-D There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed #21135 |
||
}, | ||
"to": "container", | ||
}, | ||
}, | ||
}, | ||
} | ||
return data, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package docker | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/common/bus" | ||
"github.com/elastic/beats/v7/libbeat/common/docker" | ||
) | ||
|
||
func TestGenerateData(t *testing.T) { | ||
container := &docker.Container{ | ||
ID: "abc", | ||
Name: "foobar", | ||
Labels: map[string]string{ | ||
"do.not.include": "true", | ||
"co.elastic.logs/disable": "true", | ||
}, | ||
} | ||
event := bus.Event{ | ||
"container": container, | ||
} | ||
|
||
data, err := generateData(event) | ||
require.NoError(t, err) | ||
mapping := map[string]interface{}{ | ||
"container": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": common.MapStr{ | ||
"do": common.MapStr{"not": common.MapStr{"include": "true"}}, | ||
"co": common.MapStr{"elastic": common.MapStr{"logs/disable": "true"}}, | ||
}, | ||
}, | ||
} | ||
processors := []map[string]interface{}{ | ||
{ | ||
"add_fields": map[string]interface{}{ | ||
"fields": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": common.MapStr{ | ||
"do_not_include": "true", | ||
"co_elastic_logs/disable": "true", | ||
}, | ||
}, | ||
"to": "container", | ||
}, | ||
}, | ||
} | ||
|
||
assert.Equal(t, container, data.container) | ||
assert.Equal(t, mapping, data.mapping) | ||
assert.Equal(t, processors, data.processors) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I would call it
Start
instead ofRun
as it is non blocking, but we change that later.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not called
Start
because there is noStop
. It is stopped only by the cancelling of the passed context. It is up to the provider on how it wants to implement. Example is thelocal_dynamic
provider it just reads the config and returns, as it does need a goroutine to always be running.