Skip to content

Commit

Permalink
[Elastic Agent] Add docker composable dynamic provider. (elastic#20842)
Browse files Browse the repository at this point in the history
* Add docker provider.

* Add changelog.

* Update docker start message to info.

(cherry picked from commit f017e24)
  • Loading branch information
blakerouse committed Sep 16, 2020
1 parent 111ad83 commit cf3da16
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 0 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Add restart CLI cmd {pull}20359[20359]
- Add new `synthetics/*` inputs to run Heartbeat {pull}20387[20387]
- Users of the Docker image can now pass `FLEET_ENROLL_INSECURE=1` to include the `--insecure` flag with the `elastic-agent enroll` command {issue}20312[20312] {pull}20713[20713]
- Add `docker` composable dynamic provider. {pull}20842[20842]
- Add support for dynamic inputs with providers and `{{variable|"default"}}` substitution. {pull}20839[20839]
- Add support for EQL based condition on inputs {pull}20994[20994]
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package cmd
import (
// include the composable providers
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/agent"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/docker"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
Expand Down
24 changes: 24 additions & 0 deletions x-pack/elastic-agent/pkg/composable/providers/docker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package docker

import (
"time"

"github.com/elastic/beats/v7/libbeat/common/docker"
)

// Config for docker provider
type Config struct {
Host string `config:"host"`
TLS *docker.TLSConfig `config:"ssl"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.Host = "unix:///var/run/docker.sock"
c.CleanupTimeout = 60 * time.Second
}
157 changes: 157 additions & 0 deletions x-pack/elastic-agent/pkg/composable/providers/docker/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package docker

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/common/docker"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func init() {
composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder)
}

type dockerContainerData struct {
container *docker.Container
mapping map[string]interface{}
processors []map[string]interface{}
}
type dynamicProvider struct {
logger *logger.Logger
config *Config
}

// Run runs the environment context provider.
func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
watcher, err := docker.NewWatcher(c.logger, c.config.Host, c.config.TLS, false)
if err != nil {
// info only; return nil (do nothing)
c.logger.Infof("Docker provider skipped, unable to connect: %s", err)
return nil
}
startListener := watcher.ListenStart()
stopListener := watcher.ListenStop()
stoppers := map[string]*time.Timer{}
stopTrigger := make(chan *dockerContainerData)

if err := watcher.Start(); err != nil {
// info only; return nil (do nothing)
c.logger.Infof("Docker provider skipped, unable to connect: %s", err)
return nil
}

go func() {
for {
select {
case <-comm.Done():
startListener.Stop()
stopListener.Stop()

// Stop all timers before closing the channel
for _, stopper := range stoppers {
stopper.Stop()
}
close(stopTrigger)
return
case event := <-startListener.Events():
data, err := generateData(event)
if err != nil {
c.logger.Errorf("%s", err)
continue
}
if stopper, ok := stoppers[data.container.ID]; ok {
c.logger.Debugf("container %s is restarting, aborting pending stop", data.container.ID)
stopper.Stop()
delete(stoppers, data.container.ID)
return
}
comm.AddOrUpdate(data.container.ID, data.mapping, data.processors)
case event := <-stopListener.Events():
data, err := generateData(event)
if err != nil {
c.logger.Errorf("%s", err)
continue
}
stopper := time.AfterFunc(c.config.CleanupTimeout, func() {
stopTrigger <- data
})
stoppers[data.container.ID] = stopper
case data := <-stopTrigger:
if _, ok := stoppers[data.container.ID]; ok {
delete(stoppers, data.container.ID)
}
comm.Remove(data.container.ID)
}
}
}()

return nil
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(c *config.Config) (composable.DynamicProvider, error) {
logger, err := logger.New("composable.providers.docker")
if err != nil {
return nil, err
}
var cfg Config
if c == nil {
c = config.New()
}
err = c.Unpack(&cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
return &dynamicProvider{logger, &cfg}, nil
}

func generateData(event bus.Event) (*dockerContainerData, error) {
container, ok := event["container"].(*docker.Container)
if !ok {
return nil, fmt.Errorf("unable to get container from watcher event")
}

labelMap := common.MapStr{}
processorLabelMap := common.MapStr{}
for k, v := range container.Labels {
safemapstr.Put(labelMap, k, v)
processorLabelMap.Put(common.DeDot(k), v)
}

data := &dockerContainerData{
container: container,
mapping: map[string]interface{}{
"container": map[string]interface{}{
"id": container.ID,
"name": container.Name,
"image": container.Image,
"labels": labelMap,
},
},
processors: []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": map[string]interface{}{
"id": container.ID,
"name": container.Name,
"image": container.Image,
"labels": processorLabelMap,
},
"to": "container",
},
},
},
}
return data, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package docker

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/common/docker"
)

func TestGenerateData(t *testing.T) {
container := &docker.Container{
ID: "abc",
Name: "foobar",
Labels: map[string]string{
"do.not.include": "true",
"co.elastic.logs/disable": "true",
},
}
event := bus.Event{
"container": container,
}

data, err := generateData(event)
require.NoError(t, err)
mapping := map[string]interface{}{
"container": map[string]interface{}{
"id": container.ID,
"name": container.Name,
"image": container.Image,
"labels": common.MapStr{
"do": common.MapStr{"not": common.MapStr{"include": "true"}},
"co": common.MapStr{"elastic": common.MapStr{"logs/disable": "true"}},
},
},
}
processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": map[string]interface{}{
"id": container.ID,
"name": container.Name,
"image": container.Image,
"labels": common.MapStr{
"do_not_include": "true",
"co_elastic_logs/disable": "true",
},
},
"to": "container",
},
},
}

assert.Equal(t, container, data.container)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
}
5 changes: 5 additions & 0 deletions x-pack/elastic-agent/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func LoadYAML(path string, opts ...ucfg.Option) (*Config, error) {
return newConfigFrom(config), nil
}

// New creates a new empty config.
func New() *Config {
return newConfigFrom(ucfg.New())
}

// NewConfigFrom takes a interface and read the configuration like it was YAML.
func NewConfigFrom(from interface{}, opts ...ucfg.Option) (*Config, error) {
if len(opts) == 0 {
Expand Down

0 comments on commit cf3da16

Please sign in to comment.