From 7daa1bdc78844adac80f6378b1f6f2dd415b80a8 Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Tue, 24 Sep 2024 11:00:22 -0600 Subject: [PATCH] feat(storage/transfermanager): add SkipIfExists option (#10893) --- storage/transfermanager/downloader.go | 13 ++- storage/transfermanager/integration_test.go | 109 +++++++++++++++++--- storage/transfermanager/option.go | 19 ++++ 3 files changed, 121 insertions(+), 20 deletions(-) diff --git a/storage/transfermanager/downloader.go b/storage/transfermanager/downloader.go index d7b7d5a7f5aa..452d83bd865d 100644 --- a/storage/transfermanager/downloader.go +++ b/storage/transfermanager/downloader.go @@ -161,15 +161,22 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec } // Check if the file exists. - // TODO: add skip option. + fileExists := false + filePath := filepath.Join(input.LocalDirectory, attrs.Name) if _, err := os.Stat(filePath); err == nil { - return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist) + fileExists = true + if !d.config.skipIfExists { + return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist) + } } else if !errors.Is(err, os.ErrNotExist) { + // Encountered an error other than file does not exist. return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, err) } - objectsToQueue = append(objectsToQueue, attrs.Name) + if !(d.config.skipIfExists && fileExists) { + objectsToQueue = append(objectsToQueue, attrs.Name) + } } outs := make(chan DownloadOutput, len(objectsToQueue)) diff --git a/storage/transfermanager/integration_test.go b/storage/transfermanager/integration_test.go index 484798a92525..a1ba80746475 100644 --- a/storage/transfermanager/integration_test.go +++ b/storage/transfermanager/integration_test.go @@ -185,22 +185,37 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) { localDir := t.TempDir() - numCallbacks := 0 - callbackMu := sync.Mutex{} - d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2), WithCallbacks()) if err != nil { t.Fatalf("NewDownloader: %v", err) } + // Test download options - only download directory "dir". // In lex order we have: - // "dir/nested/again/obj1", -- excluded + // "dir/nested/again/obj1", -- excluded by StartOffset // "dir/nested/objA", -- included // "dir/file" -- excluded by MatchGlob // "dir/objA", -- included // "dir/objB", -- included - // "dir/objC", -- excluded - wantObjs := 3 + // "dir/objC", -- excluded by EndOffset + includedObjs := []string{"dir/nested/objA", "dir/objA", "dir/objB"} + + objectDownloaded := make(chan bool) + done := make(chan bool) + + trackObjectsDownloaded := func(numObjects *int) { + for { + select { + case <-done: + return + case <-objectDownloaded: + *numObjects++ + } + } + } + + objectsDownloaded := 0 + go trackObjectsDownloaded(&objectsDownloaded) if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{ Bucket: tb.bucket, @@ -210,9 +225,7 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { EndOffset: "dir/objC", MatchGlob: "**obj**", OnObjectDownload: func(got *DownloadOutput) { - callbackMu.Lock() - numCallbacks++ - callbackMu.Unlock() + objectDownloaded <- true if got.Err != nil { t.Errorf("result.Err: %v", got.Err) @@ -240,8 +253,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { got.Object = "modifying this shouldn't be a problem" }, Callback: func(outs []DownloadOutput) { - if len(outs) != wantObjs { - t.Errorf("expected to receive %d results, got %d results", wantObjs, len(outs)) + if len(outs) != len(includedObjs) { + t.Errorf("expected to receive %d results, got %d results", len(includedObjs), len(outs)) } for _, got := range outs { @@ -275,12 +288,13 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Errorf("d.DownloadDirectory: %v", err) } - if _, err = d.WaitAndClose(); err != nil { - t.Fatalf("d.WaitAndClose: %v", err) + if _, err := d.WaitAndClose(); err != nil { + t.Errorf("d.WaitAndClose: %v", err) } + done <- true - if numCallbacks != wantObjs { - t.Errorf("expected to receive %d results, got %d callbacks", (wantObjs), numCallbacks) + if want, got := len(includedObjs), objectsDownloaded; want != got { + t.Errorf("expected to receive %d callbacks, got %d", want, got) } entries, err := os.ReadDir(filepath.Join(localDir, "dir")) @@ -288,8 +302,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Fatalf("os.ReadDir: %v", err) } - if len(entries) != wantObjs { - t.Errorf("expected %d entries in dir, got %d", (wantObjs), len(entries)) + if len(entries) != len(includedObjs) { + t.Errorf("expected %d entries in dir, got %d", len(includedObjs), len(entries)) } for _, entry := range entries { @@ -300,6 +314,65 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Errorf("unexpected file %q in dir", entry.Name()) } } + + // Test SkipIfExists() by attempting to download the entire directory. + // The files previously downloaded (ie. includedObjects) should be skipped. + d, err = NewDownloader(c, WithWorkers(2), SkipIfExists()) + if err != nil { + t.Fatalf("NewDownloader: %v", err) + } + + objectsDownloaded = 0 + go trackObjectsDownloaded(&objectsDownloaded) + + if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{ + Bucket: tb.bucket, + LocalDirectory: localDir, + OnObjectDownload: func(got *DownloadOutput) { + objectDownloaded <- true + + if got.Err != nil { + t.Errorf("result.Err: %v", got.Err) + } + + for _, obj := range includedObjs { + if strings.EqualFold(got.Object, obj) { + t.Errorf("should have skipped download of object %s", got.Object) + } + } + + if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { + t.Errorf("expected object size %d, got %d", want, got) + } + + path := filepath.Join(localDir, got.Object) + f, err := os.Open(path) + if err != nil { + t.Errorf("os.Open(%q): %v", path, err) + } + defer f.Close() + + b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size)) + if _, err := io.Copy(b, f); err != nil { + t.Errorf("io.Copy: %v", err) + } + + if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC { + t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC) + } + }, + }); err != nil { + t.Errorf("d.DownloadDirectory: %v", err) + } + + if _, err := d.WaitAndClose(); err != nil { + t.Errorf("d.WaitAndClose: %v", err) + } + done <- true + + if want, got := len(tb.objects)-len(includedObjs), objectsDownloaded; want != got { + t.Errorf("expected to receive %d callbacks, got %d", want, got) + } }) } @@ -1007,6 +1080,8 @@ func (tb *downloadTestBucket) Create(prefix string) error { ctx := context.Background() tb.bucket = prefix + uidSpace.New() + // Note: some tests depend on these object names. + // Verify that tests pass when adding or removing objects. tb.objects = []string{ "!#$&'()*+,:;=,?@,[] and spaces", "./obj", diff --git a/storage/transfermanager/option.go b/storage/transfermanager/option.go index ac521000db76..e687299a0fc2 100644 --- a/storage/transfermanager/option.go +++ b/storage/transfermanager/option.go @@ -91,6 +91,21 @@ func (wps withPartSize) apply(tm *transferManagerConfig) { tm.partSize = wps.partSize } +// SkipIfExists returns a TransferManagerOption that will not download files +// that already exist in the local directory. +// +// By default, if a file already exists the operation will abort and return an error. +func SkipIfExists() Option { + return &skipIfExists{} +} + +type skipIfExists struct { +} + +func (sie skipIfExists) apply(tm *transferManagerConfig) { + tm.skipIfExists = true +} + type transferManagerConfig struct { // Workers in thread pool; default numCPU/2 based on previous benchmarks? numWorkers int @@ -107,6 +122,10 @@ type transferManagerConfig struct { // If true, callbacks are used instead of returning results synchronously // in a slice at the end. asynchronous bool + + // If true, files that already exist in the local directory will not be + // downloaded. + skipIfExists bool } func defaultTransferManagerConfig() *transferManagerConfig {