From 921b1150dd1565ff470444f7caeaecff9df23a7c Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Fri, 29 Jan 2021 16:14:03 +0100 Subject: [PATCH 01/10] adding possibility to remove processor if its unsupported --- filebeat/fileset/pipelines.go | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index fe7eb86c884d..156d19d8a406 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -178,6 +178,50 @@ func setECSProcessors(esVersion common.Version, pipelineID string, content map[s return nil } +func checkProcessorMinVersion(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + ecsVersion := common.MustNewVersion("7.0.0") + if !esVersion.LessThan(ecsVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + minUserAgentVersion := common.MustNewVersion("6.7.0") + minURIPartsVersion := common.MustNewVersion("7.12") + newProcessors := make([]interface{}, len(processors)) + for i, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } + if options, ok := processor["user_agent"].(map[string]interface{}); ok { + if esVersion.LessThan(minUserAgentVersion) { + return fmt.Errorf("user_agent processor requires option 'ecs: true', but Elasticsearch %v does not support this option (Elasticsearch %v or newer is required)", esVersion, minUserAgentVersion) + } + logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) + options["ecs"] = true + } + if _, ok := processor["uri_parts"].(map[string]interface{}); ok { + if esVersion.LessThan(minURIPartsVersion) { + logp.Debug("processors", "Current version of Elasticsearch: %v does not support the uri_parts processor, minimum version is: %v and newer)", esVersion, minURIPartsVersion) + continue + } + + } + newProcessors = append(newProcessors, processors[i]) + + } + content["processors"] = newProcessors + return nil +} + func deletePipeline(esClient PipelineLoader, pipelineID string) error { path := makeIngestPipelinePath(pipelineID) _, _, err := esClient.Request("DELETE", path, "", nil, nil) @@ -365,3 +409,7 @@ func modifyAppendProcessor(esVersion common.Version, pipelineID string, content } return nil } + +func removeProcessor(processors []interface{}, processor int) []interface{} { + return append(processors[:processor], processors[processor+1:]...) +} From b924adbd6061b50cf3ccf6f85927ecd33335ada3 Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Fri, 29 Jan 2021 16:14:45 +0100 Subject: [PATCH 02/10] removing unused function while testing --- filebeat/fileset/pipelines.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 156d19d8a406..5ab7fbd0027c 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -409,7 +409,3 @@ func modifyAppendProcessor(esVersion common.Version, pipelineID string, content } return nil } - -func removeProcessor(processors []interface{}, processor int) []interface{} { - return append(processors[:processor], processors[processor+1:]...) -} From 4eae0d406daf9e35706b21b501d8a3e72b9a5fe5 Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Fri, 29 Jan 2021 16:21:13 +0100 Subject: [PATCH 03/10] was initially testing with a separate function, removed that for now --- filebeat/fileset/pipelines.go | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 5ab7fbd0027c..9c7670336789 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -161,38 +161,6 @@ func setECSProcessors(esVersion common.Version, pipelineID string, content map[s return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) } - minUserAgentVersion := common.MustNewVersion("6.7.0") - for _, p := range processors { - processor, ok := p.(map[string]interface{}) - if !ok { - continue - } - if options, ok := processor["user_agent"].(map[string]interface{}); ok { - if esVersion.LessThan(minUserAgentVersion) { - return fmt.Errorf("user_agent processor requires option 'ecs: true', but Elasticsearch %v does not support this option (Elasticsearch %v or newer is required)", esVersion, minUserAgentVersion) - } - logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) - options["ecs"] = true - } - } - return nil -} - -func checkProcessorMinVersion(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - ecsVersion := common.MustNewVersion("7.0.0") - if !esVersion.LessThan(ecsVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - minUserAgentVersion := common.MustNewVersion("6.7.0") minURIPartsVersion := common.MustNewVersion("7.12") newProcessors := make([]interface{}, len(processors)) @@ -216,7 +184,6 @@ func checkProcessorMinVersion(esVersion common.Version, pipelineID string, conte } newProcessors = append(newProcessors, processors[i]) - } content["processors"] = newProcessors return nil From a51985fa4806cd98812f6eee26084c9173f05412 Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Fri, 29 Jan 2021 16:26:31 +0100 Subject: [PATCH 04/10] moving it to its own function --- filebeat/fileset/pipelines.go | 36 ++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 9c7670336789..6548fa1d84e6 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -132,6 +132,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return fmt.Errorf("failed to modify set processor in pipeline: %v", err) } + err = checkProcessorMinVersion(esClient.GetVersion(), pipelineID, content) + if err != nil { + return fmt.Errorf("failed to check minimum processor version in pipeline: %v", err) + } + if err := modifyAppendProcessor(esClient.GetVersion(), pipelineID, content); err != nil { return fmt.Errorf("failed to modify append processor in pipeline: %v", err) } @@ -162,9 +167,7 @@ func setECSProcessors(esVersion common.Version, pipelineID string, content map[s } minUserAgentVersion := common.MustNewVersion("6.7.0") - minURIPartsVersion := common.MustNewVersion("7.12") - newProcessors := make([]interface{}, len(processors)) - for i, p := range processors { + for _, p := range processors { processor, ok := p.(map[string]interface{}) if !ok { continue @@ -176,6 +179,32 @@ func setECSProcessors(esVersion common.Version, pipelineID string, content map[s logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) options["ecs"] = true } + } + return nil +} + +func checkProcessorMinVersion(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + ecsVersion := common.MustNewVersion("7.0.0") + if !esVersion.LessThan(ecsVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + minURIPartsVersion := common.MustNewVersion("7.12") + newProcessors := make([]interface{}, len(processors)) + for i, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } if _, ok := processor["uri_parts"].(map[string]interface{}); ok { if esVersion.LessThan(minURIPartsVersion) { logp.Debug("processors", "Current version of Elasticsearch: %v does not support the uri_parts processor, minimum version is: %v and newer)", esVersion, minURIPartsVersion) @@ -184,6 +213,7 @@ func setECSProcessors(esVersion common.Version, pipelineID string, content map[s } newProcessors = append(newProcessors, processors[i]) + } content["processors"] = newProcessors return nil From e8cfd13b05c6bd8ef00117c9fe02f93a9776c56e Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Tue, 9 Feb 2021 11:36:25 +0100 Subject: [PATCH 05/10] rewriting SetProcessors logic to be more extendable --- filebeat/fileset/pipelines.go | 273 +++++++++++++---------------- filebeat/fileset/pipelines_test.go | 7 +- 2 files changed, 123 insertions(+), 157 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 6548fa1d84e6..3c5e36072eaa 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -19,6 +19,7 @@ package fileset import ( "encoding/json" + "errors" "fmt" "strings" @@ -48,6 +49,19 @@ type MultiplePipelineUnsupportedError struct { minESVersionRequired common.Version } +// Processor defines a single processors minimum version requirements. +type Processor struct { + minVersion *common.Version + name string + fn func(pipelineID string, processor map[string]interface{}) error +} + +// Processors is a slice of single processor's, used to check the minimum version +// requirement per processor defined. +type Processors struct { + Processors []Processor +} + func (m MultiplePipelineUnsupportedError) Error() string { return fmt.Sprintf( "the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s", @@ -122,23 +136,9 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string } } - err := setECSProcessors(esClient.GetVersion(), pipelineID, content) - if err != nil { - return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) - } - - err = modifySetProcessor(esClient.GetVersion(), pipelineID, content) + err := setProcessors(esClient.GetVersion(), pipelineID, content) if err != nil { - return fmt.Errorf("failed to modify set processor in pipeline: %v", err) - } - - err = checkProcessorMinVersion(esClient.GetVersion(), pipelineID, content) - if err != nil { - return fmt.Errorf("failed to check minimum processor version in pipeline: %v", err) - } - - if err := modifyAppendProcessor(esClient.GetVersion(), pipelineID, content); err != nil { - return fmt.Errorf("failed to modify append processor in pipeline: %v", err) + return fmt.Errorf("Failed to adapt pipeline with backwards compatibility changes: %w", err) } body, err := esClient.LoadJSON(path, content) @@ -151,44 +151,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string // setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 // and ES is 6.7.X to ease migration to ECS. -func setECSProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - ecsVersion := common.MustNewVersion("7.0.0") - if !esVersion.LessThan(ecsVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - minUserAgentVersion := common.MustNewVersion("6.7.0") - for _, p := range processors { - processor, ok := p.(map[string]interface{}) - if !ok { - continue - } - if options, ok := processor["user_agent"].(map[string]interface{}); ok { - if esVersion.LessThan(minUserAgentVersion) { - return fmt.Errorf("user_agent processor requires option 'ecs: true', but Elasticsearch %v does not support this option (Elasticsearch %v or newer is required)", esVersion, minUserAgentVersion) - } - logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) - options["ecs"] = true - } - } - return nil +func setECSProcessors(pipelineID string, processor map[string]interface{}) error { + return errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") } -func checkProcessorMinVersion(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - ecsVersion := common.MustNewVersion("7.0.0") - if !esVersion.LessThan(ecsVersion) { - return nil - } - +func setProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { p, ok := content["processors"] if !ok { return nil @@ -198,19 +165,57 @@ func checkProcessorMinVersion(esVersion common.Version, pipelineID string, conte return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) } - minURIPartsVersion := common.MustNewVersion("7.12") - newProcessors := make([]interface{}, len(processors)) + // A list of all processor names and versions to be checked. + versionChecks := Processors{ + Processors: []Processor{ + { + name: "uri_parts", + minVersion: common.MustNewVersion("7.12.0"), + fn: nil, + }, + { + name: "set", + minVersion: common.MustNewVersion("7.9.0"), + fn: modifySetProcessor, + }, + { + name: "append", + minVersion: common.MustNewVersion("7.10.0"), + fn: modifyAppendProcessor, + }, + { + name: "user_agent", + minVersion: common.MustNewVersion("6.7.0"), + fn: setECSProcessors, + }, + }, + } + var newProcessors []interface{} for i, p := range processors { processor, ok := p.(map[string]interface{}) if !ok { continue } - if _, ok := processor["uri_parts"].(map[string]interface{}); ok { - if esVersion.LessThan(minURIPartsVersion) { - logp.Debug("processors", "Current version of Elasticsearch: %v does not support the uri_parts processor, minimum version is: %v and newer)", esVersion, minURIPartsVersion) + for _, proc := range versionChecks.Processors { + _, found := processor[proc.name] + if !found { continue } + if options, ok := processor[proc.name].(map[string]interface{}); ok { + if !esVersion.LessThan(proc.minVersion) { + if proc.name == "user_agent" { + logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) + options["ecs"] = true + } + continue + } + if proc.fn != nil { + if err := proc.fn(pipelineID, processor); err != nil { + return err + } + } + } } newProcessors = append(newProcessors, processors[i]) @@ -285,124 +290,84 @@ func interpretError(initialErr error, body []byte) error { // modifySetProcessor replaces ignore_empty_value option with an if statement // so ES less than 7.9 will still work -func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - flagVersion := common.MustNewVersion("7.9.0") - if !esVersion.LessThan(flagVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - for _, p := range processors { - processor, ok := p.(map[string]interface{}) +func modifySetProcessor(pipelineID string, processor map[string]interface{}) error { + if options, ok := processor["set"].(map[string]interface{}); ok { + _, ok := options["ignore_empty_value"].(bool) if !ok { - continue + // don't have ignore_empty_value nothing to do + return nil } - if options, ok := processor["set"].(map[string]interface{}); ok { - _, ok := options["ignore_empty_value"].(bool) - if !ok { - // don't have ignore_empty_value nothing to do - continue - } - logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID) - delete(options, "ignore_empty_value") + logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID) + delete(options, "ignore_empty_value") - _, ok = options["if"].(string) - if ok { - // assume if check is sufficient - continue - } - val, ok := options["value"].(string) - if !ok { - continue - } + _, ok = options["if"].(string) + if ok { + // assume if check is sufficient + return nil + } + val, ok := options["value"].(string) + if !ok { + return nil + } - newIf := strings.TrimLeft(val, "{ ") - newIf = strings.TrimRight(newIf, "} ") - newIf = strings.ReplaceAll(newIf, ".", "?.") - newIf = "ctx?." + newIf + " != null" + newIf := strings.TrimLeft(val, "{ ") + newIf = strings.TrimRight(newIf, "} ") + newIf = strings.ReplaceAll(newIf, ".", "?.") + newIf = "ctx?." + newIf + " != null" - logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf) - options["if"] = newIf - } + logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf) + options["if"] = newIf } return nil } // modifyAppendProcessor replaces allow_duplicates option with an if statement // so ES less than 7.10 will still work -func modifyAppendProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - flagVersion := common.MustNewVersion("7.10.0") - if !esVersion.LessThan(flagVersion) { - return nil - } - - p, ok := content["processors"] - if !ok { - return nil - } - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - for _, p := range processors { - processor, ok := p.(map[string]interface{}) +func modifyAppendProcessor(pipelineID string, processor map[string]interface{}) error { + if options, ok := processor["append"].(map[string]interface{}); ok { + allow, ok := options["allow_duplicates"].(bool) if !ok { - continue + // don't have allow_duplicates, nothing to do + return nil } - if options, ok := processor["append"].(map[string]interface{}); ok { - allow, ok := options["allow_duplicates"].(bool) - if !ok { - // don't have allow_duplicates, nothing to do - continue - } - logp.Debug("modules", "In pipeline %q removing unsupported 'allow_duplicates' in append processor", pipelineID) - delete(options, "allow_duplicates") - if allow { - // it was set to true, nothing else to do after removing the option - continue - } + logp.Debug("modules", "In pipeline %q removing unsupported 'allow_duplicates' in append processor", pipelineID) + delete(options, "allow_duplicates") + if allow { + // it was set to true, nothing else to do after removing the option + return nil + } - currIf, _ := options["if"].(string) - if strings.Contains(strings.ToLower(currIf), "contains") { - // if it has a contains statement, we assume it is checking for duplicates already - continue - } - field, ok := options["field"].(string) - if !ok { - continue - } - val, ok := options["value"].(string) - if !ok { - continue - } + currIf, _ := options["if"].(string) + if strings.Contains(strings.ToLower(currIf), "contains") { + // if it has a contains statement, we assume it is checking for duplicates already + return nil + } + field, ok := options["field"].(string) + if !ok { + return nil + } + val, ok := options["value"].(string) + if !ok { + return nil + } - field = strings.ReplaceAll(field, ".", "?.") + field = strings.ReplaceAll(field, ".", "?.") - val = strings.TrimLeft(val, "{ ") - val = strings.TrimRight(val, "} ") - val = strings.ReplaceAll(val, ".", "?.") + val = strings.TrimLeft(val, "{ ") + val = strings.TrimRight(val, "} ") + val = strings.ReplaceAll(val, ".", "?.") - if currIf == "" { - // if there is not a previous if we add a value sanity check - currIf = fmt.Sprintf("ctx?.%s != null", val) - } + if currIf == "" { + // if there is not a previous if we add a value sanity check + currIf = fmt.Sprintf("ctx?.%s != null", val) + } - newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) + newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) - logp.Debug("modules", "In pipeline %q adding if %s to replace 'allow_duplicates: false' in append processor", pipelineID, newIf) - options["if"] = newIf - } + logp.Debug("modules", "In pipeline %q adding if %s to replace 'allow_duplicates: false' in append processor", pipelineID, newIf) + options["if"] = newIf } return nil } diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 7c617034f107..86074f897edc 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -193,6 +193,7 @@ func TestSetEcsProcessors(t *testing.T) { map[string]interface{}{ "user_agent": map[string]interface{}{ "field": "foo.http_user_agent", + "ecs": true, }, }, }, @@ -205,7 +206,7 @@ func TestSetEcsProcessors(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { t.Parallel() - err := setECSProcessors(*test.esVersion, "foo-pipeline", test.content) + err := setProcessors(*test.esVersion, "foo-pipeline", test.content) if test.isErrExpected { assert.Error(t, err) } else { @@ -382,7 +383,7 @@ func TestModifySetProcessor(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { t.Parallel() - err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content) + err := setProcessors(*test.esVersion, "foo-pipeline", test.content) if test.isErrExpected { assert.Error(t, err) } else { @@ -584,7 +585,7 @@ func TestModifyAppendProcessor(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { t.Parallel() - err := modifyAppendProcessor(*test.esVersion, "foo-pipeline", test.content) + err := setProcessors(*test.esVersion, "foo-pipeline", test.content) if test.isErrExpected { assert.Error(t, err) } else { From 04a2b072fa32654b73e5c567d6d9fab3f8725735 Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Tue, 9 Feb 2021 11:52:54 +0100 Subject: [PATCH 06/10] added testcases for uri parts --- filebeat/fileset/pipelines.go | 23 ++++-- filebeat/fileset/pipelines_test.go | 125 +++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 7 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 3c5e36072eaa..5acaf6dc6d7c 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -149,12 +149,9 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } -// setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 -// and ES is 6.7.X to ease migration to ECS. -func setECSProcessors(pipelineID string, processor map[string]interface{}) error { - return errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") -} - +// setProcessors iterates over all configured processors and performs the +// function related to it. If no function is set, it will delete the processor if +// the version of ES is under the required version number. func setProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { p, ok := content["processors"] if !ok { @@ -191,7 +188,9 @@ func setProcessors(esVersion common.Version, pipelineID string, content map[stri }, } var newProcessors []interface{} + var appendProcessor bool for i, p := range processors { + appendProcessor = true processor, ok := p.(map[string]interface{}) if !ok { continue @@ -214,16 +213,26 @@ func setProcessors(esVersion common.Version, pipelineID string, content map[stri if err := proc.fn(pipelineID, processor); err != nil { return err } + } else { + appendProcessor = false } } } - newProcessors = append(newProcessors, processors[i]) + if appendProcessor { + newProcessors = append(newProcessors, processors[i]) + } } content["processors"] = newProcessors return nil } +// setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 +// and ES is 6.7.X to ease migration to ECS. +func setECSProcessors(pipelineID string, processor map[string]interface{}) error { + return errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") +} + func deletePipeline(esClient PipelineLoader, pipelineID string) error { path := makeIngestPipelinePath(pipelineID) _, _, err := esClient.Request("DELETE", path, "", nil, nil) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 86074f897edc..5fcaafcb94f1 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -595,3 +595,128 @@ func TestModifyAppendProcessor(t *testing.T) { }) } } + +func TestRemoveURIPartsProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.12.0", + esVersion: common.MustNewVersion("7.11.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.12.0", + esVersion: common.MustNewVersion("7.12.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES > 7.12.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := setProcessors(*test.esVersion, "foo-pipeline", test.content) + if test.isErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} From e26a9d13aefe1a5601567346124a90a4a73388a6 Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Thu, 11 Feb 2021 17:34:38 +0100 Subject: [PATCH 07/10] stashing changes --- filebeat/fileset/pipelines.go | 208 +++++++++++++++++----------------- 1 file changed, 107 insertions(+), 101 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 5acaf6dc6d7c..961b37c3b7e7 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" + "github.com/davecgh/go-spew/spew" "github.com/joeshaw/multierror" "github.com/elastic/beats/v7/libbeat/common" @@ -49,17 +50,34 @@ type MultiplePipelineUnsupportedError struct { minESVersionRequired common.Version } -// Processor defines a single processors minimum version requirements. -type Processor struct { - minVersion *common.Version - name string - fn func(pipelineID string, processor map[string]interface{}) error +// processorCompatibility defines a single processors minimum version requirements. +type processorCompatibility struct { + minVersion *common.Version + name string + makeConfigCompatible func(log *logp.Logger, processor map[string]interface{}) error } -// Processors is a slice of single processor's, used to check the minimum version -// requirement per processor defined. -type Processors struct { - Processors []Processor +var processorCompatibilityChecks = []processorCompatibility{ + { + name: "uri_parts", + minVersion: common.MustNewVersion("7.12.0"), + makeConfigCompatible: nil, + }, + { + name: "set", + minVersion: common.MustNewVersion("7.9.0"), + makeConfigCompatible: modifySetProcessor, + }, + { + name: "append", + minVersion: common.MustNewVersion("7.10.0"), + makeConfigCompatible: modifyAppendProcessor, + }, + { + name: "user_agent", + minVersion: common.MustNewVersion("6.7.0"), + makeConfigCompatible: setECSProcessors, + }, } func (m MultiplePipelineUnsupportedError) Error() string { @@ -135,7 +153,7 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } } - + spew.Dump(content) err := setProcessors(esClient.GetVersion(), pipelineID, content) if err != nil { return fmt.Errorf("Failed to adapt pipeline with backwards compatibility changes: %w", err) @@ -153,40 +171,19 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string // function related to it. If no function is set, it will delete the processor if // the version of ES is under the required version number. func setProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + log := logp.NewLogger("fileset").With("pipeline", pipelineID) p, ok := content["processors"] if !ok { return nil } + processors, ok := p.([]interface{}) if !ok { return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) } // A list of all processor names and versions to be checked. - versionChecks := Processors{ - Processors: []Processor{ - { - name: "uri_parts", - minVersion: common.MustNewVersion("7.12.0"), - fn: nil, - }, - { - name: "set", - minVersion: common.MustNewVersion("7.9.0"), - fn: modifySetProcessor, - }, - { - name: "append", - minVersion: common.MustNewVersion("7.10.0"), - fn: modifyAppendProcessor, - }, - { - name: "user_agent", - minVersion: common.MustNewVersion("6.7.0"), - fn: setECSProcessors, - }, - }, - } + var newProcessors []interface{} var appendProcessor bool for i, p := range processors { @@ -195,7 +192,7 @@ func setProcessors(esVersion common.Version, pipelineID string, content map[stri if !ok { continue } - for _, proc := range versionChecks.Processors { + for _, proc := range processorCompatibilityChecks { _, found := processor[proc.name] if !found { continue @@ -209,8 +206,9 @@ func setProcessors(esVersion common.Version, pipelineID string, content map[stri } continue } - if proc.fn != nil { - if err := proc.fn(pipelineID, processor); err != nil { + + if proc.makeConfigCompatible != nil { + if err := proc.makeConfigCompatible(log.With("processor_type", proc.name, "processor_index", i), processor); err != nil { return err } } else { @@ -229,7 +227,7 @@ func setProcessors(esVersion common.Version, pipelineID string, content map[stri // setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 // and ES is 6.7.X to ease migration to ECS. -func setECSProcessors(pipelineID string, processor map[string]interface{}) error { +func setECSProcessors(log *logp.Logger, processor map[string]interface{}) error { return errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") } @@ -299,84 +297,92 @@ func interpretError(initialErr error, body []byte) error { // modifySetProcessor replaces ignore_empty_value option with an if statement // so ES less than 7.9 will still work -func modifySetProcessor(pipelineID string, processor map[string]interface{}) error { - if options, ok := processor["set"].(map[string]interface{}); ok { - _, ok := options["ignore_empty_value"].(bool) - if !ok { - // don't have ignore_empty_value nothing to do - return nil - } +func modifySetProcessor(log *logp.Logger, processor map[string]interface{}) error { + options, ok := processor["set"].(map[string]interface{}) - logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID) - delete(options, "ignore_empty_value") - - _, ok = options["if"].(string) - if ok { - // assume if check is sufficient - return nil - } - val, ok := options["value"].(string) - if !ok { - return nil - } + if !ok { + return nil + } + _, ok = options["ignore_empty_value"].(bool) + if !ok { + // don't have ignore_empty_value nothing to do + return nil + } - newIf := strings.TrimLeft(val, "{ ") - newIf = strings.TrimRight(newIf, "} ") - newIf = strings.ReplaceAll(newIf, ".", "?.") - newIf = "ctx?." + newIf + " != null" + log.Debug("Removing unsupported 'ignore_empty_value' in set processor") + delete(options, "ignore_empty_value") - logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf) - options["if"] = newIf + _, ok = options["if"].(string) + if ok { + // assume if check is sufficient + return nil + } + val, ok := options["value"].(string) + if !ok { + return nil } + + newIf := strings.TrimLeft(val, "{ ") + newIf = strings.TrimRight(newIf, "} ") + newIf = strings.ReplaceAll(newIf, ".", "?.") + newIf = "ctx?." + newIf + " != null" + + log.Debug("adding if %s to replace 'ignore_empty_value' in set processor", newIf) + options["if"] = newIf + return nil } // modifyAppendProcessor replaces allow_duplicates option with an if statement // so ES less than 7.10 will still work -func modifyAppendProcessor(pipelineID string, processor map[string]interface{}) error { - if options, ok := processor["append"].(map[string]interface{}); ok { - allow, ok := options["allow_duplicates"].(bool) - if !ok { - // don't have allow_duplicates, nothing to do - return nil - } - - logp.Debug("modules", "In pipeline %q removing unsupported 'allow_duplicates' in append processor", pipelineID) - delete(options, "allow_duplicates") - if allow { - // it was set to true, nothing else to do after removing the option - return nil - } +func modifyAppendProcessor(log *logp.Logger, processor map[string]interface{}) error { + options, ok := processor["append"].(map[string]interface{}) + if !ok { + return nil + } + allow, ok := options["allow_duplicates"].(bool) - currIf, _ := options["if"].(string) - if strings.Contains(strings.ToLower(currIf), "contains") { - // if it has a contains statement, we assume it is checking for duplicates already - return nil - } - field, ok := options["field"].(string) - if !ok { - return nil - } - val, ok := options["value"].(string) - if !ok { - return nil - } + if !ok { + // don't have allow_duplicates, nothing to do + return nil + } - field = strings.ReplaceAll(field, ".", "?.") + log.Debug("removing unsupported 'allow_duplicates' in append processor") + delete(options, "allow_duplicates") + if allow { + // it was set to true, nothing else to do after removing the option + return nil + } - val = strings.TrimLeft(val, "{ ") - val = strings.TrimRight(val, "} ") - val = strings.ReplaceAll(val, ".", "?.") + currIf, _ := options["if"].(string) + if strings.Contains(strings.ToLower(currIf), "contains") { + // if it has a contains statement, we assume it is checking for duplicates already + return nil + } + field, ok := options["field"].(string) + if !ok { + return nil + } + val, ok := options["value"].(string) + if !ok { + return nil + } - if currIf == "" { - // if there is not a previous if we add a value sanity check - currIf = fmt.Sprintf("ctx?.%s != null", val) - } + field = strings.ReplaceAll(field, ".", "?.") - newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) + val = strings.TrimLeft(val, "{ ") + val = strings.TrimRight(val, "} ") + val = strings.ReplaceAll(val, ".", "?.") - logp.Debug("modules", "In pipeline %q adding if %s to replace 'allow_duplicates: false' in append processor", pipelineID, newIf) - options["if"] = newIf + if currIf == "" { + // if there is not a previous if we add a value sanity check + currIf = fmt.Sprintf("ctx?.%s != null", val) } + + newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) + + log.Debug("adding if %s to replace 'allow_duplicates: false' in append processor", newIf) + options["if"] = newIf + return nil } From 9a89fa985314cded344af90d1bb7c61b630913f4 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sat, 13 Feb 2021 10:50:53 -0500 Subject: [PATCH 08/10] Fix assert.NoError usages These should have been require.NoError otherwise panics occur. --- filebeat/fileset/config_test.go | 3 +- filebeat/fileset/fileset_test.go | 55 ++++++++------- filebeat/fileset/modules_integration_test.go | 25 +++---- filebeat/fileset/modules_test.go | 72 ++++++++++---------- filebeat/fileset/pipelines.go | 3 +- filebeat/fileset/pipelines_test.go | 17 ++--- 6 files changed, 89 insertions(+), 86 deletions(-) diff --git a/filebeat/fileset/config_test.go b/filebeat/fileset/config_test.go index 972d7eff6319..ecefac2e181c 100644 --- a/filebeat/fileset/config_test.go +++ b/filebeat/fileset/config_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" ) @@ -34,7 +35,7 @@ func TestInputSettings(t *testing.T) { } c, err := common.NewConfigFrom(cfg) - assert.NoError(t, err) + require.NoError(t, err) f, err := NewFilesetConfig(c) if assert.NoError(t, err) { diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 4a8087af2b48..430d71e0db7a 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -26,9 +26,8 @@ import ( "testing" "text/template" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -43,9 +42,9 @@ func makeTestInfo(version string) beat.Info { func getModuleForTesting(t *testing.T, module, fileset string) *Fileset { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) fs, err := New(modulesPath, fileset, &ModuleConfig{Module: module}, &FilesetConfig{}) - assert.NoError(t, err) + require.NoError(t, err) return fs } @@ -54,7 +53,7 @@ func TestLoadManifestNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") manifest, err := fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, manifest.ModuleVersion, "1.0") assert.Equal(t, manifest.IngestPipeline, []string{"ingest/pipeline.yml"}) assert.Equal(t, manifest.Input, "config/nginx-access.yml") @@ -69,7 +68,7 @@ func TestGetBuiltinVars(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") vars, err := fs.getBuiltinVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) assert.IsType(t, vars["hostname"], "a-mac-with-esc-key") assert.IsType(t, vars["domain"], "local") @@ -83,10 +82,10 @@ func TestEvaluateVarsNginx(t *testing.T) { var err error fs.manifest, err = fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) vars, err := fs.evaluateVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) assert.IsType(t, "a-mac-with-esc-key", builtin["hostname"]) @@ -97,19 +96,19 @@ func TestEvaluateVarsNginx(t *testing.T) { func TestEvaluateVarsNginxOverride(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ Var: map[string]interface{}{ "pipeline": "no_plugins", }, }) - assert.NoError(t, err) + require.NoError(t, err) fs.manifest, err = fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) vars, err := fs.evaluateVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "no_plugins", vars["pipeline"]) } @@ -119,10 +118,10 @@ func TestEvaluateVarsMySQL(t *testing.T) { var err error fs.manifest, err = fs.readManifest() - assert.NoError(t, err) + require.NoError(t, err) vars, err := fs.evaluateVars(makeTestInfo("6.6.0")) - assert.NoError(t, err) + require.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) assert.IsType(t, "a-mac-with-esc-key", builtin["hostname"]) @@ -172,29 +171,29 @@ func TestResolveVariable(t *testing.T) { for _, test := range tests { result, err := resolveVariable(test.Vars, test.Value) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.Expected, result) } } func TestGetInputConfigNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read(makeTestInfo("5.2.0"))) + require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) cfg, err := fs.getInputConfig() - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, cfg.HasField("paths")) assert.True(t, cfg.HasField("exclude_files")) assert.True(t, cfg.HasField("pipeline")) pipelineID, err := cfg.String("pipeline", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "filebeat-5.2.0-nginx-access-pipeline", pipelineID) } func TestGetInputConfigNginxOverrides(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) tests := map[string]struct { input map[string]interface{} @@ -216,7 +215,7 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { require.True(t, v) pipelineID, err := c.String("pipeline", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "filebeat-5.2.0-nginx-access-pipeline", pipelineID) }, }, @@ -242,12 +241,12 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ Input: test.input, }) - assert.NoError(t, err) + require.NoError(t, err) - assert.NoError(t, fs.Read(makeTestInfo("5.2.0"))) + require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) cfg, err := fs.getInputConfig() - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, cfg.HasField("paths")) assert.True(t, cfg.HasField("exclude_files")) @@ -256,11 +255,11 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { test.expectedFn(t, cfg) moduleName, err := cfg.String("_module_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "nginx", moduleName) filesetName, err := cfg.String("_fileset_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "access", filesetName) }) } @@ -268,11 +267,11 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { func TestGetPipelineNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read(makeTestInfo("5.2.0"))) + require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) version := common.MustNewVersion("5.2.0") pipelines, err := fs.GetPipelines(*version) - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, pipelines, 1) pipeline := pipelines[0] @@ -286,7 +285,7 @@ func TestGetTemplateFunctions(t *testing.T) { "builtin": map[string]interface{}{}, } templateFunctions, err := getTemplateFunctions(vars) - assert.NoError(t, err) + require.NoError(t, err) assert.IsType(t, template.FuncMap{}, templateFunctions) assert.Contains(t, templateFunctions, "inList") assert.Contains(t, templateFunctions, "tojson") diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 00ced07f6b8a..2fceb652294e 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -48,8 +49,8 @@ func TestLoadPipeline(t *testing.T) { content := map[string]interface{}{ "description": "describe pipeline", - "processors": []map[string]interface{}{ - { + "processors": []interface{}{ + map[string]interface{}{ "set": map[string]interface{}{ "field": "foo", "value": "bar", @@ -59,27 +60,27 @@ func TestLoadPipeline(t *testing.T) { } err := loadPipeline(client, "my-pipeline-id", content, false) - assert.NoError(t, err) + require.NoError(t, err) status, _, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 200, status) // loading again shouldn't actually update the pipeline content["description"] = "describe pipeline 2" err = loadPipeline(client, "my-pipeline-id", content, false) - assert.NoError(t, err) + require.NoError(t, err) checkUploadedPipeline(t, client, "describe pipeline") // loading again updates the pipeline err = loadPipeline(client, "my-pipeline-id", content, true) - assert.NoError(t, err) + require.NoError(t, err) checkUploadedPipeline(t, client, "describe pipeline 2") } func checkUploadedPipeline(t *testing.T, client *eslegclient.Connection, expectedDescription string) { status, response, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 200, status) var res map[string]interface{} @@ -99,7 +100,7 @@ func TestSetupNginx(t *testing.T) { client.Request("DELETE", "/_ingest/pipeline/filebeat-5.2.0-nginx-error-pipeline", "", nil, nil) modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) configs := []*ModuleConfig{ &ModuleConfig{Module: "nginx"}, @@ -133,7 +134,7 @@ func TestAvailableProcessors(t *testing.T) { } err := checkAvailableProcessors(client, requiredProcessors) - assert.NoError(t, err) + require.NoError(t, err) // these don't exists on our integration test setup requiredProcessors = []ProcessorRequirement{ @@ -172,7 +173,7 @@ func TestLoadMultiplePipelines(t *testing.T) { client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil) modulesPath, err := filepath.Abs("../_meta/test/module") - assert.NoError(t, err) + require.NoError(t, err) enabled := true disabled := false @@ -217,7 +218,7 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil) modulesPath, err := filepath.Abs("../_meta/test/module") - assert.NoError(t, err) + require.NoError(t, err) enabled := true disabled := false @@ -226,7 +227,7 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { "multibad": &FilesetConfig{Enabled: &enabled}, } configs := []*ModuleConfig{ - &ModuleConfig{"foo", &enabled, filesetConfigs}, + {"foo", &enabled, filesetConfigs}, } reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0")) diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index 371b051084db..11556e85e6c3 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -42,17 +43,17 @@ func load(t *testing.T, from interface{}) *common.Config { func TestNewModuleRegistry(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) configs := []*ModuleConfig{ - &ModuleConfig{Module: "nginx"}, - &ModuleConfig{Module: "mysql"}, - &ModuleConfig{Module: "system"}, - &ModuleConfig{Module: "auditd"}, + {Module: "nginx"}, + {Module: "mysql"}, + {Module: "system"}, + {Module: "auditd"}, } reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) expectedModules := map[string][]string{ @@ -77,14 +78,14 @@ func TestNewModuleRegistry(t *testing.T) { for module, filesets := range reg.registry { for name, fileset := range filesets { cfg, err := fileset.getInputConfig() - assert.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name)) + require.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name)) moduleName, err := cfg.String("_module_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, module, moduleName) filesetName, err := cfg.String("_fileset_name", -1) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, name, filesetName) } } @@ -92,12 +93,12 @@ func TestNewModuleRegistry(t *testing.T) { func TestNewModuleRegistryConfig(t *testing.T) { modulesPath, err := filepath.Abs("../module") - assert.NoError(t, err) + require.NoError(t, err) falseVar := false configs := []*ModuleConfig{ - &ModuleConfig{ + { Module: "nginx", Filesets: map[string]*FilesetConfig{ "access": { @@ -110,29 +111,30 @@ func TestNewModuleRegistryConfig(t *testing.T) { }, }, }, - &ModuleConfig{ + { Module: "mysql", Enabled: &falseVar, }, } reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) nginxAccess := reg.registry["nginx"]["access"] - assert.NotNil(t, nginxAccess) - assert.Equal(t, []interface{}{"/hello/test"}, nginxAccess.vars["paths"]) + if assert.NotNil(t, nginxAccess) { + assert.Equal(t, []interface{}{"/hello/test"}, nginxAccess.vars["paths"]) + } assert.NotContains(t, reg.registry["nginx"], "error") } func TestMovedModule(t *testing.T) { modulesPath, err := filepath.Abs("./test/moved_module") - assert.NoError(t, err) + require.NoError(t, err) configs := []*ModuleConfig{ - &ModuleConfig{ + { Module: "old", Filesets: map[string]*FilesetConfig{ "test": {}, @@ -141,7 +143,7 @@ func TestMovedModule(t *testing.T) { } reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) } @@ -232,7 +234,7 @@ func TestApplyOverrides(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { result, err := applyOverrides(&test.fcfg, test.module, test.fileset, test.overrides) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, &test.expected, result) }) } @@ -251,15 +253,15 @@ func TestAppendWithoutDuplicates(t *testing.T) { configs: []*ModuleConfig{}, modules: []string{"moduleA", "moduleB", "moduleC"}, expected: []*ModuleConfig{ - &ModuleConfig{Module: "moduleA"}, - &ModuleConfig{Module: "moduleB"}, - &ModuleConfig{Module: "moduleC"}, + {Module: "moduleA"}, + {Module: "moduleB"}, + {Module: "moduleC"}, }, }, { name: "eliminate a duplicate, no override", configs: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Filesets: map[string]*FilesetConfig{ "fileset": { @@ -272,7 +274,7 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, modules: []string{"moduleA", "moduleB", "moduleC"}, expected: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Filesets: map[string]*FilesetConfig{ "fileset": { @@ -282,14 +284,14 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, }, }, - &ModuleConfig{Module: "moduleA"}, - &ModuleConfig{Module: "moduleC"}, + {Module: "moduleA"}, + {Module: "moduleC"}, }, }, { name: "disabled config", configs: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Enabled: &falseVar, Filesets: map[string]*FilesetConfig{ @@ -303,7 +305,7 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, modules: []string{"moduleA", "moduleB", "moduleC"}, expected: []*ModuleConfig{ - &ModuleConfig{ + { Module: "moduleB", Enabled: &falseVar, Filesets: map[string]*FilesetConfig{ @@ -314,9 +316,9 @@ func TestAppendWithoutDuplicates(t *testing.T) { }, }, }, - &ModuleConfig{Module: "moduleA"}, - &ModuleConfig{Module: "moduleB"}, - &ModuleConfig{Module: "moduleC"}, + {Module: "moduleA"}, + {Module: "moduleB"}, + {Module: "moduleC"}, }, }, } @@ -324,7 +326,7 @@ func TestAppendWithoutDuplicates(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { result, err := appendWithoutDuplicates(test.configs, test.modules) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected, result) }) } @@ -377,7 +379,7 @@ func TestMcfgFromConfig(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { result, err := mcfgFromConfig(test.config) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected.Module, result.Module) assert.Equal(t, len(test.expected.Filesets), len(result.Filesets)) for name, fileset := range test.expected.Filesets { @@ -397,12 +399,12 @@ func TestMissingModuleFolder(t *testing.T) { } reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0"}, true) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reg) // this should return an empty list, but no error inputs, err := reg.GetInputConfigs() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 0, len(inputs)) } diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 961b37c3b7e7..16c1395218ec 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -23,7 +23,6 @@ import ( "fmt" "strings" - "github.com/davecgh/go-spew/spew" "github.com/joeshaw/multierror" "github.com/elastic/beats/v7/libbeat/common" @@ -153,7 +152,7 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } } - spew.Dump(content) + err := setProcessors(esClient.GetVersion(), pipelineID, content) if err != nil { return fmt.Errorf("Failed to adapt pipeline with backwards compatibility changes: %w", err) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 5fcaafcb94f1..de61e80156d8 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -25,10 +25,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - - "github.com/stretchr/testify/assert" ) func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { @@ -92,10 +93,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { URL: testESServer.URL, Timeout: 90 * time.Second, }) - assert.NoError(t, err) + require.NoError(t, err) err = testESClient.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = testRegistry.LoadPipelines(testESClient, false) if test.isErrExpected { @@ -210,7 +211,7 @@ func TestSetEcsProcessors(t *testing.T) { if test.isErrExpected { assert.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected, test.content) } }) @@ -387,7 +388,7 @@ func TestModifySetProcessor(t *testing.T) { if test.isErrExpected { assert.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected, test.content, test.name) } }) @@ -589,7 +590,7 @@ func TestModifyAppendProcessor(t *testing.T) { if test.isErrExpected { assert.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected, test.content, test.name) } }) @@ -714,7 +715,7 @@ func TestRemoveURIPartsProcessor(t *testing.T) { if test.isErrExpected { assert.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, test.expected, test.content, test.name) } }) From 56b6a09bb91b1593594cd9bdf9b8f87178fe015f Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sat, 13 Feb 2021 16:21:09 -0500 Subject: [PATCH 09/10] Package cleanup Errors should not begin with capitals. Avoid double logging errors (both logging and returning them). Fix usage of deprecated logp functions. Remove unused code (ML module). Removed unneeded import alias. Avoid allocating empty slices. Add missing error check for config Unpack. --- filebeat/fileset/compatibility.go | 224 +++++++ filebeat/fileset/compatibility_test.go | 643 +++++++++++++++++++ filebeat/fileset/factory.go | 18 +- filebeat/fileset/modules.go | 63 +- filebeat/fileset/modules_integration_test.go | 18 +- filebeat/fileset/modules_test.go | 6 +- filebeat/fileset/pipelines.go | 214 +----- filebeat/fileset/pipelines_test.go | 617 +----------------- filebeat/fileset/setup.go | 9 +- 9 files changed, 940 insertions(+), 872 deletions(-) create mode 100644 filebeat/fileset/compatibility.go create mode 100644 filebeat/fileset/compatibility_test.go diff --git a/filebeat/fileset/compatibility.go b/filebeat/fileset/compatibility.go new file mode 100644 index 000000000000..210c93e1b4c4 --- /dev/null +++ b/filebeat/fileset/compatibility.go @@ -0,0 +1,224 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fileset + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// processorCompatibility defines a processor's minimum version requirements or +// a transformation to make it compatible. +type processorCompatibility struct { + checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. + procType string // Elasticsearch Ingest Node processor type. + adaptConfig func(processor map[string]interface{}, log *logp.Logger) (drop bool, err error) // Adapt the configuration to make it compatible. +} + +var processorCompatibilityChecks = []processorCompatibility{ + { + procType: "append", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.10.0")) + }, + adaptConfig: replaceAppendAllowDuplicates, + }, + { + procType: "community_id", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.12.0")) + }, + adaptConfig: deleteProcessor, + }, + { + procType: "set", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.9.0")) + }, + adaptConfig: replaceSetIgnoreEmptyValue, + }, + { + procType: "uri_parts", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.12.0")) + }, + adaptConfig: deleteProcessor, + }, + { + procType: "user_agent", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("7.0.0")) && + !esVersion.LessThan(common.MustNewVersion("6.7.0")) + }, + adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) { + config["ecs"] = true + return false, nil + }, + }, + { + procType: "user_agent", + checkVersion: func(esVersion *common.Version) bool { + return esVersion.LessThan(common.MustNewVersion("6.7.0")) + }, + adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) { + return false, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") + }, + }, +} + +// adaptPipelineForCompatibility iterates over all processors in the pipeline +// and adapts them for version of Elasticsearch used. Adapt can mean modifying +// processor options or removing the processor. +func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) error { + p, ok := content["processors"] + if !ok { + return errors.New("'processors' is missing from the pipeline definition") + } + + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + var filteredProcs []interface{} + +nextProcessor: + for i, obj := range processors { + processor, ok := obj.(map[string]interface{}) + if !ok { + return fmt.Errorf("processor at index %d is not an object, got %T", i, obj) + } + + for _, proc := range processorCompatibilityChecks { + configIfc, found := processor[proc.procType] + if !found { + continue + } + config, ok := configIfc.(map[string]interface{}) + if !ok { + return fmt.Errorf("processor config at index %d is not an object, got %T", i, obj) + } + + if !proc.checkVersion(&esVersion) { + continue + } + + drop, err := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i)) + if err != nil { + return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err) + } + if drop { + continue nextProcessor + } + } + + filteredProcs = append(filteredProcs, processors[i]) + } + + content["processors"] = filteredProcs + return nil +} + +// deleteProcessor returns true to indicate that the processor should be deleted +// in order to adapt the pipeline for backwards compatibility to Elasticsearch. +func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) (bool, error) { return true, nil } + +// replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if +// statement so ES less than 7.9 will work. +func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) (bool, error) { + _, ok := config["ignore_empty_value"].(bool) + if !ok { + return false, nil + } + + log.Debug("Removing unsupported 'ignore_empty_value' from set processor.") + delete(config, "ignore_empty_value") + + _, ok = config["if"].(string) + if ok { + // assume if check is sufficient + return false, nil + } + val, ok := config["value"].(string) + if !ok { + return false, nil + } + + newIf := strings.TrimLeft(val, "{ ") + newIf = strings.TrimRight(newIf, "} ") + newIf = strings.ReplaceAll(newIf, ".", "?.") + newIf = "ctx?." + newIf + " != null" + + log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) + config["if"] = newIf + return false, nil +} + +// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement +// so ES less than 7.10 will work. +func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) { + allow, ok := config["allow_duplicates"].(bool) + if !ok { + return false, nil + } + + log.Debug("Removing unsupported 'allow_duplicates' from append processor.") + delete(config, "allow_duplicates") + + if allow { + // It was set to true, nothing else to do after removing the option. + return false, nil + } + + currIf, _ := config["if"].(string) + if strings.Contains(strings.ToLower(currIf), "contains") { + // If it has a contains statement, we assume it is checking for duplicates already. + return false, nil + } + field, ok := config["field"].(string) + if !ok { + return false, nil + } + val, ok := config["value"].(string) + if !ok { + return false, nil + } + + field = strings.ReplaceAll(field, ".", "?.") + + val = strings.TrimLeft(val, "{ ") + val = strings.TrimRight(val, "} ") + val = strings.ReplaceAll(val, ".", "?.") + + if currIf == "" { + // if there is not a previous if we add a value sanity check + currIf = fmt.Sprintf("ctx?.%s != null", val) + } + + newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) + + log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) + config["if"] = newIf + + return false, nil +} diff --git a/filebeat/fileset/compatibility_test.go b/filebeat/fileset/compatibility_test.go new file mode 100644 index 000000000000..bc089879082b --- /dev/null +++ b/filebeat/fileset/compatibility_test.go @@ -0,0 +1,643 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package fileset + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestAdaptPipelineForCompatibility(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 6.7.0", + esVersion: common.MustNewVersion("6.6.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }}, + isErrExpected: true, + }, + { + name: "ES == 6.7.0", + esVersion: common.MustNewVersion("6.7.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES >= 7.0.0", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content) + } + }) + } +} + +func TestReplaceSetIgnoreEmptyValue(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.9.0", + esVersion: common.MustNewVersion("7.8.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.9.0", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.9.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ignore_empty_value is false", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": false, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "ignore_empty_value": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} + +func TestReplaceAppendAllowDuplicates(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.10.0: set to true", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: set to false", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.10.0", + esVersion: common.MustNewVersion("7.10.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.10.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: existing if with contains", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES < 7.10.0: no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "allow_duplicates": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} + +func TestRemoveURIPartsProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.12.0", + esVersion: common.MustNewVersion("7.11.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.12.0", + esVersion: common.MustNewVersion("7.12.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES > 7.12.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "uri_parts": map[string]interface{}{ + "field": "test.url", + "target_field": "url", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "test.field", + "value": "testvalue", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index b0656205321e..43aa8e1636ed 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -57,6 +57,7 @@ type inputsRunner struct { pipelineLoaderFactory PipelineLoaderFactory pipelineCallbackID uuid.UUID overwritePipelines bool + log *logp.Logger } // NewFactory instantiates a new Factory @@ -84,7 +85,9 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru // Hash module ID var h map[string]interface{} - c.Unpack(&h) + if err = c.Unpack(&h); err != nil { + return nil, fmt.Errorf("failed to unpack config: %w", err) + } id, err := hashstructure.Hash(h, nil) if err != nil { return nil, err @@ -94,8 +97,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru for i, pConfig := range pConfigs { inputs[i], err = f.inputFactory.Create(p, pConfig) if err != nil { - logp.Err("Error creating input: %s", err) - return nil, err + return nil, fmt.Errorf("failed to create input: %w", err) } } @@ -106,6 +108,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru pipelineLoaderFactory: f.pipelineLoaderFactory, pipelineCallbackID: f.pipelineCallbackID, overwritePipelines: f.overwritePipelines, + log: logp.NewLogger(logName), }, nil } @@ -118,8 +121,7 @@ func (f *Factory) CheckConfig(c *common.Config) error { for _, pConfig := range pConfigs { err = f.inputFactory.CheckConfig(pConfig) if err != nil { - logp.Err("Error checking input configuration: %s", err) - return err + return fmt.Errorf("error checking input configuration: %w", err) } } @@ -153,12 +155,12 @@ func (p *inputsRunner) Start() { // makes it possible to try to load pipeline when ES becomes reachable. pipelineLoader, err := p.pipelineLoaderFactory() if err != nil { - logp.Err("Error loading pipeline: %s", err) + p.log.Errorf("Error loading pipeline: %s", err) } else { err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines) if err != nil { // Log error and continue - logp.Err("Error loading pipeline: %s", err) + p.log.Errorf("Error loading pipeline: %s", err) } } @@ -168,7 +170,7 @@ func (p *inputsRunner) Start() { } p.pipelineCallbackID, err = elasticsearch.RegisterConnectCallback(callback) if err != nil { - logp.Err("Error registering connect callback for Elasticsearch to load pipelines: %v", err) + p.log.Errorf("Error registering connect callback for Elasticsearch to load pipelines: %v", err) } } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index aa0260031cea..3df41999f8fb 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -26,7 +26,7 @@ import ( "strings" "github.com/pkg/errors" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -34,13 +34,11 @@ import ( "github.com/elastic/beats/v7/libbeat/paths" ) -var availableMLModules = map[string]string{ - "apache": "access", - "nginx": "access", -} +const logName = "modules" type ModuleRegistry struct { registry map[string]map[string]*Fileset // module -> fileset -> Fileset + log *logp.Logger } // newModuleRegistry reads and loads the configured module into the registry. @@ -49,25 +47,26 @@ func newModuleRegistry(modulesPath string, overrides *ModuleOverrides, beatInfo beat.Info, ) (*ModuleRegistry, error) { - - var reg ModuleRegistry - reg.registry = map[string]map[string]*Fileset{} + reg := ModuleRegistry{ + registry: map[string]map[string]*Fileset{}, + log: logp.NewLogger(logName), + } for _, mcfg := range moduleConfigs { - if mcfg.Enabled != nil && (*mcfg.Enabled) == false { + if mcfg.Enabled != nil && !(*mcfg.Enabled) { continue } // Look for moved modules if module, moved := getCurrentModuleName(modulesPath, mcfg.Module); moved { - logp.Warn("Using old name '%s' for module '%s', please update your configuration", mcfg.Module, module) + reg.log.Warnf("Configuration uses the old name %q for module %q, please update your configuration.", mcfg.Module, module) mcfg.Module = module } reg.registry[mcfg.Module] = map[string]*Fileset{} moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module) if err != nil { - return nil, fmt.Errorf("Error getting filesets for module %s: %v", mcfg.Module, err) + return nil, fmt.Errorf("error getting filesets for module %s: %v", mcfg.Module, err) } for _, filesetName := range moduleFilesets { @@ -78,10 +77,10 @@ func newModuleRegistry(modulesPath string, fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides) if err != nil { - return nil, fmt.Errorf("Error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err) + return nil, fmt.Errorf("error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err) } - if fcfg.Enabled != nil && (*fcfg.Enabled) == false { + if fcfg.Enabled != nil && !(*fcfg.Enabled) { continue } @@ -89,16 +88,15 @@ func newModuleRegistry(modulesPath string, if err != nil { return nil, err } - err = fileset.Read(beatInfo) - if err != nil { - return nil, fmt.Errorf("Error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) + if err = fileset.Read(beatInfo); err != nil { + return nil, fmt.Errorf("error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) } reg.registry[mcfg.Module][filesetName] = fileset } // check that no extra filesets are configured for filesetName, fcfg := range mcfg.Filesets { - if fcfg.Enabled != nil && (*fcfg.Enabled) == false { + if fcfg.Enabled != nil && !(*fcfg.Enabled) { continue } found := false @@ -122,8 +120,9 @@ func NewModuleRegistry(moduleConfigs []*common.Config, beatInfo beat.Info, init stat, err := os.Stat(modulesPath) if err != nil || !stat.IsDir() { - logp.Err("Not loading modules. Module directory not found: %s", modulesPath) - return &ModuleRegistry{}, nil // empty registry, no error + log := logp.NewLogger(logName) + log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) + return &ModuleRegistry{log: log}, nil // empty registry, no error } var modulesCLIList []string @@ -217,7 +216,7 @@ func getModuleFilesets(modulePath, module string) ([]string, error) { return []string{}, err } - filesets := []string{} + var filesets []string for _, fi := range fileInfos { if fi.IsDir() { // check also that the `manifest.yml` file exists @@ -246,7 +245,7 @@ func applyOverrides(fcfg *FilesetConfig, config, err := common.NewConfigFrom(fcfg) if err != nil { - return nil, fmt.Errorf("Error creating vars config object: %v", err) + return nil, fmt.Errorf("error creating vars config object: %v", err) } toMerge := []*common.Config{config} @@ -254,12 +253,12 @@ func applyOverrides(fcfg *FilesetConfig, resultConfig, err := common.MergeConfigs(toMerge...) if err != nil { - return nil, fmt.Errorf("Error merging configs: %v", err) + return nil, fmt.Errorf("error merging configs: %v", err) } res, err := NewFilesetConfig(resultConfig) if err != nil { - return nil, fmt.Errorf("Error unpacking configs: %v", err) + return nil, fmt.Errorf("error unpacking configs: %v", err) } return res, nil @@ -275,7 +274,7 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([ // built a dictionary with the configured modules modulesMap := map[string]bool{} for _, mcfg := range moduleConfigs { - if mcfg.Enabled != nil && (*mcfg.Enabled) == false { + if mcfg.Enabled != nil && !(*mcfg.Enabled) { continue } modulesMap[mcfg.Module] = true @@ -291,12 +290,12 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([ } func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { - result := []*common.Config{} + var result []*common.Config for module, filesets := range reg.registry { for name, fileset := range filesets { fcfg, err := fileset.getInputConfig() if err != nil { - return result, fmt.Errorf("Error getting config for fileset %s/%s: %v", + return result, fmt.Errorf("error getting config for fileset %s/%s: %v", module, name, err) } result = append(result, fcfg) @@ -340,17 +339,17 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc } status, body, err := esClient.Request("GET", "/_nodes/ingest", "", nil, nil) if err != nil { - return fmt.Errorf("Error querying _nodes/ingest: %v", err) + return fmt.Errorf("error querying _nodes/ingest: %v", err) } if status > 299 { - return fmt.Errorf("Error querying _nodes/ingest. Status: %d. Response body: %s", status, body) + return fmt.Errorf("error querying _nodes/ingest. Status: %d. Response body: %s", status, body) } err = json.Unmarshal(body, &response) if err != nil { - return fmt.Errorf("Error unmarshaling json when querying _nodes/ingest. Body: %s", body) + return fmt.Errorf("error unmarshaling json when querying _nodes/ingest. Body: %s", body) } - missing := []ProcessorRequirement{} + var missing []ProcessorRequirement for _, requiredProcessor := range requiredProcessors { for _, node := range response.Nodes { available := false @@ -368,11 +367,11 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc } if len(missing) > 0 { - missingPlugins := []string{} + var missingPlugins []string for _, proc := range missing { missingPlugins = append(missingPlugins, proc.Plugin) } - errorMsg := fmt.Sprintf("This module requires the following Elasticsearch plugins: %s. "+ + errorMsg := fmt.Sprintf("this module requires the following Elasticsearch plugins: %s. "+ "You can install them by running the following commands on all the Elasticsearch nodes:", strings.Join(missingPlugins, ", ")) for _, plugin := range missingPlugins { diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 2fceb652294e..b4b5c40bb754 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" + "github.com/elastic/beats/v7/libbeat/logp" ) func makeTestInfo(version string) beat.Info { @@ -59,7 +60,8 @@ func TestLoadPipeline(t *testing.T) { }, } - err := loadPipeline(client, "my-pipeline-id", content, false) + log := logp.NewLogger(logName) + err := loadPipeline(client, "my-pipeline-id", content, false, log) require.NoError(t, err) status, _, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) @@ -68,12 +70,12 @@ func TestLoadPipeline(t *testing.T) { // loading again shouldn't actually update the pipeline content["description"] = "describe pipeline 2" - err = loadPipeline(client, "my-pipeline-id", content, false) + err = loadPipeline(client, "my-pipeline-id", content, false, log) require.NoError(t, err) checkUploadedPipeline(t, client, "describe pipeline") // loading again updates the pipeline - err = loadPipeline(client, "my-pipeline-id", content, true) + err = loadPipeline(client, "my-pipeline-id", content, true, log) require.NoError(t, err) checkUploadedPipeline(t, client, "describe pipeline 2") } @@ -103,7 +105,7 @@ func TestSetupNginx(t *testing.T) { require.NoError(t, err) configs := []*ModuleConfig{ - &ModuleConfig{Module: "nginx"}, + {Module: "nginx"}, } reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0")) @@ -178,8 +180,8 @@ func TestLoadMultiplePipelines(t *testing.T) { enabled := true disabled := false filesetConfigs := map[string]*FilesetConfig{ - "multi": &FilesetConfig{Enabled: &enabled}, - "multibad": &FilesetConfig{Enabled: &disabled}, + "multi": {Enabled: &enabled}, + "multibad": {Enabled: &disabled}, } configs := []*ModuleConfig{ &ModuleConfig{"foo", &enabled, filesetConfigs}, @@ -223,8 +225,8 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { enabled := true disabled := false filesetConfigs := map[string]*FilesetConfig{ - "multi": &FilesetConfig{Enabled: &disabled}, - "multibad": &FilesetConfig{Enabled: &enabled}, + "multi": {Enabled: &disabled}, + "multibad": {Enabled: &enabled}, } configs := []*ModuleConfig{ {"foo", &enabled, filesetConfigs}, diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index 11556e85e6c3..f69db27648c5 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -417,19 +417,19 @@ func TestInterpretError(t *testing.T) { { Test: "other plugin not installed", Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`, - Output: "This module requires an Elasticsearch plugin that provides the hello_test processor. " + + Output: "this module requires an Elasticsearch plugin that provides the hello_test processor. " + "Please visit the Elasticsearch documentation for instructions on how to install this plugin. " + "Response body: " + `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`, }, { Test: "Elasticsearch 2.4", Input: `{"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`, - Output: `The Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`, + Output: `the Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`, }, { Test: "Elasticsearch 1.7", Input: `{"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`, - Output: `The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`, + Output: `the Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`, }, { Test: "bad json", diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 16c1395218ec..a3ce0b3de423 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -19,7 +19,6 @@ package fileset import ( "encoding/json" - "errors" "fmt" "strings" @@ -49,36 +48,6 @@ type MultiplePipelineUnsupportedError struct { minESVersionRequired common.Version } -// processorCompatibility defines a single processors minimum version requirements. -type processorCompatibility struct { - minVersion *common.Version - name string - makeConfigCompatible func(log *logp.Logger, processor map[string]interface{}) error -} - -var processorCompatibilityChecks = []processorCompatibility{ - { - name: "uri_parts", - minVersion: common.MustNewVersion("7.12.0"), - makeConfigCompatible: nil, - }, - { - name: "set", - minVersion: common.MustNewVersion("7.9.0"), - makeConfigCompatible: modifySetProcessor, - }, - { - name: "append", - minVersion: common.MustNewVersion("7.10.0"), - makeConfigCompatible: modifyAppendProcessor, - }, - { - name: "user_agent", - minVersion: common.MustNewVersion("6.7.0"), - makeConfigCompatible: setECSProcessors, - }, -} - func (m MultiplePipelineUnsupportedError) Error() string { return fmt.Sprintf( "the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s", @@ -95,17 +64,17 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool for name, fileset := range filesets { // check that all the required Ingest Node plugins are available requiredProcessors := fileset.GetRequiredProcessors() - logp.Debug("modules", "Required processors: %s", requiredProcessors) + reg.log.Debugf("Required processors: %s", requiredProcessors) if len(requiredProcessors) > 0 { err := checkAvailableProcessors(esClient, requiredProcessors) if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + return fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err) } } pipelines, err := fileset.GetPipelines(esClient.GetVersion()) if err != nil { - return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) + return fmt.Errorf("error getting pipeline for fileset %s/%s: %v", module, name, err) } // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 @@ -117,9 +86,9 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool var pipelineIDsLoaded []string for _, pipeline := range pipelines { - err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite) + err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id)) if err != nil { - err = fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err) break } pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id) @@ -143,93 +112,28 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool return nil } -func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error { +func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool, log *logp.Logger) error { path := makeIngestPipelinePath(pipelineID) if !overwrite { status, _, _ := esClient.Request("GET", path, "", nil, nil) if status == 200 { - logp.Debug("modules", "Pipeline %s already loaded", pipelineID) + log.Debug("Pipeline already exists in Elasticsearch.") return nil } } - err := setProcessors(esClient.GetVersion(), pipelineID, content) - if err != nil { - return fmt.Errorf("Failed to adapt pipeline with backwards compatibility changes: %w", err) + if err := adaptPipelineForCompatibility(esClient.GetVersion(), pipelineID, content, log); err != nil { + return fmt.Errorf("failed to adapt pipeline with backwards compatibility changes: %w", err) } body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) } - logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) - return nil -} - -// setProcessors iterates over all configured processors and performs the -// function related to it. If no function is set, it will delete the processor if -// the version of ES is under the required version number. -func setProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { - log := logp.NewLogger("fileset").With("pipeline", pipelineID) - p, ok := content["processors"] - if !ok { - return nil - } - - processors, ok := p.([]interface{}) - if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) - } - - // A list of all processor names and versions to be checked. - - var newProcessors []interface{} - var appendProcessor bool - for i, p := range processors { - appendProcessor = true - processor, ok := p.(map[string]interface{}) - if !ok { - continue - } - for _, proc := range processorCompatibilityChecks { - _, found := processor[proc.name] - if !found { - continue - } - - if options, ok := processor[proc.name].(map[string]interface{}); ok { - if !esVersion.LessThan(proc.minVersion) { - if proc.name == "user_agent" { - logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) - options["ecs"] = true - } - continue - } - - if proc.makeConfigCompatible != nil { - if err := proc.makeConfigCompatible(log.With("processor_type", proc.name, "processor_index", i), processor); err != nil { - return err - } - } else { - appendProcessor = false - } - } - } - if appendProcessor { - newProcessors = append(newProcessors, processors[i]) - } - - } - content["processors"] = newProcessors + log.Info("Elasticsearch pipeline loaded.") return nil } -// setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 -// and ES is 6.7.X to ease migration to ECS. -func setECSProcessors(log *logp.Logger, processor map[string]interface{}) error { - return errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") -} - func deletePipeline(esClient PipelineLoader, pipelineID string) error { path := makeIngestPipelinePath(pipelineID) _, _, err := esClient.Request("DELETE", path, "", nil, nil) @@ -261,7 +165,7 @@ func interpretError(initialErr error, body []byte) error { } err1x := json.Unmarshal(body, &response1x) if err1x == nil && response1x.Error != "" { - return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+ + return fmt.Errorf("the Filebeat modules require Elasticsearch >= 5.0. "+ "This is the response I got from Elasticsearch: %s", body) } @@ -275,7 +179,7 @@ func interpretError(initialErr error, body []byte) error { strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") && response.Error.RootCause[0].Header.ProcessorType != "" { - return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+ + return fmt.Errorf("this module requires an Elasticsearch plugin that provides the %s processor. "+ "Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+ "Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body) @@ -286,102 +190,10 @@ func interpretError(initialErr error, body []byte) error { response.Error.RootCause[0].Type == "invalid_index_name_exception" && response.Error.RootCause[0].Index == "_ingest" { - return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+ + return fmt.Errorf("the Ingest Node functionality seems to be missing from Elasticsearch. "+ "The Filebeat modules require Elasticsearch >= 5.0. "+ "This is the response I got from Elasticsearch: %s", body) } return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) } - -// modifySetProcessor replaces ignore_empty_value option with an if statement -// so ES less than 7.9 will still work -func modifySetProcessor(log *logp.Logger, processor map[string]interface{}) error { - options, ok := processor["set"].(map[string]interface{}) - - if !ok { - return nil - } - _, ok = options["ignore_empty_value"].(bool) - if !ok { - // don't have ignore_empty_value nothing to do - return nil - } - - log.Debug("Removing unsupported 'ignore_empty_value' in set processor") - delete(options, "ignore_empty_value") - - _, ok = options["if"].(string) - if ok { - // assume if check is sufficient - return nil - } - val, ok := options["value"].(string) - if !ok { - return nil - } - - newIf := strings.TrimLeft(val, "{ ") - newIf = strings.TrimRight(newIf, "} ") - newIf = strings.ReplaceAll(newIf, ".", "?.") - newIf = "ctx?." + newIf + " != null" - - log.Debug("adding if %s to replace 'ignore_empty_value' in set processor", newIf) - options["if"] = newIf - - return nil -} - -// modifyAppendProcessor replaces allow_duplicates option with an if statement -// so ES less than 7.10 will still work -func modifyAppendProcessor(log *logp.Logger, processor map[string]interface{}) error { - options, ok := processor["append"].(map[string]interface{}) - if !ok { - return nil - } - allow, ok := options["allow_duplicates"].(bool) - - if !ok { - // don't have allow_duplicates, nothing to do - return nil - } - - log.Debug("removing unsupported 'allow_duplicates' in append processor") - delete(options, "allow_duplicates") - if allow { - // it was set to true, nothing else to do after removing the option - return nil - } - - currIf, _ := options["if"].(string) - if strings.Contains(strings.ToLower(currIf), "contains") { - // if it has a contains statement, we assume it is checking for duplicates already - return nil - } - field, ok := options["field"].(string) - if !ok { - return nil - } - val, ok := options["value"].(string) - if !ok { - return nil - } - - field = strings.ReplaceAll(field, ".", "?.") - - val = strings.TrimLeft(val, "{ ") - val = strings.TrimRight(val, "} ") - val = strings.ReplaceAll(val, ".", "?.") - - if currIf == "" { - // if there is not a previous if we add a value sanity check - currIf = fmt.Sprintf("ctx?.%s != null", val) - } - - newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) - - log.Debug("adding if %s to replace 'allow_duplicates: false' in append processor", newIf) - options["if"] = newIf - - return nil -} diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index de61e80156d8..04d48c67f019 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -28,8 +28,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/logp" ) func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { @@ -82,6 +82,7 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { "fls": testFileset, }, }, + log: logp.NewLogger(logName), } testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -107,617 +108,3 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }) } } - -func TestSetEcsProcessors(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 6.7.0", - esVersion: common.MustNewVersion("6.6.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }}, - isErrExpected: true, - }, - { - name: "ES == 6.7.0", - esVersion: common.MustNewVersion("6.7.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }, - }, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - "ecs": true, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES >= 7.0.0", - esVersion: common.MustNewVersion("7.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - }, - }, - }, - }, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "rename": map[string]interface{}{ - "field": "foo.src_ip", - "target_field": "source.ip", - }, - }, - map[string]interface{}{ - "user_agent": map[string]interface{}{ - "field": "foo.http_user_agent", - "ecs": true, - }, - }, - }, - }, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := setProcessors(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, test.expected, test.content) - } - }) - } -} - -func TestModifySetProcessor(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 7.9.0", - esVersion: common.MustNewVersion("7.8.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES == 7.9.0", - esVersion: common.MustNewVersion("7.9.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES > 7.9.0", - esVersion: common.MustNewVersion("8.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "existing if", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": true, - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ignore_empty_value is false", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "ignore_empty_value": false, - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "value": "{{panw.panos.ruleset}}", - "if": "ctx?.panw?.panos?.ruleset != null", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "no value", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - "ignore_empty_value": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "rule.name", - }, - }, - }}, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := setProcessors(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, test.expected, test.content, test.name) - } - }) - } -} - -func TestModifyAppendProcessor(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 7.10.0: set to true", - esVersion: common.MustNewVersion("7.9.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": true, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: set to false", - esVersion: common.MustNewVersion("7.9.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES == 7.10.0", - esVersion: common.MustNewVersion("7.10.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES > 7.10.0", - esVersion: common.MustNewVersion("8.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: existing if", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - "if": "ctx?.host?.hostname != null", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: existing if with contains", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "allow_duplicates": false, - "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "value": "{{host.hostname}}", - "if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ES < 7.10.0: no value", - esVersion: common.MustNewVersion("7.7.7"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - "allow_duplicates": false, - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "append": map[string]interface{}{ - "field": "related.hosts", - }, - }, - }}, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := setProcessors(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, test.expected, test.content, test.name) - } - }) - } -} - -func TestRemoveURIPartsProcessor(t *testing.T) { - cases := []struct { - name string - esVersion *common.Version - content map[string]interface{} - expected map[string]interface{} - isErrExpected bool - }{ - { - name: "ES < 7.12.0", - esVersion: common.MustNewVersion("7.11.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "uri_parts": map[string]interface{}{ - "field": "test.url", - "target_field": "url", - }, - }, - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "test.field", - "value": "testvalue", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "test.field", - "value": "testvalue", - }, - }, - }, - }, - isErrExpected: false, - }, - { - name: "ES == 7.12.0", - esVersion: common.MustNewVersion("7.12.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "uri_parts": map[string]interface{}{ - "field": "test.url", - "target_field": "url", - }, - }, - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "test.field", - "value": "testvalue", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "uri_parts": map[string]interface{}{ - "field": "test.url", - "target_field": "url", - }, - }, - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "test.field", - "value": "testvalue", - }, - }, - }}, - isErrExpected: false, - }, - { - name: "ES > 7.12.0", - esVersion: common.MustNewVersion("8.0.0"), - content: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "uri_parts": map[string]interface{}{ - "field": "test.url", - "target_field": "url", - }, - }, - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "test.field", - "value": "testvalue", - }, - }, - }}, - expected: map[string]interface{}{ - "processors": []interface{}{ - map[string]interface{}{ - "uri_parts": map[string]interface{}{ - "field": "test.url", - "target_field": "url", - }, - }, - map[string]interface{}{ - "set": map[string]interface{}{ - "field": "test.field", - "value": "testvalue", - }, - }, - }}, - isErrExpected: false, - }, - } - - for _, test := range cases { - test := test - t.Run(test.name, func(t *testing.T) { - t.Parallel() - err := setProcessors(*test.esVersion, "foo-pipeline", test.content) - if test.isErrExpected { - assert.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, test.expected, test.content, test.name) - } - }) - } -} diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index b76cf2719662..c90916fbac9d 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) @@ -69,20 +68,20 @@ type SetupCfgRunner struct { // Start loads module pipelines for configured modules. func (sr *SetupCfgRunner) Start() { - logp.Debug("fileset", "Loading ingest pipelines for modules from modules.d") + sr.moduleRegistry.log.Debug("Loading ingest pipelines for modules from modules.d") pipelineLoader, err := sr.pipelineLoaderFactory() if err != nil { - logp.Err("Error loading pipeline: %+v", err) + sr.moduleRegistry.log.Errorf("Error loading pipeline: %+v", err) return } err = sr.moduleRegistry.LoadPipelines(pipelineLoader, sr.overwritePipelines) if err != nil { - logp.Err("Error loading pipeline: %s", err) + sr.moduleRegistry.log.Errorf("Error loading pipeline: %s", err) } } -// Stopp of SetupCfgRunner. +// Stop of SetupCfgRunner. func (sr *SetupCfgRunner) Stop() {} // String returns information on the Runner From 68f14b759338f2baea4238e85984c4225dc78d7e Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 14 Feb 2021 09:25:58 -0500 Subject: [PATCH 10/10] Add changelog --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a2cc2c10da88..0f61090ddb43 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -826,6 +826,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added `application/x-www-form-urlencoded` as encode option for httpjson input {pull}23521[23521] - Move aws-s3 input to GA. {pull}23631[23631] - Populate `source.mac` and `destination.mac` for Suricata EVE events. {issue}23706[23706] {pull}23721[23721] +- Added feature to modules to adapt Ingest Node pipelines for compatibility with older Elasticsearch versions by + removing unsupported processors. {pull}23763[23763] - Added RFC6587 framing option for tcp and unix inputs {issue}23663[23663] {pull}23724[23724] *Heartbeat*