From 9dbfd44594c94fd759a05a507ce1d04e67d6d7b6 Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Mon, 15 Feb 2021 16:51:12 +0100 Subject: [PATCH] [Filebeat] Check if processor is supported by ES version (#23763) * adding possibility to remove processor if its unsupported * removing unused function while testing * was initially testing with a separate function, removed that for now * moving it to its own function * rewriting SetProcessors logic to be more extendable * added testcases for uri parts * stashing changes * Fix assert.NoError usages These should have been require.NoError otherwise panics occur. * Package cleanup Errors should not begin with capitals. Avoid double logging errors (both logging and returning them). Fix usage of deprecated logp functions. Remove unused code (ML module). Removed unneeded import alias. Avoid allocating empty slices. Add missing error check for config Unpack. * Add changelog Co-authored-by: Andrew Kroh --- CHANGELOG.next.asciidoc | 2 + filebeat/fileset/compatibility.go | 224 +++++++ filebeat/fileset/compatibility_test.go | 643 +++++++++++++++++++ filebeat/fileset/config_test.go | 3 +- filebeat/fileset/factory.go | 18 +- filebeat/fileset/fileset_test.go | 55 +- filebeat/fileset/modules.go | 63 +- filebeat/fileset/modules_integration_test.go | 43 +- filebeat/fileset/modules_test.go | 78 +-- filebeat/fileset/pipelines.go | 194 +----- filebeat/fileset/pipelines_test.go | 500 +------------- filebeat/fileset/setup.go | 9 +- 12 files changed, 1026 insertions(+), 806 deletions(-) create mode 100644 filebeat/fileset/compatibility.go create mode 100644 filebeat/fileset/compatibility_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 21ce698db828..1759d22c0282 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -825,6 +825,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added `application/x-www-form-urlencoded` as encode option for httpjson input {pull}23521[23521] - Move aws-s3 input to GA. {pull}23631[23631] - Populate `source.mac` and `destination.mac` for Suricata EVE events. {issue}23706[23706] {pull}23721[23721] +- Added feature to modules to adapt Ingest Node pipelines for compatibility with older Elasticsearch versions by + removing unsupported processors. {pull}23763[23763] - Added RFC6587 framing option for tcp and unix inputs {issue}23663[23663] {pull}23724[23724] - Added string splitting for httpjson input {pull}24022[24022] diff --git a/filebeat/fileset/compatibility.go b/filebeat/fileset/compatibility.go new file mode 100644 index 000000000000..210c93e1b4c4 --- /dev/null +++ b/filebeat/fileset/compatibility.go @@ -0,0 +1,224 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fileset + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// processorCompatibility defines a processor's minimum version requirements or +// a transformation to make it compatible. +type processorCompatibility struct { + checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. + procType string // Elasticsearch Ingest Node processor type. + adaptConfig func(processor map[string]interface{}, log *logp.Logger) (drop bool, err error) // Adapt the configuration to make it compatible. +} + +var processorCompatibilityChecks = []processorCompatibility{ + { + procType: "append", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.10.0")) + }, + adaptConfig: replaceAppendAllowDuplicates, + }, + { + procType: "community_id", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.12.0")) + }, + adaptConfig: deleteProcessor, + }, + { + procType: "set", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.9.0")) + }, + adaptConfig: replaceSetIgnoreEmptyValue, + }, + { + procType: "uri_parts", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.12.0")) + }, + adaptConfig: deleteProcessor, + }, + { + procType: "user_agent", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.0.0")) && + !esVersion.LessThan(common.MustNewVersion("6.7.0")) + }, + adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) { + config["ecs"] = true + return false, nil + }, + }, + { + procType: "user_agent", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("6.7.0")) + }, + adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) { + return false, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") + }, + }, +} + +// adaptPipelineForCompatibility iterates over all processors in the pipeline +// and adapts them for version of Elasticsearch used. Adapt can mean modifying +// processor options or removing the processor. +func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) error { + p, ok := content["processors"] + if !ok { + return errors.New("'processors' is missing from the pipeline definition") + } + + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + var filteredProcs []interface{} + +nextProcessor: + for i, obj := range processors { + processor, ok := obj.(map[string]interface{}) + if !ok { + return fmt.Errorf("processor at index %d is not an object, got %T", i, obj) + } + + for _, proc := range processorCompatibilityChecks { + configIfc, found := processor[proc.procType] + if !found { + continue + } + config, ok := configIfc.(map[string]interface{}) + if !ok { + return fmt.Errorf("processor config at index %d is not an object, got %T", i, obj) + } + + if !proc.checkVersion(&esVersion) { + continue + } + + drop, err := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i)) + if err != nil { + return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err) + } + if drop { + continue nextProcessor + } + } + + filteredProcs = append(filteredProcs, processors[i]) + } + + content["processors"] = filteredProcs + return nil +} + +// deleteProcessor returns true to indicate that the processor should be deleted +// in order to adapt the pipeline for backwards compatibility to Elasticsearch. +func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) (bool, error) { return true, nil } + +// replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if +// statement so ES less than 7.9 will work. +func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) (bool, error) { + _, ok := config["ignore_empty_value"].(bool) + if !ok { + return false, nil + } + + log.Debug("Removing unsupported 'ignore_empty_value' from set processor.") + delete(config, "ignore_empty_value") + + _, ok = config["if"].(string) + if ok { + // assume if check is sufficient + return false, nil + } + val, ok := config["value"].(string) + if !ok { + return false, nil + } + + newIf := strings.TrimLeft(val, "{ ") + newIf = strings.TrimRight(newIf, "} ") + newIf = strings.ReplaceAll(newIf, ".", "?.") + newIf = "ctx?." + newIf + " != null" + + log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) + config["if"] = newIf + return false, nil +} + +// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement +// so ES less than 7.10 will work. +func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) { + allow, ok := config["allow_duplicates"].(bool) + if !ok { + return false, nil + } + + log.Debug("Removing unsupported 'allow_duplicates' from append processor.") + delete(config, "allow_duplicates") + + if allow { + // It was set to true, nothing else to do after removing the option. + return false, nil + } + + currIf, _ := config["if"].(string) + if strings.Contains(strings.ToLower(currIf), "contains") { + // If it has a contains statement, we assume it is checking for duplicates already. + return false, nil + } + field, ok := config["field"].(string) + if !ok { + return false, nil + } + val, ok := config["value"].(string) + if !ok { + return false, nil + } + + field = strings.ReplaceAll(field, ".", "?.") + + val = strings.TrimLeft(val, "{ ") + val = strings.TrimRight(val, "} ") + val = strings.ReplaceAll(val, ".", "?.") + + if currIf == "" { + // if there is not a previous if we add a value sanity check + currIf = fmt.Sprintf("ctx?.%s != null", val) + } + + newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) + + log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) + config["if"] = newIf + + return false, nil +} diff --git a/filebeat/fileset/compatibility_test.go b/filebeat/fileset/compatibility_test.go new file mode 100644 index 000000000000..bc089879082b --- /dev/null +++ b/filebeat/fileset/compatibility_test.go @@ -0,0 +1,643 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package fileset + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestAdaptPipelineForCompatibility(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 6.7.0", + esVersion: common.MustNewVersion("6.6.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }}, + isErrExpected: true, + }, + { + name: "ES == 6.7.0", + esVersion: common.MustNewVersion("6.7.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES >= 7.0.0", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content) + } + }) + } +} + +func TestReplaceSetIgnoreEmptyValue(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.9.0", + esVersion: common.MustNewVersion("7.8.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.9.0", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.9.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ignore_empty_value is false", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": false, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "ignore_empty_value": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} + +func TestReplaceAppendAllowDuplicates(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.10.0: set to true", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: set to false", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.10.0", + esVersion: common.MustNewVersion("7.10.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.10.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: existing if with contains", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} + +func TestRemoveURIPartsProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.12.0", + esVersion: common.MustNewVersion("7.11.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.12.0", + esVersion: common.MustNewVersion("7.12.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES > 7.12.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} diff --git a/filebeat/fileset/config_test.go b/filebeat/fileset/config_test.go index 972d7eff6319..ecefac2e181c 100644 --- a/filebeat/fileset/config_test.go +++ b/filebeat/fileset/config_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" ) @@ -34,7 +35,7 @@ func TestInputSettings(t *testing.T) { } c, err := common.NewConfigFrom(cfg) - assert.NoError(t, err) + require.NoError(t, err) f, err := NewFilesetConfig(c) if assert.NoError(t, err) { diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index b0656205321e..43aa8e1636ed 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -57,6 +57,7 @@ type inputsRunner struct { pipelineLoaderFactory PipelineLoaderFactory pipelineCallbackID uuid.UUID overwritePipelines bool + log *logp.Logger } // NewFactory instantiates a new Factory @@ -84,7 +85,9 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru // Hash module ID var h map[string]interface{} - c.Unpack(&h) + if err = c.Unpack(&h); err != nil { + return nil, fmt.Errorf("failed to unpack config: %w", err) + } id, err := hashstructure.Hash(h, nil) if err != nil { return nil, err @@ -94,8 +97,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru for i, pConfig := range pConfigs { inputs[i], err = f.inputFactory.Create(p, pConfig) if err != nil { - logp.Err("Error creating input: %s", err) - return nil, err + return nil, fmt.Errorf("failed to create input: %w", err) } } @@ -106,6 +108,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru pipelineLoaderFactory: f.pipelineLoaderFactory, pipelineCallbackID: f.pipelineCallbackID, overwritePipelines: f.overwritePipelines, + log: logp.NewLogger(logName), }, nil } @@ -118,8 +121,7 @@ func (f *Factory) CheckConfig(c *common.Config) error { for _, pConfig := range pConfigs { err = f.inputFactory.CheckConfig(pConfig) if err != nil { - logp.Err("Error checking input configuration: %s", err) - return err + return fmt.Errorf("error checking input configuration: %w", err) } } @@ -153,12 +155,12 @@ func (p *inputsRunner) Start() { // makes it possible to try to load pipeline when ES becomes reachable. pipelineLoader, err := p.pipelineLoaderFactory() if err != nil { - logp.Err("Error loading pipeline: %s", err) + p.log.Errorf("Error loading pipeline: %s", err) } else { err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines) if err != nil { // Log error and continue - logp.Err("Error loading pipeline: %s", err) + p.log.Errorf("Error loading pipeline: %s", err) } } @@ -168,7 +170,7 @@ func (p *inputsRunner) Start() { } p.pipelineCallbackID, err = elasticsearch.RegisterConnectCallback(callback) if err != nil { - logp.Err("Error registering connect callback for Elasticsearch to load pipelines: %v", err) + p.log.Errorf("Error registering connect callback for Elasticsearch to load pipelines: %v", err) } } diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 4a8087af2b48..430d71e0db7a 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -26,9 +26,8 @@ import ( "testing" "text/template" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -43,9 +42,9 @@ func makeTestInfo(version string) beat.Info { func getModuleForTesting(t *testing.T, module, fileset string) *Fileset { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) fs, err := New(modulesPath, fileset, &ModuleConfig{Module: module}, &FilesetConfig{}) - assert.NoError(t, err) + require.NoError(t, err) return fs } @@ -54,7 +53,7 @@ func TestLoadManifestNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") manifest, err := fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, manifest.ModuleVersion, "1.0") assert.Equal(t, manifest.IngestPipeline, []string{"ingest/pipeline.yml"}) assert.Equal(t, manifest.Input, "config/nginx-access.yml") @@ -69,7 +68,7 @@ func TestGetBuiltinVars(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") vars, err := fs.getBuiltinVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) assert.IsType(t, vars["hostname"], "a-mac-with-esc-key") assert.IsType(t, vars["domain"], "local") @@ -83,10 +82,10 @@ func TestEvaluateVarsNginx(t *testing.T) { var err error fs.manifest, err = fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) vars, err := fs.evaluateVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) assert.IsType(t, "a-mac-with-esc-key", builtin["hostname"]) @@ -97,19 +96,19 @@ func TestEvaluateVarsNginx(t *testing.T) { func TestEvaluateVarsNginxOverride(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ Var: map[string]interface{}{ "pipeline": "no_plugins", }, }) - assert.NoError(t, err) + require.NoError(t, err) fs.manifest, err = fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) vars, err := fs.evaluateVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "no_plugins", vars["pipeline"]) } @@ -119,10 +118,10 @@ func TestEvaluateVarsMySQL(t *testing.T) { var err error fs.manifest, err = fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) vars, err := fs.evaluateVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) assert.IsType(t, "a-mac-with-esc-key", builtin["hostname"]) @@ -172,29 +171,29 @@ func TestResolveVariable(t *testing.T) { for _, test := range tests { result, err := resolveVariable(test.Vars, test.Value) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.Expected, result) } } func TestGetInputConfigNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read(makeTestInfo("5.2.0"))) + require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) cfg, err := fs.getInputConfig() - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, cfg.HasField("paths")) assert.True(t, cfg.HasField("exclude_files")) assert.True(t, cfg.HasField("pipeline")) pipelineID, err := cfg.String("pipeline", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "filebeat-5.2.0-nginx-access-pipeline", pipelineID) } func TestGetInputConfigNginxOverrides(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) tests := map[string]struct { input map[string]interface{} @@ -216,7 +215,7 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { require.True(t, v) pipelineID, err := c.String("pipeline", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "filebeat-5.2.0-nginx-access-pipeline", pipelineID) }, }, @@ -242,12 +241,12 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ Input: test.input, }) - assert.NoError(t, err) + require.NoError(t, err) - assert.NoError(t, fs.Read(makeTestInfo("5.2.0"))) + require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) cfg, err := fs.getInputConfig() - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, cfg.HasField("paths")) assert.True(t, cfg.HasField("exclude_files")) @@ -256,11 +255,11 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { test.expectedFn(t, cfg) moduleName, err := cfg.String("_module_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "nginx", moduleName) filesetName, err := cfg.String("_fileset_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "access", filesetName) }) } @@ -268,11 +267,11 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { func TestGetPipelineNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read(makeTestInfo("5.2.0"))) + require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) version := common.MustNewVersion("5.2.0") pipelines, err := fs.GetPipelines(*version) - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, pipelines, 1) pipeline := pipelines[0] @@ -286,7 +285,7 @@ func TestGetTemplateFunctions(t *testing.T) { "builtin": map[string]interface{}{}, } templateFunctions, err := getTemplateFunctions(vars) - assert.NoError(t, err) + require.NoError(t, err) assert.IsType(t, template.FuncMap{}, templateFunctions) assert.Contains(t, templateFunctions, "inList") assert.Contains(t, templateFunctions, "tojson") diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index aa0260031cea..3df41999f8fb 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -26,7 +26,7 @@ import ( "strings" "github.com/pkg/errors" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -34,13 +34,11 @@ import ( "github.com/elastic/beats/v7/libbeat/paths" ) -var availableMLModules = map[string]string{ - "apache": "access", - "nginx": "access", -} +const logName = "modules" type ModuleRegistry struct { registry map[string]map[string]*Fileset // module -> fileset -> Fileset + log *logp.Logger } // newModuleRegistry reads and loads the configured module into the registry. @@ -49,25 +47,26 @@ func newModuleRegistry(modulesPath string, overrides *ModuleOverrides, beatInfo beat.Info, ) (*ModuleRegistry, error) { - - var reg ModuleRegistry - reg.registry = map[string]map[string]*Fileset{} + reg := ModuleRegistry{ + registry: map[string]map[string]*Fileset{}, + log: logp.NewLogger(logName), + } for _, mcfg := range moduleConfigs { - if mcfg.Enabled != nil && (*mcfg.Enabled) == false { + if mcfg.Enabled != nil && !(*mcfg.Enabled) { continue } // Look for moved modules if module, moved := getCurrentModuleName(modulesPath, mcfg.Module); moved { - logp.Warn("Using old name '%s' for module '%s', please update your configuration", mcfg.Module, module) + reg.log.Warnf("Configuration uses the old name %q for module %q, please update your configuration.", mcfg.Module, module) mcfg.Module = module } reg.registry[mcfg.Module] = map[string]*Fileset{} moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module) if err != nil { - return nil, fmt.Errorf("Error getting filesets for module %s: %v", mcfg.Module, err) + return nil, fmt.Errorf("error getting filesets for module %s: %v", mcfg.Module, err) } for _, filesetName := range moduleFilesets { @@ -78,10 +77,10 @@ func newModuleRegistry(modulesPath string, fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides) if err != nil { - return nil, fmt.Errorf("Error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err) + return nil, fmt.Errorf("error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err) } - if fcfg.Enabled != nil && (*fcfg.Enabled) == false { + if fcfg.Enabled != nil && !(*fcfg.Enabled) { continue } @@ -89,16 +88,15 @@ func newModuleRegistry(modulesPath string, if err != nil { return nil, err } - err = fileset.Read(beatInfo) - if err != nil { - return nil, fmt.Errorf("Error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) + if err = fileset.Read(beatInfo); err != nil { + return nil, fmt.Errorf("error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) } reg.registry[mcfg.Module][filesetName] = fileset } // check that no extra filesets are configured for filesetName, fcfg := range mcfg.Filesets { - if fcfg.Enabled != nil && (*fcfg.Enabled) == false { + if fcfg.Enabled != nil && !(*fcfg.Enabled) { continue } found := false @@ -122,8 +120,9 @@ func NewModuleRegistry(moduleConfigs []*common.Config, beatInfo beat.Info, init stat, err := os.Stat(modulesPath) if err != nil || !stat.IsDir() { - logp.Err("Not loading modules. Module directory not found: %s", modulesPath) - return &ModuleRegistry{}, nil // empty registry, no error + log := logp.NewLogger(logName) + log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) + return &ModuleRegistry{log: log}, nil // empty registry, no error } var modulesCLIList []string @@ -217,7 +216,7 @@ func getModuleFilesets(modulePath, module string) ([]string, error) { return []string{}, err } - filesets := []string{} + var filesets []string for _, fi := range fileInfos { if fi.IsDir() { // check also that the `manifest.yml` file exists @@ -246,7 +245,7 @@ func applyOverrides(fcfg *FilesetConfig, config, err := common.NewConfigFrom(fcfg) if err != nil { - return nil, fmt.Errorf("Error creating vars config object: %v", err) + return nil, fmt.Errorf("error creating vars config object: %v", err) } toMerge := []*common.Config{config} @@ -254,12 +253,12 @@ func applyOverrides(fcfg *FilesetConfig, resultConfig, err := common.MergeConfigs(toMerge...) if err != nil { - return nil, fmt.Errorf("Error merging configs: %v", err) + return nil, fmt.Errorf("error merging configs: %v", err) } res, err := NewFilesetConfig(resultConfig) if err != nil { - return nil, fmt.Errorf("Error unpacking configs: %v", err) + return nil, fmt.Errorf("error unpacking configs: %v", err) } return res, nil @@ -275,7 +274,7 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([ // built a dictionary with the configured modules modulesMap := map[string]bool{} for _, mcfg := range moduleConfigs { - if mcfg.Enabled != nil && (*mcfg.Enabled) == false { + if mcfg.Enabled != nil && !(*mcfg.Enabled) { continue } modulesMap[mcfg.Module] = true @@ -291,12 +290,12 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([ } func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { - result := []*common.Config{} + var result []*common.Config for module, filesets := range reg.registry { for name, fileset := range filesets { fcfg, err := fileset.getInputConfig() if err != nil { - return result, fmt.Errorf("Error getting config for fileset %s/%s: %v", + return result, fmt.Errorf("error getting config for fileset %s/%s: %v", module, name, err) } result = append(result, fcfg) @@ -340,17 +339,17 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc } status, body, err := esClient.Request("GET", "/_nodes/ingest", "", nil, nil) if err != nil { - return fmt.Errorf("Error querying _nodes/ingest: %v", err) + return fmt.Errorf("error querying _nodes/ingest: %v", err) } if status > 299 { - return fmt.Errorf("Error querying _nodes/ingest. Status: %d. Response body: %s", status, body) + return fmt.Errorf("error querying _nodes/ingest. Status: %d. Response body: %s", status, body) } err = json.Unmarshal(body, &response) if err != nil { - return fmt.Errorf("Error unmarshaling json when querying _nodes/ingest. Body: %s", body) + return fmt.Errorf("error unmarshaling json when querying _nodes/ingest. Body: %s", body) } - missing := []ProcessorRequirement{} + var missing []ProcessorRequirement for _, requiredProcessor := range requiredProcessors { for _, node := range response.Nodes { available := false @@ -368,11 +367,11 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc } if len(missing) > 0 { - missingPlugins := []string{} + var missingPlugins []string for _, proc := range missing { missingPlugins = append(missingPlugins, proc.Plugin) } - errorMsg := fmt.Sprintf("This module requires the following Elasticsearch plugins: %s. "+ + errorMsg := fmt.Sprintf("this module requires the following Elasticsearch plugins: %s. "+ "You can install them by running the following commands on all the Elasticsearch nodes:", strings.Join(missingPlugins, ", ")) for _, plugin := range missingPlugins { diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 00ced07f6b8a..b4b5c40bb754 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -25,10 +25,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" + "github.com/elastic/beats/v7/libbeat/logp" ) func makeTestInfo(version string) beat.Info { @@ -48,8 +50,8 @@ func TestLoadPipeline(t *testing.T) { content := map[string]interface{}{ "description": "describe pipeline", - "processors": []map[string]interface{}{ - { + "processors": []interface{}{ + map[string]interface{}{ "set": map[string]interface{}{ "field": "foo", "value": "bar", @@ -58,28 +60,29 @@ func TestLoadPipeline(t *testing.T) { }, } - err := loadPipeline(client, "my-pipeline-id", content, false) - assert.NoError(t, err) + log := logp.NewLogger(logName) + err := loadPipeline(client, "my-pipeline-id", content, false, log) + require.NoError(t, err) status, _, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 200, status) // loading again shouldn't actually update the pipeline content["description"] = "describe pipeline 2" - err = loadPipeline(client, "my-pipeline-id", content, false) - assert.NoError(t, err) + err = loadPipeline(client, "my-pipeline-id", content, false, log) + require.NoError(t, err) checkUploadedPipeline(t, client, "describe pipeline") // loading again updates the pipeline - err = loadPipeline(client, "my-pipeline-id", content, true) - assert.NoError(t, err) + err = loadPipeline(client, "my-pipeline-id", content, true, log) + require.NoError(t, err) checkUploadedPipeline(t, client, "describe pipeline 2") } func checkUploadedPipeline(t *testing.T, client *eslegclient.Connection, expectedDescription string) { status, response, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 200, status) var res map[string]interface{} @@ -99,10 +102,10 @@ func TestSetupNginx(t *testing.T) { client.Request("DELETE", "/_ingest/pipeline/filebeat-5.2.0-nginx-error-pipeline", "", nil, nil) modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) configs := []*ModuleConfig{ - &ModuleConfig{Module: "nginx"}, + {Module: "nginx"}, } reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0")) @@ -133,7 +136,7 @@ func TestAvailableProcessors(t *testing.T) { } err := checkAvailableProcessors(client, requiredProcessors) - assert.NoError(t, err) + require.NoError(t, err) // these don't exists on our integration test setup requiredProcessors = []ProcessorRequirement{ @@ -172,13 +175,13 @@ func TestLoadMultiplePipelines(t *testing.T) { client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil) modulesPath, err := filepath.Abs("../_meta/test/module") - assert.NoError(t, err) + require.NoError(t, err) enabled := true disabled := false filesetConfigs := map[string]*FilesetConfig{ - "multi": &FilesetConfig{Enabled: &enabled}, - "multibad": &FilesetConfig{Enabled: &disabled}, + "multi": {Enabled: &enabled}, + "multibad": {Enabled: &disabled}, } configs := []*ModuleConfig{ &ModuleConfig{"foo", &enabled, filesetConfigs}, @@ -217,16 +220,16 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil) modulesPath, err := filepath.Abs("../_meta/test/module") - assert.NoError(t, err) + require.NoError(t, err) enabled := true disabled := false filesetConfigs := map[string]*FilesetConfig{ - "multi": &FilesetConfig{Enabled: &disabled}, - "multibad": &FilesetConfig{Enabled: &enabled}, + "multi": {Enabled: &disabled}, + "multibad": {Enabled: &enabled}, } configs := []*ModuleConfig{ - &ModuleConfig{"foo", &enabled, filesetConfigs}, + {"foo", &enabled, filesetConfigs}, } reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0")) diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index 371b051084db..f69db27648c5 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -42,17 +43,17 @@ func load(t *testing.T, from interface{}) *common.Config { func TestNewModuleRegistry(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) configs := []*ModuleConfig{ - &ModuleConfig{Module: "nginx"}, - &ModuleConfig{Module: "mysql"}, - &ModuleConfig{Module: "system"}, - &ModuleConfig{Module: "auditd"}, + {Module: "nginx"}, + {Module: "mysql"}, + {Module: "system"}, + {Module: "auditd"}, } reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) expectedModules := map[string][]string{ @@ -77,14 +78,14 @@ func TestNewModuleRegistry(t *testing.T) { for module, filesets := range reg.registry { for name, fileset := range filesets { cfg, err := fileset.getInputConfig() - assert.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name)) + require.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name)) moduleName, err := cfg.String("_module_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, module, moduleName) filesetName, err := cfg.String("_fileset_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, name, filesetName) } } @@ -92,12 +93,12 @@ func TestNewModuleRegistry(t *testing.T) { func TestNewModuleRegistryConfig(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) falseVar := false configs := []*ModuleConfig{ - &ModuleConfig{ + { Module: "nginx", Filesets: map[string]*FilesetConfig{ "access": { @@ -110,29 +111,30 @@ func TestNewModuleRegistryConfig(t *testing.T) { }, }, }, - &ModuleConfig{ + { Module: "mysql", Enabled: &falseVar, }, } reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) nginxAccess := reg.registry["nginx"]["access"] - assert.NotNil(t, nginxAccess) - assert.Equal(t, []interface{}{"/hello/test"}, nginxAccess.vars["paths"]) + if assert.NotNil(t, nginxAccess) { + assert.Equal(t, []interface{}{"/hello/test"}, nginxAccess.vars["paths"]) + } assert.NotContains(t, reg.registry["nginx"], "error") } func TestMovedModule(t *testing.T) { modulesPath, err := filepath.Abs("./test/moved_module") - assert.NoError(t, err) + require.NoError(t, err) configs := []*ModuleConfig{ - &ModuleConfig{ + { Module: "old", Filesets: map[string]*FilesetConfig{ "test": {}, @@ -141,7 +143,7 @@ func TestMovedModule(t *testing.T) { } reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) } @@ -232,7 +234,7 @@ func TestApplyOverrides(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { result, err := applyOverrides(&test.fcfg, test.module, test.fileset, test.overrides) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, &test.expected, result) }) } @@ -251,15 +253,15 @@ func TestAppendWithoutDuplicates(t *testing.T) { configs: []*ModuleConfig{}, modules: []string{"moduleA", "moduleB", "moduleC"}, expected: []*ModuleConfig{ - &ModuleConfig{Module: "moduleA"}, - &ModuleConfig{Module: "moduleB"}, - &ModuleConfig{Module: "moduleC"}, + {Module: "moduleA"}, + {Module: "moduleB"}, + {Module: "moduleC"}, }, }, { name: "eliminate a duplicate, no override", configs: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Filesets: map[string]*FilesetConfig{ "fileset": { @@ -272,7 +274,7 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, modules: []string{"moduleA", "moduleB", "moduleC"}, expected: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Filesets: map[string]*FilesetConfig{ "fileset": { @@ -282,14 +284,14 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, }, }, - &ModuleConfig{Module: "moduleA"}, - &ModuleConfig{Module: "moduleC"}, + {Module: "moduleA"}, + {Module: "moduleC"}, }, }, { name: "disabled config", configs: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Enabled: &falseVar, Filesets: map[string]*FilesetConfig{ @@ -303,7 +305,7 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, modules: []string{"moduleA", "moduleB", "moduleC"}, expected: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Enabled: &falseVar, Filesets: map[string]*FilesetConfig{ @@ -314,9 +316,9 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, }, }, - &ModuleConfig{Module: "moduleA"}, - &ModuleConfig{Module: "moduleB"}, - &ModuleConfig{Module: "moduleC"}, + {Module: "moduleA"}, + {Module: "moduleB"}, + {Module: "moduleC"}, }, }, } @@ -324,7 +326,7 @@ func TestAppendWithoutDuplicates(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { result, err := appendWithoutDuplicates(test.configs, test.modules) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected, result) }) } @@ -377,7 +379,7 @@ func TestMcfgFromConfig(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { result, err := mcfgFromConfig(test.config) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected.Module, result.Module) assert.Equal(t, len(test.expected.Filesets), len(result.Filesets)) for name, fileset := range test.expected.Filesets { @@ -397,12 +399,12 @@ func TestMissingModuleFolder(t *testing.T) { } reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0"}, true) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) // this should return an empty list, but no error inputs, err := reg.GetInputConfigs() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 0, len(inputs)) } @@ -415,19 +417,19 @@ func TestInterpretError(t *testing.T) { { Test: "other plugin not installed", Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`, - Output: "This module requires an Elasticsearch plugin that provides the hello_test processor. " + + Output: "this module requires an Elasticsearch plugin that provides the hello_test processor. " + "Please visit the Elasticsearch documentation for instructions on how to install this plugin. " + "Response body: " + `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`, }, { Test: "Elasticsearch 2.4", Input: `{"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`, - Output: `The Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`, + Output: `the Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`, }, { Test: "Elasticsearch 1.7", Input: `{"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`, - Output: `The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`, + Output: `the Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`, }, { Test: "bad json", diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index fe7eb86c884d..a3ce0b3de423 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -64,17 +64,17 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool for name, fileset := range filesets { // check that all the required Ingest Node plugins are available requiredProcessors := fileset.GetRequiredProcessors() - logp.Debug("modules", "Required processors: %s", requiredProcessors) + reg.log.Debugf("Required processors: %s", requiredProcessors) if len(requiredProcessors) > 0 { err := checkAvailableProcessors(esClient, requiredProcessors) if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + return fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err) } } pipelines, err := fileset.GetPipelines(esClient.GetVersion()) if err != nil { - return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) + return fmt.Errorf("error getting pipeline for fileset %s/%s: %v", module, name, err) } // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 @@ -86,9 +86,9 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool var pipelineIDsLoaded []string for _, pipeline := range pipelines { - err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite) + err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id)) if err != nil { - err = fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err) break } pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id) @@ -112,69 +112,25 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool return nil } -func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error { +func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool, log *logp.Logger) error { path := makeIngestPipelinePath(pipelineID) if !overwrite { status, _, _ := esClient.Request("GET", path, "", nil, nil) if status == 200 { - logp.Debug("modules", "Pipeline %s already loaded", pipelineID) + log.Debug("Pipeline already exists in Elasticsearch.") return nil } } - err := setECSProcessors(esClient.GetVersion(), pipelineID, content) - if err != nil { - return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) - } - - err = modifySetProcessor(esClient.GetVersion(), pipelineID, content) - if err != nil { - return fmt.Errorf("failed to modify set processor in pipeline: %v", err) - } - - if err := modifyAppendProcessor(esClient.GetVersion(), pipelineID, content); err != nil { - return fmt.Errorf("failed to modify append processor in pipeline: %v", err) + if err := adaptPipelineForCompatibility(esClient.GetVersion(), pipelineID, content, log); err != nil { + return fmt.Errorf("failed to adapt pipeline with backwards compatibility changes: %w", err) } body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) } - logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) - return nil -} - -// setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 -// and ES is 6.7.X to ease migration to ECS. -func setECSProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - ecsVersion := common.MustNewVersion("7.0.0") - if !esVersion.LessThan(ecsVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - minUserAgentVersion := common.MustNewVersion("6.7.0") - for _, p := range processors { - processor, ok := p.(map[string]interface{}) - if !ok { - continue - } - if options, ok := processor["user_agent"].(map[string]interface{}); ok { - if esVersion.LessThan(minUserAgentVersion) { - return fmt.Errorf("user_agent processor requires option 'ecs: true', but Elasticsearch %v does not support this option (Elasticsearch %v or newer is required)", esVersion, minUserAgentVersion) - } - logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) - options["ecs"] = true - } - } + log.Info("Elasticsearch pipeline loaded.") return nil } @@ -209,7 +165,7 @@ func interpretError(initialErr error, body []byte) error { } err1x := json.Unmarshal(body, &response1x) if err1x == nil && response1x.Error != "" { - return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+ + return fmt.Errorf("the Filebeat modules require Elasticsearch >= 5.0. "+ "This is the response I got from Elasticsearch: %s", body) } @@ -223,7 +179,7 @@ func interpretError(initialErr error, body []byte) error { strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") && response.Error.RootCause[0].Header.ProcessorType != "" { - return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+ + return fmt.Errorf("this module requires an Elasticsearch plugin that provides the %s processor. "+ "Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+ "Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body) @@ -234,134 +190,10 @@ func interpretError(initialErr error, body []byte) error { response.Error.RootCause[0].Type == "invalid_index_name_exception" && response.Error.RootCause[0].Index == "_ingest" { - return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+ + return fmt.Errorf("the Ingest Node functionality seems to be missing from Elasticsearch. "+ "The Filebeat modules require Elasticsearch >= 5.0. "+ "This is the response I got from Elasticsearch: %s", body) } return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) } - -// modifySetProcessor replaces ignore_empty_value option with an if statement -// so ES less than 7.9 will still work -func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - flagVersion := common.MustNewVersion("7.9.0") - if !esVersion.LessThan(flagVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - for _, p := range processors { - processor, ok := p.(map[string]interface{}) - if !ok { - continue - } - if options, ok := processor["set"].(map[string]interface{}); ok { - _, ok := options["ignore_empty_value"].(bool) - if !ok { - // don't have ignore_empty_value nothing to do - continue - } - - logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID) - delete(options, "ignore_empty_value") - - _, ok = options["if"].(string) - if ok { - // assume if check is sufficient - continue - } - val, ok := options["value"].(string) - if !ok { - continue - } - - newIf := strings.TrimLeft(val, "{ ") - newIf = strings.TrimRight(newIf, "} ") - newIf = strings.ReplaceAll(newIf, ".", "?.") - newIf = "ctx?." + newIf + " != null" - - logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf) - options["if"] = newIf - } - } - return nil -} - -// modifyAppendProcessor replaces allow_duplicates option with an if statement -// so ES less than 7.10 will still work -func modifyAppendProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - flagVersion := common.MustNewVersion("7.10.0") - if !esVersion.LessThan(flagVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - for _, p := range processors { - processor, ok := p.(map[string]interface{}) - if !ok { - continue - } - if options, ok := processor["append"].(map[string]interface{}); ok { - allow, ok := options["allow_duplicates"].(bool) - if !ok { - // don't have allow_duplicates, nothing to do - continue - } - - logp.Debug("modules", "In pipeline %q removing unsupported 'allow_duplicates' in append processor", pipelineID) - delete(options, "allow_duplicates") - if allow { - // it was set to true, nothing else to do after removing the option - continue - } - - currIf, _ := options["if"].(string) - if strings.Contains(strings.ToLower(currIf), "contains") { - // if it has a contains statement, we assume it is checking for duplicates already - continue - } - field, ok := options["field"].(string) - if !ok { - continue - } - val, ok := options["value"].(string) - if !ok { - continue - } - - field = strings.ReplaceAll(field, ".", "?.") - - val = strings.TrimLeft(val, "{ ") - val = strings.TrimRight(val, "} ") - val = strings.ReplaceAll(val, ".", "?.") - - if currIf == "" { - // if there is not a previous if we add a value sanity check - currIf = fmt.Sprintf("ctx?.%s != null", val) - } - - newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) - - logp.Debug("modules", "In pipeline %q adding if %s to replace 'allow_duplicates: false' in append processor", pipelineID, newIf) - options["if"] = newIf - } - } - return nil -} diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 7c617034f107..04d48c67f019 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -25,10 +25,11 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/logp" ) func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { @@ -81,6 +82,7 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { "fls": testFileset, }, }, + log: logp.NewLogger(logName), } testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -92,10 +94,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { URL: testESServer.URL, Timeout: 90 * time.Second, }) - assert.NoError(t, err) + require.NoError(t, err) err = testESClient.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = testRegistry.LoadPipelines(testESClient, false) if test.isErrExpected { @@ -106,491 +108,3 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }) } } - -func TestSetEcsProcessors(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 6.7.0", - esVersion: common.MustNewVersion("6.6.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }}, - isErrExpected: true, - }, - { - name: "ES == 6.7.0", - esVersion: common.MustNewVersion("6.7.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }, - }, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - "ecs": true, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES >= 7.0.0", - esVersion: common.MustNewVersion("7.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }, - }, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }, - }, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := setECSProcessors(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, test.expected, test.content) - } - }) - } -} - -func TestModifySetProcessor(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 7.9.0", - esVersion: common.MustNewVersion("7.8.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES == 7.9.0", - esVersion: common.MustNewVersion("7.9.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES > 7.9.0", - esVersion: common.MustNewVersion("8.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "existing if", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ignore_empty_value is false", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": false, - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "no value", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "ignore_empty_value": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - }, - }, - }}, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, test.expected, test.content, test.name) - } - }) - } -} - -func TestModifyAppendProcessor(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 7.10.0: set to true", - esVersion: common.MustNewVersion("7.9.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: set to false", - esVersion: common.MustNewVersion("7.9.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES == 7.10.0", - esVersion: common.MustNewVersion("7.10.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES > 7.10.0", - esVersion: common.MustNewVersion("8.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: existing if", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - "if": "ctx?.host?.hostname != null", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: existing if with contains", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: no value", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - }, - }, - }}, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := modifyAppendProcessor(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, test.expected, test.content, test.name) - } - }) - } -} diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index b76cf2719662..c90916fbac9d 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) @@ -69,20 +68,20 @@ type SetupCfgRunner struct { // Start loads module pipelines for configured modules. func (sr *SetupCfgRunner) Start() { - logp.Debug("fileset", "Loading ingest pipelines for modules from modules.d") + sr.moduleRegistry.log.Debug("Loading ingest pipelines for modules from modules.d") pipelineLoader, err := sr.pipelineLoaderFactory() if err != nil { - logp.Err("Error loading pipeline: %+v", err) + sr.moduleRegistry.log.Errorf("Error loading pipeline: %+v", err) return } err = sr.moduleRegistry.LoadPipelines(pipelineLoader, sr.overwritePipelines) if err != nil { - logp.Err("Error loading pipeline: %s", err) + sr.moduleRegistry.log.Errorf("Error loading pipeline: %s", err) } } -// Stopp of SetupCfgRunner. +// Stop of SetupCfgRunner. func (sr *SetupCfgRunner) Stop() {} // String returns information on the Runner