From cb7b0a1b285de9d4182155a123747419232dd35f Mon Sep 17 00:00:00 2001 From: Akansha Maloo Date: Thu, 19 Sep 2024 14:47:11 -0700 Subject: [PATCH] feat(storage/dataflux): add dataflux interface (#10748) feat: add dataflux interface and helper functions to storage/dataflux. Dataflux fast-listing will be used to quickly list objects in a bucket in parallel. Fixes #10731 --- storage/dataflux/README.md | 65 ++++++ storage/dataflux/doc.go | 27 +++ storage/dataflux/example_test.go | 69 ++++++ storage/dataflux/fast_list.go | 156 ++++++++++++++ storage/dataflux/integration_test.go | 301 +++++++++++++++++++++++++++ storage/dataflux/range_splitter.go | 42 ++++ storage/dataflux/sequential.go | 80 +++++++ storage/dataflux/worksteal.go | 90 ++++++++ storage/go.mod | 2 +- 9 files changed, 831 insertions(+), 1 deletion(-) create mode 100644 storage/dataflux/README.md create mode 100644 storage/dataflux/doc.go create mode 100644 storage/dataflux/example_test.go create mode 100644 storage/dataflux/fast_list.go create mode 100644 storage/dataflux/integration_test.go create mode 100644 storage/dataflux/range_splitter.go create mode 100644 storage/dataflux/sequential.go create mode 100644 storage/dataflux/worksteal.go diff --git a/storage/dataflux/README.md b/storage/dataflux/README.md new file mode 100644 index 000000000000..012d68cfdf5b --- /dev/null +++ b/storage/dataflux/README.md @@ -0,0 +1,65 @@ +# Dataflux for Google Cloud Storage Go client library + +## Overview +The purpose of this client is to quickly list data stored in GCS. + +## Fast List +The fast list component of this client leverages GCS API to parallelize the listing of files within a GCS bucket. It does this by implementing a workstealing algorithm, where each worker in the list operation is able to steal work from its siblings once it has finished all currently stated listing work. This parallelization leads to a significant real world speed increase than sequential listing. Note that paralellization is limited by the machine on which the client runs. + +Benchmarking has demonstrated that the larger the object count, the better Dataflux performs when compared to a linear listing. Around 100k objects, users will see improvemement in listing speed. + +### Example Usage + +First create a `storage.Client` to use throughout your application: + +[snip]:# (storage-1) +```go +ctx := context.Background() +client, err := storage.NewClient(ctx) +if err != nil { + log.Fatal(err) +} +``` + +[snip]:# (storage-2) +```go + +// storage.Query to filter objects that the user wants to list. +query := storage.Query{} +// Input for fast-listing. +dfopts := dataflux.ListerInput{ + BucketName: "bucket", + Parallelism: 500, + BatchSize: 500000, + Query: query, +} + +// Construct a dataflux lister. +df, close = dataflux.NewLister(sc, dfopts) +defer close() + +// List objects in GCS bucket. +for { + objects, err := df.NextBatch(ctx) + + if err == iterator.Done { + // No more objects in the bucket to list. + break + } + if err != nil { + log.Fatal(err) + } + // TODO: process objects +} +``` + +### Fast List Benchmark Results +VM used : n2d-standard-48 +Region: us-central1-a +NIC type: gVNIC +|File Count|VM Core Count|List Time Without Dataflux |List Time With Dataflux| +|------------|-------------|--------------------------|-----------------------| +|5000000 Obj |48 Core |319.72s |17.35s | +|1999032 Obj |48 Core |139.54s |8.98s | +|578703 Obj |48 Core |32.90s |5.71s | +|10448 Obj |48 Core |750.50ms |637.17ms | \ No newline at end of file diff --git a/storage/dataflux/doc.go b/storage/dataflux/doc.go new file mode 100644 index 000000000000..752eadd82dad --- /dev/null +++ b/storage/dataflux/doc.go @@ -0,0 +1,27 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package dataflux provides an easy way to parallelize listing in Google +Cloud Storage. + +More information about Google Cloud Storage is available at +https://cloud.google.com/storage/docs. + +See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, +connection pooling and similar aspects of this package. + +NOTE: This package is in preview. It is not stable, and is likely to change. +*/ +package dataflux // import "cloud.google.com/go/storage/dataflux" diff --git a/storage/dataflux/example_test.go b/storage/dataflux/example_test.go new file mode 100644 index 000000000000..922461b09716 --- /dev/null +++ b/storage/dataflux/example_test.go @@ -0,0 +1,69 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux_test + +import ( + "context" + "log" + + "cloud.google.com/go/storage" + "cloud.google.com/go/storage/dataflux" + "google.golang.org/api/iterator" +) + +func ExampleNextBatch_batch() { + ctx := context.Background() + // Pass in any client opts or set retry policy here. + client, err := storage.NewClient(ctx) + if err != nil { + // handle error + } + + // Create dataflux fast-list input and provide desired options, + // including number of workers, batch size, query to filer objects, etc. + in := &dataflux.ListerInput{ + BucketName: "mybucket", + // Optionally specify params to apply to lister. + Parallelism: 100, + BatchSize: 500000, + Query: storage.Query{}, + SkipDirectoryObjects: false, + } + + // Create Lister with desired options, including number of workers, + // part size, per operation timeout, etc. + df := dataflux.NewLister(client, in) + defer df.Close() + + var numOfObjects int + + for { + objects, err := df.NextBatch(ctx) + if err != nil { + // handle error + } + + if err == iterator.Done { + numOfObjects += len(objects) + // No more objects in the bucket to list. + break + } + if err != nil { + // handle error + } + numOfObjects += len(objects) + } + log.Printf("listing %d objects in bucket %q is complete.", numOfObjects, in.BucketName) +} diff --git a/storage/dataflux/fast_list.go b/storage/dataflux/fast_list.go new file mode 100644 index 000000000000..45a59758ee0c --- /dev/null +++ b/storage/dataflux/fast_list.go @@ -0,0 +1,156 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "context" + "errors" + "fmt" + + "cloud.google.com/go/storage" + "golang.org/x/sync/errgroup" + "google.golang.org/api/iterator" +) + +// listingMethod represents the method of listing. +type listingMethod int + +const ( + // open when any method can be used to list. + open listingMethod = iota + // sequential when the listing is done sequentially. + sequential + // worksteal when the listing is done using work stealing algorithm. + worksteal +) + +// ListerInput contains options for listing objects. +type ListerInput struct { + // BucketName is the name of the bucket to list objects from. Required. + BucketName string + + // Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional. + Parallelism int + + // BatchSize is the number of objects to list. Default value returns all objects at once. Optional. + // The number of objects returned will be rounded up to a multiple of gcs page size. + BatchSize int + + // Query is the query to filter objects for listing. Default value is nil. Optional. + //Use ProjectionNoACL For faster listing. ACL is expensive and this results in fewer objects + // to be returned from GCS in each API call. + Query storage.Query + + // SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional. + SkipDirectoryObjects bool +} + +// Lister is used for interacting with Dataflux fast-listing. +// The caller should initialize it with NewLister() instead of creating it directly. +type Lister struct { + // method indicates the listing method(open, sequential, worksteal) to be used for listing. + method listingMethod + + // pageToken is the token to use for sequential listing. + pageToken string + + // bucket is the bucket handle to list objects from. + bucket *storage.BucketHandle + + // batchSize is the number of objects to list. + batchSize int + + // query is the query to filter objects for listing. + query storage.Query + + // skipDirectoryObjects is to indicate whether to list directory objects. + skipDirectoryObjects bool +} + +// NewLister creates a new dataflux Lister to list objects in the give bucket. +func NewLister(c *storage.Client, in *ListerInput) *Lister { + bucket := c.Bucket(in.BucketName) + lister := &Lister{ + method: open, + pageToken: "", + bucket: bucket, + batchSize: in.BatchSize, + query: in.Query, + skipDirectoryObjects: in.SkipDirectoryObjects, + } + return lister +} + +// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly +// return a list of objects in the bucket. For smaller dataset, +// sequential listing is expected to be faster. For larger dataset, +// worksteal listing is expected to be faster. +func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) { + // countError tracks the number of failed listing methods. + countError := 0 + var results []*storage.ObjectAttrs + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Errgroup takes care of running both methods in parallel. As soon as one of the method + // is complete, the running method also stops. + g, childCtx := errgroup.WithContext(ctx) + + // To start listing method is Open and runs both worksteal and sequential listing in parallel. + // The method which completes first is used for all subsequent runs. + // TODO: Run worksteal listing when method is Open or WorkSteal. + // Run sequential listing when method is Open or Sequential. + if c.method != worksteal { + + g.Go(func() error { + objects, nextToken, err := c.sequentialListing(childCtx) + if err != nil { + countError++ + return fmt.Errorf("error in running sequential listing: %w", err) + } + // If sequential listing completes first, set method to sequential listing and ranges to nil. + // The nextToken will be used to continue sequential listing. + results = objects + c.pageToken = nextToken + c.method = sequential + // Close context when sequential listing is complete. + cancel() + return nil + }) + } + + // Close all functions if either sequential listing or worksteal listing is complete. + err := g.Wait() + + // If the error is not context.Canceled, then return error instead of falling back + // to the other method. This is so that the error can be fixed and user can take + // advantage of fast-listing. + // As one of the listing method completes, it is expected to cancel context for the other method. + // If both sequential and worksteal listing fail due to context canceled, only then return error. + if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) { + return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err) + } + + // If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete. + if c.pageToken == "" { + return results, iterator.Done + } + return results, nil +} + +// Close closes the range channel of the Lister. +func (c *Lister) Close() { + + // TODO: Close range channel for worksteal lister. +} diff --git a/storage/dataflux/integration_test.go b/storage/dataflux/integration_test.go new file mode 100644 index 000000000000..193356dc6c83 --- /dev/null +++ b/storage/dataflux/integration_test.go @@ -0,0 +1,301 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "context" + crand "crypto/rand" + "flag" + "fmt" + "hash/crc32" + "io" + "log" + "math/rand" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/internal/uid" + "cloud.google.com/go/storage" + "google.golang.org/api/iterator" +) + +const ( + testPrefix = "go-integration-test-tm" + grpcTestPrefix = "golang-grpc-test-tm" + bucketExpiryAge = 24 * time.Hour + minObjectSize = 1024 + maxObjectSize = 1024 * 1024 +) + +var ( + uidSpace = uid.NewSpace("", nil) + // These buckets are shared amongst download tests. They are created, + // populated with objects and cleaned up in TestMain. + httpTestBucket = downloadTestBucket{} +) + +func TestMain(m *testing.M) { + flag.Parse() + fmt.Println("creating bucket") + if err := httpTestBucket.Create(testPrefix); err != nil { + log.Fatalf("test bucket creation failed: %v", err) + } + + m.Run() + + if err := httpTestBucket.Cleanup(); err != nil { + log.Printf("test bucket cleanup failed: %v", err) + } + + if err := deleteExpiredBuckets(testPrefix); err != nil { + log.Printf("expired http bucket cleanup failed: %v", err) + } +} + +// Lists the all the objects in the bucket. +func TestIntegration_NextBatch_All(t *testing.T) { + if testing.Short() { + t.Skip("Integration tests skipped in short mode") + } + ctx := context.Background() + c, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("NewClient: %v", err) + } + in := &ListerInput{ + BucketName: httpTestBucket.bucket, + } + + df := NewLister(c, in) + defer df.Close() + + objects, err := df.NextBatch(ctx) + if err != nil && err != iterator.Done { + t.Errorf("df.NextBatch : %v", err) + } + + if len(objects) != len(httpTestBucket.objects) { + t.Errorf("expected to receive %d results, got %d results", len(httpTestBucket.objects), len(objects)) + } +} + +func TestIntegration_NextBatch(t *testing.T) { + // Accessing public bucket to list large number of files in batches. + // See https://cloud.google.com/storage/docs/public-datasets/landsat + if testing.Short() { + t.Skip("Integration tests skipped in short mode") + } + const landsatBucket = "gcp-public-data-landsat" + const landsatPrefix = "LC08/01/001/00" + wantObjects := 17225 + ctx := context.Background() + c, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("NewClient: %v", err) + } + + in := &ListerInput{ + BucketName: landsatBucket, + Query: storage.Query{Prefix: landsatPrefix}, + BatchSize: 6000, + } + + df := NewLister(c, in) + defer df.Close() + totalObjects := 0 + for { + objects, err := df.NextBatch(ctx) + if err != nil && err != iterator.Done { + t.Errorf("df.NextBatch : %v", err) + } + totalObjects += len(objects) + if err == iterator.Done { + break + } + if len(objects) > in.BatchSize { + t.Errorf("expected to receive %d objects in each batch, got %d objects in a batch", in.BatchSize, len(objects)) + } + } + if totalObjects != wantObjects { + t.Errorf("expected to receive %d objects in results, got %d objects in results", wantObjects, totalObjects) + + } +} + +// generateRandomFileInGCS uploads a file with random contents to GCS and returns +// the crc32c hash of the contents. +func generateFileInGCS(ctx context.Context, o *storage.ObjectHandle, size int64) (uint32, error) { + w := o.Retryer(storage.WithPolicy(storage.RetryAlways)).NewWriter(ctx) + + crc32cHash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + mw := io.MultiWriter(w, crc32cHash) + + if _, err := io.CopyN(mw, crand.Reader, size); err != nil { + w.Close() + return 0, err + } + return crc32cHash.Sum32(), w.Close() +} + +// randomInt64 returns a value in the closed interval [min, max]. +// That is, the endpoints are possible return values. +func randomInt64(min, max int64) int64 { + if min > max { + log.Fatalf("min cannot be larger than max; min: %d max: %d", min, max) + } + return rand.Int63n(max-min+1) + min +} + +func deleteExpiredBuckets(prefix string) error { + if testing.Short() { + return nil + } + + ctx := context.Background() + client, err := storage.NewClient(ctx) + if err != nil { + return fmt.Errorf("NewClient: %v", err) + } + + projectID := testutil.ProjID() + it := client.Buckets(ctx, projectID) + it.Prefix = prefix + for { + bktAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + if time.Since(bktAttrs.Created) > bucketExpiryAge { + log.Printf("deleting bucket %q, which is more than %s old", bktAttrs.Name, bucketExpiryAge) + if err := killBucket(ctx, client, bktAttrs.Name); err != nil { + return err + } + } + } + return nil +} + +// killBucket deletes a bucket and all its objects. +func killBucket(ctx context.Context, client *storage.Client, bucketName string) error { + bkt := client.Bucket(bucketName) + // Bucket must be empty to delete. + it := bkt.Objects(ctx, nil) + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + if err := bkt.Object(objAttrs.Name).Delete(ctx); err != nil { + return fmt.Errorf("deleting %q: %v", bucketName+"/"+objAttrs.Name, err) + } + } + // GCS is eventually consistent, so this delete may fail because the + // replica still sees an object in the bucket. We log the error and expect + // a later test run to delete the bucket. + if err := bkt.Delete(ctx); err != nil { + log.Printf("deleting %q: %v", bucketName, err) + } + return nil +} + +// downloadTestBucket provides a bucket that can be reused for tests that only +// download from the bucket. +type downloadTestBucket struct { + bucket string + objects []string + contentHashes map[string]uint32 + objectSizes map[string]int64 +} + +// Create initializes the downloadTestBucket, creating a bucket and populating +// objects in it. All objects are of the same size but with different contents +// and can be mapped to their respective crc32c hash in contentHashes. +func (tb *downloadTestBucket) Create(prefix string) error { + if testing.Short() { + return nil + } + ctx := context.Background() + + tb.bucket = prefix + uidSpace.New() + tb.objects = []string{ + "!#$&'()*+,:;=,?@,[] and spaces", + "./obj", + "obj1", + "obj2", + "dir/file", + "dir/objA", + "dir/objB", + "dir/objC", + "dir/nested/objA", + "dir/nested/again/obj1", + "anotherDir/objC", + } + tb.contentHashes = make(map[string]uint32) + tb.objectSizes = make(map[string]int64) + + client, err := storage.NewClient(ctx) + if err != nil { + return fmt.Errorf("NewClient: %v", err) + } + defer client.Close() + + b := client.Bucket(tb.bucket) + if err := b.Create(ctx, testutil.ProjID(), nil); err != nil { + return fmt.Errorf("bucket(%q).Create: %v", tb.bucket, err) + } + + // Write objects. + for _, obj := range tb.objects { + size := randomInt64(minObjectSize, maxObjectSize) + crc, err := generateFileInGCS(ctx, b.Object(obj), size) + if err != nil { + return fmt.Errorf("generateFileInGCS: %v", err) + } + tb.contentHashes[obj] = crc + tb.objectSizes[obj] = size + } + return nil +} + +// Cleanup deletes the objects and bucket created in Create. +func (tb *downloadTestBucket) Cleanup() error { + if testing.Short() { + return nil + } + ctx := context.Background() + + client, err := storage.NewClient(ctx) + if err != nil { + return fmt.Errorf("NewClient: %v", err) + } + defer client.Close() + + b := client.Bucket(tb.bucket) + + for _, obj := range tb.objects { + if err := b.Object(obj).Delete(ctx); err != nil { + return fmt.Errorf("object.Delete: %v", err) + } + } + + return b.Delete(ctx) +} diff --git a/storage/dataflux/range_splitter.go b/storage/dataflux/range_splitter.go new file mode 100644 index 000000000000..4c3564ecc54b --- /dev/null +++ b/storage/dataflux/range_splitter.go @@ -0,0 +1,42 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "sync" +) + +// rangeSplitter specifies the a list and a map of sorted alphabets. +type rangeSplitter struct { + mu sync.Mutex + sortedAlphabet *[]rune + alphabetMap map[rune]int +} + +// listRange specifies the start and end range for the range splitter. +type listRange struct { + startRange string + endRange string +} + +// newRangeSplitter creates a new RangeSplitter with the given alphabets. +func newRangeSplitter(alphabet string) *rangeSplitter { + return &rangeSplitter{} +} + +// splitRange creates a given number of splits based on a provided start and end range. +func (rs *rangeSplitter) splitRange(startRange, endRange string, numSplits int) ([]string, error) { + return nil, nil +} diff --git a/storage/dataflux/sequential.go b/storage/dataflux/sequential.go new file mode 100644 index 000000000000..e68d733df879 --- /dev/null +++ b/storage/dataflux/sequential.go @@ -0,0 +1,80 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "context" + "fmt" + "strings" + + "cloud.google.com/go/storage" + "google.golang.org/api/iterator" +) + +const ( + // defaultPageSize specifies the number of object results to include on a single page. + defaultPageSize = 5000 +) + +// sequentialListing performs a sequential listing on the given bucket. +// It returns a list of objects and the next token to use to continue listing. +// If the next token is empty, then listing is complete. +func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs, string, error) { + var result []*storage.ObjectAttrs + var objectsListed int + var lastToken string + objectIterator := c.bucket.Objects(ctx, &c.query) + objectIterator.PageInfo().Token = c.pageToken + objectIterator.PageInfo().MaxSize = defaultPageSize + + for { + objects, nextToken, numObjects, err := doListing(objectIterator, c.skipDirectoryObjects) + if err != nil { + return nil, "", fmt.Errorf("failed while listing objects: %w", err) + } + result = append(result, objects...) + lastToken = nextToken + objectsListed += numObjects + if nextToken == "" || (c.batchSize > 0 && objectsListed >= c.batchSize) { + break + } + c.pageToken = nextToken + } + return result, lastToken, nil +} + +func doListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, objectsListed int, err error) { + + for { + attrs, errObjectIterator := objectIterator.Next() + objectsListed++ + // Stop listing when all the requested objects have been listed. + if errObjectIterator == iterator.Done { + break + } + if errObjectIterator != nil { + err = fmt.Errorf("iterating through objects %w", errObjectIterator) + return + } + if !(skipDirectoryObjects && strings.HasSuffix(attrs.Name, "/")) { + result = append(result, attrs) + } + if objectIterator.PageInfo().Remaining() == 0 { + break + } + } + token = objectIterator.PageInfo().Token + return +} diff --git a/storage/dataflux/worksteal.go b/storage/dataflux/worksteal.go new file mode 100644 index 000000000000..bd73bcab5043 --- /dev/null +++ b/storage/dataflux/worksteal.go @@ -0,0 +1,90 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "context" + "sync" + + "cloud.google.com/go/storage" +) + +// workerStatus indicates the status of a worker. +type workerStatus int + +const ( + // idle status shows that the worker is currently not listing. + idle workerStatus = iota + // active status shows that the worker is currently listing objects within assigned range. + active +) + +type listerResult struct { + mu sync.Mutex + objects []*storage.ObjectAttrs +} + +type worker struct { + goroutineID int + startRange string + endRange string + status workerStatus + rangesplitter *rangeSplitter + idleChannel chan int + result *listerResult + generation int64 +} + +// workstealListing is the main entry point of the worksteal algorithm. +// It performs worksteal to achieve highly dynamic object listing. +func (c *Lister) workstealListing(ctx context.Context) []*storage.ObjectAttrs { + return nil +} + +// newObjectListerOpts specifies options for instantiating the NewObjectLister. +type newObjectListerOpts struct { + // startRange is the start offset of the objects to be listed. + startRange string + // endRange is the end offset of the objects to be listed. + endRange string + // bucketHandle is the bucket handle of the bucket to be listed. + bucketHandle *storage.BucketHandle + // query is the storage.Query to filter objects for listing. + query storage.Query + // skipDirectoryObjects is to indicate whether to list directory objects. + skipDirectoryObjects bool + // generation is the generation number of the last object in the page. + generation int64 +} + +// nextPageResult holds the next page of object names and indicates whether the +// lister has completed listing (no more objects to retrieve). +type nextPageResult struct { + // items is the list of objects listed. + items []*storage.ObjectAttrs + // doneListing indicates whether the lister has completed listing. + doneListing bool + // nextStartRange is the start offset of the next page of objects to be listed. + nextStartRange string + // generation is the generation number of the last object in the page. + generation int64 +} + +func addPrefix(name, prefix string) string { + if name != "" { + return prefix + name + } + return name +} diff --git a/storage/go.mod b/storage/go.mod index 009888a87639..73ad3c0d97ba 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -18,6 +18,7 @@ require ( go.opentelemetry.io/otel/sdk v1.29.0 go.opentelemetry.io/otel/sdk/metric v1.29.0 golang.org/x/oauth2 v0.23.0 + golang.org/x/sync v0.8.0 google.golang.org/api v0.197.0 google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 @@ -53,7 +54,6 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/net v0.29.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.6.0 // indirect