Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #41762) Use fingerprint file identity by default and migrate file state from native or path #42126

Open
wants to merge 3 commits into
base: 8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

- Add kafka compression support for ZSTD.

*Heartbeat*
Expand Down Expand Up @@ -357,6 +356,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
- The Filestream input can automatically migrate state from files when changing the `file_identity` if the previous file identity was `native` (the default) or `path`. {issue}40197[40197] {pull}41762[41762]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]

Expand Down
2 changes: 2 additions & 0 deletions filebeat/_meta/config/filebeat.global.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# batch of events has been published successfully. The default value is 1s.
#filebeat.registry.flush: 1s

# The interval which to run the registry clean up
#filebeat.registry.cleanup_interval: 5m

# Starting with Filebeat 7.0, the registry uses a new directory format to store
# Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/faq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ We do not recommend reading log files from network volumes. Whenever possible, i
send the log files directly from there. Reading files from network volumes (especially on Windows) can have unexpected side
effects. For example, changed file identifiers may result in {beatname_uc} reading a log file from scratch again.

If it is not possible to read from the host, then using the
<<filebeat-input-filestream-file-identity-fingerprint, `fingerprint`>>
file identity is the next best option.

[[filebeat-not-collecting-lines]]
=== {beatname_uc} isn't collecting lines from a file

Expand Down Expand Up @@ -71,6 +75,13 @@ By default states are never removed from the registry file. To resolve the inode

You can use <<{beatname_lc}-input-log-clean-removed,`clean_removed`>> for files that are removed from disk. Be aware that `clean_removed` cleans the file state from the registry whenever a file cannot be found during a scan. If the file shows up again later, it will be sent again from scratch.

Aside from that you should also change the
<<filebeat-input-filestream-file-identity, `file_identity`>> to
<<filebeat-input-filestream-file-identity-fingerprint,
`fingerprint`>>. If you were using `native` (the default) or `path`,
the state of the files will be automatically migrated to
`fingerprint`.

include::filebeat-log-rotation.asciidoc[]

[[windows-file-rotation]]
Expand Down
20 changes: 18 additions & 2 deletions filebeat/docs/inputs/input-filestream-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -542,19 +542,33 @@ indirectly set higher priorities on certain inputs by assigning a higher
limit of harvesters.

[float]
[id="{beatname_lc}-input-{type}-file-identity"]
===== `file_identity`

Different `file_identity` methods can be configured to suit the
environment where you are collecting log messages.

WARNING: Changing `file_identity` methods between runs may result in
IMPORTANT: Changing `file_identity` is only supported from `native` or
`path` to `fingerprint`. On those cases {beatname_uc} will
automatically migrate the state of the file when {type} starts.

WARNING: Any unsupported change in `file_identity` methods between
duplicated events in the output.

*`native`*:: The default behaviour of {beatname_uc} is to differentiate
between files using their inodes and device ids.
+
In some cases these values can change during the lifetime of a file.
For example, when using the Linux link:https://en.wikipedia.org/wiki/Logical_Volume_Manager_%28Linux%29[LVM] (Logical Volume Manager), device numbers are allocated dynamically at module load (refer to link:https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/7/html/logical_volume_manager_administration/lv#persistent_numbers[Persistent Device Numbers] in the Red Hat Enterprise Linux documentation). To avoid the possibility of data duplication in this case, you can set `file_identity` to `path` rather than `native`.
For example, when using the Linux
link:https://en.wikipedia.org/wiki/Logical_Volume_Manager_%28Linux%29[LVM]
(Logical Volume Manager), device numbers are allocated dynamically at
module load (refer to
link:https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/7/html/logical_volume_manager_administration/lv#persistent_numbers[Persistent
Device Numbers] in the Red Hat Enterprise Linux documentation). To
avoid the possibility of data duplication in this case, you can set
`file_identity` to `fingerprint` rather than the default `native`.
+
The states of files generated by `native` file identity can be migrated to `fingerprint`.

[source,yaml]
----
Expand All @@ -571,6 +585,8 @@ WARNING: This strategy does not support renaming files.
If an input file is renamed, {beatname_uc} will read it again if the new path
matches the settings of the input.

