Skip to content

Commit

Permalink
[7.17](backport #36736) Re-use buffers to optimise memory allocation …
Browse files Browse the repository at this point in the history
…in fingerprint (#36738)

* Re-use buffers to optimise memory allocation in fingerprint (#36736)

This dramatically drops the memory usage, particularly on large amount of files.

(cherry picked from commit 429b38f)

* Fix changelog

---------

Co-authored-by: Denis <denis.rechkunov@elastic.co>
  • Loading branch information
mergify[bot] and rdner authored Oct 4, 2023
1 parent d3713b3 commit 06e5261
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736]

*Heartbeat*

Expand Down
30 changes: 17 additions & 13 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package filestream

import (
"bufio"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -282,16 +282,19 @@ func defaultFileScannerConfig() fileScannerConfig {
// fileScanner looks for files which match the patterns in paths.
// It is able to exclude files and symlinks.
type fileScanner struct {
paths []string
cfg fileScannerConfig
log *logp.Logger
paths []string
cfg fileScannerConfig
log *logp.Logger
hasher hash.Hash
readBuffer []byte
}

func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner, error) {
func newFileScanner(paths []string, config fileScannerConfig) (*fileScanner, error) {
s := fileScanner{
paths: paths,
cfg: config,
log: logp.NewLogger(scannerDebugKey),
paths: paths,
cfg: config,
log: logp.NewLogger(scannerDebugKey),
hasher: sha256.New(),
}

if s.cfg.Fingerprint.Enabled {
Expand All @@ -300,6 +303,7 @@ func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner,
return nil, fmt.Errorf("error while reading configuration of fingerprint: %w", err)
}
s.log.Debugf("fingerprint mode enabled: offset %d, length %d", s.cfg.Fingerprint.Offset, s.cfg.Fingerprint.Length)
s.readBuffer = make([]byte, s.cfg.Fingerprint.Length)
}

err := s.resolveRecursiveGlobs(config)
Expand Down Expand Up @@ -463,6 +467,7 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript

if s.cfg.Fingerprint.Enabled {
fileSize := it.info.Size()
// we should not open the file if we know it's too small
minSize := s.cfg.Fingerprint.Offset + s.cfg.Fingerprint.Length
if fileSize < minSize {
return fd, fmt.Errorf("filesize of %q is %d bytes, expected at least %d bytes for fingerprinting", fd.Filename, fileSize, minSize)
Expand All @@ -481,18 +486,17 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript
}
}

bfile := bufio.NewReaderSize(file, int(s.cfg.Fingerprint.Length))
r := io.LimitReader(bfile, s.cfg.Fingerprint.Length)
h := sha256.New()
written, err := io.Copy(h, r)
s.hasher.Reset()
lr := io.LimitReader(file, s.cfg.Fingerprint.Length)
written, err := io.CopyBuffer(s.hasher, lr, s.readBuffer)
if err != nil {
return fd, fmt.Errorf("failed to compute hash for first %d bytes of %q: %w", s.cfg.Fingerprint.Length, fd.Filename, err)
}
if written != s.cfg.Fingerprint.Length {
return fd, fmt.Errorf("failed to read %d bytes from %q to compute fingerprint, read only %d", written, fd.Filename, s.cfg.Fingerprint.Length)
}

fd.Fingerprint = hex.EncodeToString(h.Sum(nil))
fd.Fingerprint = hex.EncodeToString(s.hasher.Sum(nil))
}

return fd, nil
Expand Down
46 changes: 22 additions & 24 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,15 +832,14 @@ func BenchmarkGetFiles(b *testing.B) {
err := os.WriteFile(filename, []byte(strings.Repeat(content, 1024)), 0777)
require.NoError(b, err)
}

s := fileScanner{
paths: []string{filepath.Join(dir, "*.log")},
cfg: fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: false,
},
paths := []string{filepath.Join(dir, "*.log")}
cfg := fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: false,
},
}
s, err := newFileScanner(paths, cfg)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
files := s.GetFiles()
Expand All @@ -858,17 +857,16 @@ func BenchmarkGetFilesWithFingerprint(b *testing.B) {
err := os.WriteFile(filename, []byte(strings.Repeat(content, 1024)), 0777)
require.NoError(b, err)
}

s := fileScanner{
paths: []string{filepath.Join(dir, "*.log")},
cfg: fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
paths := []string{filepath.Join(dir, "*.log")}
cfg := fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
}
s, err := newFileScanner(paths, cfg)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
files := s.GetFiles()
Expand Down Expand Up @@ -948,16 +946,16 @@ func BenchmarkToFileDescriptor(b *testing.B) {
err := os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777)
require.NoError(b, err)

s := fileScanner{
paths: []string{filename},
cfg: fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
paths := []string{filename}
cfg := fileScannerConfig{
Fingerprint: fingerprintConfig{
Enabled: true,
Offset: 0,
Length: 1024,
},
}
s, err := newFileScanner(paths, cfg)
require.NoError(b, err)

it, err := s.getIngestTarget(filename)
require.NoError(b, err)
Expand Down

0 comments on commit 06e5261

Please sign in to comment.