diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d27a203b4aa..1e3ae568e30 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -49,7 +49,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - - Add kafka compression support for ZSTD. *Heartbeat* @@ -357,6 +356,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934] - AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] +- The Filestream input can automatically migrate state from files when changing the `file_identity` if the previous file identity was `native` (the default) or `path`. {issue}40197[40197] {pull}41762[41762] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] diff --git a/filebeat/_meta/config/filebeat.global.reference.yml.tmpl b/filebeat/_meta/config/filebeat.global.reference.yml.tmpl index 0287fb3f9f5..9d0a3c23974 100644 --- a/filebeat/_meta/config/filebeat.global.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.global.reference.yml.tmpl @@ -15,6 +15,8 @@ # batch of events has been published successfully. The default value is 1s. #filebeat.registry.flush: 1s +# The interval which to run the registry clean up +#filebeat.registry.cleanup_interval: 5m # Starting with Filebeat 7.0, the registry uses a new directory format to store # Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x diff --git a/filebeat/docs/faq.asciidoc b/filebeat/docs/faq.asciidoc index ddcdb6a8898..ee7ceeabad8 100644 --- a/filebeat/docs/faq.asciidoc +++ b/filebeat/docs/faq.asciidoc @@ -19,6 +19,10 @@ We do not recommend reading log files from network volumes. Whenever possible, i send the log files directly from there. Reading files from network volumes (especially on Windows) can have unexpected side effects. For example, changed file identifiers may result in {beatname_uc} reading a log file from scratch again. +If it is not possible to read from the host, then using the +<> +file identity is the next best option. + [[filebeat-not-collecting-lines]] === {beatname_uc} isn't collecting lines from a file @@ -71,6 +75,13 @@ By default states are never removed from the registry file. To resolve the inode You can use <<{beatname_lc}-input-log-clean-removed,`clean_removed`>> for files that are removed from disk. Be aware that `clean_removed` cleans the file state from the registry whenever a file cannot be found during a scan. If the file shows up again later, it will be sent again from scratch. +Aside from that you should also change the +<> to +<>. If you were using `native` (the default) or `path`, +the state of the files will be automatically migrated to +`fingerprint`. + include::filebeat-log-rotation.asciidoc[] [[windows-file-rotation]] diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index 5436d3863dc..5278e013334 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -542,19 +542,33 @@ indirectly set higher priorities on certain inputs by assigning a higher limit of harvesters. [float] +[id="{beatname_lc}-input-{type}-file-identity"] ===== `file_identity` Different `file_identity` methods can be configured to suit the environment where you are collecting log messages. -WARNING: Changing `file_identity` methods between runs may result in +IMPORTANT: Changing `file_identity` is only supported from `native` or +`path` to `fingerprint`. On those cases {beatname_uc} will +automatically migrate the state of the file when {type} starts. + +WARNING: Any unsupported change in `file_identity` methods between duplicated events in the output. *`native`*:: The default behaviour of {beatname_uc} is to differentiate between files using their inodes and device ids. + In some cases these values can change during the lifetime of a file. -For example, when using the Linux link:https://en.wikipedia.org/wiki/Logical_Volume_Manager_%28Linux%29[LVM] (Logical Volume Manager), device numbers are allocated dynamically at module load (refer to link:https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/7/html/logical_volume_manager_administration/lv#persistent_numbers[Persistent Device Numbers] in the Red Hat Enterprise Linux documentation). To avoid the possibility of data duplication in this case, you can set `file_identity` to `path` rather than `native`. +For example, when using the Linux +link:https://en.wikipedia.org/wiki/Logical_Volume_Manager_%28Linux%29[LVM] +(Logical Volume Manager), device numbers are allocated dynamically at +module load (refer to +link:https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/7/html/logical_volume_manager_administration/lv#persistent_numbers[Persistent +Device Numbers] in the Red Hat Enterprise Linux documentation). To +avoid the possibility of data duplication in this case, you can set +`file_identity` to `fingerprint` rather than the default `native`. ++ +The states of files generated by `native` file identity can be migrated to `fingerprint`. [source,yaml] ---- @@ -571,6 +585,8 @@ WARNING: This strategy does not support renaming files. If an input file is renamed, {beatname_uc} will read it again if the new path matches the settings of the input. +The states of files generated by `path` file identity can be migrated to `fingerprint`. + [source,yaml] ---- file_identity.path: ~ diff --git a/filebeat/docs/inputs/input-filestream.asciidoc b/filebeat/docs/inputs/input-filestream.asciidoc index 54283d6cce7..96ba5e273e5 100644 --- a/filebeat/docs/inputs/input-filestream.asciidoc +++ b/filebeat/docs/inputs/input-filestream.asciidoc @@ -34,6 +34,12 @@ The `log` writes the complete file state. 7. Stale entries can be removed from the registry, even if there is no active input. +8. The input can identify files based on their contents when using the +<> +<> instead +of the default inode and device ID. This solves data duplication +caused by inode reuse. + To configure this input, specify a list of glob-based <> that must be crawled to locate and fetch the log lines. @@ -86,20 +92,42 @@ multiple input sections: [[filestream-file-identity]] ==== Reading files on network shares and cloud providers -WARNING: Filebeat does not support reading from network shares and cloud providers. - -However, one of the limitations of these data sources can be mitigated -if you configure Filebeat adequately. +WARNING: Some file identity methods do not support reading from +network shares and cloud providers, to avoid duplicating events, use +`fingerprint` when reading from network shares or cloud providers. By default, {beatname_uc} identifies files based on their inodes and device IDs. However, on network shares and cloud providers these values might change during the lifetime of the file. If this happens {beatname_uc} thinks that file is new and resends the whole content of the file. To solve this problem you can configure the `file_identity` option. Possible -values besides the default `inode_deviceid` are `path`, `inode_marker` and `fingerprint`. - -WARNING: Changing `file_identity` methods between runs may result in -duplicated events in the output. +values besides the default `native` (inode + device ID) are +`fingerprint`, `path` and `inode_marker`. + +IMPORTANT: Changing `file_identity` is only supported when +migrating from `native` or `path` to `fingerprint`. + +WARNING: Any unsupported change in `file_identity` methods between +runs may result in duplicated events in the output. + +`fingerprint` is the recommended file identity because it does not +rely on the file system/OS, it generates a hash from a portion of the +file (the first 1024 bytes, by default) and uses that to identify the +file. This works well with log rotation strategies that move/rename +the file and on Windows as file identifiers might be more +volatile. The downside is that {beatname_uc} will wait until the file +reaches 1024 bytes before start ingesting any file. + +WARNING: In order to use this file identity option, one must enable +the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint +option in the scanner>>. Once this file identity is enabled, changing +the fingerprint configuration (offset, length, etc) will lead to a +global re-ingestion of all files that match the paths configuration of +the input. + +Please refer to the +<<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint +configuration for details>>. Selecting `path` instructs {beatname_uc} to identify files based on their paths. This is a quick way to avoid rereading files if inode and device ids @@ -117,13 +145,6 @@ example oneliner generates a hidden marker file for the selected mountpoint `/lo Please note that you should not use this option on Windows as file identifiers might be more volatile. -Selecting `fingerprint` instructs {beatname_uc} to identify files based on their -content byte range. - -WARNING: In order to use this file identity option, one must enable the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint option in the scanner>>. Once this file identity is enabled, changing the fingerprint configuration (offset, length, etc) will lead to a global re-ingestion of all files that match the paths configuration of the input. - -Please refer to the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint configuration for details>>. - ["source","sh",subs="attributes"] ---- $ lsblk -o MOUNTPOINT,UUID | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 700f3d8e788..be189fdfd1c 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1266,6 +1266,8 @@ filebeat.inputs: # batch of events has been published successfully. The default value is 1s. #filebeat.registry.flush: 1s +# The interval which to run the registry clean up +#filebeat.registry.cleanup_interval: 5m # Starting with Filebeat 7.0, the registry uses a new directory format to store # Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x diff --git a/filebeat/include/list.go b/filebeat/include/list.go index e2a656a2a85..ae05c332eaa 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -28,6 +28,7 @@ import ( // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/filebeat/input" _ "github.com/elastic/beats/v7/filebeat/input/container" + _ "github.com/elastic/beats/v7/filebeat/input/filestream" _ "github.com/elastic/beats/v7/filebeat/input/log" _ "github.com/elastic/beats/v7/filebeat/input/mqtt" _ "github.com/elastic/beats/v7/filebeat/input/redis" diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index f9804bb16f3..80460d6b3b4 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -386,6 +386,7 @@ func getIDFromPath(filepath, inputID string, fi os.FileInfo) string { // waitUntilEventCount waits until total count events arrive to the client. func (e *inputTestingEnvironment) waitUntilEventCount(count int) { + e.t.Helper() msg := &strings.Builder{} require.Eventuallyf(e.t, func() bool { msg.Reset() @@ -448,9 +449,14 @@ func (e *inputTestingEnvironment) waitUntilAtLeastEventCount(count int) { // waitUntilHarvesterIsDone detects Harvester stop by checking if the last client has been closed // as when a Harvester stops the client is closed. func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() { - for !e.pipeline.clients[len(e.pipeline.clients)-1].closed { - time.Sleep(10 * time.Millisecond) - } + require.Eventually( + e.t, + func() bool { + return e.pipeline.clients[len(e.pipeline.clients)-1].closed + }, + time.Second*10, + time.Millisecond*10, + "The last connected client has not closed it's connection") } // requireEventsReceived requires that the list of messages has made it into the output. diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 9fae0481ca6..03674772e0d 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -222,6 +222,7 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: + fingerprint.enabled: false check_interval: 10ms ` @@ -260,6 +261,7 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: + fingerprint.enabled: false check_interval: 50ms ` @@ -370,6 +372,7 @@ scanner: } cfgStr := ` scanner: + fingerprint.enabled: false check_interval: 100ms ` @@ -615,6 +618,7 @@ scanner: name: "returns no symlink if the original file is excluded", cfgStr: ` scanner: + fingerprint.enabled: false exclude_files: ['.*exclude.*', '.*traveler.*'] symlinks: true `, @@ -661,6 +665,7 @@ scanner: name: "returns no included symlink if the original file is not included", cfgStr: ` scanner: + fingerprint.enabled: false include_files: ['.*include.*', '.*portal.*'] symlinks: true `, @@ -678,6 +683,7 @@ scanner: name: "returns an included symlink if the original file is included", cfgStr: ` scanner: + fingerprint.enabled: false include_files: ['.*include.*', '.*portal.*', '.*traveler.*'] symlinks: true `, diff --git a/filebeat/input/filestream/identifier_test.go b/filebeat/input/filestream/identifier_test.go index 1fcd4d73efa..f2cd0102823 100644 --- a/filebeat/input/filestream/identifier_test.go +++ b/filebeat/input/filestream/identifier_test.go @@ -18,7 +18,6 @@ package filestream import ( - "io/ioutil" "os" "testing" @@ -35,12 +34,17 @@ type testFileIdentifierConfig struct { } func TestFileIdentifier(t *testing.T) { - t.Run("default file identifier", func(t *testing.T) { - identifier, err := newFileIdentifier(nil, "") + t.Run("native file identifier", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(`native: ~`) + ns := conf.Namespace{} + if err := cfg.Unpack(&ns); err != nil { + t.Fatalf("cannot unpack config into conf.Namespace: %s", err) + } + identifier, err := newFileIdentifier(&ns, "") require.NoError(t, err) assert.Equal(t, DefaultIdentifierName, identifier.Name()) - tmpFile, err := ioutil.TempFile("", "test_file_identifier_native") + tmpFile, err := os.CreateTemp("", "test_file_identifier_native") if err != nil { t.Fatalf("cannot create temporary file for test: %v", err) } @@ -59,12 +63,17 @@ func TestFileIdentifier(t *testing.T) { assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name()) }) - t.Run("default file identifier with suffix", func(t *testing.T) { - identifier, err := newFileIdentifier(nil, "my-suffix") + t.Run("native file identifier with suffix", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(`native: ~`) + ns := conf.Namespace{} + if err := cfg.Unpack(&ns); err != nil { + t.Fatalf("cannot unpack config into conf.Namespace: %s", err) + } + identifier, err := newFileIdentifier(&ns, "my-suffix") require.NoError(t, err) assert.Equal(t, DefaultIdentifierName, identifier.Name()) - tmpFile, err := ioutil.TempFile("", "test_file_identifier_native") + tmpFile, err := os.CreateTemp("", "test_file_identifier_native") if err != nil { t.Fatalf("cannot create temporary file for test: %v", err) } diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 80327d8bcf2..5c063481dd5 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -52,11 +52,13 @@ func TestFilestreamCloseRenamed(t *testing.T) { // the output to receive the event and then close the source file. id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName) + "*"}, - "prospector.scanner.check_interval": "10ms", - "close.on_state_change.check_interval": "1ms", - "close.on_state_change.renamed": "true", + "id": id, + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "10ms", + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.renamed": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first log line\n") @@ -94,9 +96,11 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName) + "*"}, - "prospector.scanner.check_interval": "1ms", + "id": id, + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testline := []byte("log line\n") @@ -132,11 +136,13 @@ func TestFilestreamCloseRemoved(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName) + "*"}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "1ms", - "close.on_state_change.removed": "true", + "id": id, + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.removed": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first log line\n") @@ -173,10 +179,12 @@ func TestFilestreamCloseEOF(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "24h", - "close.reader.on_eof": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, + "close.reader.on_eof": "true", }) testlines := []byte("first log line\n") @@ -209,9 +217,11 @@ func TestFilestreamEmptyLine(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -248,9 +258,11 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -272,8 +284,10 @@ func TestFilestreamBOMUTF8(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) // BOM: 0xEF,0xBB,0xBF @@ -315,9 +329,11 @@ func TestFilestreamUTF16BOMs(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "encoding": name, + "id": id, + "paths": []string{env.abspath(testlogName)}, + "encoding": name, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) line := []byte("first line\n") @@ -348,11 +364,13 @@ func TestFilestreamCloseTimeout(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "100ms", - "close.reader.after_interval": "500ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.reader.after_interval": "500ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\n") @@ -382,11 +400,13 @@ func TestFilestreamCloseAfterInterval(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "100ms", - "close.on_state_change.inactive": "2s", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.on_state_change.inactive": "2s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -417,7 +437,9 @@ func TestFilestreamCloseAfterIntervalRemoved(t *testing.T) { "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime - "close.on_state_change.removed": "false", + "close.on_state_change.removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -450,7 +472,9 @@ func TestFilestreamCloseAfterIntervalRenamed(t *testing.T) { "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime - "close.on_state_change.removed": "false", + "close.on_state_change.removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -485,7 +509,9 @@ func TestFilestreamCloseAfterIntervalRotatedAndRemoved(t *testing.T) { "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime - "close.on_state_change.removed": "false", + "close.on_state_change.removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -514,11 +540,13 @@ func TestFilestreamCloseAfterIntervalRotatedAndNewRemoved(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "close.on_state_change.check_interval": "10ms", - "close.on_state_change.inactive": "100ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": "false", + "prospector.scanner.check_interval": "1ms", + "close.on_state_change.check_interval": "10ms", + "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime "close.on_state_change.removed": "false", @@ -558,10 +586,12 @@ func TestFilestreamTruncatedFileOpen(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -592,11 +622,13 @@ func TestFilestreamTruncatedFileClosed(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", - "close.reader.on_eof": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "close.reader.on_eof": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -635,9 +667,11 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) { env.abspath(testlogName), env.abspath(symlinkName), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", - "prospector.scanner.symlinks": "true", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.symlinks": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) lines := []byte("first line\nsecond line\nthird line\n") @@ -675,10 +709,12 @@ func TestFilestreamTruncateBigScannerInterval(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "5s", - "prospector.scanner.resend_on_touch": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "5s", + "prospector.scanner.resend_on_touch": "true", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -707,10 +743,12 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -737,9 +775,11 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "200ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "200ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\n") @@ -792,7 +832,9 @@ func TestFilestreamSymlinksEnabled(t *testing.T) { "paths": []string{ env.abspath(symlinkName), }, - "prospector.scanner.symlinks": "true", + "prospector.scanner.symlinks": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\n") @@ -824,10 +866,12 @@ func TestFilestreamSymlinkRotated(t *testing.T) { "paths": []string{ env.abspath(symlinkName), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.symlinks": "true", - "close.on_state_change.removed": "false", - "clean_removed": "false", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) commonLine := "first line in file " @@ -874,10 +918,12 @@ func TestFilestreamSymlinkRemoved(t *testing.T) { "paths": []string{ env.abspath(symlinkName), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.symlinks": "true", - "close.on_state_change.removed": "false", - "clean_removed": "false", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) line := []byte("first line\n") @@ -918,9 +964,11 @@ func TestFilestreamTruncate(t *testing.T) { "paths": []string{ env.abspath("*"), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", - "prospector.scanner.symlinks": "true", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.symlinks": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) lines := []byte("first line\nsecond line\nthird line\n") @@ -978,6 +1026,8 @@ func TestFilestreamHarvestAllFilesWhenHarvesterLimitExceeded(t *testing.T) { "paths": []string{ env.abspath(logFiles[0].path), env.abspath(logFiles[1].path)}, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -994,8 +1044,10 @@ func TestGlobalIDCannotBeUsed(t *testing.T) { env := newInputTestingEnvironment(t) testlogName := "test.log" _, err := env.createInput(map[string]interface{}{ - "id": ".global", - "paths": []string{env.abspath(testlogName) + "*"}, + "id": ".global", + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) if err == nil { t.Fatal("expecting an error because '.global' cannot be used as input ID") @@ -1013,10 +1065,12 @@ func TestRotatingCloseInactiveLargerWriteRate(t *testing.T) { "paths": []string{ env.abspath("*"), }, - "prospector.scanner.check_interval": "100ms", - "close.on_state_change.check_interval": "1s", - "close.on_state_change.inactive": "5s", - "ignore_older": "10s", + "prospector.scanner.check_interval": "100ms", + "close.on_state_change.check_interval": "1s", + "close.on_state_change.inactive": "5s", + "ignore_older": "10s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -1060,10 +1114,12 @@ func TestRotatingCloseInactiveLowWriteRate(t *testing.T) { "paths": []string{ env.abspath("*"), }, - "prospector.scanner.check_interval": "1ms", - "close.on_state_change.check_interval": "1ms", - "close.on_state_change.inactive": "1s", - "ignore_older": "10s", + "prospector.scanner.check_interval": "1ms", + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.inactive": "1s", + "ignore_older": "10s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index 3dfe176ac01..735ea0d0ffe 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -50,6 +50,7 @@ func BenchmarkFilestream(b *testing.B) { cfg := ` type: filestream prospector.scanner.check_interval: 1s +prospector.scanner.fingerprint.enabled: false paths: - ` + filename + ` ` @@ -91,6 +92,7 @@ paths: cfg := ` type: filestream prospector.scanner.check_interval: 1s +prospector.scanner.fingerprint.enabled: false paths: - ` + ingestPath + ` ` @@ -146,6 +148,7 @@ func TestTakeOverTags(t *testing.T) { cfg := fmt.Sprintf(` type: filestream prospector.scanner.check_interval: 1s +prospector.scanner.fingerprint.enabled: false take_over: %t paths: - %s`, testCase.takeOver, filename) diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index 733e55fe26e..2f90d440e36 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -56,14 +56,13 @@ type ProspectorCleaner interface { // The function passed to UpdateIdentifiers must return an empty string if the key // remains the same. UpdateIdentifiers(func(v Value) (string, interface{})) - - // FixUpIdentifiers migrates IDs in the registry from inputs - // that used the deprecated `.global` ID. - FixUpIdentifiers(func(v Value) (string, interface{})) } // Value contains the cursor metadata. type Value interface { // UnpackCursorMeta returns the cursor metadata required by the prospector. UnpackCursorMeta(to interface{}) error + + // Key return the registry's key for this resource + Key() string } diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 024ca5c9bfd..eb4f9cd7354 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -212,13 +212,28 @@ func (s *sourceStore) CleanIf(pred func(v Value) bool) { } } -// FixUpIdentifiers copies an existing resource to a new ID and marks the previous one +// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one // for removal. -func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface{})) { +func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) { s.store.ephemeralStore.mu.Lock() defer s.store.ephemeralStore.mu.Unlock() for key, res := range s.store.ephemeralStore.table { + // Entries in the registry are soft deleted, once the gcStore runs, + // they're actually removed from the in-memory registry (ephemeralStore) + // and marked as removed in the registry operations log. So we need + // to skip all entries that were soft deleted. + // + // - res.internalState.TTL == 0: entry has been deleted + // - res.internalState.TTL == -1: entry will never be removed by TTL + // - res.internalState.TTL > 0: entry will be removed once its TTL + // is reached + // + // If the entry has been deleted, skip it + if res.internalState.TTL == 0 { + continue + } + if !s.identifier.MatchesInput(key) { continue } @@ -229,68 +244,30 @@ func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface } newKey, updatedMeta := getNewID(res) - if len(newKey) > 0 && res.internalState.TTL > 0 { + if len(newKey) > 0 { if _, ok := s.store.ephemeralStore.table[newKey]; ok { res.lock.Unlock() continue } - // Pending updates due to events that have not yet been ACKed - // are not included in the copy. Collection on - // the copy start from the last known ACKed position. - // This might lead to data duplication because the harvester - // will pickup from the last ACKed position using the new key - // and the pending updates will affect the entry with the oldKey. r := res.copyWithNewKey(newKey) r.cursorMeta = updatedMeta r.stored = false + // writeState only writes to the log file (disk) + // the write is synchronous s.store.writeState(r) // Add the new resource to the ephemeralStore so the rest of the // codebase can have access to the new value s.store.ephemeralStore.table[newKey] = r - // Remove the old key from the store - s.store.UpdateTTL(res, 0) // aka delete. See store.remove for details - s.store.log.Infof("migrated entry in registry from '%s' to '%s'", key, newKey) - } - - res.lock.Unlock() - } -} - -// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one -// for removal. -func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) { - s.store.ephemeralStore.mu.Lock() - defer s.store.ephemeralStore.mu.Unlock() - - for key, res := range s.store.ephemeralStore.table { - if !s.identifier.MatchesInput(key) { - continue - } - - if !res.lock.TryLock() { - continue - } - - newKey, updatedMeta := getNewID(res) - if len(newKey) > 0 && res.internalState.TTL > 0 { - if _, ok := s.store.ephemeralStore.table[newKey]; ok { - res.lock.Unlock() - continue - } - - // Pending updates due to events that have not yet been ACKed - // are not included in the copy. Collection on - // the copy start from the last known ACKed position. - // This might lead to data duplication because the harvester - // will pickup from the last ACKed position using the new key - // and the pending updates will affect the entry with the oldKey. - r := res.copyWithNewKey(newKey) - r.cursorMeta = updatedMeta - r.stored = false - s.store.writeState(r) + // Remove the old key from the store aka delete. This is also + // synchronously written to the disk. + // We cannot use store.remove because it will + // acquire the same lock we hold, causing a deadlock. + // See store.remove for details. + s.store.UpdateTTL(res, 0) + s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", key, newKey, r.cursor) } res.lock.Unlock() @@ -482,10 +459,16 @@ func (r *resource) UnpackCursor(to interface{}) error { return typeconv.Convert(to, r.activeCursor()) } +// UnpackCursorMeta unpacks the cursor metadata's into the provided struct. func (r *resource) UnpackCursorMeta(to interface{}) error { return typeconv.Convert(to, r.cursorMeta) } +// Key returns the resource's key +func (r *resource) Key() string { + return r.key +} + // syncStateSnapshot returns the current insync state based on already ACKed update operations. func (r *resource) inSyncStateSnapshot() state { return state{ diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index 6f19e1afad7..2d4f98b5d29 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -347,11 +347,11 @@ type testMeta struct { func TestSourceStore_UpdateIdentifiers(t *testing.T) { t.Run("update identifiers when TTL is bigger than zero", func(t *testing.T) { backend := createSampleStore(t, map[string]state{ - "test::key1": { + "test::key1": { // Active resource TTL: 60 * time.Second, Meta: testMeta{IdentifierName: "method"}, }, - "test::key2": { + "test::key2": { // Deleted resource TTL: 0 * time.Second, Meta: testMeta{IdentifierName: "method"}, }, @@ -372,22 +372,25 @@ func TestSourceStore_UpdateIdentifiers(t *testing.T) { return "", nil }) - var newState state - s.persistentStore.Get("test::key1::updated", &newState) + // The persistentStore is a mock that does not consider if a state has + // been removed before returning it, thus allowing us to get Updated + // timestamp from when the resource was deleted. + var deletedState state + s.persistentStore.Get("test::key1", &deletedState) want := map[string]state{ - "test::key1": { - Updated: s.Get("test::key1").internalState.Updated, - TTL: 60 * time.Second, + "test::key1": { // old resource is deleted, TTL must be zero + Updated: deletedState.Updated, + TTL: 0 * time.Second, Meta: map[string]interface{}{"identifiername": "method"}, }, - "test::key2": { + "test::key2": { // Unchanged Updated: s.Get("test::key2").internalState.Updated, TTL: 0 * time.Second, Meta: map[string]interface{}{"identifiername": "method"}, }, - "test::key1::updated": { - Updated: newState.Updated, + "test::key1::updated": { // Updated resource + Updated: s.Get("test::key1::updated").internalState.Updated, TTL: 60 * time.Second, Meta: map[string]interface{}{"identifiername": "something"}, }, diff --git a/filebeat/input/filestream/legacy_metrics_integration_test.go b/filebeat/input/filestream/legacy_metrics_integration_test.go index 649ede41f3e..ec2e18a3706 100644 --- a/filebeat/input/filestream/legacy_metrics_integration_test.go +++ b/filebeat/input/filestream/legacy_metrics_integration_test.go @@ -41,6 +41,8 @@ filebeat.inputs: enabled: true close.reader.after_interval: 1s prospector.scanner.check_interval: 500ms + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false paths: - %s/*.filestream - type: log @@ -48,6 +50,8 @@ filebeat.inputs: enabled: true close_timeout: 1s scan_frequency: 500ms + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false paths: - %s/*.log @@ -71,7 +75,9 @@ func TestLegacyMetrics(t *testing.T) { filebeat.WriteConfigFile(cfg) filebeat.Start() - filebeat.WaitForLogs("Metrics endpoint listening on:", 10*time.Second) + filebeat.WaitForLogs("Metrics endpoint listening on:", + 10*time.Second, + "metrics endpoint did not start") // After starting Filebeat all counters must be zero waitForMetrics(t, diff --git a/filebeat/input/filestream/metrics_integration_test.go b/filebeat/input/filestream/metrics_integration_test.go index 3671f076d0e..b551b2321b7 100644 --- a/filebeat/input/filestream/metrics_integration_test.go +++ b/filebeat/input/filestream/metrics_integration_test.go @@ -33,11 +33,13 @@ func TestFilestreamMetrics(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "100ms", - "close.on_state_change.inactive": "2s", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.on_state_change.inactive": "2s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") diff --git a/filebeat/input/filestream/parsers_integration_test.go b/filebeat/input/filestream/parsers_integration_test.go index 619d39f0512..858f4e6d1ce 100644 --- a/filebeat/input/filestream/parsers_integration_test.go +++ b/filebeat/input/filestream/parsers_integration_test.go @@ -29,9 +29,11 @@ func TestParsersAgentLogs(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "ndjson": map[string]interface{}{ @@ -65,9 +67,11 @@ func TestParsersIncludeMessage(t *testing.T) { testlogName := "test.log" readLine := "include this" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "100ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "100ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "include_message": map[string]interface{}{ @@ -98,9 +102,11 @@ func TestParsersDockerLogsFiltering(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "ndjson": map[string]interface{}{ @@ -137,9 +143,11 @@ func TestParsersSimpleJSONOverwrite(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "ndjson": map[string]interface{}{ @@ -173,9 +181,11 @@ func TestParsersTimestampInJSONMessage(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "ndjson": map[string]interface{}{ @@ -214,9 +224,11 @@ func TestParsersJavaElasticsearchLogs(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -249,9 +261,11 @@ func TestParsersCStyleLog(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -290,9 +304,11 @@ func TestParsersRabbitMQMultilineLog(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -335,9 +351,11 @@ func TestParsersMultilineMaxLines(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -379,9 +397,11 @@ func TestParsersMultilineTimeout(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -444,10 +464,12 @@ func TestParsersMultilineMaxBytes(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "message_max_bytes": 50, + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "message_max_bytes": 50, + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -486,10 +508,12 @@ func TestParsersCloseTimeoutWithMultiline(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "close.reader.after_interval": "1s", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "close.reader.after_interval": "1s", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ @@ -551,10 +575,12 @@ func TestParsersConsecutiveNewline(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "close.reader.after_interval": "1s", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "close.reader.after_interval": "1s", + "file_identity.native": map[string]any{}, + "prospector.scanner.fingerprint.enabled": false, "parsers": []map[string]interface{}{ { "multiline": map[string]interface{}{ diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 2bf737a86fd..1e3b7cb6c69 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -47,11 +47,31 @@ var ignoreInactiveSettings = map[string]ignoreInactiveType{ ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart, } +var identifiersMap = map[string]fileIdentifier{} + +func init() { + for name, factory := range identifierFactories { + if name == inodeMarkerName { + // inode marker requires an specific config we cannot infer. + continue + } + + identifier, err := factory(nil) + if err != nil { + // Skip identifiers we cannot create. E.g: inode_marker is not + // supported on Windows + continue + } + identifiersMap[name] = identifier + } +} + // fileProspector implements the Prospector interface. // It contains a file scanner which returns file system events. // The FS events then trigger either new Harvester runs or updates // the statestore. type fileProspector struct { + logger *logp.Logger filewatcher loginp.FSWatcher identifier fileIdentifier ignoreOlder time.Duration @@ -70,7 +90,7 @@ func (p *fileProspector) Init( // If this fileProspector belongs to an input that did not have an ID // this will find its files in the registry and update them to use the // new ID. - globalCleaner.FixUpIdentifiers(func(v loginp.Value) (id string, val interface{}) { + globalCleaner.UpdateIdentifiers(func(v loginp.Value) (id string, val interface{}) { var fm fileMeta err := v.UnpackCursorMeta(&fm) if err != nil { @@ -101,6 +121,16 @@ func (p *fileProspector) Init( } identifierName := p.identifier.Name() + + // If the file identity has changed to fingerprint, update the registry + // keys so we can keep the state. This is only supported from file + // identities that do not require configuration: + // - native (inode + device ID) + // - path + if identifierName != fingerprintName { + p.logger.Debugf("file identity is '%s', will not migrate registry", identifierName) + return nil + } cleaner.UpdateIdentifiers(func(v loginp.Value) (string, interface{}) { var fm fileMeta err := v.UnpackCursorMeta(&fm) @@ -113,12 +143,58 @@ func (p *fileProspector) Init( return "", fm } - if fm.IdentifierName != identifierName { - newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}).Name() - fm.IdentifierName = identifierName - return newKey, fm + // Return early (do nothing) if: + // - The identifiers are the same + // - The old identifier is neither native nor path + oldIdentifierName := fm.IdentifierName + if oldIdentifierName == identifierName || + !(oldIdentifierName == nativeName || oldIdentifierName == pathName) { + return "", nil + } + + // Our current file (source) is in the registry, now we need to ensure + // this registry entry (resource) actually refers to our file. Sources + // are identified by path, however as log files rotate the same path + // can point to different files. + // + // So to ensure we're dealing with the resource from our current file, + // we use the old identifier to generate a registry key for the current + // file we're trying to migrate, if this key matches with the key in the + // registry, then we proceed to update the registry. + registryKey := v.Key() + oldIdentifier, ok := identifiersMap[oldIdentifierName] + if !ok { + // This should never happen, but just in case we properly handle it. + // If we cannot find the identifier, move on to the next entry + // some identifiers cannot be migrated + p.logger.Errorf( + "old file identity '%s' not found while migrating entry to"+ + "new file identity '%s'. If the file still exists, it will be re-ingested", + oldIdentifierName, + identifierName, + ) + return "", nil } - return "", fm + previousIdentifierKey := newID(oldIdentifier.GetSource( + loginp.FSEvent{ + NewPath: fm.Source, + Descriptor: fd, + })) + + // If the registry key and the key generated by the old identifier + // do not match, log it at debug level and do nothing. + if previousIdentifierKey != registryKey { + return "", fm + } + + // The resource matches the file we found in the file system, generate + // a new registry key and return it alongside the updated meta. + newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd})) + fm.IdentifierName = identifierName + p.logger.Infof("registry key: '%s' and previous file identity key: '%s', are the same, migrating. Source: '%s'", + registryKey, previousIdentifierKey, fm.Source) + + return newKey, fm }) return nil diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 5142704a614..91a5e0b30d3 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -53,9 +53,8 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("error while creating file identifier: %w", err) } - logp.L(). - With("filestream_id", config.ID). - Debugf("file identity is set to %s", identifier.Name()) + logger := logp.L().Named("input.filestream").With("filestream_id", config.ID) + logger.Debugf("file identity is set to %s", identifier.Name()) fileprospector := fileProspector{ filewatcher: filewatcher, @@ -64,6 +63,7 @@ func newProspector(config config) (loginp.Prospector, error) { ignoreInactiveSince: config.IgnoreInactive, cleanRemoved: config.CleanRemoved, stateChangeCloser: config.Close.OnStateChange, + logger: logger.Named("prospector"), } if config.Rotation == nil { return &fileprospector, nil diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index 552b4218c78..c1e806e3948 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -22,13 +22,14 @@ import ( "context" "fmt" "io/fs" - "io/ioutil" "os" + "path/filepath" "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" @@ -54,7 +55,8 @@ func TestProspector_InitCleanIfRemoved(t *testing.T) { "prospector init with clean_removed disabled with entries": { entries: map[string]loginp.Value{ "key1": &mockUnpackValue{ - fileMeta{ + key: "key1", + fileMeta: fileMeta{ Source: "/no/such/path", IdentifierName: "path", }, @@ -67,7 +69,8 @@ func TestProspector_InitCleanIfRemoved(t *testing.T) { "prospector init with clean_removed enabled with entries": { entries: map[string]loginp.Value{ "key1": &mockUnpackValue{ - fileMeta{ + key: "key1", + fileMeta: fileMeta{ Source: "/no/such/path", IdentifierName: "path", }, @@ -85,6 +88,7 @@ func TestProspector_InitCleanIfRemoved(t *testing.T) { t.Run(name, func(t *testing.T) { testStore := newMockProspectorCleaner(testCase.entries) p := fileProspector{ + logger: logp.L(), identifier: mustPathIdentifier(false), cleanRemoved: testCase.cleanRemoved, filewatcher: newMockFileWatcherWithFiles(testCase.filesOnDisk), @@ -97,13 +101,13 @@ func TestProspector_InitCleanIfRemoved(t *testing.T) { } func TestProspector_InitUpdateIdentifiers(t *testing.T) { - f, err := ioutil.TempFile("", "existing_file") + f, err := os.CreateTemp(t.TempDir(), "existing_file") if err != nil { t.Fatalf("cannot create temp file") } defer f.Close() tmpFileName := f.Name() - fi, err := f.Stat() + fi, err := f.Stat() //nolint:typecheck // It is used on L151 if err != nil { t.Fatalf("cannot stat test file: %v", err) } @@ -112,6 +116,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { entries map[string]loginp.Value filesOnDisk map[string]loginp.FileDescriptor expectedUpdatedKeys map[string]string + newKey string }{ "prospector init does not update keys if there are no entries": { entries: nil, @@ -121,7 +126,8 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { "prospector init does not update keys of not existing files": { entries: map[string]loginp.Value{ "not_path::key1": &mockUnpackValue{ - fileMeta{ + key: "not_path::key1", + fileMeta: fileMeta{ Source: "/no/such/path", IdentifierName: "not_path", }, @@ -130,10 +136,11 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { filesOnDisk: nil, expectedUpdatedKeys: map[string]string{}, }, - "prospector init updates keys of existing files": { + "prospector init does not update keys if new file identity is not fingerprint": { entries: map[string]loginp.Value{ "not_path::key1": &mockUnpackValue{ - fileMeta{ + key: "not_path::key1", + fileMeta: fileMeta{ Source: tmpFileName, IdentifierName: "not_path", }, @@ -142,7 +149,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { filesOnDisk: map[string]loginp.FileDescriptor{ tmpFileName: {Info: file.ExtendFileInfo(fi)}, }, - expectedUpdatedKeys: map[string]string{"not_path::key1": "path::" + tmpFileName}, + expectedUpdatedKeys: map[string]string{}, }, } @@ -152,16 +159,139 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { t.Run(name, func(t *testing.T) { testStore := newMockProspectorCleaner(testCase.entries) p := fileProspector{ + logger: logp.L(), identifier: mustPathIdentifier(false), filewatcher: newMockFileWatcherWithFiles(testCase.filesOnDisk), } - p.Init(testStore, newMockProspectorCleaner(nil), func(loginp.Source) string { return "" }) - + err := p.Init(testStore, newMockProspectorCleaner(nil), func(loginp.Source) string { return testCase.newKey }) + require.NoError(t, err, "prospector Init must succeed") assert.EqualValues(t, testCase.expectedUpdatedKeys, testStore.updatedKeys) }) } } +func TestMigrateRegistryToFingerprint(t *testing.T) { + const mockFingerprint = "the fingerprint from this file" + const mockInputPrefix = "test-input" + + logFileFullPath, err := filepath.Abs(filepath.Join("testdata", "log.log")) + if err != nil { + t.Fatalf("cannot get absolute path from test file: %s", err) + } + f, err := os.Open(logFileFullPath) + if err != nil { + t.Fatalf("cannot open test file") + } + defer f.Close() + tmpFileName := f.Name() + fi, err := f.Stat() + + fd := loginp.FileDescriptor{ + Filename: tmpFileName, + Info: file.ExtendFileInfo(fi), + Fingerprint: mockFingerprint, + } + + fingerprintIdentifier, _ := newFingerprintIdentifier(nil) + nativeIdentifier, _ := newINodeDeviceIdentifier(nil) + pathIdentifier, _ := newPathIdentifier(nil) + newIDFunc := func(s loginp.Source) string { + return mockInputPrefix + "-" + s.Name() + } + + fsEvent := loginp.FSEvent{ + OldPath: logFileFullPath, + NewPath: logFileFullPath, + Op: loginp.OpCreate, + Descriptor: fd, + } + + expectedNewKey := newIDFunc(fingerprintIdentifier.GetSource(fsEvent)) + + testCases := map[string]struct { + oldIdentifier fileIdentifier + newIdentifier fileIdentifier + expectRegistryMigration bool + }{ + "inode to fingerprint succeeds": { + oldIdentifier: nativeIdentifier, + newIdentifier: fingerprintIdentifier, + expectRegistryMigration: true, + }, + "path to fingerprint succeeds": { + oldIdentifier: pathIdentifier, + newIdentifier: fingerprintIdentifier, + expectRegistryMigration: true, + }, + "fingerprint to fingerprint fails": { + oldIdentifier: fingerprintIdentifier, + newIdentifier: fingerprintIdentifier, + }, + + // If the new identifier is not fingerprint, it will always fail. + // So we only test a couple of combinations + "fingerprint to native fails": { + oldIdentifier: fingerprintIdentifier, + newIdentifier: nativeIdentifier, + }, + "path to native fails": { + oldIdentifier: pathIdentifier, + newIdentifier: nativeIdentifier, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + oldKey := newIDFunc(tc.oldIdentifier.GetSource(fsEvent)) + entries := map[string]loginp.Value{ + oldKey: &mockUnpackValue{ + key: oldKey, + fileMeta: fileMeta{ + Source: logFileFullPath, + IdentifierName: tc.oldIdentifier.Name(), + }, + }, + } + + testStore := newMockProspectorCleaner(entries) + filesOnDisk := map[string]loginp.FileDescriptor{ + tmpFileName: fd, + } + + p := fileProspector{ + logger: logp.L(), + identifier: tc.newIdentifier, + filewatcher: newMockFileWatcherWithFiles(filesOnDisk), + } + + err = p.Init( + testStore, + newMockProspectorCleaner(nil), + newIDFunc, + ) + require.NoError(t, err, "prospector Init must succeed") + + // testStore.updatedKeys is in the format + // oldKey -> newKey + if tc.expectRegistryMigration { + assert.Equal( + t, + map[string]string{ + oldKey: expectedNewKey, + }, + testStore.updatedKeys, + "the registry entries were not correctly migrated") + } else { + assert.Equal( + t, + map[string]string{}, + testStore.updatedKeys, + "expecting no migration") + } + }) + } +} + func TestProspectorNewAndUpdatedFiles(t *testing.T) { minuteAgo := time.Now().Add(-1 * time.Minute) @@ -246,6 +376,7 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { t.Run(name, func(t *testing.T) { p := fileProspector{ + logger: logp.L(), filewatcher: newMockFileWatcher(test.events, len(test.events)), identifier: mustPathIdentifier(false), ignoreOlder: test.ignoreOlder, @@ -283,6 +414,7 @@ func TestProspectorHarvesterUpdateIgnoredFiles(t *testing.T) { filewatcher := newMockFileWatcher([]loginp.FSEvent{eventCreate}, 2) p := fileProspector{ + logger: logp.L(), filewatcher: filewatcher, identifier: mustPathIdentifier(false), ignoreOlder: 10 * time.Second, @@ -347,6 +479,7 @@ func TestProspectorDeletedFile(t *testing.T) { t.Run(name, func(t *testing.T) { p := fileProspector{ + logger: logp.L(), filewatcher: newMockFileWatcher(test.events, len(test.events)), identifier: mustPathIdentifier(false), cleanRemoved: test.cleanRemoved, @@ -428,6 +561,7 @@ func TestProspectorRenamedFile(t *testing.T) { t.Run(name, func(t *testing.T) { p := fileProspector{ + logger: logp.L(), filewatcher: newMockFileWatcher(test.events, len(test.events)), identifier: mustPathIdentifier(test.trackRename), stateChangeCloser: stateChangeCloserConfig{Renamed: test.closeRenamed}, @@ -600,12 +734,17 @@ func (mu *mockMetadataUpdater) Remove(s loginp.Source) error { type mockUnpackValue struct { fileMeta + key string } func (u *mockUnpackValue) UnpackCursorMeta(to interface{}) error { return typeconv.Convert(to, u.fileMeta) } +func (u *mockUnpackValue) Key() string { + return u.key +} + type mockProspectorCleaner struct { available map[string]loginp.Value cleanedKeys []string @@ -695,6 +834,7 @@ func TestOnRenameFileIdentity(t *testing.T) { for k, tc := range testCases { t.Run(k, func(t *testing.T) { p := fileProspector{ + logger: logp.L(), filewatcher: newMockFileWatcher(tc.events, len(tc.events)), identifier: mustPathIdentifier(true), stateChangeCloser: stateChangeCloserConfig{Renamed: true}, diff --git a/filebeat/input/filestream/testdata/log.log b/filebeat/input/filestream/testdata/log.log new file mode 100644 index 00000000000..733afc5a1aa --- /dev/null +++ b/filebeat/input/filestream/testdata/log.log @@ -0,0 +1,10 @@ +51.157.82.254 - collins3480 [06/Dec/2024:17:03:34 -0500] "GET /enable/transparent HTTP/2.0" 503 29836 +128.72.132.219 - - [06/Dec/2024:17:03:34 -0500] "PATCH /redefine/paradigms/front-end/synergies HTTP/2.0" 200 2307 +153.167.184.78 - - [06/Dec/2024:17:03:34 -0500] "HEAD /leading-edge/interactive/interactive/one-to-one HTTP/2.0" 204 18593 +175.195.94.204 - - [06/Dec/2024:17:03:34 -0500] "PUT /incentivize HTTP/2.0" 301 3998 +235.228.211.66 - hoppe3344 [06/Dec/2024:17:03:34 -0500] "DELETE /proactive/customized/action-items/killer HTTP/2.0" 203 24605 +6.175.232.33 - - [06/Dec/2024:17:03:34 -0500] "HEAD /extensible/productize/b2b HTTP/1.0" 503 15893 +146.190.210.171 - - [06/Dec/2024:17:03:34 -0500] "HEAD /architect/embrace/evolve HTTP/1.0" 502 9833 +224.125.203.225 - - [06/Dec/2024:17:03:34 -0500] "DELETE /turn-key/infrastructures/vortals HTTP/1.0" 100 17062 +194.157.121.128 - nicolas3550 [06/Dec/2024:17:03:34 -0500] "PATCH /vortals/scalable/experiences/deploy HTTP/1.1" 503 8034 +88.58.87.19 - - [06/Dec/2024:17:03:34 -0500] "GET /vertical/schemas HTTP/2.0" 405 2034 diff --git a/filebeat/tests/integration/event_log_file_test.go b/filebeat/tests/integration/event_log_file_test.go index fce7672199f..793a3386af7 100644 --- a/filebeat/tests/integration/event_log_file_test.go +++ b/filebeat/tests/integration/event_log_file_test.go @@ -37,6 +37,8 @@ filebeat.inputs: - type: filestream id: filestream-input-id enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false parsers: - ndjson: target: "" diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 3ddb04a2c20..24125469dd8 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -20,12 +20,18 @@ package integration import ( + "errors" "fmt" + "os" "path" "path/filepath" + "strings" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/tests/integration" ) @@ -36,6 +42,8 @@ filebeat.inputs: paths: - %s + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false clean_inactive: 3s ignore_older: 2s close.on_state_change.inactive: 1s @@ -105,3 +113,450 @@ func TestFilestreamCleanInactive(t *testing.T) { registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json") filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second) } + +func TestFilestreamValidationPreventsFilebeatStart(t *testing.T) { + duplicatedIDs := ` +filebeat.inputs: + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/*.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + emptyID := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + multipleDuplicatedIDs := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1-2.log + + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + tcs := []struct { + name string + cfg string + }{ + { + name: "duplicated IDs", + cfg: duplicatedIDs, + }, + { + name: "duplicated empty ID", + cfg: emptyID, + }, + { + name: "two inputs without ID and duplicated IDs", + cfg: multipleDuplicatedIDs, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(tc.cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "filestream inputs validation error", + 10*time.Second, + "Filebeat did not log a filestream input validation error") + + proc, err := filebeat.Process.Wait() + require.NoError(t, err, "filebeat process.Wait returned an error") + assert.False(t, proc.Success(), "filebeat should have failed to start") + + }) + } +} + +func TestFilestreamValidationSucceeds(t *testing.T) { + cfg := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "Input 'filestream' starting", + 10*time.Second, + "Filebeat did log a validation error") +} + +func TestFilestreamCanMigrateIdentity(t *testing.T) { + cfgTemplate := ` +filebeat.inputs: + - type: filestream + id: "test-migrate-ID" + paths: + - %s +%s + +queue.mem: + flush.timeout: 0s + +path.home: %s + +output.file: + path: ${path.home} + filename: "output-file" + rotate_on_startup: false + +logging: + level: debug + selectors: + - input + - input.filestream + - input.filestream.prospector + metrics: + enabled: false +` + nativeCfg := ` + file_identity.native: ~ +` + pathCfg := ` + file_identity.path: ~ +` + fingerprintCfg := ` + file_identity.fingerprint: ~ + prospector: + scanner: + fingerprint.enabled: true + check_interval: 0.1s +` + + testCases := map[string]struct { + oldIdentityCfg string + oldIdentityName string + newIdentityCfg string + notMigrateMsg string + expectMigration bool + }{ + "native to fingerprint": { + oldIdentityCfg: nativeCfg, + oldIdentityName: "native", + newIdentityCfg: fingerprintCfg, + expectMigration: true, + }, + + "path to fingerprint": { + oldIdentityCfg: pathCfg, + oldIdentityName: "path", + newIdentityCfg: fingerprintCfg, + expectMigration: true, + }, + + "path to native": { + oldIdentityCfg: pathCfg, + newIdentityCfg: nativeCfg, + oldIdentityName: "path", + expectMigration: false, + notMigrateMsg: "file identity is 'native', will not migrate registry", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + workDir := filebeat.TempDir() + outputFile := filepath.Join(workDir, "output-file*") + logFilepath := filepath.Join(workDir, "log.log") + integration.GenerateLogFile(t, logFilepath, 25, false) + + cfgYAML := fmt.Sprintf(cfgTemplate, logFilepath, tc.oldIdentityCfg, workDir) + filebeat.WriteConfigFile(cfgYAML) + filebeat.Start() + + // Wait for the file to be fully ingested + eofMsg := fmt.Sprintf("End of file reached: %s; Backoff now.", logFilepath) + filebeat.WaitForLogs(eofMsg, time.Second*10, "EOF was not reached") + requirePublishedEvents(t, filebeat, 25, outputFile) + filebeat.Stop() + + newCfg := fmt.Sprintf(cfgTemplate, logFilepath, tc.newIdentityCfg, workDir) + if err := os.WriteFile(filebeat.ConfigFilePath(), []byte(newCfg), 0o644); err != nil { + t.Fatalf("cannot write new configuration file: %s", err) + } + + filebeat.Start() + + // The happy path is to migrate keys, so we assert it first + if tc.expectMigration { + // Test the case where the registry migration happens + migratingMsg := fmt.Sprintf("are the same, migrating. Source: '%s'", logFilepath) + filebeat.WaitForLogs(migratingMsg, time.Second*5, "prospector did not migrate registry entry") + filebeat.WaitForLogs("migrated entry in registry from", time.Second*10, "store did not update registry key") + filebeat.WaitForLogs(eofMsg, time.Second*10, "EOF was not reached the second time") + requirePublishedEvents(t, filebeat, 25, outputFile) + + // Ingest more data to ensure the offset was migrated + integration.GenerateLogFile(t, logFilepath, 17, true) + filebeat.WaitForLogs(eofMsg, time.Second*5, "EOF was not reached the third time") + + requirePublishedEvents(t, filebeat, 42, outputFile) + requireRegistryEntryRemoved(t, workDir, tc.oldIdentityName) + return + } + + // Another option is for no keys to be migrated because the current + // file identity is not fingerprint + if tc.notMigrateMsg != "" { + filebeat.WaitForLogs(tc.notMigrateMsg, time.Second*5, "the registry should not have been migrated") + } + + // The last thing to test when there is no migration is to assert + // the file has been fully re-ingested because the file identity + // changed + filebeat.WaitForLogs(eofMsg, time.Second*10, "EOF was not reached the second time") + requirePublishedEvents(t, filebeat, 50, outputFile) + + // Ingest more data to ensure the offset is correctly tracked + integration.GenerateLogFile(t, logFilepath, 10, true) + filebeat.WaitForLogs(eofMsg, time.Second*5, "EOF was not reached the third time") + requirePublishedEvents(t, filebeat, 60, outputFile) + }) + } +} + +func TestFilestreamMigrateIdentityCornerCases(t *testing.T) { + cfgTemplate := ` +filebeat.inputs: + - type: filestream + id: "test-migrate-ID" + paths: + - %s +%s + +queue.mem: + flush.timeout: 0s + +path.home: %s + +output.file: + path: ${path.home} + filename: "output-file" + rotate_on_startup: false + +logging: + level: debug + selectors: + - input + - input.filestream + - input.filestream.prospector + metrics: + enabled: false +` + nativeCfg := ` + file_identity.native: ~ + prospector: + scanner: + fingerprint.enabled: false + check_interval: 0.1s +` + fingerprintCfg := ` + file_identity.fingerprint: ~ + prospector: + scanner: + fingerprint.enabled: true + check_interval: 0.1s +` + + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + workDir := filebeat.TempDir() + + logFilepath := filepath.Join(workDir, "log.log") + outputFile := filepath.Join(workDir, "output-file*") + + cfgYAML := fmt.Sprintf(cfgTemplate, logFilepath, nativeCfg, workDir) + filebeat.WriteConfigFile(cfgYAML) + filebeat.Start() + + // Create and ingest 4 different files, all with the same path + // to simulate log rotation + createFileAndWaitIngestion(t, logFilepath, outputFile, filebeat, 50, 50) + createFileAndWaitIngestion(t, logFilepath, outputFile, filebeat, 50, 100) + createFileAndWaitIngestion(t, logFilepath, outputFile, filebeat, 50, 150) + createFileAndWaitIngestion(t, logFilepath, outputFile, filebeat, 50, 200) + + filebeat.Stop() + cfgYAML = fmt.Sprintf(cfgTemplate, logFilepath, fingerprintCfg, workDir) + if err := os.WriteFile(filebeat.ConfigFilePath(), []byte(cfgYAML), 0666); err != nil { + t.Fatalf("cannot write config file: %s", err) + } + + filebeat.Start() + + migratingMsg := fmt.Sprintf("are the same, migrating. Source: '%s'", logFilepath) + eofMsg := fmt.Sprintf("End of file reached: %s; Backoff now.", logFilepath) + + filebeat.WaitForLogs(migratingMsg, time.Second*10, "prospector did not migrate registry entry") + filebeat.WaitForLogs("migrated entry in registry from", time.Second*10, "store did not update registry key") + // Filebeat logs the EOF message when it starts and the file had already been fully ingested. + filebeat.WaitForLogs(eofMsg, time.Second*10, "EOF was not reached after restart") + + requirePublishedEvents(t, filebeat, 200, outputFile) + // Ingest more data to ensure the offset was migrated + integration.GenerateLogFile(t, logFilepath, 20, true) + filebeat.WaitForLogs(eofMsg, time.Second*5, "EOF was not reached after adding data") + + requirePublishedEvents(t, filebeat, 220, outputFile) + requireRegistryEntryRemoved(t, workDir, "native") +} + +func requireRegistryEntryRemoved(t *testing.T, workDir, identity string) { + t.Helper() + + registryLogFile := filepath.Join(workDir, "data", "registry", "filebeat", "log.json") + entries := readFilestreamRegistryLog(t, registryLogFile) + inputEntries := []registryEntry{} + for _, currentEntry := range entries { + if strings.Contains(currentEntry.Key, identity) { + inputEntries = append(inputEntries, currentEntry) + } + } + + lastNativeEntry := inputEntries[len(inputEntries)-1] + if lastNativeEntry.TTL != 0 { + t.Errorf("'%s' has not been removed from the registry", lastNativeEntry.Key) + } +} + +func requirePublishedEvents( + t *testing.T, + filebeat *integration.BeatProc, + expected int, + outputFile string) { + + t.Helper() + publishedEvents := filebeat.CountFileLines(outputFile) + if publishedEvents != expected { + t.Fatalf("expecting %d published events after file migration, got %d instead", expected, publishedEvents) + } +} + +func createFileAndWaitIngestion( + t *testing.T, + logFilepath, outputFilepath string, + fb *integration.BeatProc, + n, outputTotal int) { + + t.Helper() + _, err := os.Stat(logFilepath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("cannot stat log file: %s", err) + } + // Remove the file if it exists + if err == nil { + if err := os.Remove(logFilepath); err != nil { + t.Fatalf("cannot remove log file: %s", err) + } + } + + integration.GenerateLogFile(t, logFilepath, n, false) + + eofMsg := fmt.Sprintf("End of file reached: %s; Backoff now.", logFilepath) + fb.WaitForLogs(eofMsg, time.Second*10, "EOF was not reached") + requirePublishedEvents(t, fb, outputTotal, outputFilepath) +} diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index 98db9a6ad23..f495c72f141 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -38,6 +38,8 @@ filebeat.inputs: id: a-unique-filestream-input-id enabled: true prospector.scanner.check_interval: 30s + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false paths: - %s output: diff --git a/filebeat/tests/integration/store_test.go b/filebeat/tests/integration/store_test.go index d4ee36298d5..e187c682676 100644 --- a/filebeat/tests/integration/store_test.go +++ b/filebeat/tests/integration/store_test.go @@ -41,6 +41,8 @@ filebeat.inputs: close.on_state_change.inactive: 8s ignore_older: 9s prospector.scanner.check_interval: 1s + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false paths: - %s diff --git a/filebeat/tests/integration/translate_ldap_attribute_test.go b/filebeat/tests/integration/translate_ldap_attribute_test.go index 376be5e36a2..d7c4f129593 100644 --- a/filebeat/tests/integration/translate_ldap_attribute_test.go +++ b/filebeat/tests/integration/translate_ldap_attribute_test.go @@ -45,6 +45,8 @@ const translateguidCfg = ` filebeat.inputs: - type: filestream id: "test-translateguidCfg" + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false paths: - %s diff --git a/filebeat/tests/system/config/filestream-fixup-id.yml.j2 b/filebeat/tests/system/config/filestream-fixup-id.yml.j2 index 7617429286d..446b7db9723 100644 --- a/filebeat/tests/system/config/filestream-fixup-id.yml.j2 +++ b/filebeat/tests/system/config/filestream-fixup-id.yml.j2 @@ -1,6 +1,8 @@ filebeat.inputs: - type: filestream id: test-fix-global-id + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false enabled: true paths: - {{path}} diff --git a/filebeat/tests/system/test_reload_inputs.py b/filebeat/tests/system/test_reload_inputs.py index dd81a60ffe8..cf58557f3ac 100644 --- a/filebeat/tests/system/test_reload_inputs.py +++ b/filebeat/tests/system/test_reload_inputs.py @@ -49,6 +49,8 @@ def test_filestream_reload_not_duplicate_id(self): input_config_template = """ - type: filestream id: my-unique-id + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false paths: - {} """ diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 904fc1e302a..186d8483f9f 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -994,3 +994,8 @@ func (b *BeatProc) CountFileLines(glob string) int { return bytes.Count(data, []byte{'\n'}) } + +// ConfigFilePath returns the config file path +func (b *BeatProc) ConfigFilePath() string { + return b.configFile +} diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 743a31a8610..0c2eb0c0c51 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3185,6 +3185,8 @@ filebeat.inputs: # batch of events has been published successfully. The default value is 1s. #filebeat.registry.flush: 1s +# The interval which to run the registry clean up +#filebeat.registry.cleanup_interval: 5m # Starting with Filebeat 7.0, the registry uses a new directory format to store # Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x