Skip to content

Commit

Permalink
Cherry-pick #10486 to 6.x: Migrate registry from previous incorrect p…
Browse files Browse the repository at this point in the history
…ath (#10597)

In 6.x Journalbeat placed its registry file under the wrong path ignoring the data.path settings.
This patch lets users migrate from registries under such paths when upgrading from 6.x to 7.x.
(cherry picked from commit 9499811)
  • Loading branch information
kvch authored Feb 6, 2019
1 parent 9464d2e commit fa29c37
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff]

- Add the ability to check against JSON HTTP bodies with conditions. {pull}8667[8667]
- Add cursor_seek_fallback option. {pull}9234[9234]
- Migrate registry from previously incorrect path. {pull}10486[10486]

*Metricbeat*

Expand Down
54 changes: 54 additions & 0 deletions journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package checkpoint

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -32,6 +33,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -87,6 +89,11 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

err := c.findRegistryFile()
if err != nil {
return nil, fmt.Errorf("error locating the proper registry file: %+v", err)
}

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down Expand Up @@ -120,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
return c, nil
}

// Previously the registry file was written to the root folder. It was fixed on
// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info.
func (c *Checkpoint) findRegistryFile() error {
migratedPath := paths.Resolve(paths.Data, c.file)

fs, err := os.Stat(c.file)
if os.IsNotExist(err) {
c.file = migratedPath
return nil
} else if err != nil {
return fmt.Errorf("error accessing previous registry file: %+v", err)
}

f, err := os.Open(c.file)
if err != nil {
return err
}
defer f.Close()

target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode())
if err != nil {
return err
}
defer target.Close()

if _, err := io.Copy(target, f); err != nil {
return err
}

err = target.Sync()
if err != nil {
return fmt.Errorf("error while syncing new registry file to disk: %+v", err)
}

c.file = migratedPath

p := filepath.Dir(migratedPath)
pf, err := os.Open(p)
if err != nil {
return nil
}
defer pf.Close()
pf.Sync()

return nil
}

// run is worker loop that reads incoming state information from the save
// channel and persists it when the number of changes reaches maxEvents or
// the amount of time since the last disk write reaches flushInterval.
Expand Down

0 comments on commit fa29c37

Please sign in to comment.