Skip to content

Commit

Permalink
Keep docker & k8s pod annotations while they are needed (elastic#5084)
Browse files Browse the repository at this point in the history
In some cases pod annotations are neede after the pod is deleted, for
instance when filebeat is reading the log behind the container.

This change makes sure we keep metadata after a pod is gone. By storing
access times we ensure that it's available as long as it's being used
  • Loading branch information
exekias authored and ruflin committed Sep 7, 2017
1 parent e1060ff commit a49463c
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add support for `initContainers` in `add_kubernetes_metadata` processor. {issue}4825[4825]
- Fix the `/usr/bin/beatname` script to accept `-d "*"` as a parameter. {issue}5040[5040]
- Combine `fields.yml` properties when they are defined in different sources. {issue}5075[5075]
- Keep Docker & Kubernetes pod metadata after container dies while they are needed by processors. {pull}5084[5084]
- Fix `fields.yml` lookup when using `export template` with a custom `path.config` param. {issue}5089[5089]

*Auditbeat*
Expand Down Expand Up @@ -79,6 +80,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Changed the number of shards in the default configuration to 3. {issue}5095[5095]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]

*Heartbeat*

Expand Down
23 changes: 19 additions & 4 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,19 @@ processors:
lookup_fields: ["metricset.host"]
-------------------------------------------------------------------------------

The `add_kubernetes_metadata` processor has the following configuration settings:

`in_cluster`:: (Optional) Use in cluster settings for Kubernetes client, `true`
by default.
`host`:: (Optional) In case `in_cluster` is false, use this host to connect to
Kubernetes API.
`kube_config`:: (Optional) Use given config file as configuration for Kubernetes
client.
`default_indexers.enabled`:: (Optional) Enable/Disable default pod indexers, in
case you want to specify your own.
`default_matchers.enabled`:: (Optional) Enable/Disable default pod matchers, in
case you want to specify your own.

[[add-docker-metadata]]
=== Add Docker metadata

Expand All @@ -578,10 +591,10 @@ from Docker containers:
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
match_fields: ["system.process.cgroup.id"]
match_source: true
match_source_index: 4
#match_fields: ["system.process.cgroup.id"]
#match_source: true
#match_source_index: 4
#cleanup_timeout: 60
# To connect to Docker over TLS you must specify a client and CA certificate.
#ssl:
# certificate_authority: "/etc/pki/root/ca.pem"
Expand All @@ -600,3 +613,5 @@ It has the following settings:
`match_source_index`:: (Optional) Index in the source path split by / to look
for container id. It defaults to 4 to match
`/var/lib/docker/containers/<container_id>/*.log`
`cleanup_timeout`:: (Optional) Time of inactivity to consider we can clean and
forget metadata for a container, 60s by default.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) {
if event.Fields["source"] != nil {
event, err = d.sourceProcessor.Run(event)
if err != nil {
return nil, err
logp.Debug("docker", "Error while extracting container ID from source path: %v", err)
return event, nil
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func (m *mockWatcher) Start() error {
return nil
}

func (m *mockWatcher) Stop() {}

func (m *mockWatcher) Container(ID string) *Container {
return m.containers[ID]
}
Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/add_docker_metadata/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package add_docker_metadata

import "time"

// Config for docker processor
type Config struct {
Host string `config:"host"`
TLS *TLSConfig `config:"ssl"`
Fields []string `config:"match_fields"`
MatchSource bool `config:"match_source"`
SourceIndex int `config:"match_source_index"`

// Annotations are kept after container is killled, until they haven't been accessed
// for a full `cleanup_timeout`:
CleanupTimeout time.Duration `config:"cleanup_timeout"`
}

// TLSConfig for docker socket connection
Expand Down
111 changes: 100 additions & 11 deletions libbeat/processors/add_docker_metadata/watcher.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package add_docker_metadata

import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/docker/go-connections/tlsconfig"
"golang.org/x/net/context"

"github.com/elastic/beats/libbeat/logp"
)
Expand All @@ -22,6 +24,9 @@ type Watcher interface {
// Start watching docker API for new containers
Start() error

// Stop watching docker API for new containers
Stop()

// Container returns the running container with the given ID or nil if unknown
Container(ID string) *Container

Expand All @@ -30,11 +35,15 @@ type Watcher interface {
}

type watcher struct {
client *client.Client
sync.RWMutex
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
stopped sync.WaitGroup
}

// Container info retrieved by the watcher
Expand All @@ -45,6 +54,12 @@ type Container struct {
Labels map[string]string
}

// Client for docker interface
type Client interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
}

type WatcherConstructor func(host string, tls *TLSConfig) (Watcher, error)

// NewWatcher returns a watcher running for the given settings
Expand All @@ -69,28 +84,51 @@ func NewWatcher(host string, tls *TLSConfig) (Watcher, error) {
}
}

cli, err := client.NewClient(host, dockerAPIVersion, httpClient, nil)
client, err := client.NewClient(host, dockerAPIVersion, httpClient, nil)
if err != nil {
return nil, err
}

return NewWatcherWithClient(client, 60*time.Second)
}

