Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deregister pipeline loader callback when inputsRunner is stopped #7893

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fix potential data loss on OS X in spool file by using fcntl with F_FULLFSYNC. {pull}7859[7859]
- Improve fsync on linux, by assuming the kernel resets error flags of failed writes. {pull}7859[7859]
- Remove unix-like permission checks on Windows, so files can be opened. {issue}7849[7849]
- Deregister pipeline loader callback when inputsRunner is stopped. {pull}[7893][7893]

*Auditbeat*

Expand Down
21 changes: 18 additions & 3 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package fileset

import (
uuid "github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/channel"
input "github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
Expand Down Expand Up @@ -46,6 +48,7 @@ type Factory struct {
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
pipelineCallbackID uuid.UUID
beatDone chan struct{}
}

Expand All @@ -55,6 +58,7 @@ type inputsRunner struct {
moduleRegistry *ModuleRegistry
inputs []*input.Runner
pipelineLoaderFactory PipelineLoaderFactory
pipelineCallbackID uuid.UUID
overwritePipelines bool
}

Expand All @@ -67,6 +71,7 @@ func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVers
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoaderFactory: pipelineLoaderFactory,
pipelineCallbackID: uuid.Nil,
overwritePipelines: overwritePipelines,
}
}
Expand Down Expand Up @@ -107,14 +112,20 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
moduleRegistry: m,
inputs: inputs,
pipelineLoaderFactory: f.pipelineLoaderFactory,
pipelineCallbackID: f.pipelineCallbackID,
overwritePipelines: f.overwritePipelines,
}, nil
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
// Load pipelines instantly and then setup a callback for reconnections:
// Attempt to load pipelines instantly when starting or after reload.
// Thus, if ES was not available previously, it could be loaded this time.
// If the function below fails, it means that ES is not available
// at the moment, so the pipeline loader cannot be created.
// Registering a callback regardless of the availability of ES
// makes it possible to try to load pipeline when ES becomes reachable.
pipelineLoader, err := p.pipelineLoaderFactory()
if err != nil {
logp.Err("Error loading pipeline: %s", err)
Expand All @@ -126,11 +137,11 @@ func (p *inputsRunner) Start() {
}
}

// Callback:
// Register callback to try to load pipelines when connecting to ES.
callback := func(esClient *elasticsearch.Client) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the code on line 124 and 129: In case of an error, should we register a callback at all? Not introduced in this PR but probably something we should clean up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is correct. The lines you are referring to are trying to load pipelines immediately in Start. It's possible it fails due to e.g ES is not being not available.
But it's the whole point of using OnConnect callbacks. If previously pipeline loading fails, it can be attempted again when connecting to ES again.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kvch I got tricked by this as well. Can you add a comment why the callback must always be registered:

  • if pipeline loading fails, retry on reload
  • always retry on reload in case of someone removing it by accident or new ES cluster in place

return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)
p.pipelineCallbackID = elasticsearch.RegisterConnectCallback(callback)
}

for _, input := range p.inputs {
Expand All @@ -143,6 +154,10 @@ func (p *inputsRunner) Start() {
}
}
func (p *inputsRunner) Stop() {
if p.pipelineCallbackID != uuid.Nil {
elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID)
}

for _, input := range p.inputs {
input.Stop()
}
Expand Down
38 changes: 34 additions & 4 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"sync"

uuid "github.com/satori/go.uuid"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
Expand Down Expand Up @@ -49,20 +51,48 @@ var (
ErrResponseRead = errors.New("bulk item status parse failed")
)

// Callbacks must not depend on the result of a previous one,
// because the ordering is not fixed.
type callbacksRegistry struct {
callbacks []connectCallback
callbacks map[uuid.UUID]connectCallback
mutex sync.Mutex
}

// XXX: it would be fantastic to do this without a package global
var connectCallbackRegistry callbacksRegistry
var connectCallbackRegistry = newCallbacksRegistry()

func newCallbacksRegistry() callbacksRegistry {
return callbacksRegistry{
callbacks: make(map[uuid.UUID]connectCallback),
}
}

// RegisterConnectCallback registers a callback for the elasticsearch output
// The callback is called each time the client connects to elasticsearch.
func RegisterConnectCallback(callback connectCallback) {
// It returns the key of the newly added callback, so it can be deregistered later.
func RegisterConnectCallback(callback connectCallback) uuid.UUID {
connectCallbackRegistry.mutex.Lock()
defer connectCallbackRegistry.mutex.Unlock()

// find the next unique key
var key uuid.UUID
exists := true
for exists {
key = uuid.NewV4()
_, exists = connectCallbackRegistry.callbacks[key]
}

connectCallbackRegistry.callbacks[key] = callback
return key
}

// DeregisterConnectCallback deregisters a callback for the elasticsearch output
// specified by its key. If a callback does not exist, nothing happens.
func DeregisterConnectCallback(key uuid.UUID) {
connectCallbackRegistry.mutex.Lock()
defer connectCallbackRegistry.mutex.Unlock()
connectCallbackRegistry.callbacks = append(connectCallbackRegistry.callbacks, callback)

delete(connectCallbackRegistry.callbacks, key)
}

func makeES(
Expand Down
39 changes: 39 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"fmt"
"testing"
)

func TestConnectCallbacksManagement(t *testing.T) {
f0 := func(client *Client) error { fmt.Println("i am function #0"); return nil }
f1 := func(client *Client) error { fmt.Println("i am function #1"); return nil }
f2 := func(client *Client) error { fmt.Println("i am function #2"); return nil }

_ = RegisterConnectCallback(f0)
id1 := RegisterConnectCallback(f1)
id2 := RegisterConnectCallback(f2)

t.Logf("removing second callback")
DeregisterConnectCallback(id1)
if _, ok := connectCallbackRegistry.callbacks[id2]; !ok {
t.Fatalf("third callback cannot be retrieved")
}
}