Skip to content

Commit

Permalink
Speed up VerifyIndex (#206)
Browse files Browse the repository at this point in the history
* Add some tests for VerifyIndex

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>

* Reduce code duplication while verifying an index

We can reuse the validate() function of FileSeedSegment and avoid
duplicating the code here.

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>

* Feed multiple chunks to the workers while verifying an index

If we send a single chunk to every Goroutine, the workers will complete
their task immediately and then will wait for a new chunk to validate.

If instead we group together multiple chunks we can improve the overall
performance.

For example, in my test machine, the required time to verify an index
that pointed to a 5gb image took 30 seconds, and with this patch it
dropped to 24 seconds.

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>
  • Loading branch information
RyuzakiKK authored Dec 25, 2021
1 parent eeffcfc commit b7a13be
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
34 changes: 34 additions & 0 deletions cmd/desync/verifyindex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"bytes"
"context"
"github.com/stretchr/testify/require"
"testing"
)

func TestVerifyIndexCommand(t *testing.T) {
// Validate the index of blob1, we expect it to complete without any error
verifyIndex := newVerifyIndexCommand(context.Background())
verifyIndex.SetArgs([]string{"testdata/blob1.caibx", "testdata/blob1"})
b := new(bytes.Buffer)
stderr = b
_, err := verifyIndex.ExecuteC()
require.NoError(t, err)
require.Contains(t, b.String(), "")

// Do the same for blob2
verifyIndex = newVerifyIndexCommand(context.Background())
verifyIndex.SetArgs([]string{"testdata/blob2.caibx", "testdata/blob2"})
b = new(bytes.Buffer)
stderr = b
_, err = verifyIndex.ExecuteC()
require.NoError(t, err)
require.Contains(t, b.String(), "")

// Run again against the wrong blob
verifyIndex = newVerifyIndexCommand(context.Background())
verifyIndex.SetArgs([]string{"testdata/blob2.caibx", "testdata/blob1"})
_, err = verifyIndex.ExecuteC()
require.Error(t, err)
}
47 changes: 24 additions & 23 deletions verifyindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package desync
import (
"context"
"fmt"
"io"
"os"

"golang.org/x/sync/errgroup"
Expand All @@ -12,7 +11,7 @@ import (
// VerifyIndex re-calculates the checksums of a blob comparing it to a given index.
// Fails if the index does not match the blob.
func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb ProgressBar) error {
in := make(chan IndexChunk)
in := make(chan []IndexChunk)
g, ctx := errgroup.WithContext(ctx)

// Setup and start the progressbar if any
Expand All @@ -39,41 +38,43 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb Progress
defer f.Close()
g.Go(func() error {
for c := range in {
// Update progress bar if any
if pb != nil {
pb.Increment()
}

// Position the filehandle to the place where the chunk is meant to come
// from within the file
if _, err := f.Seek(int64(c.Start), io.SeekStart); err != nil {
return err
}

// Read the whole (uncompressed) chunk into memory
b := make([]byte, c.Size)
if _, err := io.ReadFull(f, b); err != nil {
// Reuse the fileSeedSegment structure, this is really just a seed segment after all
segment := newFileSeedSegment(name, c, false, false)
if err := segment.validate(f); err != nil {
return err
}

// Calculate this chunks checksum and compare to what it's supposed to be
// according to the index
sum := Digest.Sum(b)
if sum != c.ID {
return fmt.Errorf("checksum does not match chunk %s", c.ID)
// Update progress bar, if any
if pb != nil {
pb.Add(len(c))
}
}
return nil
})
}

chunksNum := len(idx.Chunks)

// Number of chunks that will be evaluated in a single Goroutine.
// This helps to reduce the required number of context switch.
// In theory, we could just divide the total number of chunks by the number
// of workers, but instead we reduce that by 10 times to avoid the situation
// where we end up waiting a single worker that was slower to complete (e.g.
// if its chunks were not in cache while the others were).
batch := chunksNum / (n * 10)

// Feed the workers, stop if there are any errors
loop:
for _, c := range idx.Chunks {
for i := 0; i < chunksNum; i = i + batch + 1 {
last := i + batch
if last >= chunksNum {
// We reached the end of the array
last = chunksNum - 1
}
select {
case <-ctx.Done():
break loop
case in <- c:
case in <- idx.Chunks[i : last+1]:
}
}
close(in)
Expand Down

0 comments on commit b7a13be

Please sign in to comment.