Skip to content

Commit

Permalink
Migrate registry from previous incorrect path (elastic#10486)
Browse files Browse the repository at this point in the history
In 6.x Journalbeat placed its registry file under the wrong path ignoring the data.path settings.
This patch lets users migrate from registries under such paths when upgrading from 6.x to 7.x.
  • Loading branch information
kvch authored Feb 5, 2019
1 parent 131e24e commit 388b4da
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Journalbeat*

- Migrate registry from previously incorrect path. {pull}10486[10486]

*Metricbeat*

- Add `key` metricset to the Redis module. {issue}9582[9582] {pull}9657[9657] {pull}9746[9746]
Expand Down
53 changes: 52 additions & 1 deletion journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package checkpoint

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -88,7 +89,10 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)
err := c.findRegistryFile()
if err != nil {
return nil, fmt.Errorf("error locating the proper registry file: %+v", err)
}

// Minimum batch size.
if c.maxUpdates < 1 {
Expand Down Expand Up @@ -123,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
return c, nil
}

// Previously the registry file was written to the root folder. It was fixed on
// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info.
func (c *Checkpoint) findRegistryFile() error {
migratedPath := paths.Resolve(paths.Data, c.file)

fs, err := os.Stat(c.file)
if os.IsNotExist(err) {
c.file = migratedPath
return nil
} else if err != nil {
return fmt.Errorf("error accessing previous registry file: %+v", err)
}

f, err := os.Open(c.file)
if err != nil {
return err
}
defer f.Close()

target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode())
if err != nil {
return err
}
defer target.Close()

if _, err := io.Copy(target, f); err != nil {
return err
}

err = target.Sync()
if err != nil {
return fmt.Errorf("error while syncing new registry file to disk: %+v", err)
}

c.file = migratedPath

p := filepath.Dir(migratedPath)
pf, err := os.Open(p)
if err != nil {
return nil
}
defer pf.Close()
pf.Sync()

return nil
}

// run is worker loop that reads incoming state information from the save
// channel and persists it when the number of changes reaches maxEvents or
// the amount of time since the last disk write reaches flushInterval.
Expand Down

0 comments on commit 388b4da

Please sign in to comment.