diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index d50f72369cd..d9475d35be3 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -17,6 +17,7 @@ - Add restart CLI cmd {pull}20359[20359] - Add new `synthetics/*` inputs to run Heartbeat {pull}20387[20387] - Users of the Docker image can now pass `FLEET_ENROLL_INSECURE=1` to include the `--insecure` flag with the `elastic-agent enroll` command {issue}20312[20312] {pull}20713[20713] +- Add `docker` composable dynamic provider. {pull}20842[20842] - Add support for dynamic inputs with providers and `{{variable|"default"}}` substitution. {pull}20839[20839] - Add support for EQL based condition on inputs {pull}20994[20994] - Send `fleet.host.id` to Endpoint Security {pull}21042[21042] diff --git a/x-pack/elastic-agent/pkg/agent/cmd/include.go b/x-pack/elastic-agent/pkg/agent/cmd/include.go index a28d47490d5..87506b88415 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/include.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/include.go @@ -7,6 +7,7 @@ package cmd import ( // include the composable providers _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/agent" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/docker" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local" diff --git a/x-pack/elastic-agent/pkg/composable/providers/docker/config.go b/x-pack/elastic-agent/pkg/composable/providers/docker/config.go new file mode 100644 index 00000000000..adb5e01f037 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/docker/config.go @@ -0,0 +1,24 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package docker + +import ( + "time" + + "github.com/elastic/beats/v7/libbeat/common/docker" +) + +// Config for docker provider +type Config struct { + Host string `config:"host"` + TLS *docker.TLSConfig `config:"ssl"` + CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"` +} + +// InitDefaults initializes the default values for the config. +func (c *Config) InitDefaults() { + c.Host = "unix:///var/run/docker.sock" + c.CleanupTimeout = 60 * time.Second +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go b/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go new file mode 100644 index 00000000000..333728ee578 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go @@ -0,0 +1,157 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package docker + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/bus" + "github.com/elastic/beats/v7/libbeat/common/docker" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +func init() { + composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder) +} + +type dockerContainerData struct { + container *docker.Container + mapping map[string]interface{} + processors []map[string]interface{} +} +type dynamicProvider struct { + logger *logger.Logger + config *Config +} + +// Run runs the environment context provider. +func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { + watcher, err := docker.NewWatcher(c.logger, c.config.Host, c.config.TLS, false) + if err != nil { + // info only; return nil (do nothing) + c.logger.Infof("Docker provider skipped, unable to connect: %s", err) + return nil + } + startListener := watcher.ListenStart() + stopListener := watcher.ListenStop() + stoppers := map[string]*time.Timer{} + stopTrigger := make(chan *dockerContainerData) + + if err := watcher.Start(); err != nil { + // info only; return nil (do nothing) + c.logger.Infof("Docker provider skipped, unable to connect: %s", err) + return nil + } + + go func() { + for { + select { + case <-comm.Done(): + startListener.Stop() + stopListener.Stop() + + // Stop all timers before closing the channel + for _, stopper := range stoppers { + stopper.Stop() + } + close(stopTrigger) + return + case event := <-startListener.Events(): + data, err := generateData(event) + if err != nil { + c.logger.Errorf("%s", err) + continue + } + if stopper, ok := stoppers[data.container.ID]; ok { + c.logger.Debugf("container %s is restarting, aborting pending stop", data.container.ID) + stopper.Stop() + delete(stoppers, data.container.ID) + return + } + comm.AddOrUpdate(data.container.ID, data.mapping, data.processors) + case event := <-stopListener.Events(): + data, err := generateData(event) + if err != nil { + c.logger.Errorf("%s", err) + continue + } + stopper := time.AfterFunc(c.config.CleanupTimeout, func() { + stopTrigger <- data + }) + stoppers[data.container.ID] = stopper + case data := <-stopTrigger: + if _, ok := stoppers[data.container.ID]; ok { + delete(stoppers, data.container.ID) + } + comm.Remove(data.container.ID) + } + } + }() + + return nil +} + +// DynamicProviderBuilder builds the dynamic provider. +func DynamicProviderBuilder(c *config.Config) (composable.DynamicProvider, error) { + logger, err := logger.New("composable.providers.docker") + if err != nil { + return nil, err + } + var cfg Config + if c == nil { + c = config.New() + } + err = c.Unpack(&cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + return &dynamicProvider{logger, &cfg}, nil +} + +func generateData(event bus.Event) (*dockerContainerData, error) { + container, ok := event["container"].(*docker.Container) + if !ok { + return nil, fmt.Errorf("unable to get container from watcher event") + } + + labelMap := common.MapStr{} + processorLabelMap := common.MapStr{} + for k, v := range container.Labels { + safemapstr.Put(labelMap, k, v) + processorLabelMap.Put(common.DeDot(k), v) + } + + data := &dockerContainerData{ + container: container, + mapping: map[string]interface{}{ + "container": map[string]interface{}{ + "id": container.ID, + "name": container.Name, + "image": container.Image, + "labels": labelMap, + }, + }, + processors: []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "id": container.ID, + "name": container.Name, + "image": container.Image, + "labels": processorLabelMap, + }, + "to": "container", + }, + }, + }, + } + return data, nil +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/docker/docker_test.go b/x-pack/elastic-agent/pkg/composable/providers/docker/docker_test.go new file mode 100644 index 00000000000..55cf2f3a238 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/docker/docker_test.go @@ -0,0 +1,64 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package docker + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/bus" + "github.com/elastic/beats/v7/libbeat/common/docker" +) + +func TestGenerateData(t *testing.T) { + container := &docker.Container{ + ID: "abc", + Name: "foobar", + Labels: map[string]string{ + "do.not.include": "true", + "co.elastic.logs/disable": "true", + }, + } + event := bus.Event{ + "container": container, + } + + data, err := generateData(event) + require.NoError(t, err) + mapping := map[string]interface{}{ + "container": map[string]interface{}{ + "id": container.ID, + "name": container.Name, + "image": container.Image, + "labels": common.MapStr{ + "do": common.MapStr{"not": common.MapStr{"include": "true"}}, + "co": common.MapStr{"elastic": common.MapStr{"logs/disable": "true"}}, + }, + }, + } + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "id": container.ID, + "name": container.Name, + "image": container.Image, + "labels": common.MapStr{ + "do_not_include": "true", + "co_elastic_logs/disable": "true", + }, + }, + "to": "container", + }, + }, + } + + assert.Equal(t, container, data.container) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) +} diff --git a/x-pack/elastic-agent/pkg/config/config.go b/x-pack/elastic-agent/pkg/config/config.go index 35bdac74aae..d2d366e526c 100644 --- a/x-pack/elastic-agent/pkg/config/config.go +++ b/x-pack/elastic-agent/pkg/config/config.go @@ -36,6 +36,11 @@ func LoadYAML(path string, opts ...ucfg.Option) (*Config, error) { return newConfigFrom(config), nil } +// New creates a new empty config. +func New() *Config { + return newConfigFrom(ucfg.New()) +} + // NewConfigFrom takes a interface and read the configuration like it was YAML. func NewConfigFrom(from interface{}, opts ...ucfg.Option) (*Config, error) { if len(opts) == 0 {