Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #21851 to 7.x: Refactor docker watcher to fix flaky test and other small issues #21918

Merged
merged 1 commit into from
Oct 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21258[21258]
- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349]
- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851]

*Auditbeat*

Expand Down
282 changes: 154 additions & 128 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package docker

import (
"fmt"
"context"
"io"
"net/http"
"sync"
"time"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/go-connections/tlsconfig"
"golang.org/x/net/context"

"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -39,7 +39,6 @@ import (
const (
shortIDLen = 12
dockerRequestTimeout = 10 * time.Second
dockerWatchRequestTimeout = 60 * time.Minute
dockerEventsWatchPityTimerInterval = 10 * time.Second
dockerEventsWatchPityTimerTimeout = 10 * time.Minute
)
Expand Down Expand Up @@ -74,20 +73,30 @@ type TLSConfig struct {

type watcher struct {
sync.RWMutex
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
lastWatchReceivedEventTime time.Time
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
clock clock
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
}

// clock is an interface used to provide mocked time on testing
type clock interface {
Now() time.Time
}

// systemClock implements the clock interface using the system clock via the time package
type systemClock struct{}

// Now returns the current time
func (*systemClock) Now() time.Time { return time.Now() }

// Container info retrieved by the watcher
type Container struct {
ID string
Expand Down Expand Up @@ -147,8 +156,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool

// NewWatcherWithClient creates a new Watcher from a given Docker client
func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) {
log = log.Named("docker")

ctx, cancel := context.WithCancel(context.Background())
return &watcher{
log: log,
Expand All @@ -160,6 +167,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D
cleanupTimeout: cleanupTimeout,
bus: bus.New(log, "docker"),
shortID: storeShortID,
clock: &systemClock{},
}, nil
}

Expand All @@ -177,7 +185,7 @@ func (w *watcher) Container(ID string) *Container {
// Update last access time if it's deleted
if ok {
w.Lock()
w.deleted[container.ID] = time.Now()
w.deleted[container.ID] = w.clock.Now()
w.Unlock()
}

Expand All @@ -201,7 +209,6 @@ func (w *watcher) Containers() map[string]*Container {
func (w *watcher) Start() error {
// Do initial scan of existing containers
w.log.Debug("Start docker containers scanner")
w.lastValidTimestamp = time.Now().Unix()

w.Lock()
defer w.Unlock()
Expand Down Expand Up @@ -236,108 +243,124 @@ func (w *watcher) Start() error {

func (w *watcher) Stop() {
w.stop()
w.stopped.Wait()
}

func (w *watcher) watch() {
log := w.log
defer w.stopped.Done()

filter := filters.NewArgs()
filter.Add("type", "container")

for {
// Ticker to restart the watcher when no events are received after some time.
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

lastValidTimestamp := w.clock.Now()

watch := func() bool {
lastReceivedEventTime := w.clock.Now()

w.log.Debugf("Fetching events since %s", lastValidTimestamp)

options := types.EventsOptions{
Since: fmt.Sprintf("%d", w.lastValidTimestamp),
Since: lastValidTimestamp.Format(time.RFC3339Nano),
Filters: filter,
}

log.Debugf("Fetching events since %s", options.Since)
ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout)
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()

events, errors := w.client.Events(ctx, options)

//ticker for timeout to restart watcher when no events are received
w.lastWatchReceivedEventTime = time.Now()
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

WATCH:
for {
select {
case event := <-events:
log.Debugf("Got a new docker event: %v", event)
w.lastValidTimestamp = event.Time
w.lastWatchReceivedEventTime = time.Now()

// Add / update
if event.Action == "start" || event.Action == "update" {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)

containers, err := w.listContainers(types.ContainerListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
log.Errorf("Error getting container info: %v", err)
continue
}
container := containers[0]

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()

w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}

// Delete
if event.Action == "die" {
container := w.Container(event.Actor.ID)
if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}

w.Lock()
w.deleted[event.Actor.ID] = time.Now()
w.Unlock()
w.log.Debugf("Got a new docker event: %v", event)
lastValidTimestamp = time.Unix(event.Time, event.TimeNano)
lastReceivedEventTime = w.clock.Now()

switch event.Action {
case "start", "update":
w.containerUpdate(event)
case "die":
w.containerDelete(event)
}

case err := <-errors:
// Restart watch call
if err == context.DeadlineExceeded {
log.Info("Context deadline exceeded for docker request, restarting watch call")
} else {
log.Errorf("Error watching for docker events: %+v", err)
switch err {
case io.EOF:
// Client disconnected, watch is not done, reconnect
w.log.Debug("EOF received in events stream, restarting watch call")
case context.DeadlineExceeded:
w.log.Debug("Context deadline exceeded for docker request, restarting watch call")
case context.Canceled:
// Parent context has been canceled, watch is done.
return true
default:
w.log.Errorf("Error watching for docker events: %+v", err)
}

time.Sleep(1 * time.Second)
break WATCH

return false
case <-tickChan.C:
if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
time.Sleep(1 * time.Second)
break WATCH
if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
return false
}

case <-w.ctx.Done():
log.Debug("Watcher stopped")
w.stopped.Done()
return
w.log.Debug("Watcher stopped")
return true
}
}
}

for {
done := watch()
if done {
return
}
// Wait before trying to reconnect
time.Sleep(1 * time.Second)
}
}

func (w *watcher) containerUpdate(event events.Message) {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)

containers, err := w.listContainers(types.ContainerListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
w.log.Errorf("Error getting container info: %v", err)
return
}
container := containers[0]

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()

w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}

func (w *watcher) containerDelete(event events.Message) {
container := w.Container(event.Actor.ID)

w.Lock()
w.deleted[event.Actor.ID] = w.clock.Now()
w.Unlock()

if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}
}

Expand Down Expand Up @@ -393,49 +416,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain

// Clean up deleted containers after they are not used anymore
func (w *watcher) cleanupWorker() {
log := w.log
defer w.stopped.Done()

for {
select {
case <-w.ctx.Done():
w.stopped.Done()
return
// Wait a full period
case <-time.After(w.cleanupTimeout):
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}
w.runCleanup()
}
}
}

w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
func (w *watcher) runCleanup() {
// Check entries for timeout
var toDelete []string
timeout := w.clock.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
w.log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}

w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
}

// ListenStart returns a bus listener to receive container started events, with a `container` key holding it
Expand Down
Loading