Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registry file fsync improvements #6988

Merged
merged 2 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix panic when log prospector configuration fails to load. {issue}6800[6800]
- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797]
- Add raw JSON to message field when JSON parsing fails. {issue}6516[6516]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Commit registry writes to stable storage to avoid corrupt registry files. {issue}6792[6792]
- Fix a data race between stopping and starting of the harverters. {issue}#6879[6879]

*Heartbeat*
Expand Down
87 changes: 54 additions & 33 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
registryFilePermissions os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
fileMode os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
Expand All @@ -35,16 +35,20 @@ type successLogger interface {
}

var (
statesUpdate = monitoring.NewInt(nil, "registrar.states.update")
statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup")
statesCurrent = monitoring.NewInt(nil, "registrar.states.current")
registryWrites = monitoring.NewInt(nil, "registrar.writes")
statesUpdate = monitoring.NewInt(nil, "registrar.states.update")
statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup")
statesCurrent = monitoring.NewInt(nil, "registrar.states.current")
registryWrites = monitoring.NewInt(nil, "registrar.writes.total")
registryFails = monitoring.NewInt(nil, "registrar.writes.fail")
registrySuccess = monitoring.NewInt(nil, "registrar.writes.success")
)

func New(registryFile string, registryFilePermissions os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
// New creates a new Registrar instance, updating the registry file on
// `file.State` updates. New fails if the file can not be opened or created.
func New(registryFile string, fileMode os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
registryFilePermissions: registryFilePermissions,
registryFile: registryFile,
fileMode: fileMode,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []file.State, 1),
Expand Down Expand Up @@ -258,38 +262,55 @@ func (r *Registrar) flushRegistry() {

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
// First clean up states
r.gcStates()
states := r.states.GetStates()
statesCurrent.Set(int64(len(states)))

logp.Debug("registrar", "Write registry file: %s", r.registryFile)
registryWrites.Inc()

tempfile := r.registryFile + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, r.registryFilePermissions)
tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
registryFails.Inc()
return err
}

// First clean up states
states := r.states.GetStates()

encoder := json.NewEncoder(f)
err = encoder.Encode(states)
err = helper.SafeFileRotate(r.registryFile, tempfile)
if err != nil {
f.Close()
logp.Err("Error when encoding the states: %s", err)
registryFails.Inc()
return err
}

// Commit the changes to storage to avoid corrupt registry files
f.Sync()
// Directly close file because of windows
f.Close()
logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registrySuccess.Inc()

err = helper.SafeFileRotate(r.registryFile, tempfile)
return nil
}

logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registryWrites.Add(1)
statesCurrent.Set(int64(len(states)))
func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
logp.Debug("registrar", "Write registry file: %s", baseName)

tempfile := baseName + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
return "", err
}

defer f.Close()

encoder := json.NewEncoder(f)

if err := encoder.Encode(states); err != nil {
logp.Err("Error when encoding the states: %s", err)
return "", err
}

// Commit the changes to storage to avoid corrupt registry files
if err = f.Sync(); err != nil {
logp.Err("Error when syncing new registry file contents: %s", err)
return "", err
}

return err
return tempfile, nil
}
14 changes: 14 additions & 0 deletions libbeat/common/file/helper_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@ package file

import (
"os"
"path/filepath"
)

// SafeFileRotate safely rotates an existing file under path and replaces it with the tempfile
func SafeFileRotate(path, tempfile string) error {
parent := filepath.Dir(path)

if e := os.Rename(tempfile, path); e != nil {
return e
}

// best-effort fsync on parent directory. The fsync is required by some
// filesystems, so to update the parents directory metadata to actually
// contain the new file being rotated in.
f, err := os.Open(parent)
if err != nil {
return nil // ignore error
}
defer f.Close()
f.Sync()

return nil
}