diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index bdab0784535..eca8bfdc6a1 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -19,6 +19,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2..master[Check the HEAD di The list below covers the major changes between 7.0.0-alpha2 and master only. ==== Breaking changes +- Outputs receive Index Manager as additional parameter. The index manager can + be used to create an index selector. {pull}10347[10347] ==== Bugfixes @@ -27,3 +29,5 @@ The list below covers the major changes between 7.0.0-alpha2 and master only. - Allow multiple object type configurations per field. {pull}9772[9772] - Move agent metadata addition to a processor. {pull}9952[9952] - Add (*common.Config).Has and (*common.Config).Remove. {pull}10363[10363] +- Introduce ILM and IndexManagment support to beat.Settings. {pull}10347[10347] +- Introduce ILM and IndexManagement support to beat.Settings. {pull}10347[10347] diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 213fb6ac3ec..71cc849cce9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -26,6 +26,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Remove --setup command line flag. {pull}10138[10138] - Remove --version command line flag. {pull}10138[10138] - Remove --configtest command line flag. {pull}10138[10138] +- Move output.elasticsearch.ilm settings to setup.ilm. {pull}10347[10347] +- ILM will be available by default if Elasticsearch > 7.0 is used. {pull}10347[10347] *Auditbeat* @@ -187,6 +189,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `add_fields` processor. {pull}10119[10119] - Add Kibana field formatter to bytes fields. {pull}10184[10184] - Document a few more `auditd.log.*` fields. {pull}10192[10192] +- Add ILM mode `auto` to setup.ilm.enabled setting. This new default value detects if ILM is available {pull}10347[10347] +- Add support to read ILM policy from external JSON file. {pull}10347[10347] +- Add `overwrite` and `check_exists` settings to ILM support. {pull}10347[10347] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index b042b2bb6ea..534778657a1 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -358,11 +358,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "auditbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1009,6 +1004,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "auditbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/auditbeat/auditbeat.yml b/auditbeat/auditbeat.yml index 0004039a77c..c8412360cdd 100644 --- a/auditbeat/auditbeat.yml +++ b/auditbeat/auditbeat.yml @@ -121,9 +121,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index e4f68c1df5a..cdfa2b0ca11 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1059,11 +1059,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "filebeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1710,6 +1705,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "filebeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/filebeat/filebeat.yml b/filebeat/filebeat.yml index 6585d1e49a8..29be4bd93c2 100644 --- a/filebeat/filebeat.yml +++ b/filebeat/filebeat.yml @@ -149,9 +149,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 52e099d8511..22c95ef4b4c 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -105,6 +105,19 @@ filebeat.config.{{ reload_type|default("inputs") }}: {% endif -%} {% endif -%} +{% if ilm %} +setup.ilm: + enabled: {{ ilm.enabled | default("auto") }} + policy_name: libbeat-test-default-policy + {% if ilm.pattern %} + pattern: {{ ilm.pattern }} + {% endif %} + {% if ilm.rollover_alias %} + rollover_alias: {{ ilm.rollover_alias }} + {% endif %} +{% endif %} + + #============================== Autodiscover ================================== {% if autodiscover %} diff --git a/filebeat/tests/system/config/filebeat_inputs.yml.j2 b/filebeat/tests/system/config/filebeat_inputs.yml.j2 index 04ef3c2a6ad..db5628fb603 100644 --- a/filebeat/tests/system/config/filebeat_inputs.yml.j2 +++ b/filebeat/tests/system/config/filebeat_inputs.yml.j2 @@ -7,6 +7,19 @@ filebeat.inputs: {% endfor %} filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} +{% if ilm %} +setup.ilm: + enabled: {{ ilm.enabled | default("auto") }} + policy_name: libbeat-test-default-policy + {% if ilm.pattern %} + pattern: {{ ilm.pattern }} + {% endif %} + {% if ilm.rollover_alias %} + rollover_alias: {{ ilm.rollover_alias }} + {% endif %} +{% endif %} + + output.file: path: {{ output_file_path|default(beat.working_dir + "/output") }} filename: "{{ output_file_filename|default("filebeat") }}" diff --git a/filebeat/tests/system/config/filebeat_modules.yml.j2 b/filebeat/tests/system/config/filebeat_modules.yml.j2 index 62b66e8a5ff..df3a480277c 100644 --- a/filebeat/tests/system/config/filebeat_modules.yml.j2 +++ b/filebeat/tests/system/config/filebeat_modules.yml.j2 @@ -15,3 +15,16 @@ setup.kibana.host: {{ kibana_url }} {% if kibana_path %} setup.dashboards.directory: {{ kibana_path }} {% endif %} + +{% if ilm %} +setup.ilm: + enabled: {{ ilm.enabled | default("auto") }} + policy_name: libbeat-test-default-policy + {% if ilm.pattern %} + pattern: {{ ilm.pattern }} + {% endif %} + {% if ilm.rollover_alias %} + rollover_alias: {{ ilm.rollover_alias }} + {% endif %} +{% endif %} + diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 0804d8ad28b..8ec1a6578f3 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -89,7 +89,7 @@ def test_fileset_file(self, module, fileset, test_file): template_name="filebeat_modules", output=cfgfile, index_name=self.index_name, - elasticsearch_url=self.elasticsearch_url + elasticsearch_url=self.elasticsearch_url, ) self.run_on_file( @@ -111,6 +111,7 @@ def run_on_file(self, module, fileset, test_file, cfgfile): self.filebeat, "-systemTest", "-e", "-d", "*", "-once", "-c", cfgfile, + "-E", "setup.ilm.enabled=false", "-modules={}".format(module), "-M", "{module}.*.enabled=false".format(module=module), "-M", "{module}.{fileset}.enabled=true".format( diff --git a/filebeat/tests/system/test_pipeline.py b/filebeat/tests/system/test_pipeline.py index 8058306a95d..7b5b6c381bd 100644 --- a/filebeat/tests/system/test_pipeline.py +++ b/filebeat/tests/system/test_pipeline.py @@ -62,6 +62,7 @@ def test_input_pipeline_config(self): pipeline="test", setup_template_name=index_name, setup_template_pattern=index_name + "*", + ilm={"enabled": False}, ) os.mkdir(self.working_dir + "/log/") diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 741b2229c25..be2ee368934 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -502,11 +502,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "heartbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1153,6 +1148,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "heartbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index 6e1d31515e7..7424bf421b3 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -106,9 +106,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index f31f7c8b2a9..4ba3987a172 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -292,11 +292,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "journalbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -943,6 +938,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "journalbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index 06326ece081..dbdea0ea016 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -112,9 +112,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index fb77508c109..c88d26a22f1 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -246,11 +246,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "beat-index-prefix" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -897,6 +892,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "beat-index-prefix" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/libbeat/_meta/config.yml b/libbeat/_meta/config.yml index c038793ef68..9364f344a39 100644 --- a/libbeat/_meta/config.yml +++ b/libbeat/_meta/config.yml @@ -66,9 +66,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/libbeat/cmd/export.go b/libbeat/cmd/export.go index 07f9cd16da0..2bd37f33adb 100644 --- a/libbeat/cmd/export.go +++ b/libbeat/cmd/export.go @@ -33,7 +33,7 @@ func genExportCmd(settings instance.Settings, name, idxPrefix, beatVersion strin exportCmd.AddCommand(export.GenExportConfigCmd(settings, name, idxPrefix, beatVersion)) exportCmd.AddCommand(export.GenTemplateConfigCmd(settings, name, idxPrefix, beatVersion)) exportCmd.AddCommand(export.GenDashboardCmd(name, idxPrefix, beatVersion)) - exportCmd.AddCommand(export.GenGetILMPolicyCmd()) + exportCmd.AddCommand(export.GenGetILMPolicyCmd(settings, name, idxPrefix, beatVersion)) return exportCmd } diff --git a/libbeat/cmd/export/ilm_policy.go b/libbeat/cmd/export/ilm_policy.go index a9e4542fc29..9850da3527c 100644 --- a/libbeat/cmd/export/ilm_policy.go +++ b/libbeat/cmd/export/ilm_policy.go @@ -19,19 +19,43 @@ package export import ( "fmt" + "os" "github.com/spf13/cobra" "github.com/elastic/beats/libbeat/cmd/instance" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" ) // GenGetILMPolicyCmd is the command used to export the ilm policy. -func GenGetILMPolicyCmd() *cobra.Command { +func GenGetILMPolicyCmd(settings instance.Settings, name, idxPrefix, version string) *cobra.Command { genTemplateConfigCmd := &cobra.Command{ Use: "ilm-policy", Short: "Export ILM policy", Run: func(cmd *cobra.Command, args []string) { - fmt.Println(instance.ILMPolicy.StringToPrint()) + b, err := instance.NewBeat(name, idxPrefix, version) + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) + os.Exit(1) + } + err = b.InitWithSettings(settings) + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) + os.Exit(1) + } + + ilmFactory := settings.ILM + if ilmFactory == nil { + ilmFactory = ilm.DefaultSupport + } + + ilm, err := ilmFactory(nil, b.Info, b.RawConfig) + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing ILM support: %s\n", err) + } + + fmt.Println(common.MapStr(ilm.Policy().Body).StringToPrint()) }, } diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index 1f4dea5751a..e804a0df5ce 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -25,6 +25,8 @@ import ( "github.com/elastic/beats/libbeat/cmd/instance" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/template" ) @@ -36,65 +38,83 @@ func GenTemplateConfigCmd(settings instance.Settings, name, idxPrefix, beatVersi Run: func(cmd *cobra.Command, args []string) { version, _ := cmd.Flags().GetString("es.version") index, _ := cmd.Flags().GetString("index") + noILM, _ := cmd.Flags().GetBool("noilm") b, err := instance.NewBeat(name, idxPrefix, beatVersion) if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) + fatalf("Error initializing beat: %+v", err) } err = b.InitWithSettings(settings) if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) - } - - cfg := template.DefaultConfig - if b.Config.Template.Enabled() { - err = b.Config.Template.Unpack(&cfg) - if err != nil { - fmt.Fprintf(os.Stderr, "Error getting template settings: %+v", err) - os.Exit(1) - } + fatalf("Error initializing beat: %+v", err) } if version == "" { version = b.Info.Version } - esVersion, err := common.NewVersion(version) if err != nil { - fmt.Fprintf(os.Stderr, "Invalid Elasticsearch version: %s\n", err) + fatalf("Invalid Elasticsearch version: %+v", err) + } + + imFactory := settings.IndexManagement + if imFactory == nil { + imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) + } + indexManager, err := imFactory(logp.NewLogger("index-management"), b.Info, b.RawConfig) + if err != nil { + fatalf("Error initializing the index manager: %+v", err) + } + + tmplCfg, err := indexManager.TemplateConfig(!noILM) + if err != nil { + fatalf("Template error detected: %+v", err) + } + if tmplCfg.Enabled == false { + tmplCfg = template.DefaultConfig() + } + + var withMigration bool + if b.RawConfig.HasField("migration") { + sub, err := b.RawConfig.Child("migration", -1) + if err != nil { + fatalf("Failed to read migration setting: %+v", err) + } + withMigration = sub.Enabled() } - tmpl, err := template.New(b.Info.Version, index, *esVersion, cfg, b.Config.Migration.Enabled()) + tmpl, err := template.New(b.Info.Version, index, *esVersion, tmplCfg, withMigration) if err != nil { - fmt.Fprintf(os.Stderr, "Error generating template: %+v", err) - os.Exit(1) + fatalf("Error generating template: %+v", err) } var templateString common.MapStr - if cfg.Fields != "" { - fieldsPath := paths.Resolve(paths.Config, cfg.Fields) + if tmplCfg.Fields != "" { + fieldsPath := paths.Resolve(paths.Config, tmplCfg.Fields) templateString, err = tmpl.LoadFile(fieldsPath) } else { templateString, err = tmpl.LoadBytes(b.Fields) } - if err != nil { - fmt.Fprintf(os.Stderr, "Error generating template: %+v", err) - os.Exit(1) + fatalf("Error generating template: %+v", err) } _, err = os.Stdout.WriteString(templateString.StringToPrint() + "\n") if err != nil { - fmt.Fprintf(os.Stderr, "Error writing template: %+v", err) - os.Exit(1) + fatalf("Error writing template: %+v", err) } }, } genTemplateConfigCmd.Flags().String("es.version", beatVersion, "Elasticsearch version") genTemplateConfigCmd.Flags().String("index", idxPrefix, "Base index name") + genTemplateConfigCmd.Flags().Bool("noilm", false, "Generate template with ILM disabled") return genTemplateConfigCmd } + +func fatalf(msg string, vs ...interface{}) { + fmt.Fprintf(os.Stderr, msg, vs...) + fmt.Fprintln(os.Stderr) + os.Exit(1) +} diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 2ab590dc900..51c5507a82f 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -43,11 +43,11 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/cloudid" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/file" "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/common/seccomp" "github.com/elastic/beats/libbeat/dashboards" + "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/keystore" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/logp/configure" @@ -56,12 +56,12 @@ import ( "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/monitoring/report" "github.com/elastic/beats/libbeat/monitoring/report/log" + "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/elasticsearch" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/plugin" "github.com/elastic/beats/libbeat/publisher/pipeline" svc "github.com/elastic/beats/libbeat/service" - "github.com/elastic/beats/libbeat/template" "github.com/elastic/beats/libbeat/version" sysinfo "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" @@ -74,7 +74,9 @@ type Beat struct { Config beatConfig RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. - keystore keystore.Keystore + + keystore keystore.Keystore + index idxmgmt.Supporter } type beatConfig struct { @@ -103,12 +105,7 @@ type beatConfig struct { // elastic stack 'setup' configurations Dashboards *common.Config `config:"setup.dashboards"` - Template *common.Config `config:"setup.template"` Kibana *common.Config `config:"setup.kibana"` - Migration *common.Config `config:"migration"` - - // ILM Config options - ILM *common.Config `config:"output.elasticsearch.ilm"` } var debugf = logp.MakeDebug("beat") @@ -274,7 +271,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { logSystemInfo(b.Info) logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version) - err = b.registerTemplateLoading() + err = b.registerESIndexManagement() if err != nil { return nil, err } @@ -312,13 +309,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Logger: logp.L().Named("publisher"), }, b.Config.Pipeline, - b.Config.Output) + b.makeOutputFactory(b.Config.Output), + ) if err != nil { return nil, fmt.Errorf("error initializing publisher: %+v", err) } - reload.Register.MustRegister("output", pipeline.OutputReloader()) + reload.Register.MustRegister("output", b.makeOutputReloader(pipeline.OutputReloader())) // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically @@ -439,47 +437,29 @@ func (b *Beat) Setup(bt beat.Creator, settings SetupSettings) error { return err } - if settings.Template { + if settings.Template || settings.ILMPolicy { outCfg := b.Config.Output if outCfg.Name() != "elasticsearch" { - return fmt.Errorf("Template loading requested but the Elasticsearch output is not configured/enabled") - } - - if b.Config.ILM.Enabled() { - cfgwarn.Beta("Index lifecycle management is enabled which is in beta.") - - ilmCfg, err := getILMConfig(b) - if err != nil { - return err - } - - err = b.prepareILMTemplate(ilmCfg) - if err != nil { - return err - } + return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled") } esConfig := outCfg.Config() - if tmplCfg := b.Config.Template; tmplCfg == nil || tmplCfg.Enabled() { - loadCallback, err := b.templateLoadingCallback() - if err != nil { - return err - } - + if b.index.Enabled() { esClient, err := elasticsearch.NewConnectedClient(esConfig) if err != nil { return err } - // Load template - err = loadCallback(esClient) + // prepare index by loading templates, lifecycle policies and write aliases + + m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields)) + err = m.Setup(settings.Template, settings.ILMPolicy) if err != nil { return err } } - - fmt.Println("Loaded index template") + fmt.Println("Index setup complete.") } if settings.Dashboard { @@ -516,13 +496,6 @@ func (b *Beat) Setup(bt beat.Creator, settings SetupSettings) error { fmt.Println("Loaded Ingest pipelines") } - if settings.ILMPolicy { - if err := b.loadILMPolicy(); err != nil { - return err - } - fmt.Println("Loaded Index Lifecycle Management (ILM) policy") - } - return nil }()) } @@ -618,7 +591,12 @@ func (b *Beat) configure(settings Settings) error { return err } - return nil + imFactory := settings.IndexManagement + if imFactory == nil { + imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) + } + b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig) + return err } func (b *Beat) loadMeta() error { @@ -721,159 +699,50 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { return nil } -// registerTemplateLoading registers the loading of the template as a callback with -// the elasticsearch output. It is important the the registration happens before -// the publisher is created. -func (b *Beat) registerTemplateLoading() error { - var templateCfg template.TemplateConfig - - // Check if outputting to file is enabled, and output to file if it is - if b.Config.Template.Enabled() { - err := b.Config.Template.Unpack(&templateCfg) - if err != nil { - return fmt.Errorf("unpacking template config fails: %v", err) - } - } - - // Loads template by default if esOutput is enabled - if b.Config.Output.Name() == "elasticsearch" { - - // Get ES Index name for comparison - esCfg := struct { - Index string `config:"index"` - }{} - err := b.Config.Output.Config().Unpack(&esCfg) - if err != nil { - return err - } - - if esCfg.Index != "" && - (templateCfg.Name == "" || templateCfg.Pattern == "") && - (b.Config.Template == nil || b.Config.Template.Enabled()) { - return errors.New("setup.template.name and setup.template.pattern have to be set if index name is modified") - } - - if b.Config.Template == nil || (b.Config.Template != nil && b.Config.Template.Enabled()) { - - // load template through callback to make sure it is also loaded - // on reconnecting - callback, err := b.templateLoadingCallback() - if err != nil { - return err - } - elasticsearch.RegisterConnectCallback(callback) - } else if b.Config.ILM.Enabled() { - return errors.New("templates cannot be disable when using ILM") - } - - if b.Config.ILM.Enabled() { - cfgwarn.Beta("Index lifecycle management is enabled which is in beta.") - - ilmCfg, err := getILMConfig(b) - if err != nil { - return err - } - - err = b.prepareILMTemplate(ilmCfg) - if err != nil { - return err - } - - // Set the ingestion index to the rollover alias - logp.Info("Set output.elasticsearch.index to '%s' as ILM is enabled.", ilmCfg.RolloverAlias) - esCfg.Index = ilmCfg.RolloverAlias - err = b.Config.Output.Config().SetString("index", -1, ilmCfg.RolloverAlias) - if err != nil { - return errw.Wrap(err, "error setting output.elasticsearch.index") - } - - writeAliasCallback, err := b.writeAliasLoadingCallback() - if err != nil { - return err - } - - // Load write alias already on - esConfig := b.Config.Output.Config() - - // Check that ILM is enabled and the right elasticsearch version exists - esClient, err := elasticsearch.NewConnectedClient(esConfig) - if err != nil { - return err - } - - err = checkElasticsearchVersionIlm(esClient) - if err != nil { - return err - } - - err = checkILMFeatureEnabled(esClient) - if err != nil { - return err - } - - elasticsearch.RegisterConnectCallback(writeAliasCallback) - } - } - - return nil -} - -func (b *Beat) prepareILMTemplate(ilmCfg *ilmConfig) error { - // In case no template settings are set, config must be created - if b.Config.Template == nil { - b.Config.Template = common.NewConfig() - } - // Template name and pattern can't be configure when using ILM - logp.Info("Set setup.template.name to '%s' as ILM is enabled.", ilmCfg.RolloverAlias) - err := b.Config.Template.SetString("name", -1, ilmCfg.RolloverAlias) - if err != nil { - return errw.Wrap(err, "error setting setup.template.name") - } - pattern := fmt.Sprintf("%s-*", ilmCfg.RolloverAlias) - logp.Info("Set setup.template.pattern to '%s' as ILM is enabled.", pattern) - err = b.Config.Template.SetString("pattern", -1, pattern) - if err != nil { - return errw.Wrap(err, "error setting setup.template.pattern") +// registerESIndexManagement registers the loading of the template and ILM +// policy as a callback with the elasticsearch output. It is important the +// registration happens before the publisher is created. +func (b *Beat) registerESIndexManagement() error { + if b.Config.Output.Name() != "elasticsearch" || !b.index.Enabled() { + return nil } - // rollover_alias and lifecycle.name can't be configured and will be overwritten - logp.Info("Set settings.index.lifecycle.rollover_alias in template to %s as ILM is enabled.", ilmCfg.RolloverAlias) - err = b.Config.Template.SetString("settings.index.lifecycle.rollover_alias", -1, ilmCfg.RolloverAlias) + _, err := elasticsearch.RegisterConnectCallback(b.indexSetupCallback()) if err != nil { - return errw.Wrap(err, "error setting settings.index.lifecycle.rollover_alias") + return fmt.Errorf("failed to register index management with elasticsearch: %+v", err) } - logp.Info("Set settings.index.lifecycle.name in template to %s as ILM is enabled.", ILMPolicyName) - err = b.Config.Template.SetString("settings.index.lifecycle.name", -1, ILMPolicyName) - if err != nil { - return errw.Wrap(err, "error setting settings.index.lifecycle.name") - } - return nil } // Build and return a callback to load index template into ES -func (b *Beat) templateLoadingCallback() (func(esClient *elasticsearch.Client) error, error) { - callback := func(esClient *elasticsearch.Client) error { - if b.Config.Template == nil { - b.Config.Template = common.NewConfig() - } - - loader, err := template.NewLoader(b.Config.Template, esClient, b.Info, b.Fields, b.Config.Migration.Enabled()) - if err != nil { - return fmt.Errorf("Error creating Elasticsearch template loader: %v", err) - } +func (b *Beat) indexSetupCallback() func(esClient *elasticsearch.Client) error { + return func(esClient *elasticsearch.Client) error { + m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields)) + return m.Setup(true, true) + } +} - err = loader.Load() - if err != nil { - return fmt.Errorf("Error loading Elasticsearch template: %v", err) - } +func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { + return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error { + return outReloader.Reload(config, b.createOutput) + }) +} - logp.Info("Template successfully loaded.") +func (b *Beat) makeOutputFactory( + cfg common.ConfigNamespace, +) func(outputs.Observer) (string, outputs.Group, error) { + return func(outStats outputs.Observer) (string, outputs.Group, error) { + out, err := b.createOutput(outStats, cfg) + return cfg.Name(), out, err + } +} - return nil +func (b *Beat) createOutput(stats outputs.Observer, cfg common.ConfigNamespace) (outputs.Group, error) { + if !cfg.IsSet() { + return outputs.Group{}, nil } - return callback, nil + return outputs.Load(b.index, b.Info, stats, cfg.Name(), cfg.Config()) } // handleError handles the given error by logging it and then returning the diff --git a/libbeat/cmd/instance/ilm.go b/libbeat/cmd/instance/ilm.go deleted file mode 100644 index 0ef1f9778a6..00000000000 --- a/libbeat/cmd/instance/ilm.go +++ /dev/null @@ -1,216 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package instance - -import ( - "encoding/json" - "fmt" - "net/url" - - "github.com/pkg/errors" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/outputs/elasticsearch" -) - -type ilmConfig struct { - RolloverAlias string `config:"ilm.rollover_alias" ` - Pattern string `config:"ilm.pattern"` -} - -// ILMPolicy contains the default policy -var ILMPolicy = common.MapStr{ - "policy": common.MapStr{ - "phases": common.MapStr{ - "hot": common.MapStr{ - "actions": common.MapStr{ - "rollover": common.MapStr{ - "max_size": "50gb", - "max_age": "30d", - }, - }, - }, - }, - }, -} - -const ( - // ILMPolicyName is the default policy name - ILMPolicyName = "beats-default-policy" - // ILMDefaultPattern is the default pattern - ILMDefaultPattern = "{now/d}-000001" -) - -// Build and return a callback to load ILM write alias -func (b *Beat) writeAliasLoadingCallback() (func(esClient *elasticsearch.Client) error, error) { - callback := func(esClient *elasticsearch.Client) error { - if b.Config.ILM == nil { - b.Config.ILM = common.NewConfig() - } - - config, err := getILMConfig(b) - if err != nil { - return err - } - - // Escaping because of date pattern - pattern := url.PathEscape(config.Pattern) - // This always assume it's a date pattern by sourrounding it by <...> - firstIndex := fmt.Sprintf("%%3C%s-%s%%3E", config.RolloverAlias, pattern) - - // Check if alias already exists - status, b, err := esClient.Request("HEAD", "/_alias/"+config.RolloverAlias, "", nil, nil) - if err != nil && status != 404 { - logp.Err("Failed to check for alias: %s: %+v", err, string(b)) - return errors.Wrap(err, "failed to check for alias") - } - if status == 200 { - logp.Info("Write alias already exists") - return nil - } - - body := common.MapStr{ - "aliases": common.MapStr{ - config.RolloverAlias: common.MapStr{ - "is_write_index": true, - }, - }, - } - - // Create alias with write index - code, res, err := esClient.Request("PUT", "/"+firstIndex, "", nil, body) - if code == 400 { - logp.Err("Error creating alias with write index. As return code is 400, assuming already exists: %s, %s", err, string(res)) - return nil - - } else if err != nil { - logp.Err("Error creating alias with write index: %s, %s", err, string(res)) - return errors.Wrap(err, "failed to create write alias: "+string(res)) - } - - logp.Info("Alias with write index created: %s", firstIndex) - - return nil - } - - return callback, nil -} - -func (b *Beat) loadILMPolicy() error { - esClient, err := getElasticsearchClient(b) - if err != nil { - return err - } - - err = checkElasticsearchVersionIlm(esClient) - if err != nil { - return err - } - - err = checkILMFeatureEnabled(esClient) - if err != nil { - return err - } - - _, _, err = esClient.Request("PUT", "/_ilm/policy/"+ILMPolicyName, "", nil, ILMPolicy) - return err -} - -func getElasticsearchClient(b *Beat) (*elasticsearch.Client, error) { - outCfg := b.Config.Output - if outCfg.Name() != "elasticsearch" { - return nil, fmt.Errorf("Policy loading requested but the Elasticsearch output is not configured/enabled") - } - - esConfig := outCfg.Config() - - return elasticsearch.NewConnectedClient(esConfig) -} - -func loadConfigWithDefaults(config *ilmConfig, b *Beat) { - if config.RolloverAlias == "" { - config.RolloverAlias = fmt.Sprintf("%s-%s", b.Info.Beat, b.Info.Version) - } - - if config.Pattern == "" { - config.Pattern = ILMDefaultPattern - } -} - -func checkElasticsearchVersionIlm(client *elasticsearch.Client) error { - esV := client.GetVersion() - requiredVersion, err := common.NewVersion("6.6.0") - if err != nil { - return err - } - - if esV.LessThan(requiredVersion) { - return fmt.Errorf("ILM requires at least Elasticsearch 6.6.0. Used version: %s", esV.String()) - } - - return nil -} - -func checkILMFeatureEnabled(client *elasticsearch.Client) error { - code, body, err := client.Request("GET", "/_xpack", "", nil, nil) - - // If we get a 400, it's assumed to be the OSS version of Elasticsearch - if code == 400 { - return fmt.Errorf("ILM feature is not available in this Elasticsearch version") - } - if err != nil { - return err - } - - var response struct { - Features struct { - ILM struct { - Available bool `json:"available"` - Enabled bool `json:"enabled"` - } `json:"ilm"` - } `json:"features"` - } - - err = json.Unmarshal(body, &response) - if err != nil { - return fmt.Errorf("failed to parse JSON response: %v", err) - } - - if !response.Features.ILM.Available { - return fmt.Errorf("ILM feature is not available in Elasticsearch") - } - - if !response.Features.ILM.Enabled { - return fmt.Errorf("ILM feature is not enabled in Elasticsearch") - } - - return nil -} - -func getILMConfig(b *Beat) (*ilmConfig, error) { - config := &ilmConfig{} - err := b.Config.Output.Config().Unpack(config) - if err != nil { - return nil, errors.Wrap(err, "problem unpacking ilm configs") - } - - loadConfigWithDefaults(config, b) - - return config, nil -} diff --git a/libbeat/cmd/instance/settings.go b/libbeat/cmd/instance/settings.go index 8275282daad..9b2bfd34a05 100644 --- a/libbeat/cmd/instance/settings.go +++ b/libbeat/cmd/instance/settings.go @@ -21,16 +21,23 @@ import ( "github.com/spf13/pflag" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/monitoring/report" ) // Settings contains basic settings for any beat to pass into GenRootCmd type Settings struct { - Name string - IndexPrefix string - Version string - Monitoring report.Settings - RunFlags *pflag.FlagSet - ConfigOverrides *common.Config + Name string + IndexPrefix string + Version string + Monitoring report.Settings + RunFlags *pflag.FlagSet + ConfigOverrides *common.Config + DisableConfigResolver bool + + // load custom index manager. The config object will be the Beats root configuration. + IndexManagement idxmgmt.SupportFactory + ILM ilm.SupportFactory } diff --git a/libbeat/cmd/test/output.go b/libbeat/cmd/test/output.go index a94f9053898..ee855d8688b 100644 --- a/libbeat/cmd/test/output.go +++ b/libbeat/cmd/test/output.go @@ -24,6 +24,7 @@ import ( "github.com/spf13/cobra" "github.com/elastic/beats/libbeat/cmd/instance" + "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/testing" ) @@ -45,7 +46,8 @@ func GenTestOutputCmd(name, beatVersion string) *cobra.Command { os.Exit(1) } - output, err := outputs.Load(b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config()) + im, _ := idxmgmt.DefaultSupport(nil, b.Info, nil) + output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config()) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err) os.Exit(1) diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index 0e6933889e3..ceb377274b5 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -47,6 +47,9 @@ type Reloadable interface { Reload(config *ConfigWithMeta) error } +// ReloadableFunc wraps a custom function in order to implement the Reloadable interface. +type ReloadableFunc func(config *ConfigWithMeta) error + // Registry of reloadable objects and lists type Registry struct { sync.RWMutex @@ -152,3 +155,8 @@ func (r *Registry) nameTaken(name string) bool { return false } + +// Reload calls the underlying function. +func (fn ReloadableFunc) Reload(config *ConfigWithMeta) error { + return fn(config) +} diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 1e1e82604c3..57b9776342b 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -342,15 +342,6 @@ This configuration results in indices named `sev1`, `sev2`, and `sev3`. The `mappings` setting simplifies the configuration, but is limited to string values. You cannot specify format strings within the mapping pairs. -//TODO: MOVE ILM OPTIONS TO APPEAR LOGICALLY BASED ON LOCATION IN THE YAML FILE. - -[[ilm-es]] -===== `ilm` - -Configuration options for index lifecycle management. - -See <> for more information. - ifndef::no-pipeline[] [[pipeline-option-es]] ===== `pipeline` diff --git a/libbeat/docs/shared-ilm.asciidoc b/libbeat/docs/shared-ilm.asciidoc index 61cac3791d3..31c4001f27e 100644 --- a/libbeat/docs/shared-ilm.asciidoc +++ b/libbeat/docs/shared-ilm.asciidoc @@ -18,15 +18,12 @@ existing indices. To use index lifecycle management on {beatname_uc} indices: -. Enable index lifecycle management by setting `ilm.enabled: true` in the {es} -output configuration. For example: +. Enable index lifecycle management by setting `setup.ilm.enabled: true`. For example: + -- [source,yaml] ------------------------------------------------------------------------------ -output.elasticsearch: - hosts: ["localhost:9200"] - ilm.enabled: true +setup.ilm.enabled: true ------------------------------------------------------------------------------ This configuration overwrites your index settings and adjusts the {beatname_uc} diff --git a/libbeat/idxmgmt/assets.go b/libbeat/idxmgmt/assets.go new file mode 100644 index 00000000000..e5ca4ca2e08 --- /dev/null +++ b/libbeat/idxmgmt/assets.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +type assets struct { + fields []byte +} + +// BeatsAssets creates an asseter with a predefine set of fields that is always +// reported. +func BeatsAssets(fields []byte) Asseter { + return &assets{fields: fields} +} + +func (a *assets) Fields(name string) []byte { + return a.fields // assume we have the beats global assets +} diff --git a/libbeat/idxmgmt/idxmgmt.go b/libbeat/idxmgmt/idxmgmt.go new file mode 100644 index 00000000000..b2f7f38568e --- /dev/null +++ b/libbeat/idxmgmt/idxmgmt.go @@ -0,0 +1,162 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +import ( + "errors" + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/template" +) + +// SupportFactory is used to provide custom index management support to libbeat. +type SupportFactory func(*logp.Logger, beat.Info, *common.Config) (Supporter, error) + +// Supporter provides index management and configuration related services +// throughout libbeat. +// The BuildSelector is used by the output to create an IndexSelector. The +// index selector will report the per event index name to be used. +// A manager instantiated via Supporter is responsible for instantiating/configuring +// the index throughout the Elastic Stack. +type Supporter interface { + // Enalbed checks if index management is configured to configure templates, + // ILM, or aliases. + Enabled() bool + + // ILM provides access to the configured ILM support. + ILM() ilm.Supporter + + // TemplateConfig returns the template configuration used by the index supporter. + TemplateConfig(withILM bool) (template.TemplateConfig, error) + + // BuildSelector create an index selector. + // The defaultIndex string is interpreted as format string. It is used + // as default index if the configuration provided does not define an index or + // has no default fallback if all indices are guarded by conditionals. + BuildSelector(cfg *common.Config) (outputs.IndexSelector, error) + + // Manager creates a new manager that can be used to execute the required steps + // for initializing an index, ILM policies, and write aliases. + Manager(client ESClient, assets Asseter) Manager +} + +// Asseter provides access to beats assets required to load the template. +type Asseter interface { + Fields(name string) []byte +} + +// ESClient defines the minimal interface required for the index manager to +// prepare an index. +type ESClient interface { + Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) + GetVersion() common.Version +} + +// Manager is used to initialize indices, ILM policies, and aliases within the +// Elastic Stack. +type Manager interface { + Setup(template, policy bool) error +} + +// DefaultSupport initializes the default index management support used by most Beats. +func DefaultSupport(log *logp.Logger, info beat.Info, configRoot *common.Config) (Supporter, error) { + factory := MakeDefaultSupport(nil) + return factory(log, info, configRoot) +} + +// MakeDefaultSupport creates some default index management support, with a +// custom ILM support implementation. +func MakeDefaultSupport(ilmSupport ilm.SupportFactory) SupportFactory { + if ilmSupport == nil { + ilmSupport = ilm.DefaultSupport + } + + return func(log *logp.Logger, info beat.Info, configRoot *common.Config) (Supporter, error) { + const logName = "index-management" + + cfg := struct { + ILM *common.Config `config:"setup.ilm"` + Template *common.Config `config:"setup.template"` + Output common.ConfigNamespace `config:"output"` + Migration *common.Config `config:"migration"` + }{} + if configRoot != nil { + if err := configRoot.Unpack(&cfg); err != nil { + return nil, err + } + } + + if log == nil { + log = logp.NewLogger(logName) + } else { + log = log.Named(logName) + } + + if err := checkTemplateESSettings(cfg.Template, cfg.Output); err != nil { + return nil, err + } + + return newIndexSupport(log, info, ilmSupport, cfg.Template, cfg.ILM, cfg.Migration.Enabled()) + } +} + +// checkTemplateESSettings validates template settings and output.elasticsearch +// settings to be consistent. +// XXX: This is some legacy check that will not be active if the output is +// configured via Central Config Management. +// In the future we will have CM deal with index setup and providing a +// consistent output configuration. +// TODO: check if it's safe to move this check to the elasticsearch output +// (Not doing so, so to not interfere with outputs being setup via Central +// Management for now). +func checkTemplateESSettings(tmpl *common.Config, out common.ConfigNamespace) error { + if out.Name() != "elasticsearch" { + return nil + } + + enabled := tmpl == nil || tmpl.Enabled() + if !enabled { + return nil + } + + var tmplCfg template.TemplateConfig + if tmpl != nil { + if err := tmpl.Unpack(&tmplCfg); err != nil { + return fmt.Errorf("unpacking template config fails: %v", err) + } + } + + esCfg := struct { + Index string `config:"index"` + }{} + if err := out.Config().Unpack(&esCfg); err != nil { + return err + } + + tmplSet := tmplCfg.Name != "" && tmplCfg.Pattern != "" + if esCfg.Index != "" && !tmplSet { + return errors.New("setup.template.name and setup.template.pattern have to be set if index name is modified") + } + + return nil +} diff --git a/libbeat/idxmgmt/idxmgmt_test.go b/libbeat/idxmgmt/idxmgmt_test.go new file mode 100644 index 00000000000..3d7a841b797 --- /dev/null +++ b/libbeat/idxmgmt/idxmgmt_test.go @@ -0,0 +1,272 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/template" +) + +func TestDefaultSupport_Enabled(t *testing.T) { + cases := map[string]struct { + ilmCalls []onCall + cfg map[string]interface{} + want bool + }{ + "templates and ilm disabled": { + want: false, + ilmCalls: []onCall{ + onMode().Return(ilm.ModeDisabled), + }, + cfg: map[string]interface{}{ + "setup.template.enabled": false, + }, + }, + "templates only": { + want: true, + ilmCalls: []onCall{ + onMode().Return(ilm.ModeDisabled), + }, + cfg: map[string]interface{}{ + "setup.template.enabled": true, + }, + }, + "ilm only": { + want: true, + ilmCalls: []onCall{ + onMode().Return(ilm.ModeEnabled), + }, + cfg: map[string]interface{}{ + "setup.template.enabled": false, + }, + }, + "ilm tentatively": { + want: true, + ilmCalls: []onCall{ + onMode().Return(ilm.ModeAuto), + }, + cfg: map[string]interface{}{ + "setup.template.enabled": false, + }, + }, + } + for name, test := range cases { + t.Run(name, func(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} + factory := MakeDefaultSupport(makeMockILMSupport(test.ilmCalls...)) + im, err := factory(nil, info, common.MustNewConfigFrom(test.cfg)) + require.NoError(t, err) + assert.Equal(t, test.want, im.Enabled()) + }) + } +} + +func TestDefaultSupport_TemplateConfig(t *testing.T) { + ilmTemplateSettings := func(alias, policy string) []onCall { + return []onCall{ + onMode().Return(ilm.ModeEnabled), + onAlias().Return(ilm.Alias{Name: alias}), + onPolicy().Return(ilm.Policy{Name: policy}), + } + } + + cloneCfg := func(c template.TemplateConfig) template.TemplateConfig { + if c.AppendFields != nil { + tmp := make(common.Fields, len(c.AppendFields)) + copy(tmp, c.AppendFields) + c.AppendFields = tmp + } + + if c.Settings.Index != nil { + c.Settings.Index = (map[string]interface{})(common.MapStr(c.Settings.Index).Clone()) + } + if c.Settings.Index != nil { + c.Settings.Source = (map[string]interface{})(common.MapStr(c.Settings.Source).Clone()) + } + return c + } + + cfgWith := func(s template.TemplateConfig, mods ...map[string]interface{}) template.TemplateConfig { + for _, mod := range mods { + cfg := common.MustNewConfigFrom(mod) + s = cloneCfg(s) + err := cfg.Unpack(&s) + if err != nil { + panic(err) + } + } + return s + } + + cases := map[string]struct { + ilmCalls []onCall + cfg map[string]interface{} + want template.TemplateConfig + fail bool + }{ + "default template config": { + want: template.DefaultConfig(), + }, + "default template with ilm": { + ilmCalls: ilmTemplateSettings("alias", "test-9.9.9"), + want: cfgWith(template.DefaultConfig(), map[string]interface{}{ + "name": "alias", + "pattern": "alias-*", + "settings.index.lifecycle.name": "test-9.9.9", + "settings.index.lifecycle.rollover_alias": "alias", + }), + }, + } + for name, test := range cases { + t.Run(name, func(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} + factory := MakeDefaultSupport(makeMockILMSupport(test.ilmCalls...)) + im, err := factory(nil, info, common.MustNewConfigFrom(test.cfg)) + require.NoError(t, err) + withILM := len(test.ilmCalls) > 0 + + tmpl, err := im.TemplateConfig(withILM) + if test.fail { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.want, tmpl) + } + }) + } +} + +func TestDefaultSupport_BuildSelector(t *testing.T) { + type nameFunc func(time.Time) string + + noILM := []onCall{onMode().Return(ilm.ModeDisabled)} + ilmTemplateSettings := func(alias, policy string) []onCall { + return []onCall{ + onMode().Return(ilm.ModeEnabled), + onAlias().Return(ilm.Alias{Name: alias}), + onPolicy().Return(ilm.Policy{Name: policy}), + } + } + + stable := func(s string) nameFunc { + return func(_ time.Time) string { return s } + } + dateIdx := func(base string) nameFunc { + return func(ts time.Time) string { + ext := fmt.Sprintf("%d.%02d.%02d", ts.Year(), ts.Month(), ts.Day()) + return fmt.Sprintf("%v-%v", base, ext) + } + } + + cases := map[string]struct { + ilmCalls []onCall + imCfg map[string]interface{} + cfg map[string]interface{} + want nameFunc + meta common.MapStr + }{ + "without ilm": { + ilmCalls: noILM, + cfg: map[string]interface{}{"index": "test-%{[agent.version]}"}, + want: stable("test-9.9.9"), + }, + "event alias without ilm": { + ilmCalls: noILM, + cfg: map[string]interface{}{"index": "test-%{[agent.version]}"}, + want: stable("test"), + meta: common.MapStr{ + "alias": "test", + }, + }, + "event index without ilm": { + ilmCalls: noILM, + cfg: map[string]interface{}{"index": "test-%{[agent.version]}"}, + want: dateIdx("test"), + meta: common.MapStr{ + "index": "test", + }, + }, + "with ilm": { + ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"), + cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"}, + want: stable("test-9.9.9"), + }, + "event alias wit ilm": { + ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"), + cfg: map[string]interface{}{"index": "test-%{[agent.version]}"}, + want: stable("event-alias"), + meta: common.MapStr{ + "alias": "event-alias", + }, + }, + "event index with ilm": { + ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"), + cfg: map[string]interface{}{"index": "test-%{[agent.version]}"}, + want: dateIdx("event-index"), + meta: common.MapStr{ + "index": "event-index", + }, + }, + "use indices": { + ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"), + cfg: map[string]interface{}{ + "index": "test-%{[agent.version]}", + "indices": []map[string]interface{}{ + {"index": "myindex"}, + }, + }, + want: stable("myindex"), + }, + } + for name, test := range cases { + t.Run(name, func(t *testing.T) { + ts := time.Now() + info := beat.Info{Beat: "test", Version: "9.9.9"} + + factory := MakeDefaultSupport(makeMockILMSupport(test.ilmCalls...)) + im, err := factory(nil, info, common.MustNewConfigFrom(test.imCfg)) + require.NoError(t, err) + + sel, err := im.BuildSelector(common.MustNewConfigFrom(test.cfg)) + require.NoError(t, err) + + meta := test.meta + idx, err := sel.Select(&beat.Event{ + Timestamp: ts, + Fields: common.MapStr{ + "test": "value", + "agent": common.MapStr{ + "version": "9.9.9", + }, + }, + Meta: meta, + }) + require.NoError(t, err) + assert.Equal(t, test.want(ts), idx) + }) + } +} diff --git a/libbeat/idxmgmt/ilm/config.go b/libbeat/idxmgmt/ilm/config.go new file mode 100644 index 00000000000..24934c1c6a6 --- /dev/null +++ b/libbeat/idxmgmt/ilm/config.go @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "fmt" + "strconv" + "strings" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" +) + +// Config is used for unpacking a common.Config. +type Config struct { + Mode Mode `config:"enabled"` + PolicyName fmtstr.EventFormatString `config:"policy_name"` + PolicyFile string `config:"policy_file"` + RolloverAlias string `config:"rollover_alias"` + Pattern string `config:"pattern"` + + // CheckExists can disable the check for an existing policy. Check required + // read_ilm privileges. If check is disabled the policy will only be + // installed if Overwrite is enabled. + CheckExists bool `config:"check_exists"` + + // Enable always overwrite policy mode. This required manage_ilm privileges. + Overwrite bool `config:"overwrite"` +} + +//Mode is used for enumerating the ilm mode. +type Mode uint8 + +const ( + //ModeAuto enum 'auto' + ModeAuto Mode = iota + + //ModeEnabled enum 'true' + ModeEnabled + + //ModeDisabled enum 'false' + ModeDisabled +) + +const ilmDefaultPattern = "{now/d}-000001" + +// DefaultPolicy defines the default policy to be used if no custom policy is +// configured. +// By default the policy contains not warm, cold, or delete phase. +// The index is configured to rollover every 50GB or after 30d. +var DefaultPolicy = common.MapStr{ + "policy": common.MapStr{ + "phases": common.MapStr{ + "hot": common.MapStr{ + "actions": common.MapStr{ + "rollover": common.MapStr{ + "max_size": "50gb", + "max_age": "30d", + }, + }, + }, + }, + }, +} + +//Unpack creates enumeration value true, false or auto +func (m *Mode) Unpack(in string) error { + in = strings.ToLower(in) + + if in == "auto" { + *m = ModeAuto + return nil + } + + b, err := strconv.ParseBool(in) + if err != nil { + return fmt.Errorf("ilm.enabled` mode '%v' is invalid (try auto, true, false)", in) + } + + if b { + *m = ModeEnabled + } else { + *m = ModeDisabled + } + return nil +} + +//Validate verifies that expected config options are given and valid +func (cfg *Config) Validate() error { + if cfg.RolloverAlias == "" && cfg.Mode != ModeDisabled { + return fmt.Errorf("rollover_alias must be set when ILM is not disabled") + } + return nil +} + +func defaultConfig(info beat.Info) Config { + name := fmt.Sprintf("%s-%s", info.Beat, info.Version) + nameFmt := fmtstr.MustCompileEvent(name) + + return Config{ + Mode: ModeAuto, + PolicyName: *nameFmt, + RolloverAlias: name, + Pattern: ilmDefaultPattern, + PolicyFile: "", + CheckExists: true, + } +} diff --git a/libbeat/idxmgmt/ilm/error.go b/libbeat/idxmgmt/ilm/error.go new file mode 100644 index 00000000000..4d644de9b98 --- /dev/null +++ b/libbeat/idxmgmt/ilm/error.go @@ -0,0 +1,94 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "errors" + "fmt" +) + +// Error indicates an error + reason describing the last error. +// The Reason() method returns a sentinal error value for comparison. +type Error struct { + reason error + cause error + message string +} + +var ( + ErrESVersionNotSupported = errors.New("ILM is not supported by the Elasticsearch version in use") + ErrILMCheckRequestFailed = errors.New("request checking for ILM availability failed") + ErrInvalidResponse = errors.New("invalid response received") + ErrESILMDisabled = errors.New("ILM is disabled in Elasticsearch") + ErrRequestFailed = errors.New("request failed") + ErrAliasAlreadyExists = errors.New("alias already exists") + ErrAliasCreateFailed = errors.New("failed to create write alias") + ErrOpNotAvailable = errors.New("operation not available") +) + +func errOf(reason error) error { + return &Error{reason: reason} +} + +func errf(reason error, msg string, vs ...interface{}) error { + return wrapErrf(nil, reason, msg, vs...) +} + +func wrapErr(cause, reason error) error { + return wrapErrf(cause, reason, "") +} + +func wrapErrf(cause, reason error, msg string, vs ...interface{}) error { + return &Error{ + cause: cause, + reason: reason, + message: fmt.Sprintf(msg, vs...), + } +} + +// ErrReason calls Reason() if the error implements this method. Otherwise return nil. +func ErrReason(err error) error { + if err == nil { + return nil + } + + ifc, ok := err.(interface{ Reason() error }) + if !ok { + return nil + } + return ifc.Reason() +} + +// Cause returns the errors cause, if present. +func (e *Error) Cause() error { return e.cause } + +// Reason returns a sentinal error value define within the ilm package. +func (e *Error) Reason() error { return e.reason } + +// Error returns the formatted error string. +func (e *Error) Error() string { + msg := e.message + if e.message == "" { + msg = e.reason.Error() + } + + if e.cause != nil { + return fmt.Sprintf("%v: %+v", msg, e.cause) + } + return msg +} diff --git a/libbeat/idxmgmt/ilm/eshandler.go b/libbeat/idxmgmt/ilm/eshandler.go new file mode 100644 index 00000000000..de493721bf7 --- /dev/null +++ b/libbeat/idxmgmt/ilm/eshandler.go @@ -0,0 +1,210 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "encoding/json" + "fmt" + "net/url" + "path" + + "github.com/elastic/beats/libbeat/common" +) + +type esClientHandler struct { + client ESClient +} + +var ( + esMinILMVersion = common.MustNewVersion("6.6.0") + esMinDefaultILMVesion = common.MustNewVersion("7.0.0") +) + +const ( + // esFeaturesPath is used to query Elasticsearch for availability of licensed + // features. + esFeaturesPath = "/_xpack" + + esILMPath = "/_ilm/policy" + + esAliasPath = "/_alias" +) + +// ESClientHandler creates a new APIHandler executing ILM, and alias queries +// against Elasticsearch. +func ESClientHandler(client ESClient) APIHandler { + if client == nil { + return nil + } + return &esClientHandler{client} +} + +// ESClient defines the minimal interface required for the ESClientHandler to +// prepare a policy and write alias. +type ESClient interface { + GetVersion() common.Version + Request( + method, path string, + pipeline string, + params map[string]string, + body interface{}, + ) (int, []byte, error) +} + +func (h *esClientHandler) ILMEnabled(mode Mode) (bool, error) { + if mode == ModeDisabled { + return false, nil + } + + avail, probe := h.checkILMVersion(mode) + if !avail { + if mode == ModeEnabled { + ver := h.client.GetVersion() + return false, errf(ErrESVersionNotSupported, + "Elasticsearch %v does not support ILM", ver.String()) + } + return false, nil + } + + if !probe { + // version potentially supports ILM, but mode + version indicates that we + // want to disable ILM support. + return false, nil + } + + avail, enabled, err := h.checkILMSupport() + if err != nil { + return false, err + } + + if !avail { + if mode == ModeEnabled { + return false, errOf(ErrESVersionNotSupported) + } + return false, nil + } + + if !enabled && mode == ModeEnabled { + return false, errOf(ErrESILMDisabled) + } + return enabled, nil +} + +func (h *esClientHandler) CreateILMPolicy(policy Policy) error { + path := path.Join(esILMPath, policy.Name) + _, _, err := h.client.Request("PUT", path, "", nil, policy.Body) + return err +} + +func (h *esClientHandler) HasILMPolicy(name string) (bool, error) { + // XXX: HEAD method does currently not work for checking if a policy exists + path := path.Join(esILMPath, name) + status, b, err := h.client.Request("GET", path, "", nil, nil) + if err != nil && status != 404 { + return false, wrapErrf(err, ErrRequestFailed, + "failed to check for policy name '%v': (status=%v) %s", name, status, b) + } + return status == 200, nil +} + +func (h *esClientHandler) HasAlias(name string) (bool, error) { + path := path.Join(esAliasPath, name) + status, b, err := h.client.Request("HEAD", path, "", nil, nil) + if err != nil && status != 404 { + return false, wrapErrf(err, ErrRequestFailed, + "failed to check for alias '%v': (status=%v) %s", name, status, b) + } + return status == 200, nil +} + +func (h *esClientHandler) CreateAlias(alias Alias) error { + // Escaping because of date pattern + // This always assume it's a date pattern by sourrounding it by <...> + firstIndex := fmt.Sprintf("<%s-%s>", alias.Name, alias.Pattern) + firstIndex = url.PathEscape(firstIndex) + + body := common.MapStr{ + "aliases": common.MapStr{ + alias.Name: common.MapStr{ + "is_write_index": true, + }, + }, + } + + // Note: actual aliases are accessible via the index + status, res, err := h.client.Request("PUT", "/"+firstIndex, "", nil, body) + if status == 400 { + return errOf(ErrAliasAlreadyExists) + } else if err != nil { + return wrapErrf(err, ErrAliasCreateFailed, "failed to create alias: %s", res) + } + + return nil +} + +func (h *esClientHandler) checkILMVersion(mode Mode) (avail, probe bool) { + ver := h.client.GetVersion() + avail = !ver.LessThan(esMinILMVersion) + if avail { + probe = (mode == ModeEnabled) || + (mode == ModeAuto && !ver.LessThan(esMinDefaultILMVesion)) + } + + return avail, probe +} + +func (h *esClientHandler) checkILMSupport() (avail, enbaled bool, err error) { + var response struct { + Features struct { + ILM struct { + Available bool `json:"available"` + Enabled bool `json:"enabled"` + } `json:"ilm"` + } `json:"features"` + } + status, err := h.queryFeatures(&response) + if status == 400 { + // If we get a 400, it's assumed to be the OSS version of Elasticsearch + return false, false, nil + } + if err != nil { + return false, false, wrapErr(err, ErrILMCheckRequestFailed) + } + + avail = response.Features.ILM.Available + enbaled = response.Features.ILM.Enabled + return avail, enbaled, nil +} + +func (h *esClientHandler) queryFeatures(to interface{}) (int, error) { + status, body, err := h.client.Request("GET", esFeaturesPath, "", nil, nil) + if status >= 400 || err != nil { + return status, err + } + + if to != nil { + if err := json.Unmarshal(body, to); err != nil { + return status, wrapErrf(err, ErrInvalidResponse, "failed to parse JSON response") + } + } + return status, nil +} + +func (h *esClientHandler) access() ESClient { + return h.client +} diff --git a/libbeat/idxmgmt/ilm/eshandler_integration_test.go b/libbeat/idxmgmt/ilm/eshandler_integration_test.go new file mode 100644 index 00000000000..808ab48b47d --- /dev/null +++ b/libbeat/idxmgmt/ilm/eshandler_integration_test.go @@ -0,0 +1,207 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//+build integration + +package ilm_test + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/libbeat/outputs/outil" +) + +const ( + // ElasticsearchDefaultHost is the default host for elasticsearch. + ElasticsearchDefaultHost = "localhost" + // ElasticsearchDefaultPort is the default port for elasticsearch. + ElasticsearchDefaultPort = "9200" +) + +func TestESClientHandler_ILMEnabled(t *testing.T) { + t.Run("no ilm if disabled", func(t *testing.T) { + h := newESClientHandler(t) + b, err := h.ILMEnabled(ilm.ModeDisabled) + assert.NoError(t, err) + assert.False(t, b) + }) + + t.Run("with ilm if auto", func(t *testing.T) { + h := newESClientHandler(t) + b, err := h.ILMEnabled(ilm.ModeAuto) + assert.NoError(t, err) + assert.True(t, b) + }) + + t.Run("with ilm if enabled", func(t *testing.T) { + h := newESClientHandler(t) + b, err := h.ILMEnabled(ilm.ModeEnabled) + assert.NoError(t, err) + assert.True(t, b) + }) +} + +func TestESClientHandler_ILMPolicy(t *testing.T) { + t.Run("does not exist", func(t *testing.T) { + name := makeName("esch-policy-no") + h := newESClientHandler(t) + b, err := h.HasILMPolicy(name) + assert.NoError(t, err) + assert.False(t, b) + }) + + t.Run("create new", func(t *testing.T) { + policy := ilm.Policy{ + Name: makeName("esch-policy-create"), + Body: ilm.DefaultPolicy, + } + h := newESClientHandler(t) + err := h.CreateILMPolicy(policy) + require.NoError(t, err) + + b, err := h.HasILMPolicy(policy.Name) + assert.NoError(t, err) + assert.True(t, b) + }) + + t.Run("overwrite", func(t *testing.T) { + policy := ilm.Policy{ + Name: makeName("esch-policy-overwrite"), + Body: ilm.DefaultPolicy, + } + h := newESClientHandler(t) + + err := h.CreateILMPolicy(policy) + require.NoError(t, err) + + // check second 'create' does not throw (assuming race with other beat) + err = h.CreateILMPolicy(policy) + require.NoError(t, err) + + b, err := h.HasILMPolicy(policy.Name) + assert.NoError(t, err) + assert.True(t, b) + }) +} + +func TestESClientHandler_Alias(t *testing.T) { + makeAlias := func(base string) ilm.Alias { + return ilm.Alias{ + Name: makeName(base), + Pattern: "{now/d}-000001", + } + } + + t.Run("does not exist", func(t *testing.T) { + name := makeName("esch-alias-no") + h := newESClientHandler(t) + b, err := h.HasAlias(name) + assert.NoError(t, err) + assert.False(t, b) + }) + + t.Run("create new", func(t *testing.T) { + alias := makeAlias("esch-alias-create") + h := newESClientHandler(t) + err := h.CreateAlias(alias) + assert.NoError(t, err) + + b, err := h.HasAlias(alias.Name) + assert.NoError(t, err) + assert.True(t, b) + }) + + t.Run("second create", func(t *testing.T) { + alias := makeAlias("esch-alias-2create") + h := newESClientHandler(t) + + err := h.CreateAlias(alias) + assert.NoError(t, err) + + err = h.CreateAlias(alias) + require.Error(t, err) + assert.Equal(t, ilm.ErrAliasAlreadyExists, ilm.ErrReason(err)) + + b, err := h.HasAlias(alias.Name) + assert.NoError(t, err) + assert.True(t, b) + }) +} + +func newESClientHandler(t *testing.T) ilm.APIHandler { + client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ + URL: getURL(), + Index: outil.MakeSelector(), + Username: getUser(), + Password: getUser(), + Timeout: 60 * time.Second, + CompressionLevel: 3, + }, nil) + if err != nil { + t.Fatal(err) + } + + if err := client.Connect(); err != nil { + t.Fatalf("Failed to connect to Test Elasticsearch instance: %v", err) + } + + return ilm.ESClientHandler(client) +} + +func makeName(base string) string { + id, err := uuid.NewV4() + if err != nil { + panic(err) + } + return fmt.Sprintf("%v-%v", base, id.String()) +} + +func getURL() string { + return fmt.Sprintf("http://%v:%v", getEsHost(), getEsPort()) +} + +// GetEsHost returns the Elasticsearch testing host. +func getEsHost() string { + return getEnv("ES_HOST", ElasticsearchDefaultHost) +} + +// GetEsPort returns the Elasticsearch testing port. +func getEsPort() string { + return getEnv("ES_PORT", ElasticsearchDefaultPort) +} + +// GetUser returns the Elasticsearch testing user. +func getUser() string { return getEnv("ES_USER", "") } + +// GetPass returns the Elasticsearch testing user's password. +func getPass() string { return getEnv("ES_PASS", "") } + +func getEnv(name, def string) string { + if v := os.Getenv(name); v != "" { + return v + } + return def +} diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/ilm/ilm.go new file mode 100644 index 00000000000..6ca833b978f --- /dev/null +++ b/libbeat/idxmgmt/ilm/ilm.go @@ -0,0 +1,147 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "encoding/json" + "io/ioutil" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/logp" +) + +// SupportFactory is used to define a policy type to be used. +type SupportFactory func(*logp.Logger, beat.Info, *common.Config) (Supporter, error) + +// Supporter implements ILM support. For loading the policies and creating +// write alias a manager instance must be generated. +type Supporter interface { + Mode() Mode + Alias() Alias + Policy() Policy + Manager(h APIHandler) Manager +} + +// Manager uses an APIHandler to install a policy. +type Manager interface { + Enabled() (bool, error) + EnsureAlias() error + EnsurePolicy(overwrite bool) error +} + +// APIHandler defines the interface between a remote service and the Manager. +type APIHandler interface { + ILMEnabled(Mode) (bool, error) + + HasAlias(name string) (bool, error) + CreateAlias(alias Alias) error + + HasILMPolicy(name string) (bool, error) + CreateILMPolicy(policy Policy) error +} + +// Policy describes a policy to be loaded into Elasticsearch. +// See: [Policy phases and actions documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/ilm-policy-definition.html). +type Policy struct { + Name string + Body map[string]interface{} +} + +// Alias describes the alias to be created in Elasticsearch. +type Alias struct { + Name string + Pattern string +} + +// DefaultSupport configures a new default ILM support implementation. +func DefaultSupport(log *logp.Logger, info beat.Info, config *common.Config) (Supporter, error) { + if log == nil { + log = logp.NewLogger("ilm") + } else { + log = log.Named("ilm") + } + + cfg := defaultConfig(info) + if config != nil { + if err := config.Unpack(&cfg); err != nil { + return nil, err + } + } + + if cfg.Mode == ModeDisabled { + return NoopSupport(info, config) + } + + name, err := applyStaticFmtstr(info, &cfg.PolicyName) + if err != nil { + return nil, errors.Wrap(err, "failed to read ilm policy name") + } + + alias := Alias{ + Name: cfg.RolloverAlias, + Pattern: cfg.Pattern, + } + + policy := Policy{ + Name: name, + Body: DefaultPolicy, + } + if path := cfg.PolicyFile; path != "" { + contents, err := ioutil.ReadFile(path) + if err != nil { + return nil, errors.Wrapf(err, "failed to read policy file '%v'", path) + } + + var body map[string]interface{} + if err := json.Unmarshal(contents, &body); err != nil { + return nil, errors.Wrapf(err, "failed to decode policy file '%v'", path) + } + + policy.Body = body + } + + log.Infof("Policy name: %v", name) + return NewDefaultSupport(log, cfg.Mode, alias, policy, cfg.Overwrite, cfg.CheckExists), nil +} + +func applyStaticFmtstr(info beat.Info, fmt *fmtstr.EventFormatString) (string, error) { + return fmt.Run(&beat.Event{ + Fields: common.MapStr{ + // beat object was left in for backward compatibility reason for older configs. + "beat": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + "agent": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + // For the Beats that have an observer role + "observer": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + }, + Timestamp: time.Now(), + }) +} diff --git a/libbeat/idxmgmt/ilm/ilm_test.go b/libbeat/idxmgmt/ilm/ilm_test.go new file mode 100644 index 00000000000..e4a6ccf2864 --- /dev/null +++ b/libbeat/idxmgmt/ilm/ilm_test.go @@ -0,0 +1,279 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestDefaultSupport_Init(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} + + t.Run("mode from config", func(t *testing.T) { + cases := map[string]Mode{ + "true": ModeEnabled, + "false": ModeDisabled, + "auto": ModeAuto, + } + for setting, expected := range cases { + expected := expected + t.Run(setting, func(t *testing.T) { + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "enabled": setting, + "rollover_alias": "test", + }) + + s, err := DefaultSupport(nil, info, cfg) + require.NoError(t, err) + assert.Equal(t, expected, s.Mode()) + }) + } + }) + + t.Run("with custom config", func(t *testing.T) { + tmp, err := DefaultSupport(nil, info, common.MustNewConfigFrom( + map[string]interface{}{ + "enabled": true, + "name": "test-%{[agent.version]}", + "rollover_alias": "alias", + "pattern": "01", + "check_exists": false, + "overwrite": true, + }, + )) + require.NoError(t, err) + + s := tmp.(*ilmSupport) + assert := assert.New(t) + assert.Equal(true, s.overwrite) + assert.Equal(false, s.checkExists) + assert.Equal(ModeEnabled, s.Mode()) + assert.Equal(DefaultPolicy, common.MapStr(s.Policy().Body)) + assert.Equal(Alias{Name: "alias", Pattern: "01"}, s.Alias()) + }) + + t.Run("load external policy", func(t *testing.T) { + s, err := DefaultSupport(nil, info, common.MustNewConfigFrom( + map[string]interface{}{ + "policy_file": "testfiles/custom.json", + }, + )) + require.NoError(t, err) + assert.Equal(t, map[string]interface{}{"hello": "world"}, s.Policy().Body) + }) +} + +func TestDefaultSupport_Manager_Enabled(t *testing.T) { + cases := map[string]struct { + calls []onCall + cfg map[string]interface{} + b bool + fail error + err bool + }{ + "disabled via config": { + cfg: map[string]interface{}{"enabled": false}, + }, + "disabled via handler": { + calls: []onCall{ + onILMEnabled(ModeAuto).Return(false, nil), + }, + }, + "enabled via handler": { + calls: []onCall{ + onILMEnabled(ModeAuto).Return(true, nil), + }, + b: true, + }, + "handler confirms enabled flag": { + calls: []onCall{ + onILMEnabled(ModeEnabled).Return(true, nil), + }, + cfg: map[string]interface{}{"enabled": true}, + b: true, + }, + "fail enabled": { + calls: []onCall{ + onILMEnabled(ModeEnabled).Return(false, nil), + }, + cfg: map[string]interface{}{"enabled": true}, + fail: ErrESILMDisabled, + }, + "io error": { + calls: []onCall{ + onILMEnabled(ModeAuto).Return(false, errors.New("ups")), + }, + cfg: map[string]interface{}{}, + err: true, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + cfg := test.cfg + if cfg == nil { + cfg = map[string]interface{}{} + } + + h := newMockHandler(test.calls...) + m := createManager(t, h, test.cfg) + b, err := m.Enabled() + + if test.fail == nil && !test.err { + require.NoError(t, err) + } + if test.err || test.fail != nil { + require.Error(t, err) + } + if test.fail != nil { + assert.Equal(t, test.fail, ErrReason(err)) + } + + assert.Equal(t, test.b, b) + h.AssertExpectations(t) + }) + } +} + +func TestDefaultSupport_Manager_EnsureAlias(t *testing.T) { + alias := Alias{ + Name: "test-9.9.9", + Pattern: ilmDefaultPattern, + } + + cases := map[string]struct { + calls []onCall + cfg map[string]interface{} + fail error + }{ + "create new alias": { + calls: []onCall{ + onHasAlias(alias.Name).Return(false, nil), + onCreateAlias(alias).Return(nil), + }, + }, + "alias already exists": { + calls: []onCall{ + onHasAlias(alias.Name).Return(true, nil), + }, + }, + "fail": { + calls: []onCall{ + onHasAlias(alias.Name).Return(false, nil), + onCreateAlias(alias).Return(errOf(ErrRequestFailed)), + }, + fail: ErrRequestFailed, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + cfg := test.cfg + if cfg == nil { + cfg = map[string]interface{}{"alias": "test"} + } + + h := newMockHandler(test.calls...) + m := createManager(t, h, test.cfg) + err := m.EnsureAlias() + + if test.fail == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.fail, ErrReason(err)) + } + h.AssertExpectations(t) + }) + } +} + +func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) { + testPolicy := Policy{ + Name: "test-9.9.9", + Body: DefaultPolicy, + } + + cases := map[string]struct { + calls []onCall + overwrite bool + cfg map[string]interface{} + fail error + }{ + "create new policy": { + calls: []onCall{ + onHasILMPolicy(testPolicy.Name).Return(false, nil), + onCreateILMPolicy(testPolicy).Return(nil), + }, + }, + "policy already exists": { + calls: []onCall{ + onHasILMPolicy(testPolicy.Name).Return(true, nil), + }, + }, + "overwrite existing": { + overwrite: true, + calls: []onCall{ + onCreateILMPolicy(testPolicy).Return(nil), + }, + }, + "fail": { + calls: []onCall{ + onHasILMPolicy(testPolicy.Name).Return(false, nil), + onCreateILMPolicy(testPolicy).Return(errOf(ErrRequestFailed)), + }, + fail: ErrRequestFailed, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + cfg := test.cfg + if cfg == nil { + cfg = map[string]interface{}{"name": "test"} + } + + h := newMockHandler(test.calls...) + m := createManager(t, h, test.cfg) + err := m.EnsurePolicy(test.overwrite) + + if test.fail == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.fail, ErrReason(err)) + } + h.AssertExpectations(t) + }) + } +} + +func createManager(t *testing.T, h APIHandler, cfg map[string]interface{}) Manager { + info := beat.Info{Beat: "test", Version: "9.9.9"} + s, err := DefaultSupport(nil, info, common.MustNewConfigFrom(cfg)) + require.NoError(t, err) + return s.Manager(h) +} diff --git a/libbeat/idxmgmt/ilm/mockapihandler_test.go b/libbeat/idxmgmt/ilm/mockapihandler_test.go new file mode 100644 index 00000000000..6c1313b3091 --- /dev/null +++ b/libbeat/idxmgmt/ilm/mockapihandler_test.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "github.com/stretchr/testify/mock" +) + +type mockHandler struct { + mock.Mock +} + +type onCall struct { + name string + args []interface{} + returns []interface{} +} + +func (c onCall) Return(values ...interface{}) onCall { + c.returns = values + return c +} + +func newMockHandler(calls ...onCall) *mockHandler { + m := &mockHandler{} + for _, c := range calls { + m.On(c.name, c.args...).Return(c.returns...) + } + return m +} + +func onILMEnabled(m Mode) onCall { return makeOnCall("ILMEnabled", m) } +func (h *mockHandler) ILMEnabled(mode Mode) (bool, error) { + args := h.Called(mode) + return args.Bool(0), args.Error(1) +} + +func onHasAlias(name string) onCall { return makeOnCall("HasAlias", name) } +func (h *mockHandler) HasAlias(name string) (bool, error) { + args := h.Called(name) + return args.Bool(0), args.Error(1) +} + +func onCreateAlias(alias Alias) onCall { return makeOnCall("CreateAlias", alias) } +func (h *mockHandler) CreateAlias(alias Alias) error { + args := h.Called(alias) + return args.Error(0) +} + +func onHasILMPolicy(name string) onCall { return makeOnCall("HasILMPolicy", name) } +func (h *mockHandler) HasILMPolicy(name string) (bool, error) { + args := h.Called(name) + return args.Bool(0), args.Error(1) +} + +func onCreateILMPolicy(policy Policy) onCall { return makeOnCall("CreateILMPolicy", policy) } +func (h *mockHandler) CreateILMPolicy(policy Policy) error { + args := h.Called(policy) + return args.Error(0) +} + +func makeOnCall(name string, args ...interface{}) onCall { + return onCall{name: name, args: args} +} diff --git a/libbeat/idxmgmt/ilm/noop.go b/libbeat/idxmgmt/ilm/noop.go new file mode 100644 index 00000000000..129ca4c9eab --- /dev/null +++ b/libbeat/idxmgmt/ilm/noop.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +type noopSupport struct{} +type noopManager struct{} + +// NoopSupport creates a noop ILM implementation with ILM support being always +// disabled. Attempts to install a policy or create a write alias will fail. +func NoopSupport(info beat.Info, config *common.Config) (Supporter, error) { + return (*noopSupport)(nil), nil +} + +func (*noopSupport) Mode() Mode { return ModeDisabled } +func (*noopSupport) Alias() Alias { return Alias{} } +func (*noopSupport) Policy() Policy { return Policy{} } +func (*noopSupport) Manager(_ APIHandler) Manager { return (*noopManager)(nil) } + +func (*noopManager) Enabled() (bool, error) { return false, nil } +func (*noopManager) EnsureAlias() error { return errOf(ErrOpNotAvailable) } +func (*noopManager) EnsurePolicy(_ bool) error { return errOf(ErrOpNotAvailable) } diff --git a/libbeat/idxmgmt/ilm/std.go b/libbeat/idxmgmt/ilm/std.go new file mode 100644 index 00000000000..8c55e471136 --- /dev/null +++ b/libbeat/idxmgmt/ilm/std.go @@ -0,0 +1,141 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ilm + +import ( + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +type ilmSupport struct { + log *logp.Logger + + mode Mode + overwrite bool + checkExists bool + + alias Alias + policy Policy +} + +type singlePolicyManager struct { + *ilmSupport + client APIHandler + + // cached info + cache infoCache +} + +type infoCache struct { + LastUpdate time.Time + Enabled bool +} + +var defaultCacheDuration = 5 * time.Minute + +// NewDefaultSupport creates an instance of default ILM support implementation. +func NewDefaultSupport( + log *logp.Logger, + mode Mode, + alias Alias, + policy Policy, + overwrite, checkExists bool, +) Supporter { + return &ilmSupport{ + log: log, + mode: mode, + overwrite: overwrite, + checkExists: checkExists, + alias: alias, + policy: policy, + } +} + +func (s *ilmSupport) Mode() Mode { return s.mode } +func (s *ilmSupport) Alias() Alias { return s.alias } +func (s *ilmSupport) Policy() Policy { return s.policy } + +func (s *ilmSupport) Manager(h APIHandler) Manager { + return &singlePolicyManager{ + client: h, + ilmSupport: s, + } +} + +func (m *singlePolicyManager) Enabled() (bool, error) { + if m.mode == ModeDisabled { + return false, nil + } + + if m.cache.Valid() { + return m.cache.Enabled, nil + } + + enabled, err := m.client.ILMEnabled(m.mode) + if err != nil { + return enabled, err + } + + if !enabled && m.mode == ModeEnabled { + return false, errOf(ErrESILMDisabled) + } + + m.cache.Enabled = enabled + m.cache.LastUpdate = time.Now() + return enabled, nil +} + +func (m *singlePolicyManager) EnsureAlias() error { + b, err := m.client.HasAlias(m.alias.Name) + if err != nil { + return err + } + if b { + return nil + } + + // This always assume it's a date pattern by sourrounding it by <...> + return m.client.CreateAlias(m.alias) +} + +func (m *singlePolicyManager) EnsurePolicy(overwrite bool) error { + log := m.log + overwrite = overwrite || m.overwrite + + exists := true + if m.checkExists && !overwrite { + b, err := m.client.HasILMPolicy(m.policy.Name) + if err != nil { + return err + } + exists = b + } + + if !exists || overwrite { + return m.client.CreateILMPolicy(m.policy) + } + + log.Infof("do not generate ilm policy: exists=%v, overwrite=%v", + exists, overwrite) + return nil +} + +func (c *infoCache) Valid() bool { + return !c.LastUpdate.IsZero() && time.Since(c.LastUpdate) < defaultCacheDuration +} diff --git a/libbeat/idxmgmt/ilm/testfiles/custom.json b/libbeat/idxmgmt/ilm/testfiles/custom.json new file mode 100644 index 00000000000..56c8e280338 --- /dev/null +++ b/libbeat/idxmgmt/ilm/testfiles/custom.json @@ -0,0 +1 @@ +{"hello": "world"} diff --git a/libbeat/idxmgmt/mockilm_test.go b/libbeat/idxmgmt/mockilm_test.go new file mode 100644 index 00000000000..5aac9419ace --- /dev/null +++ b/libbeat/idxmgmt/mockilm_test.go @@ -0,0 +1,96 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +import ( + "github.com/stretchr/testify/mock" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/logp" +) + +type mockILMSupport struct { + mock.Mock +} + +type onCall struct { + name string + args []interface{} + returns []interface{} +} + +func makeMockILMSupport(calls ...onCall) ilm.SupportFactory { + return func(_ *logp.Logger, _ beat.Info, _ *common.Config) (ilm.Supporter, error) { + m := &mockILMSupport{} + for _, c := range calls { + m.On(c.name, c.args...).Return(c.returns...) + } + return m, nil + } +} + +func (c onCall) Return(values ...interface{}) onCall { + c.returns = values + return c +} + +func onMode() onCall { return makeOnCall("Mode") } +func (m *mockILMSupport) Mode() ilm.Mode { + args := m.Called() + return args.Get(0).(ilm.Mode) +} + +func onAlias() onCall { return makeOnCall("Alias") } +func (m *mockILMSupport) Alias() ilm.Alias { + args := m.Called() + return args.Get(0).(ilm.Alias) +} + +func onPolicy() onCall { return makeOnCall("Policy") } +func (m *mockILMSupport) Policy() ilm.Policy { + args := m.Called() + return args.Get(0).(ilm.Policy) +} + +func (m *mockILMSupport) Manager(_ ilm.APIHandler) ilm.Manager { + return m +} + +func onEnabled() onCall { return makeOnCall("Enabled") } +func (m *mockILMSupport) Enabled() (bool, error) { + args := m.Called() + return args.Bool(0), args.Error(1) +} + +func onEnsureAlias() onCall { return makeOnCall("EnsureAlias") } +func (m *mockILMSupport) EnsureAlias() error { + args := m.Called() + return args.Error(0) +} + +func onEnsurePolicy() onCall { return makeOnCall("EnsurePolicy") } +func (m *mockILMSupport) EnsurePolicy(overwrite bool) error { + args := m.Called() + return args.Error(0) +} + +func makeOnCall(name string, args ...interface{}) onCall { + return onCall{name: name, args: args} +} diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go new file mode 100644 index 00000000000..a2e8149e7bb --- /dev/null +++ b/libbeat/idxmgmt/std.go @@ -0,0 +1,403 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +import ( + "errors" + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/atomic" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/outputs/outil" + "github.com/elastic/beats/libbeat/template" +) + +type indexSupport struct { + log *logp.Logger + ilm ilm.Supporter + info beat.Info + migration bool + templateCfg template.TemplateConfig + defaultIndex string + + st indexState +} + +type indexState struct { + withILM atomic.Bool +} + +type indexManager struct { + support *indexSupport + ilm ilm.Manager + + client ESClient + assets Asseter +} + +type indexSelector outil.Selector + +type ilmIndexSelector struct { + index outil.Selector + alias outil.Selector + st *indexState +} + +func newIndexSupport( + log *logp.Logger, + info beat.Info, + ilmFactory ilm.SupportFactory, + tmplConfig *common.Config, + ilmConfig *common.Config, + migration bool, +) (*indexSupport, error) { + if ilmFactory == nil { + ilmFactory = ilm.DefaultSupport + } + + ilm, err := ilmFactory(log, info, ilmConfig) + if err != nil { + return nil, err + } + + tmplCfg, err := unpackTemplateConfig(tmplConfig) + if err != nil { + return nil, err + } + + return &indexSupport{ + log: log, + ilm: ilm, + info: info, + templateCfg: tmplCfg, + migration: migration, + defaultIndex: fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", info.IndexPrefix, info.Version), + }, nil +} + +func (s *indexSupport) Enabled() bool { + return s.templateCfg.Enabled || (s.ilm.Mode() != ilm.ModeDisabled) +} + +func (s *indexSupport) ILM() ilm.Supporter { + return s.ilm +} + +func (s *indexSupport) TemplateConfig(withILM bool) (template.TemplateConfig, error) { + log := s.log + + cfg := s.templateCfg + if withILM { + if mode := s.ilm.Mode(); mode == ilm.ModeDisabled { + withILM = false + } else if mode == ilm.ModeEnabled { + withILM = true + } + } + + var err error + if withILM { + cfg, err = applyILMSettings(log, cfg, s.ilm.Policy(), s.ilm.Alias()) + } + return cfg, err +} + +func (s *indexSupport) Manager( + client ESClient, + assets Asseter, +) Manager { + ilm := s.ilm.Manager(ilm.ESClientHandler(client)) + return &indexManager{ + support: s, + ilm: ilm, + client: client, + assets: assets, + } +} + +func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector, error) { + var err error + log := s.log + + // we construct our own configuration object based on the available settings + // in cfg and defaultIndex. The configuration object provided must not be + // modified. + selCfg := common.NewConfig() + if cfg.HasField("indices") { + sub, err := cfg.Child("indices", -1) + if err != nil { + return nil, err + } + selCfg.SetChild("indices", -1, sub) + } + + var indexName string + if cfg.HasField("index") { + indexName, err = cfg.String("index", -1) + if err != nil { + return nil, err + } + } + + var alias string + mode := s.ilm.Mode() + if mode != ilm.ModeDisabled { + alias = s.ilm.Alias().Name + log.Infof("Set %v to '%s' as ILM is enabled.", cfg.PathOf("index"), alias) + } + if mode == ilm.ModeEnabled { + indexName = alias + } + + // no index name configuration found yet -> define default index name based on + // beat.Info provided to the indexSupport on during setup. + if indexName == "" { + indexName = s.defaultIndex + } + + selCfg.SetString("index", -1, indexName) + buildSettings := outil.Settings{ + Key: "index", + MultiKey: "indices", + EnableSingleOnly: true, + FailEmpty: mode != ilm.ModeEnabled, + } + + indexSel, err := outil.BuildSelectorFromConfig(selCfg, buildSettings) + if err != nil { + return nil, err + } + + if mode != ilm.ModeAuto { + return indexSelector(indexSel), nil + } + + selCfg.SetString("index", -1, alias) + aliasSel, err := outil.BuildSelectorFromConfig(selCfg, buildSettings) + return &ilmIndexSelector{ + index: indexSel, + alias: aliasSel, + st: &s.st, + }, nil +} + +func (m *indexManager) Setup(template, policy bool) error { + return m.load(template, policy) +} + +func (m *indexManager) Load() error { + return m.load(false, false) +} + +func (m *indexManager) load(forceTemplate, forcePolicy bool) error { + var err error + log := m.support.log + + withILM := m.support.st.withILM.Load() + if !withILM { + withILM, err = m.ilm.Enabled() + if err != nil { + return err + } + + if withILM { + log.Info("Auto ILM enable success.") + } + } + + // mark ILM as enabled in indexState if withILM is true + if withILM { + m.support.st.withILM.CAS(false, withILM) + } + + // install ilm policy + if withILM { + if err := m.ilm.EnsurePolicy(forcePolicy); err != nil { + return err + } + log.Info("ILM policy successfully loaded.") + } + + // create and install template + if m.support.templateCfg.Enabled { + tmplCfg := m.support.templateCfg + if withILM { + ilm := m.support.ilm + tmplCfg, err = applyILMSettings(log, tmplCfg, ilm.Policy(), ilm.Alias()) + if err != nil { + return err + } + } + + if forceTemplate { + tmplCfg.Overwrite = true + } + + fields := m.assets.Fields(m.support.info.Beat) + loader, err := template.NewLoader(tmplCfg, m.client, m.support.info, fields, m.support.migration) + if err != nil { + return fmt.Errorf("Error creating Elasticsearch template loader: %v", err) + } + + err = loader.Load() + if err != nil { + return fmt.Errorf("Error loading Elasticsearch template: %v", err) + } + + log.Info("Loaded index template.") + } + + // create alias + if withILM { + if err := m.ilm.EnsureAlias(); err != nil { + if ilm.ErrReason(err) != ilm.ErrAliasAlreadyExists { + return err + } + log.Info("Write alias exists already") + } else { + log.Info("Write alias successfully generated.") + } + } + + return nil +} + +func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { + if idx := getEventCustomIndex(evt); idx != "" { + return idx, nil + } + + if s.st.withILM.Load() { + idx, err := s.alias.Select(evt) + return idx, err + } + + idx, err := s.index.Select(evt) + return idx, err +} + +func (s indexSelector) Select(evt *beat.Event) (string, error) { + if idx := getEventCustomIndex(evt); idx != "" { + return idx, nil + } + return outil.Selector(s).Select(evt) +} + +func getEventCustomIndex(evt *beat.Event) string { + if len(evt.Meta) == 0 { + return "" + } + + if tmp := evt.Meta["alias"]; tmp != nil { + if alias, ok := tmp.(string); ok { + return alias + } + } + + if tmp := evt.Meta["index"]; tmp != nil { + if idx, ok := tmp.(string); ok { + ts := evt.Timestamp.UTC() + return fmt.Sprintf("%s-%d.%02d.%02d", + idx, ts.Year(), ts.Month(), ts.Day()) + } + } + + return "" +} + +func unpackTemplateConfig(cfg *common.Config) (config template.TemplateConfig, err error) { + config = template.DefaultConfig() + if cfg != nil { + err = cfg.Unpack(&config) + } + return config, err +} + +func applyILMSettings( + log *logp.Logger, + tmpl template.TemplateConfig, + policy ilm.Policy, + alias ilm.Alias, +) (template.TemplateConfig, error) { + if !tmpl.Enabled { + return tmpl, nil + } + + if alias.Name == "" { + return tmpl, errors.New("no ilm rollover alias configured") + } + + if policy.Name == "" { + return tmpl, errors.New("no ilm policy name configured") + } + + tmpl.Name = alias.Name + if log != nil { + log.Infof("Set setup.template.name to '%s' as ILM is enabled.", alias) + } + + tmpl.Pattern = fmt.Sprintf("%s-*", alias.Name) + if log != nil { + log.Infof("Set setup.template.pattern to '%s' as ILM is enabled.", tmpl.Pattern) + } + + // rollover_alias and lifecycle.name can't be configured and will be overwritten + + // init/copy index settings + idxSettings := tmpl.Settings.Index + if idxSettings == nil { + idxSettings = map[string]interface{}{} + } else { + tmp := make(map[string]interface{}, len(idxSettings)) + for k, v := range idxSettings { + tmp[k] = v + } + idxSettings = tmp + } + tmpl.Settings.Index = idxSettings + + // init/copy index.lifecycle settings + var lifecycle map[string]interface{} + if ifcLifecycle := idxSettings["lifecycle"]; ifcLifecycle == nil { + lifecycle = map[string]interface{}{} + } else if tmp, ok := ifcLifecycle.(map[string]interface{}); ok { + lifecycle = make(map[string]interface{}, len(tmp)) + for k, v := range tmp { + lifecycle[k] = v + } + } else { + return tmpl, errors.New("settings.index.lifecycle must be an object") + } + idxSettings["lifecycle"] = lifecycle + + // add rollover_alias and name to index.lifecycle settings + if _, exists := lifecycle["rollover_alias"]; !exists { + log.Infof("Set settings.index.lifecycle.rollover_alias in template to %s as ILM is enabled.", alias) + lifecycle["rollover_alias"] = alias.Name + } + if _, exists := lifecycle["name"]; !exists { + log.Infof("Set settings.index.lifecycle.name in template to %s as ILM is enabled.", policy) + lifecycle["name"] = policy.Name + } + + return tmpl, nil +} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 66598fd6c53..c91d420674d 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -53,6 +53,7 @@ func init() { } func makeConsole( + _ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config, diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 18dc9683345..5230f3c57a3 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -43,7 +43,7 @@ type Client struct { Connection tlsConfig *transport.TLSConfig - index outil.Selector + index outputs.IndexSelector pipeline *outil.Selector params map[string]string timeout time.Duration @@ -70,7 +70,7 @@ type ClientSettings struct { EscapeHTML bool Parameters map[string]string Headers map[string]string - Index outil.Selector + Index outputs.IndexSelector Pipeline *outil.Selector Timeout time.Duration CompressionLevel int @@ -366,7 +366,7 @@ func (client *Client) publishEvents( // successfully added to bulk request. func bulkEncodePublishRequest( body bulkWriter, - index outil.Selector, + index outputs.IndexSelector, pipeline *outil.Selector, eventType string, data []publisher.Event, @@ -390,7 +390,7 @@ func bulkEncodePublishRequest( } func createEventBulkMeta( - indexSel outil.Selector, + indexSel outputs.IndexSelector, pipelineSel *outil.Selector, eventType string, event *beat.Event, @@ -401,7 +401,7 @@ func createEventBulkMeta( return nil, err } - index, err := getIndex(event, indexSel) + index, err := indexSel.Select(event) if err != nil { err := fmt.Errorf("failed to select event index: %v", err) return nil, err @@ -447,24 +447,6 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) return "", nil } -// getIndex returns the full index name -// Index is either defined in the config as part of the output -// or can be overload by the event through setting index -func getIndex(event *beat.Event, index outil.Selector) (string, error) { - if event.Meta != nil { - if str, exists := event.Meta["index"]; exists { - idx, ok := str.(string) - if ok { - ts := event.Timestamp.UTC() - return fmt.Sprintf("%s-%d.%02d.%02d", - idx, ts.Year(), ts.Month(), ts.Day()), nil - } - } - } - - return index.Select(event) -} - // bulkCollectPublishFails checks per item errors returning all events // to be tried again due to error code returned for that items. If indexing an // event failed due to some error in the event itself (e.g. does not respect mapping), diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 8a3a7d5f5b9..fb690c23c9f 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/elasticsearch/internal" @@ -262,7 +263,9 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) { t.Fatal(err) } - output, err := makeES(beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), config) + info := beat.Info{Beat: "libbeat"} + im, _ := idxmgmt.DefaultSupport(nil, info, nil) + output, err := makeES(im, info, outputs.NewNilObserver(), config) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 3d626935caa..27825c65353 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/outest" "github.com/elastic/beats/libbeat/outputs/outil" @@ -189,47 +189,6 @@ func TestCollectPipelinePublishFail(t *testing.T) { assert.Equal(t, events, res) } -func TestGetIndexStandard(t *testing.T) { - ts := time.Now().UTC() - extension := fmt.Sprintf("%d.%02d.%02d", ts.Year(), ts.Month(), ts.Day()) - fields := common.MapStr{"field": 1} - - pattern := "beatname-%{+yyyy.MM.dd}" - fmtstr := fmtstr.MustCompileEvent(pattern) - indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, "")) - - event := &beat.Event{Timestamp: ts, Fields: fields} - index, _ := getIndex(event, indexSel) - assert.Equal(t, index, "beatname-"+extension) -} - -func TestGetIndexOverwrite(t *testing.T) { - time := time.Now().UTC() - extension := fmt.Sprintf("%d.%02d.%02d", time.Year(), time.Month(), time.Day()) - - fields := common.MapStr{ - "@timestamp": common.Time(time), - "field": 1, - "beat": common.MapStr{ - "name": "testbeat", - }, - } - - pattern := "beatname-%%{+yyyy.MM.dd}" - fmtstr := fmtstr.MustCompileEvent(pattern) - indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, "")) - - event := &beat.Event{ - Timestamp: time, - Meta: map[string]interface{}{ - "index": "dynamicindex", - }, - Fields: fields} - index, _ := getIndex(event, indexSel) - expected := "dynamicindex-" + extension - assert.Equal(t, expected, index) -} - func BenchmarkCollectPublishFailsNone(b *testing.B) { response := []byte(` { "items": [ @@ -406,11 +365,15 @@ func TestBulkEncodeEvents(t *testing.T) { test := test t.Run(name, func(t *testing.T) { cfg := common.MustNewConfigFrom(test.config) - - index, pipeline, err := buildSelectors(beat.Info{ + info := beat.Info{ IndexPrefix: "test", Version: version.GetDefaultVersion(), - }, cfg) + } + + im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig()) + require.NoError(t, err) + + index, pipeline, err := buildSelectors(im, info, cfg) require.NoError(t, err) events := make([]publisher.Event, len(test.events)) diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 0c9dd08537c..f604190c31c 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -100,6 +100,7 @@ func DeregisterConnectCallback(key uuid.UUID) { } func makeES( + im outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config, @@ -108,7 +109,7 @@ func makeES( cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } - index, pipeline, err := buildSelectors(beat, cfg) + index, pipeline, err := buildSelectors(im, beat, cfg) if err != nil { return outputs.Fail(err) } @@ -177,20 +178,11 @@ func makeES( } func buildSelectors( + im outputs.IndexManager, beat beat.Info, cfg *common.Config, -) (index outil.Selector, pipeline *outil.Selector, err error) { - if !cfg.HasField("index") { - pattern := fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", beat.IndexPrefix, beat.Version) - cfg.SetString("index", -1, pattern) - } - - index, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{ - Key: "index", - MultiKey: "indices", - EnableSingleOnly: true, - FailEmpty: true, - }) +) (index outputs.IndexSelector, pipeline *outil.Selector, err error) { + index, err = im.BuildSelector(cfg) if err != nil { return index, pipeline, err } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 1b05fbb783f..ab5b040bea5 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -44,6 +44,7 @@ type fileOutput struct { // makeFileout instantiates a new file output instance. func makeFileout( + _ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index d9f9e86ac18..dc34e77c558 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -70,6 +70,7 @@ func kafkaMetricsRegistry() gometrics.Registry { } func makeKafka( + _ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config, diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 78df0a12a08..4d85003a719 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -199,7 +199,7 @@ func TestKafkaPublish(t *testing.T) { } t.Run(name, func(t *testing.T) { - grp, err := makeKafka(beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), cfg) + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), cfg) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 02928ad0d08..0c14bf5882b 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -40,6 +40,7 @@ func init() { } func makeLogstash( + _ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config, diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index ba00c59de13..ead21295c76 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/elasticsearch" "github.com/elastic/beats/libbeat/outputs/outest" @@ -175,7 +176,17 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { "template.enabled": false, }) - grp, err := plugin(beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), config) + info := beat.Info{Beat: "libbeat"} + im, err := idxmgmt.DefaultSupport(nil, info, common.MustNewConfigFrom( + map[string]interface{}{ + "setup.ilm.enabled": false, + }, + )) + if err != nil { + t.Fatal("init index management:", err) + } + + grp, err := plugin(im, info, outputs.NewNilObserver(), config) if err != nil { t.Fatalf("init elasticsearch output plugin failed: %v", err) } diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 9c94a18ae1d..7c1552fcb83 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -179,7 +179,7 @@ func newTestLumberjackOutput( } cfg, _ := common.NewConfigFrom(config) - grp, err := outputs.Load(beat.Info{}, nil, "logstash", cfg) + grp, err := outputs.Load(nil, beat.Info{}, nil, "logstash", cfg) if err != nil { t.Fatalf("init logstash output plugin failed: %v", err) } diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 5b669c9413b..6625f1125b1 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -28,10 +28,25 @@ var outputReg = map[string]Factory{} // Factory is used by output plugins to build an output instance type Factory func( + im IndexManager, beat beat.Info, stats Observer, cfg *common.Config) (Group, error) +// IndexManager provides additional index related services to the outputs. +type IndexManager interface { + // BuildSelector can be used by an output to create an IndexSelector based on + // the outputs configuration. + // The defaultIndex is interpreted as format string and used as default fallback + // if no index is configured or all indices are guarded using conditionals. + BuildSelector(cfg *common.Config) (IndexSelector, error) +} + +// IndexSelector is used to find the index name an event shall be indexed to. +type IndexSelector interface { + Select(event *beat.Event) (string, error) +} + // Group configures and combines multiple clients into load-balanced group of clients // being managed by the publisher pipeline. type Group struct { @@ -54,7 +69,13 @@ func FindFactory(name string) Factory { } // Load creates and configures a output Group using a configuration object.. -func Load(info beat.Info, stats Observer, name string, config *common.Config) (Group, error) { +func Load( + im IndexManager, + info beat.Info, + stats Observer, + name string, + config *common.Config, +) (Group, error) { factory := FindFactory(name) if factory == nil { return Group{}, fmt.Errorf("output type %v undefined", name) @@ -63,5 +84,5 @@ func Load(info beat.Info, stats Observer, name string, config *common.Config) (G if stats == nil { stats = NewNilObserver() } - return factory(info, stats, config) + return factory(im, info, stats, config) } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 66bcaa0e72d..97a6b9fdafa 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -49,6 +49,7 @@ func init() { } func makeRedis( + _ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config, diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index 57f491caa43..bf0fcf0ac21 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -289,7 +289,7 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) outputs.Cli t.Fatalf("redis output module not registered") } - out, err := plugin(beat.Info{Beat: testBeatname, Version: testBeatversion}, outputs.NewNilObserver(), config) + out, err := plugin(nil, beat.Info{Beat: testBeatname, Version: testBeatversion}, outputs.NewNilObserver(), config) if err != nil { t.Fatalf("Failed to initialize redis output: %v", err) } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 72e4c1487a4..cbd1b520537 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -146,16 +146,22 @@ func makeWorkQueue() workQueue { } // Reload the output -func (c *outputController) Reload(cfg *reload.ConfigWithMeta) error { - outputCfg := common.ConfigNamespace{} - +func (c *outputController) Reload( + cfg *reload.ConfigWithMeta, + outFactory func(outputs.Observer, common.ConfigNamespace) (outputs.Group, error), +) error { + outCfg := common.ConfigNamespace{} if cfg != nil { - if err := cfg.Config.Unpack(&outputCfg); err != nil { + if err := cfg.Config.Unpack(&outCfg); err != nil { return err } } - output, err := loadOutput(c.beat, c.monitors, outputCfg) + output, err := loadOutput(c.monitors, func(stats outputs.Observer) (string, outputs.Group, error) { + name := outCfg.Name() + out, err := outFactory(stats, outCfg) + return name, out, err + }) if err != nil { return err } diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index e9f50779d02..2c854c13676 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -45,6 +45,11 @@ type Monitors struct { Logger *logp.Logger } +// OutputFactory is used by the publisher pipeline to create an output instance. +// If the group returned can be empty. The pipeline will accept events, but +// eventually block. +type OutputFactory func(outputs.Observer) (string, outputs.Group, error) + func init() { flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing") } @@ -55,7 +60,7 @@ func Load( beatInfo beat.Info, monitors Monitors, config Config, - outcfg common.ConfigNamespace, + makeOutput func(outputs.Observer) (string, outputs.Group, error), ) (*Pipeline, error) { log := monitors.Logger if log == nil { @@ -95,7 +100,7 @@ func Load( return nil, err } - out, err := loadOutput(beatInfo, monitors, outcfg) + out, err := loadOutput(monitors, makeOutput) if err != nil { return nil, err } @@ -110,9 +115,8 @@ func Load( } func loadOutput( - beatInfo beat.Info, monitors Monitors, - outcfg common.ConfigNamespace, + makeOutput OutputFactory, ) (outputs.Group, error) { log := monitors.Logger if log == nil { @@ -123,7 +127,7 @@ func loadOutput( return outputs.Group{}, nil } - if !outcfg.IsSet() { + if makeOutput == nil { return outputs.Group{}, nil } @@ -141,13 +145,13 @@ func loadOutput( outStats = outputs.NewStats(metrics) } - out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config()) + outName, out, err := makeOutput(outStats) if err != nil { return outputs.Fail(err) } if metrics != nil { - monitoring.NewString(metrics, "type").Set(outcfg.Name()) + monitoring.NewString(metrics, "type").Set(outName) } if monitors.Telemetry != nil { telemetry := monitors.Telemetry.GetRegistry("output") @@ -156,7 +160,7 @@ func loadOutput( } else { telemetry = monitors.Telemetry.NewRegistry("output") } - monitoring.NewString(telemetry, "name").Set(outcfg.Name()) + monitoring.NewString(telemetry, "name").Set(outName) } return out, nil diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 00c203f3f22..fbe49510e0d 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -131,6 +131,15 @@ const ( WaitOnClientClose ) +// OutputReloader interface, that can be queried from an active publisher pipeline. +// The output reloader can be used to change the active output. +type OutputReloader interface { + Reload( + cfg *reload.ConfigWithMeta, + factory func(outputs.Observer, common.ConfigNamespace) (outputs.Group, error), + ) error +} + type pipelineEventer struct { mutex sync.Mutex modifyable bool @@ -442,6 +451,6 @@ func makePipelineProcessors( } // OutputReloader returns a reloadable object for the output section of this pipeline -func (p *Pipeline) OutputReloader() reload.Reloadable { +func (p *Pipeline) OutputReloader() OutputReloader { return p.output } diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 5bc72ed33cd..211e056ce60 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -53,7 +53,7 @@ func init() { outputs.RegisterType("test", makeTestOutput) } -func makeTestOutput(beat beat.Info, observer outputs.Observer, cfg *common.Config) (outputs.Group, error) { +func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config) (outputs.Group, error) { config := defaultTestOutputConfig if err := cfg.Unpack(&config); err != nil { return outputs.Fail(err) diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index e21d3f29d26..3ebce0351f1 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/pipeline" ) @@ -57,13 +58,19 @@ func RunTests( return fmt.Errorf("unpacking config failed: %v", err) } - pipeline, err := pipeline.Load(info, pipeline.Monitors{ - Metrics: nil, - Telemetry: nil, - Logger: logp.L(), - }, + pipeline, err := pipeline.Load(info, + pipeline.Monitors{ + Metrics: nil, + Telemetry: nil, + Logger: logp.L(), + }, config.Pipeline, - config.Output) + func(stat outputs.Observer) (string, outputs.Group, error) { + cfg := config.Output + out, err := outputs.Load(nil, info, stat, cfg.Name(), cfg.Config()) + return cfg.Name(), out, err + }, + ) if err != nil { return fmt.Errorf("loading pipeline failed: %+v", err) } diff --git a/libbeat/template/config.go b/libbeat/template/config.go index 77e93d72011..ed939fcc9a0 100644 --- a/libbeat/template/config.go +++ b/libbeat/template/config.go @@ -39,10 +39,10 @@ type TemplateSettings struct { Source map[string]interface{} `config:"_source"` } -var ( - // DefaultConfig for index template - DefaultConfig = TemplateConfig{ +// DefaultConfig for index template +func DefaultConfig() TemplateConfig { + return TemplateConfig{ Enabled: true, Fields: "", } -) +} diff --git a/libbeat/template/load.go b/libbeat/template/load.go index cf357f952ea..99978796f78 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -45,14 +45,13 @@ type Loader struct { } // NewLoader creates a new template loader -func NewLoader(cfg *common.Config, client ESClient, beatInfo beat.Info, fields []byte, migration bool) (*Loader, error) { - config := DefaultConfig - - err := cfg.Unpack(&config) - if err != nil { - return nil, err - } - +func NewLoader( + config TemplateConfig, + client ESClient, + beatInfo beat.Info, + fields []byte, + migration bool, +) (*Loader, error) { return &Loader{ config: config, client: client, diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 741fc684441..b1f4e01c4a5 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -236,17 +236,17 @@ func TestOverwrite(t *testing.T) { client.Request("DELETE", "/_template/"+templateName, "", nil, nil) // Load template - config := newConfigFrom(t, TemplateConfig{ + config := TemplateConfig{ Enabled: true, Fields: absPath + "/fields.yml", - }) + } loader, err := NewLoader(config, client, beatInfo, nil, false) assert.NoError(t, err) err = loader.Load() assert.NoError(t, err) // Load template again, this time with custom settings - config = newConfigFrom(t, TemplateConfig{ + config = TemplateConfig{ Enabled: true, Fields: absPath + "/fields.yml", Settings: TemplateSettings{ @@ -254,7 +254,7 @@ func TestOverwrite(t *testing.T) { "enabled": false, }, }, - }) + } loader, err = NewLoader(config, client, beatInfo, nil, false) assert.NoError(t, err) err = loader.Load() @@ -265,7 +265,7 @@ func TestOverwrite(t *testing.T) { assert.Equal(t, true, templateJSON.SourceEnabled()) // Load template again, this time with custom settings AND overwrite: true - config = newConfigFrom(t, TemplateConfig{ + config = TemplateConfig{ Enabled: true, Overwrite: true, Fields: absPath + "/fields.yml", @@ -274,7 +274,7 @@ func TestOverwrite(t *testing.T) { "enabled": false, }, }, - }) + } loader, err = NewLoader(config, client, beatInfo, nil, false) assert.NoError(t, err) err = loader.Load() @@ -372,12 +372,6 @@ func TestTemplateWithData(t *testing.T) { assert.False(t, loader.CheckTemplate(tmpl.GetName())) } -func newConfigFrom(t *testing.T, from interface{}) *common.Config { - cfg, err := common.NewConfigFrom(from) - assert.NoError(t, err) - return cfg -} - func getTemplate(t *testing.T, client ESClient, templateName string) testTemplate { status, body, err := client.Request("GET", "/_template/"+templateName, "", nil, nil) assert.NoError(t, err) diff --git a/libbeat/template/template.go b/libbeat/template/template.go index 5a03ca770dd..1e829d196b1 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -53,7 +53,13 @@ type Template struct { } // New creates a new template instance -func New(beatVersion string, beatName string, esVersion common.Version, config TemplateConfig, migration bool) (*Template, error) { +func New( + beatVersion string, + beatName string, + esVersion common.Version, + config TemplateConfig, + migration bool, +) (*Template, error) { bV, err := common.NewVersion(beatVersion) if err != nil { return nil, err diff --git a/libbeat/tests/system/config/libbeat.yml.j2 b/libbeat/tests/system/config/libbeat.yml.j2 index 839836be287..5ac42df8446 100644 --- a/libbeat/tests/system/config/libbeat.yml.j2 +++ b/libbeat/tests/system/config/libbeat.yml.j2 @@ -22,6 +22,18 @@ setup.template.name: "{{setup_template_name}}" setup.template.pattern: "{{setup_template_pattern}}" {%- endif %} +{% if ilm %} +setup.ilm: + enabled: {{ ilm.enabled | default("auto") }} + policy_name: libbeat-test-default-policy + {% if ilm.pattern %} + pattern: {{ ilm.pattern }} + {% endif %} + {% if ilm.rollover_alias %} + rollover_alias: {{ ilm.rollover_alias }} + {% endif %} +{% endif %} + #================================ Processors ===================================== {%- if processors %} diff --git a/libbeat/tests/system/config/mockbeat.yml.j2 b/libbeat/tests/system/config/mockbeat.yml.j2 index 134545b9970..45893b1bddb 100644 --- a/libbeat/tests/system/config/mockbeat.yml.j2 +++ b/libbeat/tests/system/config/mockbeat.yml.j2 @@ -79,6 +79,18 @@ setup.template: path: {{ template_json_path }} name: {{ template_json_name }} +{% if ilm %} +setup.ilm: + enabled: {{ ilm.enabled | default("auto") }} + policy_name: libbeat-test-default-policy + {% if ilm.pattern %} + pattern: {{ ilm.pattern }} + {% endif %} + {% if ilm.rollover_alias %} + rollover_alias: {{ ilm.rollover_alias }} + {% endif %} +{% endif %} + #================================ Logging ===================================== {% if metrics_period -%} diff --git a/libbeat/tests/system/test_ilm.py b/libbeat/tests/system/test_ilm.py index 04d1fb1da55..069093253a2 100644 --- a/libbeat/tests/system/test_ilm.py +++ b/libbeat/tests/system/test_ilm.py @@ -10,6 +10,9 @@ INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) +testPolicyName = "libbeat-test-default-policy" + + class Test(BaseTest): def setUp(self): @@ -19,7 +22,7 @@ def setUp(self): print("Using elasticsearch: {}".format(self.elasticsearch_url)) self.es = Elasticsearch([self.elasticsearch_url]) self.alias_name = "mockbeat-9.9.9" - self.policy_name = "beats-default-policy" + self.policy_name = testPolicyName logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("elasticsearch").setLevel(logging.ERROR) @@ -31,9 +34,11 @@ def test_enabled(self): """ self.render_config_template( + ilm={ + "enabled": True, + }, elasticsearch={ "hosts": self.get_elasticsearch_url(), - "ilm.enabled": True, }, ) @@ -48,7 +53,8 @@ def test_enabled(self): # Check if template is loaded with settings template = self.es.transport.perform_request('GET', '/_template/' + self.alias_name) - assert template[self.alias_name]["settings"]["index"]["lifecycle"]["name"] == "beats-default-policy" + print(self.alias_name) + assert template[self.alias_name]["settings"]["index"]["lifecycle"]["name"] == testPolicyName assert template[self.alias_name]["settings"]["index"]["lifecycle"]["rollover_alias"] == self.alias_name # Make sure the correct index + alias was created @@ -75,11 +81,13 @@ def test_rollover_alias(self): alias_name = "foo" self.render_config_template( + ilm={ + "enabled": True, + "pattern": "1", + "rollover_alias": alias_name + }, elasticsearch={ "hosts": self.get_elasticsearch_url(), - "ilm.enabled": True, - "ilm.pattern": "1", - "ilm.rollover_alias": alias_name }, ) @@ -109,10 +117,12 @@ def test_pattern(self): """ self.render_config_template( + ilm={ + "enabled": True, + "pattern": "1" + }, elasticsearch={ "hosts": self.get_elasticsearch_url(), - "ilm.enabled": True, - "ilm.pattern": "1" }, ) @@ -142,10 +152,12 @@ def test_pattern_date(self): """ self.render_config_template( + ilm={ + "enabled": True, + "pattern": "'{now/d}'" + }, elasticsearch={ "hosts": self.get_elasticsearch_url(), - "ilm.enabled": True, - "ilm.pattern": "'{now/d}'" }, ) diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 94314930f7f..23d6c0172f0 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -948,11 +948,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "metricbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1599,6 +1594,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "metricbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/metricbeat/metricbeat.yml b/metricbeat/metricbeat.yml index b669f2f7fba..52916c295de 100644 --- a/metricbeat/metricbeat.yml +++ b/metricbeat/metricbeat.yml @@ -93,9 +93,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 6474a5e37b8..09e21d901db 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -726,11 +726,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "packetbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1377,6 +1372,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "packetbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/packetbeat/packetbeat.yml b/packetbeat/packetbeat.yml index ffc36d4beab..4f65cc09310 100644 --- a/packetbeat/packetbeat.yml +++ b/packetbeat/packetbeat.yml @@ -175,9 +175,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 1e797e44582..ec534c84fcf 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -275,11 +275,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "winlogbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -926,6 +921,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "winlogbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/winlogbeat/winlogbeat.yml b/winlogbeat/winlogbeat.yml index d8fc5fd1b17..7112c197439 100644 --- a/winlogbeat/winlogbeat.yml +++ b/winlogbeat/winlogbeat.yml @@ -97,9 +97,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index c359e2ad942..239da5691a1 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -393,11 +393,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "auditbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1044,6 +1039,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "auditbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/auditbeat/auditbeat.yml b/x-pack/auditbeat/auditbeat.yml index b52dc649e7e..ca80223cfa4 100644 --- a/x-pack/auditbeat/auditbeat.yml +++ b/x-pack/auditbeat/auditbeat.yml @@ -143,9 +143,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 7400a56d3ef..56279565fb0 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1123,11 +1123,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "filebeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1774,6 +1769,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "filebeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/filebeat/filebeat.yml b/x-pack/filebeat/filebeat.yml index 6585d1e49a8..29be4bd93c2 100644 --- a/x-pack/filebeat/filebeat.yml +++ b/x-pack/filebeat/filebeat.yml @@ -149,9 +149,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 832009f360a..05feb66beec 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -391,11 +391,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "functionbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1042,6 +1037,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "functionbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/functionbeat/functionbeat.yml b/x-pack/functionbeat/functionbeat.yml index 31c90a1d4af..07d3a2ed5d2 100644 --- a/x-pack/functionbeat/functionbeat.yml +++ b/x-pack/functionbeat/functionbeat.yml @@ -212,9 +212,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 9b9675b01fc..810adc05cde 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -966,11 +966,6 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - #ilm.rollover_alias: "metricbeat" - #ilm.pattern: "{now/d}-000001" - # Set gzip compression level. #compression_level: 0 @@ -1617,6 +1612,25 @@ setup.template.settings: #_source: #enabled: false +#============================== Setup ILM ===================================== + +# Configure Index Lifecycle Management Index Lifecycle Management creates a +# write alias and adds additional settings to the template. +# The elasticsearch.output.index setting will be replaced with the write alias +# if ILM is enabled. + +# Enabled ILM support. Valid values are true, false, and auto. The beat will +# detect availabilty of Index Lifecycle Management in Elasticsearch and enable +# or disable ILM support. +#setup.ilm.enabled: auto + +# Configure the ILM write alias name. +#setup.ilm.rollover_alias: "metricbeat" + +# Configure rollover index pattern. +#setup.ilm.pattern: "{now/d}-000001" + + #============================== Kibana ===================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/metricbeat/metricbeat.yml b/x-pack/metricbeat/metricbeat.yml index b669f2f7fba..52916c295de 100644 --- a/x-pack/metricbeat/metricbeat.yml +++ b/x-pack/metricbeat/metricbeat.yml @@ -93,9 +93,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Enabled ilm (beta) to use index lifecycle management instead daily indices. - #ilm.enabled: false - # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic"