Skip to content

Commit

Permalink
Do not immediately abort the extraction if a seed chunk is invalid (#220
Browse files Browse the repository at this point in the history
)

* Factor out writeChunk to a separate function

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>

* Do not immediately abort the extraction if a seed chunk is invalid

If the seed points to a RW location it may happen that some files change
while we are in the middle of an extraction.

When using the "InvalidSeedActionRegenerate" option, we can try harder
and attempt to take the invalid chunks from the self seed or the store.
If both of those fail too, then we abort the entire operation.

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>

* Limit the maximum number of chunks is a single seed sequence

If a seed is nearly completely equal to the output, we may end up with
just a few SeedSequencer that are very long and others that are just
one/a few chunks long.

Because each SeedSequencer is handled by a goroutine (from a pool), we
may reach a situation where the majority of the goroutines finish their
operations and are just waiting for the longer SeedSequencer jobs to
end.

By limiting the maximum amount of chunks in the SeedSequencer, we will
have jobs that are more balanced in term of amount of work.

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>

* When taking a chunk from the self seed, immediately return

If we were able to take a chunk from the self seed, there is no need to
continue looking into the existing file and the store.

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>
  • Loading branch information
RyuzakiKK authored Apr 18, 2022
1 parent 32d4018 commit b545768
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 58 deletions.
118 changes: 67 additions & 51 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,58 @@ type AssembleOptions struct {
InvalidSeedAction InvalidSeedAction
}

// writeChunk tries to write a chunk by looking at the self seed, if it is already existing in the
// destination file or by taking it from the store
func writeChunk(c IndexChunk, ss *selfSeed, f *os.File, blocksize uint64, s Store, stats *ExtractStats, isBlank bool) error {
// If we already took this chunk from the store we can reuse it by looking
// into the selfSeed.
if segment := ss.getChunk(c.ID); segment != nil {
copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank)
if err != nil {
return err
}
stats.addBytesCopied(copied)
stats.addBytesCloned(cloned)
return nil
}

// If we operate on an existing file there's a good chance we already
// have the data written for this chunk. Let's read it from disk and
// compare to what is expected.
if !isBlank {
b := make([]byte, c.Size)
if _, err := f.ReadAt(b, int64(c.Start)); err != nil {
return err
}
sum := Digest.Sum(b)
if sum == c.ID {
// Record we kept this chunk in the file (when using in-place extract)
stats.incChunksInPlace()
return nil
}
}
// Record this chunk having been pulled from the store
stats.incChunksFromStore()
// Pull the (compressed) chunk from the store
chunk, err := s.GetChunk(c.ID)
if err != nil {
return err
}
b, err := chunk.Data()
if err != nil {
return err
}
// Might as well verify the chunk size while we're at it
if c.Size != uint64(len(b)) {
return fmt.Errorf("unexpected size for chunk %s", c.ID)
}
// Write the decompressed chunk into the file at the right position
if _, err = f.WriteAt(b, int64(c.Start)); err != nil {
return err
}
return nil
}

// AssembleFile re-assembles a file based on a list of index chunks. It runs n
// goroutines, creating one filehandle for the file "name" per goroutine
// and writes to the file simultaneously. If progress is provided, it'll be
Expand Down Expand Up @@ -136,7 +188,15 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
}
sum := Digest.Sum(b)
if sum != c.ID {
return fmt.Errorf("written data in %s doesn't match its expected hash value, seed may have changed during processing", name)
if options.InvalidSeedAction == InvalidSeedActionRegenerate {
// Try harder before giving up and aborting
Log.WithField("ID", c.ID).Info("The seed may have changed during processing, trying to take the chunk from the self seed or the store")
if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil {
return err
}
} else {
return fmt.Errorf("written data in %s doesn't match its expected hash value, seed may have changed during processing", name)
}
}
}

Expand All @@ -156,58 +216,14 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
}
c := job.segment.chunks()[0]

// If we already took this chunk from the store we can reuse it by looking
// into the selfSeed.
if segment := ss.getChunk(c.ID); segment != nil {
copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank)
if err != nil {
return err
}
stats.addBytesCopied(copied)
stats.addBytesCloned(cloned)
// Even if we already confirmed that this chunk is present in the
// self-seed, we still need to record it as being written, otherwise
// the self-seed position pointer doesn't advance as we expect.
ss.add(job.segment)
}

