Skip to content

Commit

Permalink
recreate wal-meta.db if it doesn't exist and segments do (#39)
Browse files Browse the repository at this point in the history
* recreate wal-meta.db if it doesn't exist and segments do

* force sealtime to now

---------

Co-authored-by: ncabatoff <ncabatoff@hashicorp.com>
  • Loading branch information
raskchanky and ncabatoff authored Nov 9, 2023
1 parent 0762519 commit 14af58c
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 22 deletions.
2 changes: 1 addition & 1 deletion fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (fs *FS) Create(dir string, name string, size uint64) (types.WritableFile,
return fi, nil
}

// Delete indicates the file is no longer required. Typically it should be
// Delete indicates the file is no longer required. Typically it shoutld be
// deleted from the underlying system to free disk space.
func (fs *FS) Delete(dir string, name string) error {
if err := os.Remove(filepath.Join(dir, name)); err != nil {
Expand Down
82 changes: 77 additions & 5 deletions metadb/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/hashicorp/raft-wal/fs"
"github.com/hashicorp/raft-wal/segment"
"github.com/hashicorp/raft-wal/types"
"go.etcd.io/bbolt"
)
Expand Down Expand Up @@ -63,12 +66,12 @@ func (db *BoltMetaDB) ensureOpen(dir string) error {
// BoltDB can get stuck in invalid states if we crash while it's initializing.
// We can't distinguish those as safe to just wipe it and start again because
// we don't know for sure if it's failing due to bad init or later corruption
// (which would loose data if we just wipe and start over). So to ensure
// (which would lose data if we just wipe and start over). So to ensure
// initial creation of the WAL is as crash-safe as possible we will manually
// detect we have an atomic init procedure:
// 1. Check if file exits already. If yes, skip init and just open it.
// 2. Delete any existing DB file with tmp name
// 3. Creat a new BoltDB that is empty and has the buckets with a temp name.
// 3. Create a new BoltDB that is empty and has the buckets with a temp name.
// 4. Once that's committed, rename to final name and Fsync parent dir
_, err := os.Stat(fileName)
if err == nil {
Expand All @@ -80,13 +83,81 @@ func (db *BoltMetaDB) ensureOpen(dir string) error {
return fmt.Errorf("failed to stat %s: %w", FileName, err)
}

// File doesn't exist, initialize a new DB in a crash-safe way
// File doesn't exist, initialize a new DB in a crash-safe way.
if err := safeInitBoltDB(dir); err != nil {
return fmt.Errorf("failed initializing meta DB: %w", err)
}

// All good, now open it!
return open()
// Open the new db, but don't return just yet
err = open()
if err != nil {
return fmt.Errorf("error opening new metadb: %w", err)
}

// Now that we have a brand new metaDB, check to see if segment files exist.
// If they do, then we're probably trying to do a recovery, and we can
// populate the new db with some initial values read from the segment file
// headers, so that we don't error later on when trying to create a new segment
// file that already exists.
sfe, err := segmentFilesExist(dir)
if err != nil {
return fmt.Errorf("failed to check for segment files: %w", err)
}

if sfe {
fmt.Println("rebuilding meta state from segment files")
state := types.PersistentState{}
vfs := fs.New()
f := segment.NewFiler(dir, vfs)
indexes, err := f.List()
if err != nil {
return fmt.Errorf("failed to list segment IDs: %w", err)
}

for id, baseIndex := range indexes {
info, err := f.HeaderInfo(baseIndex, id)
if err != nil {
return fmt.Errorf("failed to read header for file at baseIndex %d id %d: %w", baseIndex, id, err)
}
state.NextSegmentID = info.ID + 1
si := types.SegmentInfo{
ID: info.ID,
BaseIndex: info.BaseIndex,
MinIndex: info.MinIndex,
MaxIndex: info.MaxIndex,
Codec: info.Codec,
IndexStart: info.IndexStart,
CreateTime: info.CreateTime,
SealTime: time.Now(),
SizeLimit: info.SizeLimit,
}
state.Segments = append(state.Segments, si)
}

err = db.CommitState(state)
if err != nil {
return fmt.Errorf("failed to commit state: %w", err)
}
}

return nil
}

func segmentFilesExist(dir string) (bool, error) {
sfe := false
entries, err := os.ReadDir(dir)
if err != nil {
return false, err
}

for _, e := range entries {
if filepath.Ext(e.Name()) == ".wal" {
sfe = true
break
}
}

return sfe, nil
}

func safeInitBoltDB(dir string) error {
Expand Down Expand Up @@ -173,6 +244,7 @@ func (db *BoltMetaDB) Load(dir string) (types.PersistentState, error) {
if err := json.Unmarshal(raw, &state); err != nil {
return state, fmt.Errorf("%w: failed to parse persisted state: %s", types.ErrCorrupt, err)
}
fmt.Printf("state: %#v\n", state)
return state, nil
}

Expand Down
45 changes: 29 additions & 16 deletions segment/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,24 @@ func (f *Filer) RecoverTail(info types.SegmentInfo) (types.SegmentWriter, error)
func (f *Filer) Open(info types.SegmentInfo) (types.SegmentReader, error) {
fname := info.FileName()

rf, err := f.vfs.OpenReader(f.dir, fname)
rf, gotInfo, err := f.headerInfo(fname)
if err != nil {
return nil, err
}

if err := validateFileHeader(*gotInfo, info); err != nil {
return nil, err
}

return openReader(info, rf, &f.bufPool)
}

func (f *Filer) headerInfo(name string) (types.ReadableFile, *types.SegmentInfo, error) {
rf, err := f.vfs.OpenReader(f.dir, name)
if err != nil {
return nil, nil, err
}

// Validate header here since openReader is re-used by writer where it's valid
// for the file header not to be committed yet after a crash so we can't check
// it there.
Expand All @@ -87,21 +100,21 @@ func (f *Filer) Open(info types.SegmentInfo) (types.SegmentReader, error) {
// never not have a valid header. (I.e. even if crashes happen it should
// be impossible to seal a segment with no header written so this
// indicates that something truncated the file after the fact)
return nil, fmt.Errorf("%w: failed to read header: %s", types.ErrCorrupt, err)
return nil, nil, fmt.Errorf("%w: failed to read header: %s", types.ErrCorrupt, err)
}
return nil, err
}

gotInfo, err := readFileHeader(hdr[:])
if err != nil {
return nil, err
return nil, nil, err
}

if err := validateFileHeader(*gotInfo, info); err != nil {
return nil, err
}
info, err := readFileHeader(hdr[:])
return rf, info, err
}

return openReader(info, rf, &f.bufPool)
// HeaderInfo takes a baseIndex and ID and returns the information from the header of said file.
// This is useful during recovery when the meta-db doesn't exist.
func (f *Filer) HeaderInfo(baseIndex uint64, ID uint64) (*types.SegmentInfo, error) {
fname := fmt.Sprintf(types.SegmentFileNamePattern, baseIndex, ID)
_, info, err := f.headerInfo(fname)
return info, err
}

// List returns the set of segment IDs currently stored. It's used by the WAL
Expand Down Expand Up @@ -144,10 +157,10 @@ func (f *Filer) listInternal() (map[uint64]uint64, []uint64, error) {
}

// Delete removes the segment with given baseIndex and id if it exists. Note
// that baseIndex is technically redundant since ID is unique on it's own. But
// that baseIndex is technically redundant since ID is unique on its own. But
// in practice we name files (or keys) with both so that they sort correctly.
// This interface allows a simpler implementation where we can just delete
// the file if it exists without having to scan the underlying storage for a.
// This interface allows a simpler implementation where we can just delete
// the file if it exists without having to scan the underlying storage for a - ???.
func (f *Filer) Delete(baseIndex uint64, ID uint64) error {
fname := fmt.Sprintf(types.SegmentFileNamePattern, baseIndex, ID)
return f.vfs.Delete(f.dir, fname)
Expand All @@ -159,7 +172,7 @@ func (f *Filer) Delete(baseIndex uint64, ID uint64) error {
// the correct metadata. This allows dumping log segments in a WAL that is still
// being written to by another process. Without metadata we don't know if the
// file is sealed so always recover by reading through the whole file. If after
// or before are non-zero, the specify a exclusive lower or upper bound on which
// or before are non-zero, they specify an exclusive lower or upper bound on which
// log entries should be emitted. No error checking is done on the read data. fn
// is called for each entry passing the raft info read from the file header (so
// that the caller knows which codec to use for example) the raft index of the
Expand Down
1 change: 1 addition & 0 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func Open(dir string, opts ...walOpt) (*WAL, error) {
// above) there are no readers yet since we are constructing a new WAL so we
// don't need to jump through the mutateState hoops yet!
w.s.Store(&newState)
fmt.Printf("newState: %#v\n", newState)

// Delete any unused segment files left over after a crash.
w.deleteSegments(toDelete)
Expand Down

0 comments on commit 14af58c

Please sign in to comment.