The states of files generated by `path` file identity can be migrated to `fingerprint`.

[source,yaml]
----
file_identity.path: ~
Expand Down
51 changes: 36 additions & 15 deletions filebeat/docs/inputs/input-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ The `log` writes the complete file state.

7. Stale entries can be removed from the registry, even if there is no active input.

8. The input can identify files based on their contents when using the
<<filebeat-input-filestream-file-identity-fingerprint, `fingerprint`>>
<<filebeat-input-filestream-file-identity, `file_identity`>> instead
of the default inode and device ID. This solves data duplication
caused by inode reuse.

To configure this input, specify a list of glob-based <<filestream-input-paths,`paths`>>
that must be crawled to locate and fetch the log lines.

Expand Down Expand Up @@ -86,20 +92,42 @@ multiple input sections:
[[filestream-file-identity]]
==== Reading files on network shares and cloud providers

WARNING: Filebeat does not support reading from network shares and cloud providers.

However, one of the limitations of these data sources can be mitigated
if you configure Filebeat adequately.
WARNING: Some file identity methods do not support reading from
network shares and cloud providers, to avoid duplicating events, use
`fingerprint` when reading from network shares or cloud providers.

By default, {beatname_uc} identifies files based on their inodes and
device IDs. However, on network shares and cloud providers these
values might change during the lifetime of the file. If this happens
{beatname_uc} thinks that file is new and resends the whole content
of the file. To solve this problem you can configure the `file_identity` option. Possible
values besides the default `inode_deviceid` are `path`, `inode_marker` and `fingerprint`.

WARNING: Changing `file_identity` methods between runs may result in
duplicated events in the output.
values besides the default `native` (inode + device ID) are
`fingerprint`, `path` and `inode_marker`.

IMPORTANT: Changing `file_identity` is only supported when
migrating from `native` or `path` to `fingerprint`.

WARNING: Any unsupported change in `file_identity` methods between
runs may result in duplicated events in the output.

`fingerprint` is the recommended file identity because it does not
rely on the file system/OS, it generates a hash from a portion of the
file (the first 1024 bytes, by default) and uses that to identify the
file. This works well with log rotation strategies that move/rename
the file and on Windows as file identifiers might be more
volatile. The downside is that {beatname_uc} will wait until the file
reaches 1024 bytes before start ingesting any file.

WARNING: In order to use this file identity option, one must enable
the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint
option in the scanner>>. Once this file identity is enabled, changing
the fingerprint configuration (offset, length, etc) will lead to a
global re-ingestion of all files that match the paths configuration of
the input.

Please refer to the
<<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint
configuration for details>>.

Selecting `path` instructs {beatname_uc} to identify files based on their
paths. This is a quick way to avoid rereading files if inode and device ids
Expand All @@ -117,13 +145,6 @@ example oneliner generates a hidden marker file for the selected mountpoint `/lo
Please note that you should not use this option on Windows as file identifiers might be
more volatile.

Selecting `fingerprint` instructs {beatname_uc} to identify files based on their
content byte range.

WARNING: In order to use this file identity option, one must enable the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint option in the scanner>>. Once this file identity is enabled, changing the fingerprint configuration (offset, length, etc) will lead to a global re-ingestion of all files that match the paths configuration of the input.

Please refer to the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint configuration for details>>.

["source","sh",subs="attributes"]
----
$ lsblk -o MOUNTPOINT,UUID | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker
Expand Down
2 changes: 2 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,8 @@ filebeat.inputs:
# batch of events has been published successfully. The default value is 1s.
#filebeat.registry.flush: 1s

# The interval which to run the registry clean up
#filebeat.registry.cleanup_interval: 5m

# Starting with Filebeat 7.0, the registry uses a new directory format to store
# Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x
Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@

// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
e.t.Helper()
msg := &strings.Builder{}
require.Eventuallyf(e.t, func() bool {
msg.Reset()
Expand Down Expand Up @@ -418,9 +419,9 @@
for _, e := range e.pipeline.GetAllEvents() {
flat := e.Fields.Flatten()
pathi, _ := flat.GetValue("log.file.path")
path := pathi.(string)

Check failure on line 422 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
msgi, _ := flat.GetValue("message")
msg := msgi.(string)

Check failure on line 424 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
logLines[path] = append(logLines[path], msg)
}

Expand Down Expand Up @@ -448,9 +449,14 @@
// waitUntilHarvesterIsDone detects Harvester stop by checking if the last client has been closed
// as when a Harvester stops the client is closed.
func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() {
for !e.pipeline.clients[len(e.pipeline.clients)-1].closed {
time.Sleep(10 * time.Millisecond)
}
require.Eventually(
e.t,
func() bool {
return e.pipeline.clients[len(e.pipeline.clients)-1].closed
},
time.Second*10,
time.Millisecond*10,
"The last connected client has not closed it's connection")
}

// requireEventsReceived requires that the list of messages has made it into the output.
Expand All @@ -462,7 +468,7 @@
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)

Check failure on line 471 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
}
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ scanner:
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
fingerprint.enabled: false
check_interval: 10ms
`

