diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 54912a62ae5..a1693983e5d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Improve fsync on linux, by assuming the kernel resets error flags of failed writes. {pull}7859[7859] - Remove unix-like permission checks on Windows, so files can be opened. {issue}7849[7849] - Replace index patterns in TSVB visualizations. {pull}7929[7929] +- Deregister pipeline loader callback when inputsRunner is stopped. {pull}[7893][7893] *Auditbeat* diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index a759ac3bab3..5685c43cb6e 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -18,6 +18,8 @@ package fileset import ( + uuid "github.com/satori/go.uuid" + "github.com/elastic/beats/filebeat/channel" input "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" @@ -46,6 +48,7 @@ type Factory struct { beatVersion string pipelineLoaderFactory PipelineLoaderFactory overwritePipelines bool + pipelineCallbackID uuid.UUID beatDone chan struct{} } @@ -55,6 +58,7 @@ type inputsRunner struct { moduleRegistry *ModuleRegistry inputs []*input.Runner pipelineLoaderFactory PipelineLoaderFactory + pipelineCallbackID uuid.UUID overwritePipelines bool } @@ -67,6 +71,7 @@ func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVers beatVersion: beatVersion, beatDone: beatDone, pipelineLoaderFactory: pipelineLoaderFactory, + pipelineCallbackID: uuid.Nil, overwritePipelines: overwritePipelines, } } @@ -107,6 +112,7 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP moduleRegistry: m, inputs: inputs, pipelineLoaderFactory: f.pipelineLoaderFactory, + pipelineCallbackID: f.pipelineCallbackID, overwritePipelines: f.overwritePipelines, }, nil } @@ -114,7 +120,12 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP func (p *inputsRunner) Start() { // Load pipelines if p.pipelineLoaderFactory != nil { - // Load pipelines instantly and then setup a callback for reconnections: + // Attempt to load pipelines instantly when starting or after reload. + // Thus, if ES was not available previously, it could be loaded this time. + // If the function below fails, it means that ES is not available + // at the moment, so the pipeline loader cannot be created. + // Registering a callback regardless of the availability of ES + // makes it possible to try to load pipeline when ES becomes reachable. pipelineLoader, err := p.pipelineLoaderFactory() if err != nil { logp.Err("Error loading pipeline: %s", err) @@ -126,11 +137,11 @@ func (p *inputsRunner) Start() { } } - // Callback: + // Register callback to try to load pipelines when connecting to ES. callback := func(esClient *elasticsearch.Client) error { return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines) } - elasticsearch.RegisterConnectCallback(callback) + p.pipelineCallbackID = elasticsearch.RegisterConnectCallback(callback) } for _, input := range p.inputs { @@ -143,6 +154,10 @@ func (p *inputsRunner) Start() { } } func (p *inputsRunner) Stop() { + if p.pipelineCallbackID != uuid.Nil { + elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID) + } + for _, input := range p.inputs { input.Stop() } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index dd50990a9ad..b058b6f4094 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -22,6 +22,8 @@ import ( "fmt" "sync" + uuid "github.com/satori/go.uuid" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/transport/tlscommon" @@ -49,20 +51,48 @@ var ( ErrResponseRead = errors.New("bulk item status parse failed") ) +// Callbacks must not depend on the result of a previous one, +// because the ordering is not fixed. type callbacksRegistry struct { - callbacks []connectCallback + callbacks map[uuid.UUID]connectCallback mutex sync.Mutex } // XXX: it would be fantastic to do this without a package global -var connectCallbackRegistry callbacksRegistry +var connectCallbackRegistry = newCallbacksRegistry() + +func newCallbacksRegistry() callbacksRegistry { + return callbacksRegistry{ + callbacks: make(map[uuid.UUID]connectCallback), + } +} // RegisterConnectCallback registers a callback for the elasticsearch output // The callback is called each time the client connects to elasticsearch. -func RegisterConnectCallback(callback connectCallback) { +// It returns the key of the newly added callback, so it can be deregistered later. +func RegisterConnectCallback(callback connectCallback) uuid.UUID { + connectCallbackRegistry.mutex.Lock() + defer connectCallbackRegistry.mutex.Unlock() + + // find the next unique key + var key uuid.UUID + exists := true + for exists { + key = uuid.NewV4() + _, exists = connectCallbackRegistry.callbacks[key] + } + + connectCallbackRegistry.callbacks[key] = callback + return key +} + +// DeregisterConnectCallback deregisters a callback for the elasticsearch output +// specified by its key. If a callback does not exist, nothing happens. +func DeregisterConnectCallback(key uuid.UUID) { connectCallbackRegistry.mutex.Lock() defer connectCallbackRegistry.mutex.Unlock() - connectCallbackRegistry.callbacks = append(connectCallbackRegistry.callbacks, callback) + + delete(connectCallbackRegistry.callbacks, key) } func makeES( diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go new file mode 100644 index 00000000000..0ad2b761269 --- /dev/null +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -0,0 +1,39 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "fmt" + "testing" +) + +func TestConnectCallbacksManagement(t *testing.T) { + f0 := func(client *Client) error { fmt.Println("i am function #0"); return nil } + f1 := func(client *Client) error { fmt.Println("i am function #1"); return nil } + f2 := func(client *Client) error { fmt.Println("i am function #2"); return nil } + + _ = RegisterConnectCallback(f0) + id1 := RegisterConnectCallback(f1) + id2 := RegisterConnectCallback(f2) + + t.Logf("removing second callback") + DeregisterConnectCallback(id1) + if _, ok := connectCallbackRegistry.callbacks[id2]; !ok { + t.Fatalf("third callback cannot be retrieved") + } +}