diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 052322fc57ab..a1ee0cdb47ef 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -200,6 +200,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] - The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21258[21258] - Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349] +- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851] *Auditbeat* diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index 2421c232eeeb..4145423209ab 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -20,7 +20,8 @@ package docker import ( - "fmt" + "context" + "io" "net/http" "sync" "time" @@ -29,7 +30,6 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/go-connections/tlsconfig" - "golang.org/x/net/context" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/logp" @@ -39,7 +39,6 @@ import ( const ( shortIDLen = 12 dockerRequestTimeout = 10 * time.Second - dockerWatchRequestTimeout = 60 * time.Minute dockerEventsWatchPityTimerInterval = 10 * time.Second dockerEventsWatchPityTimerTimeout = 10 * time.Minute ) @@ -74,20 +73,30 @@ type TLSConfig struct { type watcher struct { sync.RWMutex - log *logp.Logger - client Client - ctx context.Context - stop context.CancelFunc - containers map[string]*Container - deleted map[string]time.Time // deleted annotations key -> last access time - cleanupTimeout time.Duration - lastValidTimestamp int64 - lastWatchReceivedEventTime time.Time - stopped sync.WaitGroup - bus bus.Bus - shortID bool // whether to store short ID in "containers" too + log *logp.Logger + client Client + ctx context.Context + stop context.CancelFunc + containers map[string]*Container + deleted map[string]time.Time // deleted annotations key -> last access time + cleanupTimeout time.Duration + clock clock + stopped sync.WaitGroup + bus bus.Bus + shortID bool // whether to store short ID in "containers" too } +// clock is an interface used to provide mocked time on testing +type clock interface { + Now() time.Time +} + +// systemClock implements the clock interface using the system clock via the time package +type systemClock struct{} + +// Now returns the current time +func (*systemClock) Now() time.Time { return time.Now() } + // Container info retrieved by the watcher type Container struct { ID string @@ -147,8 +156,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool // NewWatcherWithClient creates a new Watcher from a given Docker client func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { - log = log.Named("docker") - ctx, cancel := context.WithCancel(context.Background()) return &watcher{ log: log, @@ -160,6 +167,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D cleanupTimeout: cleanupTimeout, bus: bus.New(log, "docker"), shortID: storeShortID, + clock: &systemClock{}, }, nil } @@ -177,7 +185,7 @@ func (w *watcher) Container(ID string) *Container { // Update last access time if it's deleted if ok { w.Lock() - w.deleted[container.ID] = time.Now() + w.deleted[container.ID] = w.clock.Now() w.Unlock() } @@ -201,7 +209,6 @@ func (w *watcher) Containers() map[string]*Container { func (w *watcher) Start() error { // Do initial scan of existing containers w.log.Debug("Start docker containers scanner") - w.lastValidTimestamp = time.Now().Unix() w.Lock() defer w.Unlock() @@ -236,108 +243,124 @@ func (w *watcher) Start() error { func (w *watcher) Stop() { w.stop() + w.stopped.Wait() } func (w *watcher) watch() { - log := w.log + defer w.stopped.Done() filter := filters.NewArgs() filter.Add("type", "container") - for { + // Ticker to restart the watcher when no events are received after some time. + tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) + defer tickChan.Stop() + + lastValidTimestamp := w.clock.Now() + + watch := func() bool { + lastReceivedEventTime := w.clock.Now() + + w.log.Debugf("Fetching events since %s", lastValidTimestamp) + options := types.EventsOptions{ - Since: fmt.Sprintf("%d", w.lastValidTimestamp), + Since: lastValidTimestamp.Format(time.RFC3339Nano), Filters: filter, } - log.Debugf("Fetching events since %s", options.Since) - ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout) + ctx, cancel := context.WithCancel(w.ctx) defer cancel() events, errors := w.client.Events(ctx, options) - - //ticker for timeout to restart watcher when no events are received - w.lastWatchReceivedEventTime = time.Now() - tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) - defer tickChan.Stop() - - WATCH: for { select { case event := <-events: - log.Debugf("Got a new docker event: %v", event) - w.lastValidTimestamp = event.Time - w.lastWatchReceivedEventTime = time.Now() - - // Add / update - if event.Action == "start" || event.Action == "update" { - filter := filters.NewArgs() - filter.Add("id", event.Actor.ID) - - containers, err := w.listContainers(types.ContainerListOptions{ - Filters: filter, - }) - if err != nil || len(containers) != 1 { - log.Errorf("Error getting container info: %v", err) - continue - } - container := containers[0] - - w.Lock() - w.containers[event.Actor.ID] = container - if w.shortID { - w.containers[event.Actor.ID[:shortIDLen]] = container - } - // un-delete if it's flagged (in case of update or recreation) - delete(w.deleted, event.Actor.ID) - w.Unlock() - - w.bus.Publish(bus.Event{ - "start": true, - "container": container, - }) - } - - // Delete - if event.Action == "die" { - container := w.Container(event.Actor.ID) - if container != nil { - w.bus.Publish(bus.Event{ - "stop": true, - "container": container, - }) - } - - w.Lock() - w.deleted[event.Actor.ID] = time.Now() - w.Unlock() + w.log.Debugf("Got a new docker event: %v", event) + lastValidTimestamp = time.Unix(event.Time, event.TimeNano) + lastReceivedEventTime = w.clock.Now() + + switch event.Action { + case "start", "update": + w.containerUpdate(event) + case "die": + w.containerDelete(event) } - case err := <-errors: - // Restart watch call - if err == context.DeadlineExceeded { - log.Info("Context deadline exceeded for docker request, restarting watch call") - } else { - log.Errorf("Error watching for docker events: %+v", err) + switch err { + case io.EOF: + // Client disconnected, watch is not done, reconnect + w.log.Debug("EOF received in events stream, restarting watch call") + case context.DeadlineExceeded: + w.log.Debug("Context deadline exceeded for docker request, restarting watch call") + case context.Canceled: + // Parent context has been canceled, watch is done. + return true + default: + w.log.Errorf("Error watching for docker events: %+v", err) } - - time.Sleep(1 * time.Second) - break WATCH - + return false case <-tickChan.C: - if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout { - log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) - time.Sleep(1 * time.Second) - break WATCH + if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout { + w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) + return false } - case <-w.ctx.Done(): - log.Debug("Watcher stopped") - w.stopped.Done() - return + w.log.Debug("Watcher stopped") + return true } } + } + for { + done := watch() + if done { + return + } + // Wait before trying to reconnect + time.Sleep(1 * time.Second) + } +} + +func (w *watcher) containerUpdate(event events.Message) { + filter := filters.NewArgs() + filter.Add("id", event.Actor.ID) + + containers, err := w.listContainers(types.ContainerListOptions{ + Filters: filter, + }) + if err != nil || len(containers) != 1 { + w.log.Errorf("Error getting container info: %v", err) + return + } + container := containers[0] + + w.Lock() + w.containers[event.Actor.ID] = container + if w.shortID { + w.containers[event.Actor.ID[:shortIDLen]] = container + } + // un-delete if it's flagged (in case of update or recreation) + delete(w.deleted, event.Actor.ID) + w.Unlock() + + w.bus.Publish(bus.Event{ + "start": true, + "container": container, + }) +} + +func (w *watcher) containerDelete(event events.Message) { + container := w.Container(event.Actor.ID) + + w.Lock() + w.deleted[event.Actor.ID] = w.clock.Now() + w.Unlock() + + if container != nil { + w.bus.Publish(bus.Event{ + "stop": true, + "container": container, + }) } } @@ -393,49 +416,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain // Clean up deleted containers after they are not used anymore func (w *watcher) cleanupWorker() { - log := w.log + defer w.stopped.Done() for { select { case <-w.ctx.Done(): - w.stopped.Done() return // Wait a full period case <-time.After(w.cleanupTimeout): - // Check entries for timeout - var toDelete []string - timeout := time.Now().Add(-w.cleanupTimeout) - w.RLock() - for key, lastSeen := range w.deleted { - if lastSeen.Before(timeout) { - log.Debugf("Removing container %s after cool down timeout", key) - toDelete = append(toDelete, key) - } - } - w.RUnlock() - - // Delete timed out entries: - for _, key := range toDelete { - container := w.Container(key) - if container != nil { - w.bus.Publish(bus.Event{ - "delete": true, - "container": container, - }) - } - } + w.runCleanup() + } + } +} - w.Lock() - for _, key := range toDelete { - delete(w.deleted, key) - delete(w.containers, key) - if w.shortID { - delete(w.containers, key[:shortIDLen]) - } - } - w.Unlock() +func (w *watcher) runCleanup() { + // Check entries for timeout + var toDelete []string + timeout := w.clock.Now().Add(-w.cleanupTimeout) + w.RLock() + for key, lastSeen := range w.deleted { + if lastSeen.Before(timeout) { + w.log.Debugf("Removing container %s after cool down timeout", key) + toDelete = append(toDelete, key) + } + } + w.RUnlock() + + // Delete timed out entries: + for _, key := range toDelete { + container := w.Container(key) + if container != nil { + w.bus.Publish(bus.Event{ + "delete": true, + "container": container, + }) + } + } + + w.Lock() + for _, key := range toDelete { + delete(w.deleted, key) + delete(w.containers, key) + if w.shortID { + delete(w.containers, key[:shortIDLen]) } } + w.Unlock() } // ListenStart returns a bus listener to receive container started events, with a `container` key holding it diff --git a/libbeat/common/docker/watcher_test.go b/libbeat/common/docker/watcher_test.go index ec53fbdeb737..a0de0567af40 100644 --- a/libbeat/common/docker/watcher_test.go +++ b/libbeat/common/docker/watcher_test.go @@ -21,6 +21,7 @@ package docker import ( "errors" + "sync" "testing" "time" @@ -37,7 +38,7 @@ type MockClient struct { containers [][]types.Container // event list to send on Events call events []interface{} - + // done channel is closed when the client has sent all events done chan interface{} } @@ -71,7 +72,7 @@ func (m *MockClient) ContainerInspect(ctx context.Context, container string) (ty } func TestWatcherInitialization(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -90,7 +91,8 @@ func TestWatcherInitialization(t *testing.T) { }, }, }, - nil) + nil, + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -109,7 +111,7 @@ func TestWatcherInitialization(t *testing.T) { } func TestWatcherInitializationShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -128,7 +130,9 @@ func TestWatcherInitializationShortID(t *testing.T) { }, }, }, - nil, true) + nil, + true, + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -154,7 +158,7 @@ func TestWatcherInitializationShortID(t *testing.T) { } func TestWatcherAddEvents(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -188,7 +192,7 @@ func TestWatcherAddEvents(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -207,7 +211,7 @@ func TestWatcherAddEvents(t *testing.T) { } func TestWatcherAddEventsShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -242,7 +246,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -261,7 +265,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { } func TestWatcherUpdateEvent(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -295,7 +299,7 @@ func TestWatcherUpdateEvent(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -309,7 +313,7 @@ func TestWatcherUpdateEvent(t *testing.T) { } func TestWatcherUpdateEventShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -344,7 +348,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -358,9 +362,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { } func TestWatcherDie(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcher(t, false, + watcher, clientDone := testWatcher(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -381,32 +383,37 @@ func TestWatcherDie(t *testing.T) { }, }, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } func TestWatcherDieShortID(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcherShortID(t, false, + watcher, clientDone := testWatcherShortID(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -428,33 +435,40 @@ func TestWatcherDieShortID(t *testing.T) { }, true, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } -func runWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) *watcher { - return runWatcherShortID(t, kill, containers, events, false) +func testWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) (*watcher, chan interface{}) { + return testWatcherShortID(t, kill, containers, events, false) } -func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) *watcher { +func testWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) (*watcher, chan interface{}) { logp.TestingSetup() client := &MockClient{ @@ -472,16 +486,37 @@ func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, t.Fatal("'watcher' was supposed to be pointer to the watcher structure") } - err = watcher.Start() - if err != nil { - t.Fatal(err) - } + return watcher, client.done +} - <-client.done - if kill { - watcher.Stop() - watcher.stopped.Wait() - } +func runAndWait(w *watcher, done chan interface{}) *watcher { + w.Start() + <-done + w.Stop() + return w +} + +type testClock struct { + sync.Mutex + + now time.Time +} + +func newTestClock() *testClock { + return &testClock{now: time.Time{}} +} + +func (c *testClock) Now() time.Time { + c.Lock() + defer c.Unlock() + + c.now = c.now.Add(1) + return c.now +} + +func (c *testClock) Sleep(d time.Duration) { + c.Lock() + defer c.Unlock() - return watcher + c.now = c.now.Add(d) }