Expand Down Expand Up @@ -260,6 +261,7 @@ scanner:
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
fingerprint.enabled: false
check_interval: 50ms
`

Expand Down Expand Up @@ -370,6 +372,7 @@ scanner:
}
cfgStr := `
scanner:
fingerprint.enabled: false
check_interval: 100ms
`

Expand Down Expand Up @@ -615,6 +618,7 @@ scanner:
name: "returns no symlink if the original file is excluded",
cfgStr: `
scanner:
fingerprint.enabled: false
exclude_files: ['.*exclude.*', '.*traveler.*']
symlinks: true
`,
Expand Down Expand Up @@ -661,6 +665,7 @@ scanner:
name: "returns no included symlink if the original file is not included",
cfgStr: `
scanner:
fingerprint.enabled: false
include_files: ['.*include.*', '.*portal.*']
symlinks: true
`,
Expand All @@ -678,6 +683,7 @@ scanner:
name: "returns an included symlink if the original file is included",
cfgStr: `
scanner:
fingerprint.enabled: false
include_files: ['.*include.*', '.*portal.*', '.*traveler.*']
symlinks: true
`,
Expand Down
23 changes: 16 additions & 7 deletions filebeat/input/filestream/identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package filestream

import (
"io/ioutil"
"os"
"testing"

Expand All @@ -35,12 +34,17 @@ type testFileIdentifierConfig struct {
}

func TestFileIdentifier(t *testing.T) {
t.Run("default file identifier", func(t *testing.T) {
identifier, err := newFileIdentifier(nil, "")
t.Run("native file identifier", func(t *testing.T) {
cfg := conf.MustNewConfigFrom(`native: ~`)
ns := conf.Namespace{}
if err := cfg.Unpack(&ns); err != nil {
t.Fatalf("cannot unpack config into conf.Namespace: %s", err)
}
identifier, err := newFileIdentifier(&ns, "")
require.NoError(t, err)
assert.Equal(t, DefaultIdentifierName, identifier.Name())

tmpFile, err := ioutil.TempFile("", "test_file_identifier_native")
tmpFile, err := os.CreateTemp("", "test_file_identifier_native")
if err != nil {
t.Fatalf("cannot create temporary file for test: %v", err)
}
Expand All @@ -59,12 +63,17 @@ func TestFileIdentifier(t *testing.T) {
assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name())
})

t.Run("default file identifier with suffix", func(t *testing.T) {
identifier, err := newFileIdentifier(nil, "my-suffix")
t.Run("native file identifier with suffix", func(t *testing.T) {
cfg := conf.MustNewConfigFrom(`native: ~`)
ns := conf.Namespace{}
if err := cfg.Unpack(&ns); err != nil {
t.Fatalf("cannot unpack config into conf.Namespace: %s", err)
}
identifier, err := newFileIdentifier(&ns, "my-suffix")
require.NoError(t, err)
assert.Equal(t, DefaultIdentifierName, identifier.Name())

tmpFile, err := ioutil.TempFile("", "test_file_identifier_native")
tmpFile, err := os.CreateTemp("", "test_file_identifier_native")
if err != nil {
t.Fatalf("cannot create temporary file for test: %v", err)
}
Expand Down
Loading
Loading