From e6d6078bee6e159cdff7fb3e5f61464987171f17 Mon Sep 17 00:00:00 2001 From: BrennaEpp Date: Thu, 19 Sep 2024 20:33:54 -0600 Subject: [PATCH 1/2] feat(storage/transfermanager): add SkipIfExists option --- storage/transfermanager/downloader.go | 13 +++- storage/transfermanager/integration_test.go | 70 +++++++++++++++++++++ storage/transfermanager/option.go | 19 ++++++ 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/storage/transfermanager/downloader.go b/storage/transfermanager/downloader.go index a15c4a418477..d56fe0a28314 100644 --- a/storage/transfermanager/downloader.go +++ b/storage/transfermanager/downloader.go @@ -160,15 +160,22 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec } // Check if the file exists. - // TODO: add skip option. + fileExists := false + filePath := filepath.Join(input.LocalDirectory, attrs.Name) if _, err := os.Stat(filePath); err == nil { - return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist) + fileExists = true + if !d.config.skipIfExists { + return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist) + } } else if !errors.Is(err, os.ErrNotExist) { + // Encountered an error other than file does not exist. return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, err) } - objectsToQueue = append(objectsToQueue, attrs.Name) + if !(d.config.skipIfExists && fileExists) { + objectsToQueue = append(objectsToQueue, attrs.Name) + } } outs := make(chan DownloadOutput, len(objectsToQueue)) diff --git a/storage/transfermanager/integration_test.go b/storage/transfermanager/integration_test.go index 1d06b5fc67bb..edc32f23a5f4 100644 --- a/storage/transfermanager/integration_test.go +++ b/storage/transfermanager/integration_test.go @@ -28,6 +28,7 @@ import ( "math/rand" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -295,6 +296,75 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Errorf("unexpected file %q in dir", entry.Name()) } } + + // Now attempt to download the entire directory. + // The existing files should be skipped. + callbacks := make(chan bool) + + d, err = NewDownloader(c, WithWorkers(2), SkipIfExists()) + if err != nil { + t.Fatalf("NewDownloader: %v", err) + } + + if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{ + Bucket: tb.bucket, + LocalDirectory: localDir, + OnObjectDownload: func(got *DownloadOutput) { + callbacks <- true + + if got.Err != nil { + t.Errorf("result.Err: %v", got.Err) + } + + if strings.EqualFold(got.Object, "dir/objA") { + t.Errorf("should have skipped download of object %s", got.Object) + } + + if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { + t.Errorf("expected object size %d, got %d", want, got) + } + + path := filepath.Join(localDir, got.Object) + f, err := os.Open(path) + if err != nil { + t.Errorf("os.Open(%q): %v", path, err) + } + defer f.Close() + + b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size)) + if _, err := io.Copy(b, f); err != nil { + t.Errorf("io.Copy: %v", err) + } + + if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC { + t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC) + } + }, + }); err != nil { + t.Errorf("d.DownloadDirectory: %v", err) + } + + gotCallbacks := 0 + done := make(chan bool) + go func() { + for { + select { + case <-done: + break + case <-callbacks: + gotCallbacks++ + } + } + }() + + if _, err := d.WaitAndClose(); err != nil { + t.Errorf("d.WaitAndClose: %v", err) + } + done <- true + + if want, got := len(tb.objects)-wantObjs, gotCallbacks; want != got { + t.Errorf("expected to receive %d callbacks, got %d", want, got) + } }) } diff --git a/storage/transfermanager/option.go b/storage/transfermanager/option.go index ac521000db76..e687299a0fc2 100644 --- a/storage/transfermanager/option.go +++ b/storage/transfermanager/option.go @@ -91,6 +91,21 @@ func (wps withPartSize) apply(tm *transferManagerConfig) { tm.partSize = wps.partSize } +// SkipIfExists returns a TransferManagerOption that will not download files +// that already exist in the local directory. +// +// By default, if a file already exists the operation will abort and return an error. +func SkipIfExists() Option { + return &skipIfExists{} +} + +type skipIfExists struct { +} + +func (sie skipIfExists) apply(tm *transferManagerConfig) { + tm.skipIfExists = true +} + type transferManagerConfig struct { // Workers in thread pool; default numCPU/2 based on previous benchmarks? numWorkers int @@ -107,6 +122,10 @@ type transferManagerConfig struct { // If true, callbacks are used instead of returning results synchronously // in a slice at the end. asynchronous bool + + // If true, files that already exist in the local directory will not be + // downloaded. + skipIfExists bool } func defaultTransferManagerConfig() *transferManagerConfig { From ecfbda2a477036573e9847d5bd2169e0b67a65bf Mon Sep 17 00:00:00 2001 From: BrennaEpp Date: Mon, 23 Sep 2024 20:29:42 -0600 Subject: [PATCH 2/2] suggestions --- storage/transfermanager/integration_test.go | 82 +++++++++++---------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/storage/transfermanager/integration_test.go b/storage/transfermanager/integration_test.go index edc32f23a5f4..472e6b4226a6 100644 --- a/storage/transfermanager/integration_test.go +++ b/storage/transfermanager/integration_test.go @@ -181,22 +181,37 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) { localDir := t.TempDir() - numCallbacks := 0 - callbackMu := sync.Mutex{} - d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2), WithCallbacks()) if err != nil { t.Fatalf("NewDownloader: %v", err) } + // Test download options - only download directory "dir". // In lex order we have: - // "dir/nested/again/obj1", -- excluded + // "dir/nested/again/obj1", -- excluded by StartOffset // "dir/nested/objA", -- included // "dir/file" -- excluded by MatchGlob // "dir/objA", -- included // "dir/objB", -- included - // "dir/objC", -- excluded - wantObjs := 3 + // "dir/objC", -- excluded by EndOffset + includedObjs := []string{"dir/nested/objA", "dir/objA", "dir/objB"} + + objectDownloaded := make(chan bool) + done := make(chan bool) + + trackObjectsDownloaded := func(numObjects *int) { + for { + select { + case <-done: + return + case <-objectDownloaded: + *numObjects++ + } + } + } + + objectsDownloaded := 0 + go trackObjectsDownloaded(&objectsDownloaded) if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{ Bucket: tb.bucket, @@ -206,9 +221,7 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { EndOffset: "dir/objC", MatchGlob: "**obj**", OnObjectDownload: func(got *DownloadOutput) { - callbackMu.Lock() - numCallbacks++ - callbackMu.Unlock() + objectDownloaded <- true if got.Err != nil { t.Errorf("result.Err: %v", got.Err) @@ -236,8 +249,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { got.Object = "modifying this shouldn't be a problem" }, Callback: func(outs []DownloadOutput) { - if len(outs) != wantObjs { - t.Errorf("expected to receive %d results, got %d results", wantObjs, len(outs)) + if len(outs) != len(includedObjs) { + t.Errorf("expected to receive %d results, got %d results", len(includedObjs), len(outs)) } for _, got := range outs { @@ -271,12 +284,13 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Errorf("d.DownloadDirectory: %v", err) } - if _, err = d.WaitAndClose(); err != nil { - t.Fatalf("d.WaitAndClose: %v", err) + if _, err := d.WaitAndClose(); err != nil { + t.Errorf("d.WaitAndClose: %v", err) } + done <- true - if numCallbacks != wantObjs { - t.Errorf("expected to receive %d results, got %d callbacks", (wantObjs), numCallbacks) + if want, got := len(includedObjs), objectsDownloaded; want != got { + t.Errorf("expected to receive %d callbacks, got %d", want, got) } entries, err := os.ReadDir(filepath.Join(localDir, "dir")) @@ -284,8 +298,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Fatalf("os.ReadDir: %v", err) } - if len(entries) != wantObjs { - t.Errorf("expected %d entries in dir, got %d", (wantObjs), len(entries)) + if len(entries) != len(includedObjs) { + t.Errorf("expected %d entries in dir, got %d", len(includedObjs), len(entries)) } for _, entry := range entries { @@ -297,27 +311,30 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { } } - // Now attempt to download the entire directory. - // The existing files should be skipped. - callbacks := make(chan bool) - + // Test SkipIfExists() by attempting to download the entire directory. + // The files previously downloaded (ie. includedObjects) should be skipped. d, err = NewDownloader(c, WithWorkers(2), SkipIfExists()) if err != nil { t.Fatalf("NewDownloader: %v", err) } + objectsDownloaded = 0 + go trackObjectsDownloaded(&objectsDownloaded) + if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{ Bucket: tb.bucket, LocalDirectory: localDir, OnObjectDownload: func(got *DownloadOutput) { - callbacks <- true + objectDownloaded <- true if got.Err != nil { t.Errorf("result.Err: %v", got.Err) } - if strings.EqualFold(got.Object, "dir/objA") { - t.Errorf("should have skipped download of object %s", got.Object) + for _, obj := range includedObjs { + if strings.EqualFold(got.Object, obj) { + t.Errorf("should have skipped download of object %s", got.Object) + } } if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { @@ -344,25 +361,12 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) { t.Errorf("d.DownloadDirectory: %v", err) } - gotCallbacks := 0 - done := make(chan bool) - go func() { - for { - select { - case <-done: - break - case <-callbacks: - gotCallbacks++ - } - } - }() - if _, err := d.WaitAndClose(); err != nil { t.Errorf("d.WaitAndClose: %v", err) } done <- true - if want, got := len(tb.objects)-wantObjs, gotCallbacks; want != got { + if want, got := len(tb.objects)-len(includedObjs), objectsDownloaded; want != got { t.Errorf("expected to receive %d callbacks, got %d", want, got) } }) @@ -1072,6 +1076,8 @@ func (tb *downloadTestBucket) Create(prefix string) error { ctx := context.Background() tb.bucket = prefix + uidSpace.New() + // Note: some tests depend on these object names. + // Verify that tests pass when adding or removing objects. tb.objects = []string{ "!#$&'()*+,:;=,?@,[] and spaces", "./obj",