Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-use buffers to optimise memory allocation in fingerprint #36736

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668]
- Add CEL partial value debug function. {pull}36652[36652]
- Added support for new features and removed partial save mechanism in the GCS input. {issue}35847[35847] {pull}36713[36713]
- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736]

*Auditbeat*

Expand Down
30 changes: 17 additions & 13 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package filestream

import (
"bufio"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -282,16 +282,19 @@ func defaultFileScannerConfig() fileScannerConfig {
// fileScanner looks for files which match the patterns in paths.
// It is able to exclude files and symlinks.
type fileScanner struct {
paths []string
cfg fileScannerConfig
log *logp.Logger
paths []string
cfg fileScannerConfig
log *logp.Logger
hasher hash.Hash
readBuffer []byte
}

func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner, error) {
func newFileScanner(paths []string, config fileScannerConfig) (*fileScanner, error) {
s := fileScanner{
paths: paths,
cfg: config,
log: logp.NewLogger(scannerDebugKey),
paths: paths,
cfg: config,
log: logp.NewLogger(scannerDebugKey),
hasher: sha256.New(),
}

if s.cfg.Fingerprint.Enabled {
Expand All @@ -300,6 +303,7 @@ func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner,
return nil, fmt.Errorf("error while reading configuration of fingerprint: %w", err)
}
s.log.Debugf("fingerprint mode enabled: offset %d, length %d", s.cfg.Fingerprint.Offset, s.cfg.Fingerprint.Length)
s.readBuffer = make([]byte, s.cfg.Fingerprint.Length)
}

err := s.resolveRecursiveGlobs(config)
Expand Down Expand Up @@ -463,6 +467,7 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript

if s.cfg.Fingerprint.Enabled {
fileSize := it.info.Size()
// we should not open the file if we know it's too small
minSize := s.cfg.Fingerprint.Offset + s.cfg.Fingerprint.Length
if fileSize < minSize {
return fd, fmt.Errorf("filesize of %q is %d bytes, expected at least %d bytes for fingerprinting", fd.Filename, fileSize, minSize)
Expand All @@ -481,18 +486,17 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript
}
}

bfile := bufio.NewReaderSize(file, int(s.cfg.Fingerprint.Length))
r := io.LimitReader(bfile, s.cfg.Fingerprint.Length)
h := sha256.New()
written, err := io.Copy(h, r)
s.hasher.Reset()
lr := io.LimitReader(file, s.cfg.Fingerprint.Length)
written, err := io.CopyBuffer(s.hasher, lr, s.readBuffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to reset the length of s.readBuffer before calling CopyBuffer? If s.cfg.Fingerprint.Length were to be made smaller there would still be data left in s.readBuffer from the previous read that is never cleared.

The CopyBuffer implementation does not clear the buffer before it copies https://cs.opensource.google/go/go/+/refs/tags/go1.21.3:src/io/io.go;l=399

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to reset the length of s.readBuffer before calling CopyBuffer?

No, because the buffer is created per file watcher (per prospector, eventually per filestream input).

prospector, err := newProspector(config)

func newProspector(config config) (loginp.Prospector, error) {
err := checkConfigCompatibility(config.FileWatcher, config.FileIdentity)
if err != nil {
return nil, err
}
filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher)

If the input configuration changes (e.g. fingerprint size), the file watcher gets re-created with a new buffer size.

The CopyBuffer implementation does not clear the buffer before it copies

It's true but it does not matter since this is just a buffer and once Read returns some data it also returns amount of bytes written into the buffer and only this amount of bytes is used for Write in the destination Writer https://cs.opensource.google/go/go/+/refs/tags/go1.21.3:src/io/io.go;l=432

I have tests that I have not changed in this PR and that would fail if the previous buffer value was re-used or buffer got corrupted in general:

{
name: "returns all files except too small to fingerprint",
cfgStr: `
scanner:
symlinks: true
recursive_glob: true
fingerprint:
enabled: true
offset: 0
length: 1024
`,
expDesc: map[string]loginp.FileDescriptor{
normalFilename: {
Filename: normalFilename,
Fingerprint: "2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a",
Info: testFileInfo{
size: sizes[normalFilename],
name: normalBasename,
},
},
excludedFilename: {
Filename: excludedFilename,
Fingerprint: "bd151321c3bbdb44185414a1b56b5649a00206dd4792e7230db8904e43987336",
Info: testFileInfo{
size: sizes[excludedFilename],
name: excludedBasename,
},
},
excludedIncludedFilename: {
Filename: excludedIncludedFilename,
Fingerprint: "bfdb99a65297062658c26dfcea816d76065df2a2da2594bfd9b96e9e405da1c2",
Info: testFileInfo{
size: sizes[excludedIncludedFilename],
name: excludedIncludedBasename,
},
},
travelerSymlinkFilename: {
Filename: travelerSymlinkFilename,
Fingerprint: "c4058942bffcea08810a072d5966dfa5c06eb79b902bf0011890dd8d22e1a5f8",
Info: testFileInfo{
size: sizes[travelerFilename],
name: travelerSymlinkBasename,
},
},
},
},

if err != nil {
return fd, fmt.Errorf("failed to compute hash for first %d bytes of %q: %w", s.cfg.Fingerprint.Length, fd.Filename, err)
}
if written != s.cfg.Fingerprint.Length {
return fd, fmt.Errorf("failed to read %d bytes from %q to compute fingerprint, read only %d", written, fd.Filename, s.cfg.Fingerprint.Length)
}

fd.Fingerprint = hex.EncodeToString(h.Sum(nil))
fd.Fingerprint = hex.EncodeToString(s.hasher.Sum(nil))
}

return fd, nil
Expand Down
46 changes: 22 additions & 24 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,15 +831,14 @@ func BenchmarkGetFiles(b *testing.B) {
err := os.WriteFile(filename, []byte(strings.Repeat(content, 1024)), 0777)
require.NoError(b, err)
}

s := fileScanner{
paths: []string{filepath.Join(dir, "*.log")},
cfg: fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: false,
},
paths := []string{filepath.Join(dir, "*.log")}
cfg := fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: false,
},
}
s, err := newFileScanner(paths, cfg)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
files := s.GetFiles()
Expand All @@ -857,17 +856,16 @@ func BenchmarkGetFilesWithFingerprint(b *testing.B) {
err := os.WriteFile(filename, []byte(strings.Repeat(content, 1024)), 0777)
require.NoError(b, err)
}

s := fileScanner{
paths: []string{filepath.Join(dir, "*.log")},
cfg: fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
paths := []string{filepath.Join(dir, "*.log")}
cfg := fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
}
s, err := newFileScanner(paths, cfg)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
files := s.GetFiles()
Expand Down Expand Up @@ -947,16 +945,16 @@ func BenchmarkToFileDescriptor(b *testing.B) {
err := os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777)
require.NoError(b, err)

s := fileScanner{
paths: []string{filename},
cfg: fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
paths := []string{filename}
cfg := fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
}
s, err := newFileScanner(paths, cfg)
require.NoError(b, err)

it, err := s.getIngestTarget(filename)
require.NoError(b, err)
Expand Down
Loading