diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c605148521d7..67fcdfcb7d7e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -270,6 +270,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Journalbeat* +- Migrate registry from previously incorrect path. {pull}10486[10486] + *Metricbeat* - Add `key` metricset to the Redis module. {issue}9582[9582] {pull}9657[9657] {pull}9746[9746] diff --git a/journalbeat/checkpoint/checkpoint.go b/journalbeat/checkpoint/checkpoint.go index 0f29861040b4..2d0d108a5906 100644 --- a/journalbeat/checkpoint/checkpoint.go +++ b/journalbeat/checkpoint/checkpoint.go @@ -22,6 +22,7 @@ package checkpoint import ( "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -88,7 +89,10 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } - c.file = paths.Resolve(paths.Data, c.file) + err := c.findRegistryFile() + if err != nil { + return nil, fmt.Errorf("error locating the proper registry file: %+v", err) + } // Minimum batch size. if c.maxUpdates < 1 { @@ -123,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp return c, nil } +// Previously the registry file was written to the root folder. It was fixed on +// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info. +func (c *Checkpoint) findRegistryFile() error { + migratedPath := paths.Resolve(paths.Data, c.file) + + fs, err := os.Stat(c.file) + if os.IsNotExist(err) { + c.file = migratedPath + return nil + } else if err != nil { + return fmt.Errorf("error accessing previous registry file: %+v", err) + } + + f, err := os.Open(c.file) + if err != nil { + return err + } + defer f.Close() + + target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode()) + if err != nil { + return err + } + defer target.Close() + + if _, err := io.Copy(target, f); err != nil { + return err + } + + err = target.Sync() + if err != nil { + return fmt.Errorf("error while syncing new registry file to disk: %+v", err) + } + + c.file = migratedPath + + p := filepath.Dir(migratedPath) + pf, err := os.Open(p) + if err != nil { + return nil + } + defer pf.Close() + pf.Sync() + + return nil +} + // run is worker loop that reads incoming state information from the save // channel and persists it when the number of changes reaches maxEvents or // the amount of time since the last disk write reaches flushInterval.