Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): add SkipIfExists option #10893

Merged
merged 6 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,22 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec
}

// Check if the file exists.
// TODO: add skip option.
fileExists := false

filePath := filepath.Join(input.LocalDirectory, attrs.Name)
if _, err := os.Stat(filePath); err == nil {
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
fileExists = true
if !d.config.skipIfExists {
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
}
} else if !errors.Is(err, os.ErrNotExist) {
// Encountered an error other than file does not exist.
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, err)
}

objectsToQueue = append(objectsToQueue, attrs.Name)
if !(d.config.skipIfExists && fileExists) {
objectsToQueue = append(objectsToQueue, attrs.Name)
}
}

outs := make(chan DownloadOutput, len(objectsToQueue))
Expand Down
109 changes: 92 additions & 17 deletions storage/transfermanager/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,37 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) {
localDir := t.TempDir()

numCallbacks := 0
callbackMu := sync.Mutex{}

d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2), WithCallbacks())
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

// Test download options - only download directory "dir".
// In lex order we have:
// "dir/nested/again/obj1", -- excluded
// "dir/nested/again/obj1", -- excluded by StartOffset
// "dir/nested/objA", -- included
// "dir/file" -- excluded by MatchGlob
// "dir/objA", -- included
// "dir/objB", -- included
// "dir/objC", -- excluded
wantObjs := 3
// "dir/objC", -- excluded by EndOffset
includedObjs := []string{"dir/nested/objA", "dir/objA", "dir/objB"}

objectDownloaded := make(chan bool)
done := make(chan bool)

trackObjectsDownloaded := func(numObjects *int) {
for {
select {
case <-done:
return
case <-objectDownloaded:
*numObjects++
}
}
}

objectsDownloaded := 0
go trackObjectsDownloaded(&objectsDownloaded)

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
Expand All @@ -210,9 +225,7 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
EndOffset: "dir/objC",
MatchGlob: "**obj**",
OnObjectDownload: func(got *DownloadOutput) {
callbackMu.Lock()
numCallbacks++
callbackMu.Unlock()
objectDownloaded <- true

if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
Expand Down Expand Up @@ -240,8 +253,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
got.Object = "modifying this shouldn't be a problem"
},
Callback: func(outs []DownloadOutput) {
if len(outs) != wantObjs {
t.Errorf("expected to receive %d results, got %d results", wantObjs, len(outs))
if len(outs) != len(includedObjs) {
t.Errorf("expected to receive %d results, got %d results", len(includedObjs), len(outs))
}

for _, got := range outs {
Expand Down Expand Up @@ -275,21 +288,22 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
t.Errorf("d.DownloadDirectory: %v", err)
}

if _, err = d.WaitAndClose(); err != nil {
t.Fatalf("d.WaitAndClose: %v", err)
if _, err := d.WaitAndClose(); err != nil {
t.Errorf("d.WaitAndClose: %v", err)
}
done <- true

if numCallbacks != wantObjs {
t.Errorf("expected to receive %d results, got %d callbacks", (wantObjs), numCallbacks)
if want, got := len(includedObjs), objectsDownloaded; want != got {
t.Errorf("expected to receive %d callbacks, got %d", want, got)
}

entries, err := os.ReadDir(filepath.Join(localDir, "dir"))
if err != nil {
t.Fatalf("os.ReadDir: %v", err)
}

if len(entries) != wantObjs {
t.Errorf("expected %d entries in dir, got %d", (wantObjs), len(entries))
if len(entries) != len(includedObjs) {
t.Errorf("expected %d entries in dir, got %d", len(includedObjs), len(entries))
}

for _, entry := range entries {
Expand All @@ -300,6 +314,65 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
t.Errorf("unexpected file %q in dir", entry.Name())
}
}

// Test SkipIfExists() by attempting to download the entire directory.
// The files previously downloaded (ie. includedObjects) should be skipped.
d, err = NewDownloader(c, WithWorkers(2), SkipIfExists())
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

objectsDownloaded = 0
go trackObjectsDownloaded(&objectsDownloaded)

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
LocalDirectory: localDir,
OnObjectDownload: func(got *DownloadOutput) {
objectDownloaded <- true

if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
}

for _, obj := range includedObjs {
if strings.EqualFold(got.Object, obj) {
t.Errorf("should have skipped download of object %s", got.Object)
}
}

if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got {
t.Errorf("expected object size %d, got %d", want, got)
}

path := filepath.Join(localDir, got.Object)
f, err := os.Open(path)
if err != nil {
t.Errorf("os.Open(%q): %v", path, err)
}
defer f.Close()

b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size))
if _, err := io.Copy(b, f); err != nil {
t.Errorf("io.Copy: %v", err)
}

if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC {
t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC)
}
},
}); err != nil {
t.Errorf("d.DownloadDirectory: %v", err)
}

if _, err := d.WaitAndClose(); err != nil {
t.Errorf("d.WaitAndClose: %v", err)
}
done <- true

if want, got := len(tb.objects)-len(includedObjs), objectsDownloaded; want != got {
t.Errorf("expected to receive %d callbacks, got %d", want, got)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
}
})
}

Expand Down Expand Up @@ -1007,6 +1080,8 @@ func (tb *downloadTestBucket) Create(prefix string) error {
ctx := context.Background()

tb.bucket = prefix + uidSpace.New()
// Note: some tests depend on these object names.
// Verify that tests pass when adding or removing objects.
tb.objects = []string{
"!#$&'()*+,:;=,?@,[] and spaces",
"./obj",
Expand Down
19 changes: 19 additions & 0 deletions storage/transfermanager/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ func (wps withPartSize) apply(tm *transferManagerConfig) {
tm.partSize = wps.partSize
}

// SkipIfExists returns a TransferManagerOption that will not download files
// that already exist in the local directory.
//
// By default, if a file already exists the operation will abort and return an error.
func SkipIfExists() Option {
return &skipIfExists{}
}

type skipIfExists struct {
}

func (sie skipIfExists) apply(tm *transferManagerConfig) {
tm.skipIfExists = true
}

type transferManagerConfig struct {
// Workers in thread pool; default numCPU/2 based on previous benchmarks?
numWorkers int
Expand All @@ -107,6 +122,10 @@ type transferManagerConfig struct {
// If true, callbacks are used instead of returning results synchronously
// in a slice at the end.
asynchronous bool

// If true, files that already exist in the local directory will not be
// downloaded.
skipIfExists bool
}

func defaultTransferManagerConfig() *transferManagerConfig {
Expand Down
Loading