func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
client: cli,
ctx: ctx,
stop: cancel,
containers: make(map[string]*Container),
client: client,
ctx: ctx,
stop: cancel,
containers: make(map[string]*Container),
deleted: make(map[string]time.Time),
cleanupTimeout: cleanupTimeout,
}, nil
}

// Container returns the running container with the given ID or nil if unknown
func (w *watcher) Container(ID string) *Container {
return w.containers[ID]
w.RLock()
container := w.containers[ID]
w.RUnlock()

// Update last access time if it's deleted
if _, ok := w.deleted[ID]; ok {
w.Lock()
w.deleted[ID] = time.Now()
w.Unlock()
}

return container
}

// Containers returns the list of known containers
func (w *watcher) Containers() map[string]*Container {
return w.containers
w.RLock()
defer w.RUnlock()
res := make(map[string]*Container)
for k, v := range w.containers {
res[k] = v
}
return res
}

// Start watching docker API for new containers
Expand All @@ -99,6 +137,8 @@ func (w *watcher) Start() error {
logp.Debug("docker", "Start docker containers scanner")
w.lastValidTimestamp = time.Now().Unix()

w.Lock()
defer w.Unlock()
containers, err := w.client.ContainerList(w.ctx, types.ContainerListOptions{})
if err != nil {
return err
Expand All @@ -113,11 +153,17 @@ func (w *watcher) Start() error {
}
}

w.stopped.Add(2)
go w.watch()
go w.cleanupWorker()

return nil
}

func (w *watcher) Stop() {
w.stop()
}

func (w *watcher) watch() {
filters := filters.NewArgs()
filters.Add("type", "container")
Expand All @@ -138,22 +184,30 @@ func (w *watcher) watch() {
w.lastValidTimestamp = event.Time

// Add / update
if event.Action == "create" || event.Action == "update" {
if event.Action == "start" || event.Action == "update" {
name := event.Actor.Attributes["name"]
image := event.Actor.Attributes["image"]
delete(event.Actor.Attributes, "name")
delete(event.Actor.Attributes, "image")

w.Lock()
w.containers[event.Actor.ID] = &Container{
ID: event.Actor.ID,
Name: name,
Image: image,
Labels: event.Actor.Attributes,
}

// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()
}

// Delete
if event.Action == "die" || event.Action == "kill" {
delete(w.containers, event.Actor.ID)
w.Lock()
w.deleted[event.Actor.ID] = time.Now()
w.Unlock()
}

case err := <-errors:
Expand All @@ -164,8 +218,43 @@ func (w *watcher) watch() {

case <-w.ctx.Done():
logp.Debug("docker", "Watcher stopped")
w.stopped.Done()
return
}
}
}
}

// Clean up deleted containers after they are not used anymore
func (w *watcher) cleanupWorker() {
for {
// Wait a full period
time.Sleep(w.cleanupTimeout)

select {
case <-w.ctx.Done():
w.stopped.Done()
return
default:
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
logp.Debug("docker", "Removing container %s after cool down timeout")
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
}
w.Unlock()
}
}
}
Loading

0 comments on commit a49463c

Please sign in to comment.