diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go index f2c3bfacdab..2d0d108a590 100644 --- a/checkpoint/checkpoint.go +++ b/checkpoint/checkpoint.go @@ -22,6 +22,7 @@ package checkpoint import ( "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -32,6 +33,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/paths" ) // Checkpoint persists event log state information to disk. @@ -87,6 +89,11 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } + err := c.findRegistryFile() + if err != nil { + return nil, fmt.Errorf("error locating the proper registry file: %+v", err) + } + // Minimum batch size. if c.maxUpdates < 1 { c.maxUpdates = 1 @@ -120,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp return c, nil } +// Previously the registry file was written to the root folder. It was fixed on +// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info. +func (c *Checkpoint) findRegistryFile() error { + migratedPath := paths.Resolve(paths.Data, c.file) + + fs, err := os.Stat(c.file) + if os.IsNotExist(err) { + c.file = migratedPath + return nil + } else if err != nil { + return fmt.Errorf("error accessing previous registry file: %+v", err) + } + + f, err := os.Open(c.file) + if err != nil { + return err + } + defer f.Close() + + target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode()) + if err != nil { + return err + } + defer target.Close() + + if _, err := io.Copy(target, f); err != nil { + return err + } + + err = target.Sync() + if err != nil { + return fmt.Errorf("error while syncing new registry file to disk: %+v", err) + } + + c.file = migratedPath + + p := filepath.Dir(migratedPath) + pf, err := os.Open(p) + if err != nil { + return nil + } + defer pf.Close() + pf.Sync() + + return nil +} + // run is worker loop that reads incoming state information from the save // channel and persists it when the number of changes reaches maxEvents or // the amount of time since the last disk write reaches flushInterval.