From 15ff8ce9dee07868b5b87699da89490a55df2872 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 10 Jan 2019 09:16:29 -0800 Subject: [PATCH 1/6] Emit error if fileset with multiple pipelines is being used with ES < 6.5 --- filebeat/fileset/pipelines.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 0b6e853ce22..93e5fe04b3c 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -58,6 +58,12 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) } + // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 + esVersion := esClient.GetVersion() + if len(pipelines) > 1 && esVersion.LessThan(common.MustNewVersion("6.5.0")) { + return fmt.Errorf("filesets with multiple pipelines require Elasticsearch >= 6.5.0. Currently running with Elasticsearch version %s", esVersion.String()) + } + var pipelineIDsLoaded []string for _, pipeline := range pipelines { err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite) From c2b7cafc41dbd192266eeaf9dc056191a9a8c890 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 10 Jan 2019 09:24:05 -0800 Subject: [PATCH 2/6] Better error message --- filebeat/fileset/pipelines.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 93e5fe04b3c..df7742a4f6d 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -60,8 +60,9 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 esVersion := esClient.GetVersion() - if len(pipelines) > 1 && esVersion.LessThan(common.MustNewVersion("6.5.0")) { - return fmt.Errorf("filesets with multiple pipelines require Elasticsearch >= 6.5.0. Currently running with Elasticsearch version %s", esVersion.String()) + minESVersionRequired := common.MustNewVersion("6.5.0") + if len(pipelines) > 1 && esVersion.LessThan(minESVersionRequired) { + return fmt.Errorf("the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s", module, name, minESVersionRequired.String(), esVersion.String()) } var pipelineIDsLoaded []string From 3a0f456d3e02833a5e559ac8663ebaf95efc2dd8 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 11 Jan 2019 02:51:48 -0800 Subject: [PATCH 3/6] Adding CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8600f545e6f..77355006d6e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Rename a few `logstash.*` fields to map to ECS, remove logstash.slowlog.message. {pull}9935[9935] - Rename a few `mysql.*` fields to map to ECS. {pull}10008[10008] - Rename a few `nginx.error.*` fields to map to ECS. {pull}10007[10007] +- Filesets with multiple ingest pipelines added in {pull}8914[8914] only work with Elasticsearch >= 6.5.0 {pull}10001[10001] *Heartbeat* From 28edeaafe0444c0b24379a1174c0413a489225ca Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 11 Jan 2019 03:42:05 -0800 Subject: [PATCH 4/6] Adding unit test for various version checks --- filebeat/fileset/pipelines_test.go | 101 ++++++++++++++++++ .../fileset/test/mod/fls/pipeline-json.json | 1 + .../fileset/test/mod/fls/pipeline-plain.json | 1 + filebeat/fileset/test/pipeline-plain.json | 0 4 files changed, 103 insertions(+) create mode 100644 filebeat/fileset/pipelines_test.go create mode 100644 filebeat/fileset/test/mod/fls/pipeline-json.json create mode 100644 filebeat/fileset/test/mod/fls/pipeline-plain.json create mode 100644 filebeat/fileset/test/pipeline-plain.json diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go new file mode 100644 index 00000000000..409b13f9993 --- /dev/null +++ b/filebeat/fileset/pipelines_test.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package fileset + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/outputs/elasticsearch" +) + +func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { + tests := []struct { + name string + esVersion string + expectedErr string + }{ + { + name: "ES < 6.5.0", + esVersion: "6.4.1", + expectedErr: "the mod/fls fileset has multiple pipelines, which are only supported with Elasticsearch >= 6.5.0. Currently running with Elasticsearch version 6.4.1", + }, + { + name: "ES == 6.5.0", + esVersion: "6.5.0", + expectedErr: "", + }, + { + name: "ES > 6.5.0", + esVersion: "6.6.0", + expectedErr: "", + }, + } + + for _, test := range tests { + testFilesetManifest := &manifest{ + Requires: struct { + Processors []ProcessorRequirement `config:"processors"` + }{ + Processors: []ProcessorRequirement{}, + }, + IngestPipeline: []string{"pipeline-plain.json", "pipeline-json.json"}, + } + testFileset := &Fileset{ + name: "fls", + modulePath: "./test/mod", + manifest: testFilesetManifest, + vars: map[string]interface{}{ + "builtin": map[string]interface{}{}, + }, + pipelineIDs: []string{"filebeat-7.0.0-mod-fls-pipeline-plain", "filebeat-7.0.0-mod-fls-pipeline-json"}, + } + testRegistry := ModuleRegistry{ + registry: map[string]map[string]*Fileset{ + "mod": map[string]*Fileset{ + "fls": testFileset, + }, + }, + } + + testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("{\"version\":{\"number\":\"" + test.esVersion + "\"}}")) + })) + defer testESServer.Close() + + testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ + URL: testESServer.URL, + }, nil) + assert.NoError(t, err) + + err = testESClient.Connect() + assert.NoError(t, err) + + err = testRegistry.LoadPipelines(testESClient, false) + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.Error(t, err, test.expectedErr) + } + } +} diff --git a/filebeat/fileset/test/mod/fls/pipeline-json.json b/filebeat/fileset/test/mod/fls/pipeline-json.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/filebeat/fileset/test/mod/fls/pipeline-json.json @@ -0,0 +1 @@ +{} diff --git a/filebeat/fileset/test/mod/fls/pipeline-plain.json b/filebeat/fileset/test/mod/fls/pipeline-plain.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/filebeat/fileset/test/mod/fls/pipeline-plain.json @@ -0,0 +1 @@ +{} diff --git a/filebeat/fileset/test/pipeline-plain.json b/filebeat/fileset/test/pipeline-plain.json new file mode 100644 index 00000000000..e69de29bb2d From e837254aacab48d1ff9b2d6b240ebd07afe013aa Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 11 Jan 2019 08:34:48 -0800 Subject: [PATCH 5/6] Using t.Run() to shutdown HTTP server at the end of each test case --- filebeat/fileset/pipelines_test.go | 86 ++++++++++++++++-------------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 409b13f9993..13b2fb3522e 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -30,7 +30,7 @@ import ( ) func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { - tests := []struct { + cases := []struct { name string esVersion string expectedErr string @@ -52,50 +52,54 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }, } - for _, test := range tests { - testFilesetManifest := &manifest{ - Requires: struct { - Processors []ProcessorRequirement `config:"processors"` - }{ - Processors: []ProcessorRequirement{}, - }, - IngestPipeline: []string{"pipeline-plain.json", "pipeline-json.json"}, - } - testFileset := &Fileset{ - name: "fls", - modulePath: "./test/mod", - manifest: testFilesetManifest, - vars: map[string]interface{}{ - "builtin": map[string]interface{}{}, - }, - pipelineIDs: []string{"filebeat-7.0.0-mod-fls-pipeline-plain", "filebeat-7.0.0-mod-fls-pipeline-json"}, - } - testRegistry := ModuleRegistry{ - registry: map[string]map[string]*Fileset{ - "mod": map[string]*Fileset{ - "fls": testFileset, + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + testFilesetManifest := &manifest{ + Requires: struct { + Processors []ProcessorRequirement `config:"processors"` + }{ + Processors: []ProcessorRequirement{}, }, - }, - } - - testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("{\"version\":{\"number\":\"" + test.esVersion + "\"}}")) - })) - defer testESServer.Close() + IngestPipeline: []string{"pipeline-plain.json", "pipeline-json.json"}, + } + testFileset := &Fileset{ + name: "fls", + modulePath: "./test/mod", + manifest: testFilesetManifest, + vars: map[string]interface{}{ + "builtin": map[string]interface{}{}, + }, + pipelineIDs: []string{"filebeat-7.0.0-mod-fls-pipeline-plain", "filebeat-7.0.0-mod-fls-pipeline-json"}, + } + testRegistry := ModuleRegistry{ + registry: map[string]map[string]*Fileset{ + "mod": map[string]*Fileset{ + "fls": testFileset, + }, + }, + } - testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ - URL: testESServer.URL, - }, nil) - assert.NoError(t, err) + testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("{\"version\":{\"number\":\"" + test.esVersion + "\"}}")) + })) + defer testESServer.Close() - err = testESClient.Connect() - assert.NoError(t, err) + testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ + URL: testESServer.URL, + }, nil) + assert.NoError(t, err) - err = testRegistry.LoadPipelines(testESClient, false) - if test.expectedErr == "" { + err = testESClient.Connect() assert.NoError(t, err) - } else { - assert.Error(t, err, test.expectedErr) - } + + err = testRegistry.LoadPipelines(testESClient, false) + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.Error(t, err, test.expectedErr) + } + }) } } From 27c3e972e321d6c6dfd5cb3a1f69f0a4023ea70b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 11 Jan 2019 08:54:35 -0800 Subject: [PATCH 6/6] Use custom error --- filebeat/fileset/pipelines.go | 21 ++++++++++++++++++++- filebeat/fileset/pipelines_test.go | 30 +++++++++++++++--------------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index df7742a4f6d..07e520d64f8 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -39,6 +39,25 @@ type PipelineLoader interface { GetVersion() common.Version } +// MultiplePipelineUnsupportedError is an error returned when a fileset uses multiple pipelines but is +// running against a version of Elasticsearch that doesn't support this feature. +type MultiplePipelineUnsupportedError struct { + module string + fileset string + esVersion common.Version + minESVersionRequired common.Version +} + +func (m MultiplePipelineUnsupportedError) Error() string { + return fmt.Sprintf( + "the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s", + m.module, + m.fileset, + m.minESVersionRequired.String(), + m.esVersion.String(), + ) +} + // LoadPipelines loads the pipelines for each configured fileset. func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error { for module, filesets := range reg.registry { @@ -62,7 +81,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool esVersion := esClient.GetVersion() minESVersionRequired := common.MustNewVersion("6.5.0") if len(pipelines) > 1 && esVersion.LessThan(minESVersionRequired) { - return fmt.Errorf("the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s", module, name, minESVersionRequired.String(), esVersion.String()) + return MultiplePipelineUnsupportedError{module, name, esVersion, *minESVersionRequired} } var pipelineIDsLoaded []string diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 13b2fb3522e..194df5e9f14 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -31,24 +31,24 @@ import ( func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { cases := []struct { - name string - esVersion string - expectedErr string + name string + esVersion string + isErrExpected bool }{ { - name: "ES < 6.5.0", - esVersion: "6.4.1", - expectedErr: "the mod/fls fileset has multiple pipelines, which are only supported with Elasticsearch >= 6.5.0. Currently running with Elasticsearch version 6.4.1", + name: "ES < 6.5.0", + esVersion: "6.4.1", + isErrExpected: true, }, { - name: "ES == 6.5.0", - esVersion: "6.5.0", - expectedErr: "", + name: "ES == 6.5.0", + esVersion: "6.5.0", + isErrExpected: false, }, { - name: "ES > 6.5.0", - esVersion: "6.6.0", - expectedErr: "", + name: "ES > 6.5.0", + esVersion: "6.6.0", + isErrExpected: false, }, } @@ -95,10 +95,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { assert.NoError(t, err) err = testRegistry.LoadPipelines(testESClient, false) - if test.expectedErr == "" { - assert.NoError(t, err) + if test.isErrExpected { + assert.IsType(t, MultiplePipelineUnsupportedError{}, err) } else { - assert.Error(t, err, test.expectedErr) + assert.NoError(t, err) } }) }