// If we operate on an existing file there's a good chance we already
// have the data written for this chunk. Let's read it from disk and
// compare to what is expected.
if !isBlank {
b := make([]byte, c.Size)
if _, err := f.ReadAt(b, int64(c.Start)); err != nil {
return err
}
sum := Digest.Sum(b)
if sum == c.ID {
// Record this chunk's been written in the self-seed
ss.add(job.segment)
// Record we kept this chunk in the file (when using in-place extract)
stats.incChunksInPlace()
continue
}
}
// Record this chunk having been pulled from the store
stats.incChunksFromStore()
// Pull the (compressed) chunk from the store
chunk, err := s.GetChunk(c.ID)
if err != nil {
return err
}
b, err := chunk.Data()
if err != nil {
if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil {
return err
}
// Might as well verify the chunk size while we're at it
if c.Size != uint64(len(b)) {
return fmt.Errorf("unexpected size for chunk %s", c.ID)
}
// Write the decompressed chunk into the file at the right position
if _, err = f.WriteAt(b, int64(c.Start)); err != nil {
return err
}
// Record this chunk's been written in the self-seed

// Record this chunk's been written in the self-seed.
// Even if we already confirmed that this chunk is present in the
// self-seed, we still need to record it as being written, otherwise
// the self-seed position pointer doesn't advance as we expect.
ss.add(job.segment)
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion cmd/desync/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ the index from STDIN. If a seed is invalid, by default the extract operation wil
aborted. With the -skip-invalid-seeds, the invalid seeds will be discarded and the
extraction will continue without them. Otherwise with the -regenerate-invalid-seeds,
the eventual invalid seed indexes will be regenerated, in memory, by using the
available data, and neither data nor indexes will be changed on disk.`,
available data, and neither data nor indexes will be changed on disk. Also, if the seed changes
while processing, its invalid chunks will be taken from the self seed, or the store, instead
of aborting.`,
Example: ` desync extract -s http://192.168.1.1/ -c /path/to/local file.caibx largefile.bin
desync extract -s /mnt/store -s /tmp/other/store file.tar.caibx file.tar
desync extract -s /mnt/store --seed /mnt/v1.caibx v2.caibx v2.vmdk`,
Expand Down
25 changes: 20 additions & 5 deletions fileseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error
}

// LongestMatchWith returns the longest sequence of chunks anywhere in Source
// that match `chunks` starting at chunks[0]. If there is no match, it returns a
// length of zero and a nil SeedSegment.
// that match `chunks` starting at chunks[0], limiting the maximum number of chunks
// if reflinks are not supported. If there is no match, it returns a length of zero
// and a nil SeedSegment.
func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) {
s.mu.RLock()
// isInvalid can be concurrently read or wrote. Use a mutex to avoid a race
Expand All @@ -53,13 +54,24 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) {
var (
match []IndexChunk
max int
limit int
)
if !s.canReflink {
// Limit the maximum number of chunks, in a single sequence, to avoid
// having jobs that are too unbalanced.
// However, if reflinks are supported, we don't limit it to make it faster and
// take less space.
limit = 100
}
for _, p := range pos {
m := s.maxMatchFrom(chunks, p)
m := s.maxMatchFrom(chunks, p, limit)
if len(m) > max {
match = m
max = len(m)
}
if limit != 0 && limit == max {
break
}
}
return max, newFileSeedSegment(s.srcFile, match, s.canReflink)
}
Expand Down Expand Up @@ -95,8 +107,8 @@ func (s *FileSeed) IsInvalid() bool {
}

// Returns a slice of chunks from the seed. Compares chunks from position 0
// with seed chunks starting at p.
func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk {
// with seed chunks starting at p. A "limit" value of zero means that there is no limit.
func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexChunk {
if len(chunks) == 0 {
return nil
}
Expand All @@ -105,6 +117,9 @@ func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk {
dp = p
)
for {
if limit != 0 && sp == limit {
break
}
if dp >= len(s.index.Chunks) || sp >= len(chunks) {
break
}
Expand Down
11 changes: 10 additions & 1 deletion nullseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,17 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment)
if len(chunks) == 0 {
return 0, nil
}
var n int
var (
n int
limit int
)
if !s.canReflink {
limit = 100
}
for _, c := range chunks {
if limit != 0 && limit == n {
break
}
if c.ID != s.id {
break
}
Expand Down

0 comments on commit b545768

Please sign in to comment.