Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to regenerate invalid seed indexes #216

Merged
merged 2 commits into from
Apr 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
)

// InvalidSeedAction represent the action that we will take if a seed
// happens to be invalid. There are currently two options: either fail with
// an error or skip the invalid seed and try to continue.
// happens to be invalid. There are currently three options:
// - fail with an error
// - skip the invalid seed and try to continue
// - regenerate the invalid seed index
type InvalidSeedAction int

const (
InvalidSeedActionBailOut InvalidSeedAction = iota
InvalidSeedActionSkip
InvalidSeedActionRegenerate
)

type AssembleOptions struct {
Expand Down Expand Up @@ -225,11 +228,21 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
for {
if err := plan.Validate(ctx, options.N); err != nil {
// This plan has at least one invalid seed
if options.InvalidSeedAction == InvalidSeedActionBailOut {
switch options.InvalidSeedAction {
case InvalidSeedActionBailOut:
return stats, err
case InvalidSeedActionRegenerate:
Log.WithError(err).Info("Unable to use one of the chosen seeds, regenerating it")
if err := seq.RegenerateInvalidSeeds(ctx, options.N); err != nil {
return stats, err
}
case InvalidSeedActionSkip:
// Recreate the plan. This time the seed marked as invalid will be skipped
Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it")
default:
panic("Unhandled InvalidSeedAction")
}
// Skip the invalid seed and try again
Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it")

seq.Rewind()
plan = seq.Plan()
continue
Expand Down
30 changes: 21 additions & 9 deletions cmd/desync/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (

type extractOptions struct {
cmdStoreOptions
stores []string
cache string
seeds []string
seedDirs []string
inPlace bool
printStats bool
skipInvalidSeeds bool
stores []string
cache string
seeds []string
seedDirs []string
inPlace bool
printStats bool
skipInvalidSeeds bool
regenerateInvalidSeeds bool
}

func newExtractCommand(ctx context.Context) *cobra.Command {
Expand All @@ -33,11 +34,15 @@ func newExtractCommand(ctx context.Context) *cobra.Command {
When using -k, the blob will be extracted in-place utilizing existing data and
the target file will not be deleted on error. This can be used to restart a
failed prior extraction without having to retrieve completed chunks again.
Muptiple optional seed indexes can be given with -seed. The matching blob needs
Multiple optional seed indexes can be given with -seed. The matching blob needs
to have the same name as the indexfile without the .caibx extension. If several
seed files and indexes are available, the -seed-dir option can be used to
automatically select call .caibx files in a directory as seeds. Use '-' to read
the index from STDIN.`,
the index from STDIN. If a seed is invalid, by default the extract operation will be
aborted. With the -skip-invalid-seeds, the invalid seeds will be discarded and the
extraction will continue without them. Otherwise with the -regenerate-invalid-seeds,
the eventual invalid seed indexes will be regenerated, in memory, by using the
available data, and neither data nor indexes will be changed on disk.`,
Example: ` desync extract -s http://192.168.1.1/ -c /path/to/local file.caibx largefile.bin
desync extract -s /mnt/store -s /tmp/other/store file.tar.caibx file.tar
desync extract -s /mnt/store --seed /mnt/v1.caibx v2.caibx v2.vmdk`,
Expand All @@ -52,6 +57,7 @@ the index from STDIN.`,
flags.StringSliceVar(&opt.seeds, "seed", nil, "seed indexes")
flags.StringSliceVar(&opt.seedDirs, "seed-dir", nil, "directory with seed index files")
flags.BoolVar(&opt.skipInvalidSeeds, "skip-invalid-seeds", false, "Skip seeds with invalid chunks")
flags.BoolVar(&opt.regenerateInvalidSeeds, "regenerate-invalid-seeds", false, "Regenerate seed indexes with invalid chunks")
RyuzakiKK marked this conversation as resolved.
Show resolved Hide resolved
flags.StringVarP(&opt.cache, "cache", "c", "", "store to be used as cache")
flags.BoolVarP(&opt.inPlace, "in-place", "k", false, "extract the file in place and keep it in case of error")
flags.BoolVarP(&opt.printStats, "print-stats", "", false, "print statistics")
Expand All @@ -75,6 +81,10 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error {
return errors.New("no store provided")
}

if opt.skipInvalidSeeds && opt.regenerateInvalidSeeds {
return errors.New("is not possible to use at the same time --skip-invalid-seeds and --regenerate-invalid-seeds")
}

// Parse the store locations, open the stores and add a cache is requested
var s desync.Store
s, err := MultiStoreWithCache(opt.cmdStoreOptions, opt.cache, opt.stores...)
Expand Down Expand Up @@ -106,6 +116,8 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error {
invalidSeedAction := desync.InvalidSeedActionBailOut
if opt.skipInvalidSeeds {
invalidSeedAction = desync.InvalidSeedActionSkip
} else if opt.regenerateInvalidSeeds {
invalidSeedAction = desync.InvalidSeedActionRegenerate
}
assembleOpt := desync.AssembleOptions{N: opt.n, InvalidSeedAction: invalidSeedAction}

Expand Down
18 changes: 17 additions & 1 deletion cmd/desync/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,23 @@ func TestExtractCommand(t *testing.T) {
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1},
// Here we don't need the `--skip-invalid-seeds` because we expect the blob1 seed to always be the chosen one, being
// a 1:1 match with the index that we want to write. So we never reach the point where we validate the corrupted seed.
// Explicitly set blob1 seed because seed-dir skips a seed if it's the same index file we gave in input.
{"extract with seed directory without skipping invalid seeds",
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "testdata/blob1.caibx"}, out1},
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
// Same as above, no need for `--skip-invalid-seeds`
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
{"extract with single seed that has all the expected chunks",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
// blob2_corrupted is a corrupted blob that doesn't match its seed index. We regenerate the seed index to match
// this corrupted blob
{"extract while regenerating the corrupted seed",
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "--regenerate-invalid-seeds", "testdata/blob1.caibx"}, out1},
// blob1_corrupted_index.caibx is a corrupted seed index that points to a valid blob1 file. By regenerating the
// invalid seed we expect to have an index that is equal to blob1.caibx. That should be enough to do the
// extraction without taking chunks from the store
{"extract with corrupted seed and empty store",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob1_corrupted_index.caibx", "--regenerate-invalid-seeds", "testdata/blob1.caibx"}, out1},
} {
t.Run(test.name, func(t *testing.T) {
cmd := newExtractCommand(context.Background())
Expand Down Expand Up @@ -125,6 +135,12 @@ func TestExtractWithInvalidSeeds(t *testing.T) {
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "testdata/blob1.caibx"}, out},
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob2.caibx"}, out},
{"extract with corrupted blob1 seed and a valid seed",
[]string{"--store", "testdata/blob2.store", "--seed", "testdata/blob1_corrupted_index.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob2.caibx"}, out},
{"extract with corrupted blob1 seed",
[]string{"--store", "testdata/blob2.store", "--seed", "testdata/blob1_corrupted_index.caibx", "testdata/blob2.caibx"}, out},
{"extract with both --regenerate-invalid-seed and --skip-invalid-seeds",
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob1_corrupted_index.caibx", "--regenerate-invalid-seeds", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out},
} {
t.Run(test.name, func(t *testing.T) {
cmd := newExtractCommand(context.Background())
Expand Down
1 change: 1 addition & 0 deletions cmd/desync/testdata/blob1_corrupted_index
Binary file added cmd/desync/testdata/blob1_corrupted_index.caibx
Binary file not shown.
24 changes: 24 additions & 0 deletions fileseed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -63,12 +64,35 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) {
return max, newFileSeedSegment(s.srcFile, match, s.canReflink)
}

func (s *FileSeed) RegenerateIndex(ctx context.Context, n int) error {
index, _, err := IndexFromFile(ctx, s.srcFile, n, s.index.Index.ChunkSizeMin, s.index.Index.ChunkSizeAvg,
s.index.Index.ChunkSizeMax, nil)
if err != nil {
return err
}

s.index = index
RyuzakiKK marked this conversation as resolved.
Show resolved Hide resolved
s.SetInvalid(false)
s.pos = make(map[ChunkID][]int, len(s.index.Chunks))
for i, c := range s.index.Chunks {
s.pos[c.ID] = append(s.pos[c.ID], i)
}

return nil
}

func (s *FileSeed) SetInvalid(value bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.isInvalid = value
}

func (s *FileSeed) IsInvalid() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isInvalid
}

// Returns a slice of chunks from the seed. Compares chunks from position 0
// with seed chunks starting at p.
func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk {
Expand Down
10 changes: 10 additions & 0 deletions nullseed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -64,10 +65,19 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment)
}
}

func (s *nullChunkSeed) RegenerateIndex(ctx context.Context, n int) error {
panic("A nullseed can't be regenerated")
}

func (s *nullChunkSeed) SetInvalid(value bool) {
panic("A nullseed is never expected to be invalid")
}

func (s *nullChunkSeed) IsInvalid() bool {
// A nullseed is never expected to be invalid
return false
}

type nullChunkSection struct {
from, to uint64
blockfile *os.File
Expand Down
3 changes: 3 additions & 0 deletions seed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"os"
)

Expand All @@ -12,7 +13,9 @@ const DefaultBlockSize = 4096
// existing chunks or blocks into the target from.
type Seed interface {
LongestMatchWith(chunks []IndexChunk) (int, SeedSegment)
RegenerateIndex(ctx context.Context, n int) error
SetInvalid(value bool)
IsInvalid() bool
}

// SeedSegment represents a matching range between a Seed and a file being
Expand Down
10 changes: 10 additions & 0 deletions selfseed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"sync"
)

Expand Down Expand Up @@ -78,6 +79,15 @@ func (s *selfSeed) getChunk(id ChunkID) SeedSegment {
return newFileSeedSegment(s.file, s.index.Chunks[first:first+1], s.canReflink)
}

func (s *selfSeed) RegenerateIndex(ctx context.Context, n int) error {
panic("A selfSeed can't be regenerated")
}

func (s *selfSeed) SetInvalid(value bool) {
panic("A selfSeed is never expected to be invalid")
}

func (s *selfSeed) IsInvalid() bool {
// A selfSeed is never expected to be invalid
return false
}
12 changes: 12 additions & 0 deletions sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ func (r *SeedSequencer) Rewind() {
r.current = 0
}

// RegenerateInvalidSeeds regenerates the index to match the unexpected seed content
func (r *SeedSequencer) RegenerateInvalidSeeds(ctx context.Context, n int) error {
for _, s := range r.seeds {
if s.IsInvalid() {
if err := s.RegenerateIndex(ctx, n); err != nil {
return err
}
}
}
return nil
}

// Validate validates a proposed plan by checking if all the chosen chunks
// are correctly provided from the seeds. In case a seed has invalid chunks, the
// entire seed is marked as invalid and an error is returned.
Expand Down