diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 73150f5b584..822a62decde 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -85,6 +85,10 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d still used for matcher. {issue}10505[10505] {pull}10506[10506] - Change type of haproxy.source from text to keyword. {pull}10506[10506] - Rename `event.type` to `suricata.eve.event_type` in Suricata module because event.type is reserved for future use by ECS. {pull}10575[10575] +- Populate more ECS fields in the Suricata module. {pull}10006[10006] +- Rename setting `filebeat.registry_flush` to `filebeat.registry.flush`. {pull}10504[10504] +- Rename setting `filebeat.registry_file_permission` to `filebeat.registry.file_permission`. {pull}10504[10504] +- Remove setting `filebeat.registry_file` in favor of `filebeat.registry.path`. The registry file will be stored in a sub-directory by now. {pull}10504[10504] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 62f23a3a0da..2296ea1235a 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -17,20 +17,20 @@ #========================= Filebeat global options ============================ -# Name of the registry file. If a relative path is used, it is considered relative to the +# Registry data path. If a relative path is used, it is considered relative to the # data path. -#filebeat.registry_file: ${path.data}/registry +#filebeat.registry.path: ${path.data}/registry -# The permissions mask to apply on registry file. The default value is 0600. -# Must be a valid Unix-style file permissions mask expressed in octal notation. -# This option is not supported on Windows. -#filebeat.registry_file_permissions: 0600 +# The permissions mask to apply on registry data, and meta files. The default +# value is 0600. Must be a valid Unix-style file permissions mask expressed in +# octal notation. This option is not supported on Windows. +#filebeat.registry.file_permissions: 0600 # The timeout value that controls when registry entries are written to disk -# (flushed). When an unwritten update exceeds this value, it triggers a write to -# disk. When registry_flush is set to 0s, the registry is written to disk after -# each batch of events has been published successfully. The default value is 0s. -#filebeat.registry_flush: 0s +# (flushed). When an unwritten update exceeds this value, it triggers a write +# to disk. When flush is set to 0s, the registry is written to disk after each +# batch of events has been published successfully. The default value is 0s. +#filebeat.registry.flush: 0s # By default Ingest pipelines are not updated if a pipeline with the same ID # already exists. If this option is enabled Filebeat overwrites pipelines diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 9cb36dbddfd..6c36061ad52 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -76,6 +76,9 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { rawConfig, "prospectors", "config.prospectors", + "registry_file", + "registry_file_permissions", + "registry_flush", ); err != nil { return nil, err } @@ -293,7 +296,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { finishedLogger := newFinishedLogger(wgEvents) // Setup registrar to persist state - registrar, err := registrar.New(config.RegistryFile, config.RegistryFilePermissions, config.RegistryFlush, finishedLogger) + registrar, err := registrar.New(config.Registry, finishedLogger) if err != nil { logp.Err("Could not init registrar: %v", err) return err diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 748208f864a..47faaf44294 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -39,25 +39,33 @@ const ( ) type Config struct { - Inputs []*common.Config `config:"inputs"` - RegistryFile string `config:"registry_file"` - RegistryFilePermissions os.FileMode `config:"registry_file_permissions"` - RegistryFlush time.Duration `config:"registry_flush"` - ConfigDir string `config:"config_dir"` - ShutdownTimeout time.Duration `config:"shutdown_timeout"` - Modules []*common.Config `config:"modules"` - ConfigInput *common.Config `config:"config.inputs"` - ConfigModules *common.Config `config:"config.modules"` - Autodiscover *autodiscover.Config `config:"autodiscover"` - OverwritePipelines bool `config:"overwrite_pipelines"` + Inputs []*common.Config `config:"inputs"` + Registry Registry `config:"registry"` + ConfigDir string `config:"config_dir"` + ShutdownTimeout time.Duration `config:"shutdown_timeout"` + Modules []*common.Config `config:"modules"` + ConfigInput *common.Config `config:"config.inputs"` + ConfigModules *common.Config `config:"config.modules"` + Autodiscover *autodiscover.Config `config:"autodiscover"` + OverwritePipelines bool `config:"overwrite_pipelines"` +} + +type Registry struct { + Path string `config:"path"` + Permissions os.FileMode `config:"file_permissions"` + FlushTimeout time.Duration `config:"flush"` + MigrateFile string `config:"migrate_file"` } var ( DefaultConfig = Config{ - RegistryFile: "registry", - RegistryFilePermissions: 0600, - ShutdownTimeout: 0, - OverwritePipelines: false, + Registry: Registry{ + Path: "registry", + Permissions: 0600, + MigrateFile: "", + }, + ShutdownTimeout: 0, + OverwritePipelines: false, } ) diff --git a/filebeat/docs/faq.asciidoc b/filebeat/docs/faq.asciidoc index 327339ea848..bf4867d075b 100644 --- a/filebeat/docs/faq.asciidoc +++ b/filebeat/docs/faq.asciidoc @@ -53,7 +53,7 @@ Make sure that you read the documentation for these configuration options before [[reduce-registry-size]] === Registry file is too large? -{beatname_uc} keeps the state of each file and persists the state to disk in the `registry_file`. The file state is used to continue file reading at a previous position when {beatname_uc} is restarted. If a large number of new files are produced every day, the registry file might grow to be too large. To reduce the size of the registry file, there are two configuration options available: <<{beatname_lc}-input-log-clean-removed,`clean_removed`>> and <<{beatname_lc}-input-log-clean-inactive,`clean_inactive`>>. +{beatname_uc} keeps the state of each file and persists the state to disk in the registry file. The file state is used to continue file reading at a previous position when {beatname_uc} is restarted. If a large number of new files are produced every day, the registry file might grow to be too large. To reduce the size of the registry file, there are two configuration options available: <<{beatname_lc}-input-log-clean-removed,`clean_removed`>> and <<{beatname_lc}-input-log-clean-inactive,`clean_inactive`>>. For old files that you no longer touch and are ignored (see <<{beatname_lc}-input-log-ignore-older,`ignore_older`>>), we recommended that you use `clean_inactive`. If old files get removed from disk, then use the `clean_removed` option. diff --git a/filebeat/docs/filebeat-general-options.asciidoc b/filebeat/docs/filebeat-general-options.asciidoc index 5e68a8a165e..bd846145ebe 100644 --- a/filebeat/docs/filebeat-general-options.asciidoc +++ b/filebeat/docs/filebeat-general-options.asciidoc @@ -17,25 +17,30 @@ Beats. These options are in the `filebeat` namespace. [float] -==== `registry_file` +==== `registry.path` -The name of the registry file. If a relative path is used, it is considered relative to the -data path. See the <> section for details. The default is `${path.data}/registry`. +The root path of the registry. If a relative path is used, it is considered +relative to the data path. See the <> section for details. +The default is `${path.data}/registry`. [source,yaml] ------------------------------------------------------------------------------------- -filebeat.registry_file: registry +filebeat.registry.path: registry ------------------------------------------------------------------------------------- -It is not possible to use a symlink as registry file. - -NOTE: The registry file is only updated when new events are flushed and not on a predefined period. +NOTE: The registry is only updated when new events are flushed and not on a predefined period. That means in case there are some states where the TTL expired, these are only removed when new event are processed. +NOTE: The registry stores it's data in the subdirectory filebeat/data.json. It +also contains a meta data file named filebeat/meta.json. The meta file contains +the file format version number. + +NOTE: The content stored in filebeat/data.json is compatible to the old registry file data format. + [float] -==== `registry_file_permissions` +==== `registry.file_permissions` -The permissions mask to apply on registry file. The default value is 0600. The permissions option must be a valid Unix-style file permissions mask expressed in octal notation. In Go, numbers in octal notation must start with 0. +The permissions mask to apply on registry data file. The default value is 0600. The permissions option must be a valid Unix-style file permissions mask expressed in octal notation. In Go, numbers in octal notation must start with 0. This option is not supported on Windows. @@ -47,25 +52,25 @@ Examples: [source,yaml] ------------------------------------------------------------------------------------- -filebeat.registry_file_permissions: 0600 +filebeat.registry.file_permissions: 0600 ------------------------------------------------------------------------------------- [float] -==== `registry_flush` +==== `registry.flush` The timeout value that controls when registry entries are written to disk (flushed). When an unwritten update exceeds this value, it triggers a write to -disk. When registry_flush is set to 0s, the registry is written to disk after +disk. When `registry.flush` is set to 0s, the registry is written to disk after each batch of events has been published successfully. The default value is 0s. NOTE: The registry is always updated when Filebeat shuts down normally. After an -abnormal shutdown, the registry will not be up-to-date if the registry_flush +abnormal shutdown, the registry will not be up-to-date if the `registry.flush` value is >0s. Filebeat will send published events again (depending on values in the last updated registry file). NOTE: Filtering out a huge number of logs can cause many registry updates, slowing -down processing. Setting registry_flush to a value >0s reduces write operations, +down processing. Setting `registry.flush` to a value >0s reduces write operations, helping Filebeat process more events. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index cdfa2b0ca11..f854e0b87fb 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -775,20 +775,20 @@ filebeat.inputs: #========================= Filebeat global options ============================ -# Name of the registry file. If a relative path is used, it is considered relative to the +# Registry data path. If a relative path is used, it is considered relative to the # data path. -#filebeat.registry_file: ${path.data}/registry +#filebeat.registry.path: ${path.data}/registry -# The permissions mask to apply on registry file. The default value is 0600. -# Must be a valid Unix-style file permissions mask expressed in octal notation. -# This option is not supported on Windows. -#filebeat.registry_file_permissions: 0600 +# The permissions mask to apply on registry data, and meta files. The default +# value is 0600. Must be a valid Unix-style file permissions mask expressed in +# octal notation. This option is not supported on Windows. +#filebeat.registry.file_permissions: 0600 # The timeout value that controls when registry entries are written to disk -# (flushed). When an unwritten update exceeds this value, it triggers a write to -# disk. When registry_flush is set to 0s, the registry is written to disk after -# each batch of events has been published successfully. The default value is 0s. -#filebeat.registry_flush: 0s +# (flushed). When an unwritten update exceeds this value, it triggers a write +# to disk. When flush is set to 0s, the registry is written to disk after each +# batch of events has been published successfully. The default value is 0s. +#filebeat.registry.flush: 0s # By default Ingest pipelines are not updated if a pipeline with the same ID # already exists. If this option is enabled Filebeat overwrites pipelines diff --git a/filebeat/registrar/migrate.go b/filebeat/registrar/migrate.go new file mode 100644 index 00000000000..a48df0477c4 --- /dev/null +++ b/filebeat/registrar/migrate.go @@ -0,0 +1,187 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package registrar + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + + "github.com/pkg/errors" + + helper "github.com/elastic/beats/libbeat/common/file" + "github.com/elastic/beats/libbeat/logp" +) + +const ( + legacyVersion = "" + currentVersion = "0" +) + +func ensureCurrent(home, migrateFile string, perm os.FileMode) error { + if migrateFile == "" { + if isFile(home) { + migrateFile = home + } + } + + fbRegHome := filepath.Join(home, "filebeat") + version, err := readVersion(fbRegHome, migrateFile) + if err != nil { + return err + } + + logp.Debug("registrar", "Registry type '%v' found", version) + + switch version { + case legacyVersion: + return migrateLegacy(home, fbRegHome, migrateFile, perm) + case currentVersion: + return nil + case "": + backupFile := migrateFile + ".bak" + if isFile(backupFile) { + return migrateLegacy(home, fbRegHome, backupFile, perm) + } + return initRegistry(fbRegHome, perm) + default: + return fmt.Errorf("registry file version %v not supported", version) + } +} + +func migrateLegacy(home, regHome, migrateFile string, perm os.FileMode) error { + logp.Info("Migrate registry file to registry directory") + + if home == migrateFile { + backupFile := migrateFile + ".bak" + if isFile(migrateFile) { + logp.Info("Move registry file to backup file: %v", backupFile) + if err := helper.SafeFileRotate(backupFile, migrateFile); err != nil { + return err + } + migrateFile = backupFile + } else if isFile(backupFile) { + logp.Info("Old registry backup file found, continue migration") + migrateFile = backupFile + } + } + + if err := initRegistry(regHome, perm); err != nil { + return err + } + + dataFile := filepath.Join(regHome, "data.json") + if !isFile(dataFile) && isFile(migrateFile) { + logp.Info("Migrate old registry file to new data file") + err := helper.SafeFileRotate(dataFile, migrateFile) + if err != nil { + return err + } + } + + return nil +} + +func initRegistry(regHome string, perm os.FileMode) error { + if !isDir(regHome) { + logp.Info("No registry home found. Create: %v", regHome) + if err := os.MkdirAll(regHome, 0750); err != nil { + return errors.Wrapf(err, "failed to create registry dir '%v'", regHome) + } + } + + metaFile := filepath.Join(regHome, "meta.json") + if !isFile(metaFile) { + logp.Info("Initialize registry meta file") + err := safeWriteFile(metaFile, []byte(`{"version": "0"}`), perm) + if err != nil { + return errors.Wrap(err, "failed writing registry meta.json") + } + } + + return nil +} + +func readVersion(regHome, migrateFile string) (string, error) { + if isFile(migrateFile) { + return legacyVersion, nil + } + + if !isDir(regHome) { + return "", nil + } + + metaFile := filepath.Join(regHome, "meta.json") + if !isFile(metaFile) { + return "", nil + } + + tmp, err := ioutil.ReadFile(metaFile) + if err != nil { + return "", errors.Wrap(err, "failed to open meta file") + } + + meta := struct{ Version string }{} + if err := json.Unmarshal(tmp, &meta); err != nil { + return "", errors.Wrap(err, "failed reading meta file") + } + + return meta.Version, nil +} + +func isDir(path string) bool { + fi, err := os.Stat(path) + exists := err == nil && fi.IsDir() + logp.Debug("test", "isDir(%v) -> %v", path, exists) + return exists +} + +func isFile(path string) bool { + fi, err := os.Stat(path) + exists := err == nil && fi.Mode().IsRegular() + logp.Debug("test", "isFile(%v) -> %v", path, exists) + return exists +} + +func safeWriteFile(path string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + + for len(data) > 0 { + var n int + n, err = f.Write(data) + if err != nil { + break + } + + data = data[n:] + } + + if err == nil { + err = f.Sync() + } + + if err1 := f.Close(); err == nil { + err = err1 + } + return err +} diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index c5d78b49079..f128107b5fc 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input/file" helper "github.com/elastic/beats/libbeat/common/file" "github.com/elastic/beats/libbeat/logp" @@ -63,20 +64,30 @@ var ( // New creates a new Registrar instance, updating the registry file on // `file.State` updates. New fails if the file can not be opened or created. -func New(registryFile string, fileMode os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) { +func New(cfg config.Registry, out successLogger) (*Registrar, error) { + home := paths.Resolve(paths.Data, cfg.Path) + migrateFile := cfg.MigrateFile + if migrateFile != "" { + migrateFile = paths.Resolve(paths.Data, migrateFile) + } + + err := ensureCurrent(home, migrateFile, cfg.Permissions) + if err != nil { + return nil, err + } + + dataFile := filepath.Join(home, "filebeat", "data.json") r := &Registrar{ - registryFile: registryFile, - fileMode: fileMode, + registryFile: dataFile, + fileMode: cfg.Permissions, done: make(chan struct{}), states: file.NewStates(), Channel: make(chan []file.State, 1), - flushTimeout: flushTimeout, + flushTimeout: cfg.FlushTimeout, out: out, wg: sync.WaitGroup{}, } - err := r.Init() - - return r, err + return r, r.Init() } // Init sets up the Registrar and make sure the registry file is setup correctly diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 22c95ef4b4c..91042de561c 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -91,9 +91,14 @@ filebeat.{{input_config | default("inputs")}}: {% endif %} filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }} + {% if not skip_registry_config %} -filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} -filebeat.registry_file_permissions: {{ registryFilePermissions|default("0600") }} +filebeat.registry: + path: {{ beat.working_dir + '/' }}{{ registry_home|default("registry")}} + file_permissions: {{ registry_file_permissions|default("0600") }} + {% if registry_migrate_file %} + migrate_file: {{ beat.working_dir + '/' + registry_migrate_file }} + {% endif %} {%endif%} {% if reload or reload_path -%} diff --git a/filebeat/tests/system/config/filebeat_inputs.yml.j2 b/filebeat/tests/system/config/filebeat_inputs.yml.j2 index db5628fb603..f9a6d725a92 100644 --- a/filebeat/tests/system/config/filebeat_inputs.yml.j2 +++ b/filebeat/tests/system/config/filebeat_inputs.yml.j2 @@ -5,7 +5,15 @@ filebeat.inputs: scan_frequency: 0.5s encoding: {{input.encoding | default("plain") }} {% endfor %} -filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} + +{% if not skip_registry_config %} +filebeat.registry: + path: {{ beat.working_dir + '/' }}{{ registry_home|default("registry")}} + file_permissions: {{ registry_file_permissions|default("0600") }} + {% if registry_migrate_file %} + migrate_file: {{ beat.working_dir + '/' + registry_migrate_file }} + {% endif %} +{%endif%} {% if ilm %} setup.ilm: diff --git a/filebeat/tests/system/config/filebeat_modules.yml.j2 b/filebeat/tests/system/config/filebeat_modules.yml.j2 index df3a480277c..710a3609ea4 100644 --- a/filebeat/tests/system/config/filebeat_modules.yml.j2 +++ b/filebeat/tests/system/config/filebeat_modules.yml.j2 @@ -1,4 +1,12 @@ -filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} +{% if not skip_registry_config %} +filebeat.registry: + path: {{ beat.working_dir + '/' }}{{ registry_home|default("registry")}} + file_permissions: {{ registry_file_permissions|default("0600") }} + {% if registry_migrate_file %} + migrate_file: {{ beat.working_dir + '/' + registry_migrate_file }} + {% endif %} +{%endif%} + filebeat.overwrite_pipelines: true filebeat.config.modules: diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index b22183bc210..df2b8483855 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -1,11 +1,15 @@ import json import os +import stat import sys -sys.path.append(os.path.join(os.path.dirname(__file__), '../../../libbeat/tests/system')) +curdir = os.path.dirname(__file__) +sys.path.append(os.path.join(curdir, '../../../libbeat/tests/system')) from beat.beat import TestCase +default_registry_file = 'registry/filebeat/data.json' + class BaseTest(TestCase): @@ -14,13 +18,27 @@ def setUpClass(self): if not hasattr(self, "beat_name"): self.beat_name = "filebeat" if not hasattr(self, "beat_path"): - self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + self.beat_path = os.path.abspath(os.path.join(curdir, "../../")) super(BaseTest, self).setUpClass() - def get_registry(self): + def has_registry(self, name=None, data_path=None): + if not name: + name = default_registry_file + if not data_path: + data_path = self.working_dir + + dotFilebeat = os.path.join(data_path, name) + return os.path.isfile(dotFilebeat) + + def get_registry(self, name=None, data_path=None): + if not name: + name = default_registry_file + if not data_path: + data_path = self.working_dir + # Returns content of the registry file - dotFilebeat = self.working_dir + '/registry' + dotFilebeat = os.path.join(data_path, name) self.wait_until(cond=lambda: os.path.isfile(dotFilebeat)) with open(dotFilebeat) as file: @@ -47,3 +65,7 @@ def get_registry_entry_by_path(self, path): tmp_entry = entry return tmp_entry + + def file_permissions(self, path): + full_path = os.path.join(self.working_dir, path) + return oct(stat.S_IMODE(os.lstat(full_path).st_mode)) diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 5927a586d91..e6aef2ff3a1 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -53,10 +53,7 @@ def test_registrar_file_content(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. - self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "registry")), - max_timeout=1) + self.wait_until(self.has_registry, max_timeout=1) filebeat.check_kill_and_wait() # Check that a single file exists in the registry. @@ -124,10 +121,7 @@ def test_registrar_files(self): max_timeout=15) # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. - self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "registry")), - max_timeout=1) + self.wait_until(self.has_registry, max_timeout=1) filebeat.check_kill_and_wait() # Check that file exist @@ -143,7 +137,7 @@ def test_custom_registry_file_location(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="a/b/c/registry", + registry_home="a/b/c/registry", ) os.mkdir(self.working_dir + "/log/") testfile_path = self.working_dir + "/log/test.log" @@ -156,13 +150,10 @@ def test_custom_registry_file_location(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "a/b/c/registry")), - + lambda: self.has_registry("a/b/c/registry/filebeat/data.json"), max_timeout=1) filebeat.check_kill_and_wait() - - assert os.path.isfile(os.path.join(self.working_dir, "a/b/c/registry")) + assert self.has_registry("a/b/c/registry/filebeat/data.json") def test_registry_file_default_permissions(self): """ @@ -174,9 +165,12 @@ def test_registry_file_default_permissions(self): # configuration isn't implemented on Windows yet raise SkipTest + registry_home = "a/b/c/registry" + registry_file = os.path.join(registry_home, "filebeat/data.json") + self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="a/b/c/registry", + registry_home=registry_home, ) os.mkdir(self.working_dir + "/log/") testfile_path = self.working_dir + "/log/test.log" @@ -189,14 +183,11 @@ def test_registry_file_default_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "a/b/c/registry")), + lambda: self.has_registry(registry_file), max_timeout=1) filebeat.check_kill_and_wait() - registry_file_perm_mask = oct(stat.S_IMODE(os.lstat(os.path.join(self.working_dir, - "a/b/c/registry")).st_mode)) - self.assertEqual(registry_file_perm_mask, "0600") + self.assertEqual(self.file_permissions(registry_file), "0600") def test_registry_file_custom_permissions(self): """ @@ -208,10 +199,13 @@ def test_registry_file_custom_permissions(self): # configuration isn't implemented on Windows yet raise SkipTest + registry_home = "a/b/c/registry" + registry_file = os.path.join(registry_home, "filebeat/data.json") + self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="a/b/c/registry", - registryFilePermissions=0644 + registry_home=registry_home, + registry_file_permissions=0644, ) os.mkdir(self.working_dir + "/log/") testfile_path = self.working_dir + "/log/test.log" @@ -224,14 +218,11 @@ def test_registry_file_custom_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "a/b/c/registry")), + lambda: self.has_registry(registry_file), max_timeout=1) filebeat.check_kill_and_wait() - registry_file_perm_mask = oct(stat.S_IMODE(os.lstat(os.path.join(self.working_dir, - "a/b/c/registry")).st_mode)) - self.assertEqual(registry_file_perm_mask, "0644") + self.assertEqual(self.file_permissions(registry_file), "0644") def test_registry_file_update_permissions(self): """ @@ -243,9 +234,12 @@ def test_registry_file_update_permissions(self): # configuration isn't implemented on Windows yet raise SkipTest + registry_home = "a/b/c/registry_x" + registry_file = os.path.join(registry_home, "filebeat/data.json") + self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="a/b/c/registry_x", + registry_home=registry_home, ) os.mkdir(self.working_dir + "/log/") testfile_path = self.working_dir + "/log/test.log" @@ -258,19 +252,16 @@ def test_registry_file_update_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "a/b/c/registry_x")), + lambda: self.has_registry(registry_file), max_timeout=1) filebeat.check_kill_and_wait() - registry_file_perm_mask = oct(stat.S_IMODE(os.lstat(os.path.join(self.working_dir, - "a/b/c/registry_x")).st_mode)) - self.assertEqual(registry_file_perm_mask, "0600") + self.assertEqual(self.file_permissions(registry_file), "0600") self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="a/b/c/registry_x", - registryFilePermissions=0644 + registry_home="a/b/c/registry_x", + registry_file_permissions=0644 ) filebeat = self.start_beat() @@ -280,8 +271,7 @@ def test_registry_file_update_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "a/b/c/registry_x")), + lambda: self.has_registry(registry_file), max_timeout=1) # Wait a moment to make sure registry is completely written @@ -289,9 +279,7 @@ def test_registry_file_update_permissions(self): filebeat.check_kill_and_wait() - registry_file_perm_mask = oct(stat.S_IMODE(os.lstat(os.path.join(self.working_dir, - "a/b/c/registry_x")).st_mode)) - self.assertEqual(registry_file_perm_mask, "0644") + self.assertEqual(self.file_permissions(registry_file), "0644") def test_rotating_file(self): """ @@ -361,7 +349,7 @@ def test_data_path(self): self.wait_until(lambda: self.output_has(lines=1)) filebeat.check_kill_and_wait() - assert os.path.isfile(self.working_dir + "/datapath/registry") + assert self.has_registry(data_path=self.working_dir+"/datapath") def test_rotating_file_inode(self): """ @@ -488,7 +476,11 @@ def test_restart_continue(self): filebeat.check_kill_and_wait() # Store first registry file - shutil.copyfile(self.working_dir + "/registry", self.working_dir + "/registry.first") + registry_file = "registry/filebeat/data.json" + shutil.copyfile( + self.working_dir + "/" + registry_file, + self.working_dir + "/registry.first", + ) # Append file with open(testfile_path, 'a') as testfile: @@ -585,7 +577,11 @@ def test_rotating_file_with_restart(self): filebeat.check_kill_and_wait() # Store first registry file - shutil.copyfile(self.working_dir + "/registry", self.working_dir + "/registry.first") + registry_file = "registry/filebeat/data.json" + shutil.copyfile( + self.working_dir + "/" + registry_file, + self.working_dir + "/registry.first", + ) # Rotate log file, create a new empty one and remove it afterwards testfilerenamed2 = self.working_dir + "/log/input.2" @@ -975,38 +971,12 @@ def test_clean_removed_with_clean_inactive(self): else: assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") - def test_directory_failure(self): - """ - Test that filebeat does not start if a directory is set as registry file - """ - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="registrar", - ) - os.mkdir(self.working_dir + "/log/") - os.mkdir(self.working_dir + "/registrar/") - - testfile_path = self.working_dir + "/log/test.log" - with open(testfile_path, 'w') as testfile: - testfile.write("Hello World\n") - - filebeat = self.start_beat() - - # Make sure states written appears one more time - self.wait_until( - lambda: self.log_contains("Exiting: Registry file path must be a file"), - max_timeout=10) - - filebeat.check_kill_and_wait(exit_code=1) - def test_symlink_failure(self): """ Test that filebeat does not start if a symlink is set as registry file """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - registryFile="registry_symlink", ) os.mkdir(self.working_dir + "/log/") @@ -1014,15 +984,18 @@ def test_symlink_failure(self): with open(testfile_path, 'w') as testfile: testfile.write("Hello World\n") - registryfile = self.working_dir + "/registry" - with open(registryfile, 'w') as testfile: - testfile.write("[]") + registry_file = self.working_dir + "/registry/filebeat/data.json" + link_to_file = self.working_dir + "registry.data" + os.makedirs(self.working_dir + "/registry/filebeat") + + with open(link_to_file, 'w') as f: + f.write("[]") if os.name == "nt": import win32file # pylint: disable=import-error - win32file.CreateSymbolicLink(self.working_dir + "/registry_symlink", registryfile, 0) + win32file.CreateSymbolicLink(registry_file, link_to_file, 0) else: - os.symlink(registryfile, self.working_dir + "/registry_symlink") + os.symlink(link_to_file, registry_file) filebeat = self.start_beat() @@ -1466,8 +1439,7 @@ def test_registrar_files_with_input_level_processors(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "registry")), + self.has_registry, max_timeout=10) # Wait a moment to make sure registry is completely written @@ -1549,10 +1521,7 @@ def test_registrar_meta(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. - self.wait_until( - lambda: os.path.isfile(os.path.join(self.working_dir, - "registry")), - max_timeout=1) + self.wait_until(self.has_registry, max_timeout=1) filebeat.check_kill_and_wait() diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 56279565fb0..b9b7b0d4588 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -839,20 +839,20 @@ filebeat.inputs: #========================= Filebeat global options ============================ -# Name of the registry file. If a relative path is used, it is considered relative to the +# Registry data path. If a relative path is used, it is considered relative to the # data path. -#filebeat.registry_file: ${path.data}/registry +#filebeat.registry.path: ${path.data}/registry -# The permissions mask to apply on registry file. The default value is 0600. -# Must be a valid Unix-style file permissions mask expressed in octal notation. -# This option is not supported on Windows. -#filebeat.registry_file_permissions: 0600 +# The permissions mask to apply on registry data, and meta files. The default +# value is 0600. Must be a valid Unix-style file permissions mask expressed in +# octal notation. This option is not supported on Windows. +#filebeat.registry.file_permissions: 0600 # The timeout value that controls when registry entries are written to disk -# (flushed). When an unwritten update exceeds this value, it triggers a write to -# disk. When registry_flush is set to 0s, the registry is written to disk after -# each batch of events has been published successfully. The default value is 0s. -#filebeat.registry_flush: 0s +# (flushed). When an unwritten update exceeds this value, it triggers a write +# to disk. When flush is set to 0s, the registry is written to disk after each +# batch of events has been published successfully. The default value is 0s. +#filebeat.registry.flush: 0s # By default Ingest pipelines are not updated if a pipeline with the same ID # already exists. If this option is enabled Filebeat overwrites pipelines diff --git a/x-pack/filebeat/tests/system/config/filebeat_modules.yml.j2 b/x-pack/filebeat/tests/system/config/filebeat_modules.yml.j2 index 62b66e8a5ff..6df0f3ba0d9 100644 --- a/x-pack/filebeat/tests/system/config/filebeat_modules.yml.j2 +++ b/x-pack/filebeat/tests/system/config/filebeat_modules.yml.j2 @@ -1,9 +1,17 @@ -filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} -filebeat.overwrite_pipelines: true - filebeat.config.modules: path: {{ beat.working_dir + '/modules.d/*.yml' }} +filebeat.overwrite_pipelines: true + +{% if not skip_registry_config %} +filebeat.registry: + path: {{ beat.working_dir + '/' }}{{ registry_home|default("registry")}} + file_permissions: {{ registry_file_permissions|default("0600") }} + {% if registry_migrate_file %} + migrate_file: {{ beat.working_dir + '/' + registry_migrate_file }} + {% endif %} +{%endif%} + output.elasticsearch.hosts: ["{{ elasticsearch_url }}"] output.elasticsearch.index: {{ index_name }}