diff --git a/assemble.go b/assemble.go index 95f9d44..e1fbf17 100644 --- a/assemble.go +++ b/assemble.go @@ -25,6 +25,58 @@ type AssembleOptions struct { InvalidSeedAction InvalidSeedAction } +// writeChunk tries to write a chunk by looking at the self seed, if it is already existing in the +// destination file or by taking it from the store +func writeChunk(c IndexChunk, ss *selfSeed, f *os.File, blocksize uint64, s Store, stats *ExtractStats, isBlank bool) error { + // If we already took this chunk from the store we can reuse it by looking + // into the selfSeed. + if segment := ss.getChunk(c.ID); segment != nil { + copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank) + if err != nil { + return err + } + stats.addBytesCopied(copied) + stats.addBytesCloned(cloned) + return nil + } + + // If we operate on an existing file there's a good chance we already + // have the data written for this chunk. Let's read it from disk and + // compare to what is expected. + if !isBlank { + b := make([]byte, c.Size) + if _, err := f.ReadAt(b, int64(c.Start)); err != nil { + return err + } + sum := Digest.Sum(b) + if sum == c.ID { + // Record we kept this chunk in the file (when using in-place extract) + stats.incChunksInPlace() + return nil + } + } + // Record this chunk having been pulled from the store + stats.incChunksFromStore() + // Pull the (compressed) chunk from the store + chunk, err := s.GetChunk(c.ID) + if err != nil { + return err + } + b, err := chunk.Data() + if err != nil { + return err + } + // Might as well verify the chunk size while we're at it + if c.Size != uint64(len(b)) { + return fmt.Errorf("unexpected size for chunk %s", c.ID) + } + // Write the decompressed chunk into the file at the right position + if _, err = f.WriteAt(b, int64(c.Start)); err != nil { + return err + } + return nil +} + // AssembleFile re-assembles a file based on a list of index chunks. It runs n // goroutines, creating one filehandle for the file "name" per goroutine // and writes to the file simultaneously. If progress is provided, it'll be @@ -136,7 +188,15 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] } sum := Digest.Sum(b) if sum != c.ID { - return fmt.Errorf("written data in %s doesn't match its expected hash value, seed may have changed during processing", name) + if options.InvalidSeedAction == InvalidSeedActionRegenerate { + // Try harder before giving up and aborting + Log.WithField("ID", c.ID).Info("The seed may have changed during processing, trying to take the chunk from the self seed or the store") + if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil { + return err + } + } else { + return fmt.Errorf("written data in %s doesn't match its expected hash value, seed may have changed during processing", name) + } } } @@ -156,58 +216,14 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] } c := job.segment.chunks()[0] - // If we already took this chunk from the store we can reuse it by looking - // into the selfSeed. - if segment := ss.getChunk(c.ID); segment != nil { - copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank) - if err != nil { - return err - } - stats.addBytesCopied(copied) - stats.addBytesCloned(cloned) - // Even if we already confirmed that this chunk is present in the - // self-seed, we still need to record it as being written, otherwise - // the self-seed position pointer doesn't advance as we expect. - ss.add(job.segment) - } - - // If we operate on an existing file there's a good chance we already - // have the data written for this chunk. Let's read it from disk and - // compare to what is expected. - if !isBlank { - b := make([]byte, c.Size) - if _, err := f.ReadAt(b, int64(c.Start)); err != nil { - return err - } - sum := Digest.Sum(b) - if sum == c.ID { - // Record this chunk's been written in the self-seed - ss.add(job.segment) - // Record we kept this chunk in the file (when using in-place extract) - stats.incChunksInPlace() - continue - } - } - // Record this chunk having been pulled from the store - stats.incChunksFromStore() - // Pull the (compressed) chunk from the store - chunk, err := s.GetChunk(c.ID) - if err != nil { - return err - } - b, err := chunk.Data() - if err != nil { + if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil { return err } - // Might as well verify the chunk size while we're at it - if c.Size != uint64(len(b)) { - return fmt.Errorf("unexpected size for chunk %s", c.ID) - } - // Write the decompressed chunk into the file at the right position - if _, err = f.WriteAt(b, int64(c.Start)); err != nil { - return err - } - // Record this chunk's been written in the self-seed + + // Record this chunk's been written in the self-seed. + // Even if we already confirmed that this chunk is present in the + // self-seed, we still need to record it as being written, otherwise + // the self-seed position pointer doesn't advance as we expect. ss.add(job.segment) } return nil diff --git a/cmd/desync/extract.go b/cmd/desync/extract.go index 26f2e54..b3aa101 100644 --- a/cmd/desync/extract.go +++ b/cmd/desync/extract.go @@ -42,7 +42,9 @@ the index from STDIN. If a seed is invalid, by default the extract operation wil aborted. With the -skip-invalid-seeds, the invalid seeds will be discarded and the extraction will continue without them. Otherwise with the -regenerate-invalid-seeds, the eventual invalid seed indexes will be regenerated, in memory, by using the -available data, and neither data nor indexes will be changed on disk.`, +available data, and neither data nor indexes will be changed on disk. Also, if the seed changes +while processing, its invalid chunks will be taken from the self seed, or the store, instead +of aborting.`, Example: ` desync extract -s http://192.168.1.1/ -c /path/to/local file.caibx largefile.bin desync extract -s /mnt/store -s /tmp/other/store file.tar.caibx file.tar desync extract -s /mnt/store --seed /mnt/v1.caibx v2.caibx v2.vmdk`, diff --git a/fileseed.go b/fileseed.go index fc350e3..6962a8f 100644 --- a/fileseed.go +++ b/fileseed.go @@ -35,8 +35,9 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error } // LongestMatchWith returns the longest sequence of chunks anywhere in Source -// that match `chunks` starting at chunks[0]. If there is no match, it returns a -// length of zero and a nil SeedSegment. +// that match `chunks` starting at chunks[0], limiting the maximum number of chunks +// if reflinks are not supported. If there is no match, it returns a length of zero +// and a nil SeedSegment. func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { s.mu.RLock() // isInvalid can be concurrently read or wrote. Use a mutex to avoid a race @@ -53,13 +54,24 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { var ( match []IndexChunk max int + limit int ) + if !s.canReflink { + // Limit the maximum number of chunks, in a single sequence, to avoid + // having jobs that are too unbalanced. + // However, if reflinks are supported, we don't limit it to make it faster and + // take less space. + limit = 100 + } for _, p := range pos { - m := s.maxMatchFrom(chunks, p) + m := s.maxMatchFrom(chunks, p, limit) if len(m) > max { match = m max = len(m) } + if limit != 0 && limit == max { + break + } } return max, newFileSeedSegment(s.srcFile, match, s.canReflink) } @@ -95,8 +107,8 @@ func (s *FileSeed) IsInvalid() bool { } // Returns a slice of chunks from the seed. Compares chunks from position 0 -// with seed chunks starting at p. -func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk { +// with seed chunks starting at p. A "limit" value of zero means that there is no limit. +func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexChunk { if len(chunks) == 0 { return nil } @@ -105,6 +117,9 @@ func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk { dp = p ) for { + if limit != 0 && sp == limit { + break + } if dp >= len(s.index.Chunks) || sp >= len(chunks) { break } diff --git a/nullseed.go b/nullseed.go index f70a26b..26033e7 100644 --- a/nullseed.go +++ b/nullseed.go @@ -47,8 +47,17 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) if len(chunks) == 0 { return 0, nil } - var n int + var ( + n int + limit int + ) + if !s.canReflink { + limit = 100 + } for _, c := range chunks { + if limit != 0 && limit == n { + break + } if c.ID != s.id { break }