Skip to content

Commit

Permalink
Cherry-pick elastic#10486 to 6.x: Migrate registry from previous inco…
Browse files Browse the repository at this point in the history
…rrect path (elastic#10597)

In 6.x Journalbeat placed its registry file under the wrong path ignoring the data.path settings.
This patch lets users migrate from registries under such paths when upgrading from 6.x to 7.x.
(cherry picked from commit 9499811)
  • Loading branch information
kvch authored Feb 6, 2019
1 parent 20602d1 commit b748f18
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package checkpoint

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -32,6 +33,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -87,6 +89,11 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

err := c.findRegistryFile()
if err != nil {
return nil, fmt.Errorf("error locating the proper registry file: %+v", err)
}

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down Expand Up @@ -120,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
return c, nil
}

// Previously the registry file was written to the root folder. It was fixed on
// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info.
func (c *Checkpoint) findRegistryFile() error {
migratedPath := paths.Resolve(paths.Data, c.file)

fs, err := os.Stat(c.file)
if os.IsNotExist(err) {
c.file = migratedPath
return nil
} else if err != nil {
return fmt.Errorf("error accessing previous registry file: %+v", err)
}

f, err := os.Open(c.file)
if err != nil {
return err
}
defer f.Close()

target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode())
if err != nil {
return err
}
defer target.Close()

if _, err := io.Copy(target, f); err != nil {
return err
}

err = target.Sync()
if err != nil {
return fmt.Errorf("error while syncing new registry file to disk: %+v", err)
}

c.file = migratedPath

p := filepath.Dir(migratedPath)
pf, err := os.Open(p)
if err != nil {
return nil
}
defer pf.Close()
pf.Sync()

return nil
}

// run is worker loop that reads incoming state information from the save
// channel and persists it when the number of changes reaches maxEvents or
// the amount of time since the last disk write reaches flushInterval.
Expand Down

0 comments on commit b748f18

Please sign in to comment.