Skip to content

Commit

Permalink
Emit error if fileset with multiple pipelines is being used with ES <…
Browse files Browse the repository at this point in the history
… 6.5 (#10001)

Follow up to #8914.

In #8914, we introduced the ability for Filebeat filesets to have multiple Ingest pipelines, the first one being the entry point. This feature relies on the Elasticsearch Ingest node having a `pipeline` processor and `if` conditions for processors, both of which were introduced in Elasticsearch 6.5.0.

This PR implements a check for whether a fileset has multiple Ingest pipelines AND is talking to an Elasticsearch cluster < 6.5.0. If that's the case, we emit an error.
  • Loading branch information
ycombinator authored Jan 13, 2019
1 parent 1ce20cb commit c55226e
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Rename a few `logstash.*` fields to map to ECS, remove logstash.slowlog.message. {pull}9935[9935]
- Rename a few `mysql.*` fields to map to ECS. {pull}10008[10008]
- Rename a few `nginx.error.*` fields to map to ECS. {pull}10007[10007]
- Filesets with multiple ingest pipelines added in {pull}8914[8914] only work with Elasticsearch >= 6.5.0 {pull}10001[10001]

*Heartbeat*

Expand Down
26 changes: 26 additions & 0 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ type PipelineLoader interface {
GetVersion() common.Version
}

// MultiplePipelineUnsupportedError is an error returned when a fileset uses multiple pipelines but is
// running against a version of Elasticsearch that doesn't support this feature.
type MultiplePipelineUnsupportedError struct {
module string
fileset string
esVersion common.Version
minESVersionRequired common.Version
}

func (m MultiplePipelineUnsupportedError) Error() string {
return fmt.Sprintf(
"the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s",
m.module,
m.fileset,
m.minESVersionRequired.String(),
m.esVersion.String(),
)
}

// LoadPipelines loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error {
for module, filesets := range reg.registry {
Expand All @@ -58,6 +77,13 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}

// Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0
esVersion := esClient.GetVersion()
minESVersionRequired := common.MustNewVersion("6.5.0")
if len(pipelines) > 1 && esVersion.LessThan(minESVersionRequired) {
return MultiplePipelineUnsupportedError{module, name, esVersion, *minESVersionRequired}
}

var pipelineIDsLoaded []string
for _, pipeline := range pipelines {
err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite)
Expand Down
105 changes: 105 additions & 0 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package fileset

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/outputs/elasticsearch"
)

func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
cases := []struct {
name string
esVersion string
isErrExpected bool
}{
{
name: "ES < 6.5.0",
esVersion: "6.4.1",
isErrExpected: true,
},
{
name: "ES == 6.5.0",
esVersion: "6.5.0",
isErrExpected: false,
},
{
name: "ES > 6.5.0",
esVersion: "6.6.0",
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
testFilesetManifest := &manifest{
Requires: struct {
Processors []ProcessorRequirement `config:"processors"`
}{
Processors: []ProcessorRequirement{},
},
IngestPipeline: []string{"pipeline-plain.json", "pipeline-json.json"},
}
testFileset := &Fileset{
name: "fls",
modulePath: "./test/mod",
manifest: testFilesetManifest,
vars: map[string]interface{}{
"builtin": map[string]interface{}{},
},
pipelineIDs: []string{"filebeat-7.0.0-mod-fls-pipeline-plain", "filebeat-7.0.0-mod-fls-pipeline-json"},
}
testRegistry := ModuleRegistry{
registry: map[string]map[string]*Fileset{
"mod": map[string]*Fileset{
"fls": testFileset,
},
},
}

testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("{\"version\":{\"number\":\"" + test.esVersion + "\"}}"))
}))
defer testESServer.Close()

testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{
URL: testESServer.URL,
}, nil)
assert.NoError(t, err)

err = testESClient.Connect()
assert.NoError(t, err)

err = testRegistry.LoadPipelines(testESClient, false)
if test.isErrExpected {
assert.IsType(t, MultiplePipelineUnsupportedError{}, err)
} else {
assert.NoError(t, err)
}
})
}
}
1 change: 1 addition & 0 deletions filebeat/fileset/test/mod/fls/pipeline-json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
1 change: 1 addition & 0 deletions filebeat/fileset/test/mod/fls/pipeline-plain.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Empty file.

0 comments on commit c55226e

Please sign in to comment.