From b748f1826acc88b474ae5fa789171a0c88b7dee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 6 Feb 2019 02:06:13 +0100 Subject: [PATCH] Cherry-pick #10486 to 6.x: Migrate registry from previous incorrect path (#10597) In 6.x Journalbeat placed its registry file under the wrong path ignoring the data.path settings. This patch lets users migrate from registries under such paths when upgrading from 6.x to 7.x. (cherry picked from commit 94998113c403627e4348b9ed35b7eaef83382e4c) --- checkpoint/checkpoint.go | 54 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go index f2c3bfacdab..2d0d108a590 100644 --- a/checkpoint/checkpoint.go +++ b/checkpoint/checkpoint.go @@ -22,6 +22,7 @@ package checkpoint import ( "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -32,6 +33,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/paths" ) // Checkpoint persists event log state information to disk. @@ -87,6 +89,11 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } + err := c.findRegistryFile() + if err != nil { + return nil, fmt.Errorf("error locating the proper registry file: %+v", err) + } + // Minimum batch size. if c.maxUpdates < 1 { c.maxUpdates = 1 @@ -120,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp return c, nil } +// Previously the registry file was written to the root folder. It was fixed on +// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info. +func (c *Checkpoint) findRegistryFile() error { + migratedPath := paths.Resolve(paths.Data, c.file) + + fs, err := os.Stat(c.file) + if os.IsNotExist(err) { + c.file = migratedPath + return nil + } else if err != nil { + return fmt.Errorf("error accessing previous registry file: %+v", err) + } + + f, err := os.Open(c.file) + if err != nil { + return err + } + defer f.Close() + + target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode()) + if err != nil { + return err + } + defer target.Close() + + if _, err := io.Copy(target, f); err != nil { + return err + } + + err = target.Sync() + if err != nil { + return fmt.Errorf("error while syncing new registry file to disk: %+v", err) + } + + c.file = migratedPath + + p := filepath.Dir(migratedPath) + pf, err := os.Open(p) + if err != nil { + return nil + } + defer pf.Close() + pf.Sync() + + return nil +} + // run is worker loop that reads incoming state information from the save // channel and persists it when the number of changes reaches maxEvents or // the amount of time since the last disk write reaches flushInterval.