Skip to content

Commit

Permalink
feat(storage/transfermanager): add SkipIfExists option (#10893)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Sep 24, 2024
1 parent 607534c commit 7daa1bd
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 20 deletions.
13 changes: 10 additions & 3 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,22 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec
}

// Check if the file exists.
// TODO: add skip option.
fileExists := false

filePath := filepath.Join(input.LocalDirectory, attrs.Name)
if _, err := os.Stat(filePath); err == nil {
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
fileExists = true
if !d.config.skipIfExists {
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
}
} else if !errors.Is(err, os.ErrNotExist) {
// Encountered an error other than file does not exist.
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, err)
}

objectsToQueue = append(objectsToQueue, attrs.Name)
if !(d.config.skipIfExists && fileExists) {
objectsToQueue = append(objectsToQueue, attrs.Name)
}
}

outs := make(chan DownloadOutput, len(objectsToQueue))
Expand Down
109 changes: 92 additions & 17 deletions storage/transfermanager/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,37 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) {
localDir := t.TempDir()

numCallbacks := 0
callbackMu := sync.Mutex{}

d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2), WithCallbacks())
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

// Test download options - only download directory "dir".
// In lex order we have:
// "dir/nested/again/obj1", -- excluded
// "dir/nested/again/obj1", -- excluded by StartOffset
// "dir/nested/objA", -- included
// "dir/file" -- excluded by MatchGlob
// "dir/objA", -- included
// "dir/objB", -- included
// "dir/objC", -- excluded
wantObjs := 3
// "dir/objC", -- excluded by EndOffset
includedObjs := []string{"dir/nested/objA", "dir/objA", "dir/objB"}

objectDownloaded := make(chan bool)
done := make(chan bool)

trackObjectsDownloaded := func(numObjects *int) {
for {
select {
case <-done:
return
case <-objectDownloaded:
*numObjects++
}
}
}

objectsDownloaded := 0
go trackObjectsDownloaded(&objectsDownloaded)

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
Expand All @@ -210,9 +225,7 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
EndOffset: "dir/objC",
MatchGlob: "**obj**",
OnObjectDownload: func(got *DownloadOutput) {
callbackMu.Lock()
numCallbacks++
callbackMu.Unlock()
objectDownloaded <- true

if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
Expand Down Expand Up @@ -240,8 +253,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
got.Object = "modifying this shouldn't be a problem"
},
Callback: func(outs []DownloadOutput) {
if len(outs) != wantObjs {
t.Errorf("expected to receive %d results, got %d results", wantObjs, len(outs))
if len(outs) != len(includedObjs) {
t.Errorf("expected to receive %d results, got %d results", len(includedObjs), len(outs))
}

for _, got := range outs {
Expand Down Expand Up @@ -275,21 +288,22 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
t.Errorf("d.DownloadDirectory: %v", err)
}

if _, err = d.WaitAndClose(); err != nil {
t.Fatalf("d.WaitAndClose: %v", err)
if _, err := d.WaitAndClose(); err != nil {
t.Errorf("d.WaitAndClose: %v", err)
}
done <- true

if numCallbacks != wantObjs {
t.Errorf("expected to receive %d results, got %d callbacks", (wantObjs), numCallbacks)
if want, got := len(includedObjs), objectsDownloaded; want != got {
t.Errorf("expected to receive %d callbacks, got %d", want, got)
}

entries, err := os.ReadDir(filepath.Join(localDir, "dir"))
if err != nil {
t.Fatalf("os.ReadDir: %v", err)
}

if len(entries) != wantObjs {
t.Errorf("expected %d entries in dir, got %d", (wantObjs), len(entries))
if len(entries) != len(includedObjs) {
t.Errorf("expected %d entries in dir, got %d", len(includedObjs), len(entries))
}

for _, entry := range entries {
Expand All @@ -300,6 +314,65 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
t.Errorf("unexpected file %q in dir", entry.Name())
}
}

// Test SkipIfExists() by attempting to download the entire directory.
// The files previously downloaded (ie. includedObjects) should be skipped.
d, err = NewDownloader(c, WithWorkers(2), SkipIfExists())
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

objectsDownloaded = 0
go trackObjectsDownloaded(&objectsDownloaded)

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
LocalDirectory: localDir,
OnObjectDownload: func(got *DownloadOutput) {
objectDownloaded <- true

if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
}

for _, obj := range includedObjs {
if strings.EqualFold(got.Object, obj) {
t.Errorf("should have skipped download of object %s", got.Object)
}
}

if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got {
t.Errorf("expected object size %d, got %d", want, got)
}

path := filepath.Join(localDir, got.Object)
f, err := os.Open(path)
if err != nil {
t.Errorf("os.Open(%q): %v", path, err)
}
defer f.Close()

b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size))
if _, err := io.Copy(b, f); err != nil {
t.Errorf("io.Copy: %v", err)
}

if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC {
t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC)
}
},
}); err != nil {
t.Errorf("d.DownloadDirectory: %v", err)
}

if _, err := d.WaitAndClose(); err != nil {
t.Errorf("d.WaitAndClose: %v", err)
}
done <- true

if want, got := len(tb.objects)-len(includedObjs), objectsDownloaded; want != got {
t.Errorf("expected to receive %d callbacks, got %d", want, got)
}
})
}

Expand Down Expand Up @@ -1007,6 +1080,8 @@ func (tb *downloadTestBucket) Create(prefix string) error {
ctx := context.Background()

tb.bucket = prefix + uidSpace.New()
// Note: some tests depend on these object names.
// Verify that tests pass when adding or removing objects.
tb.objects = []string{
"!#$&'()*+,:;=,?@,[] and spaces",
"./obj",
Expand Down
19 changes: 19 additions & 0 deletions storage/transfermanager/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ func (wps withPartSize) apply(tm *transferManagerConfig) {
tm.partSize = wps.partSize
}

// SkipIfExists returns a TransferManagerOption that will not download files
// that already exist in the local directory.
//
// By default, if a file already exists the operation will abort and return an error.
func SkipIfExists() Option {
return &skipIfExists{}
}

type skipIfExists struct {
}

func (sie skipIfExists) apply(tm *transferManagerConfig) {
tm.skipIfExists = true
}

type transferManagerConfig struct {
// Workers in thread pool; default numCPU/2 based on previous benchmarks?
numWorkers int
Expand All @@ -107,6 +122,10 @@ type transferManagerConfig struct {
// If true, callbacks are used instead of returning results synchronously
// in a slice at the end.
asynchronous bool

// If true, files that already exist in the local directory will not be
// downloaded.
skipIfExists bool
}

func defaultTransferManagerConfig() *transferManagerConfig {
Expand Down

0 comments on commit 7daa1bd

Please sign in to comment.