From fc8359cc62dfbae650755f93399ea8c06461981e Mon Sep 17 00:00:00 2001 From: Rikhil Shah Date: Thu, 19 Oct 2023 16:53:18 +0100 Subject: [PATCH] Authenticate using DefaultAzureCredential enabling support for Azure Workload Identity resolves #54 --- CHANGELOG.md | 4 ++++ objstore.go | 26 ++++++++++++++++++++------ objstore_test.go | 19 +++++++++++++++++++ providers/azure/helpers.go | 9 ++++++--- providers/gcs/gcs.go | 10 ++++++++-- providers/s3/s3.go | 9 ++++++++- 6 files changed, 65 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32d0f742..0dc65033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#33](https://github.com/thanos-io/objstore/pull/33) Tracing: Add `ContextWithTracer()` to inject the tracer into the context. - [#34](https://github.com/thanos-io/objstore/pull/34) Fix ignored options when creating shared credential Azure client. - [#62](https://github.com/thanos-io/objstore/pull/62) S3: Fix ignored context cancellation in `Iter` method. +- [#77](https://github.com/thanos-io/objstore/pull/77) Fix buckets wrapped with metrics from being unable to determine object sizes in `Upload`. +- [#78](https://github.com/thanos-io/objstore/pull/78) S3: Fix possible concurrent modification of the PutUserMetadata map. ### Added - [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support. @@ -27,9 +29,11 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#61](https://github.com/thanos-io/objstore/pull/61) Add OpenTelemetry TracingBucket. > This also changes the behaviour of `client.NewBucket`. Now it returns, uninstrumented and untraced bucket. You can combine `objstore.WrapWithMetrics` and `tracing/{opentelemetry,opentracing}.WrapWithTraces` to have old behavior. +- [#69](https://github.com/thanos-io/objstore/pull/69) [#66](https://github.com/thanos-io/objstore/pull/66) Add `objstore_bucket_operation_transferred_bytes` that counts the number of total bytes read from the bucket operation Get/GetRange and also counts the number of total bytes written to the bucket operation Upload. - [#64](https://github.com/thanos-io/objstore/pull/64) OCI: OKE Workload Identity support. - [#73](https://github.com/thanos-io/objstore/pull/73) Аdded file path to erros from DownloadFile - [#51](https://github.com/thanos-io/objstore/pull/51) Azure: Support using connection string authentication. +- [#76](https://github.com/thanos-io/objstore/pull/76) GCS: Query for object names only in `Iter` to possibly improve performance when listing objects. ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/objstore.go b/objstore.go index 84eafce3..20723704 100644 --- a/objstore.go +++ b/objstore.go @@ -458,11 +458,12 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu bkt.opsDuration.WithLabelValues(op) bkt.opsFetchedBytes.WithLabelValues(op) } - // fetched bytes only relevant for get and getrange + + // fetched bytes only relevant for get, getrange and upload for _, op := range []string{ OpGet, OpGetRange, - // TODO: Add uploads + OpUpload, } { bkt.opsTransferredBytes.WithLabelValues(op) } @@ -592,15 +593,25 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err const op = OpUpload b.ops.WithLabelValues(op).Inc() - start := time.Now() - if err := b.bkt.Upload(ctx, name, r); err != nil { + trc := newTimingReadCloser( + NopCloserWithSize(r), + op, + b.opsDuration, + b.opsFailures, + b.isOpFailureExpected, + nil, + b.opsTransferredBytes, + ) + defer trc.Close() + err := b.bkt.Upload(ctx, name, trc) + if err != nil { if !b.isOpFailureExpected(err) && ctx.Err() != context.Canceled { b.opsFailures.WithLabelValues(op).Inc() } return err } b.lastSuccessfulUploadTime.SetToCurrentTime() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + return nil } @@ -692,7 +703,10 @@ func (rc *timingReadCloser) Close() error { func (rc *timingReadCloser) Read(b []byte) (n int, err error) { n, err = rc.ReadCloser.Read(b) - rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n)) + if rc.fetchedBytes != nil { + rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n)) + } + rc.readBytes += int64(n) // Report metric just once. if !rc.alreadyGotErr && err != nil && err != io.EOF { diff --git a/objstore_test.go b/objstore_test.go index 4b105281..5feb089b 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -170,6 +170,25 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="+Inf"} 0 objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get_range"} 0 objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get_range"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="32768"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="65536"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="131072"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="262144"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="524288"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.048576e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="2.097152e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="4.194304e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="8.388608e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.6777216e+07"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="3.3554432e+07"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="6.7108864e+07"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.34217728e+08"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="2.68435456e+08"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="5.36870912e+08"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.073741824e+09"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="+Inf"} 3 + objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="upload"} 1.048578e+06 + objstore_bucket_operation_transferred_bytes_count{bucket="",operation="upload"} 3 `), `objstore_bucket_operation_transferred_bytes`)) testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10))) diff --git a/providers/azure/helpers.go b/providers/azure/helpers.go index 7b4a5fbe..01708770 100644 --- a/providers/azure/helpers.go +++ b/providers/azure/helpers.go @@ -6,12 +6,14 @@ package azure import ( "fmt" "net/http" + "os" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/pkg/errors" "github.com/thanos-io/objstore/exthttp" ) @@ -64,11 +66,12 @@ func getContainerClient(conf Config) (*container.Client, error) { } // Use MSI for authentication. - msiOpt := &azidentity.ManagedIdentityCredentialOptions{} if conf.UserAssignedID != "" { - msiOpt.ID = azidentity.ClientID(conf.UserAssignedID) + if err := os.Setenv("AZURE_CLIENT_ID", conf.UserAssignedID); err != nil { + return nil, errors.Wrapf(err, "unable to set environment variable for AZURE_CLIENT_ID") + } } - cred, err := azidentity.NewManagedIdentityCredential(msiOpt) + cred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { return nil, err } diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index 5ea45c7e..ad305d6e 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -108,10 +108,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt delimiter = "" } - it := b.bkt.Objects(ctx, &storage.Query{ + query := &storage.Query{ Prefix: dir, Delimiter: delimiter, - }) + } + err := query.SetAttrSelection([]string{"Name"}) + if err != nil { + return err + } + + it := b.bkt.Objects(ctx, query) for { select { case <-ctx.Done(): diff --git a/providers/s3/s3.go b/providers/s3/s3.go index f92d3973..83e3a2de 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -492,6 +492,13 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { if size < int64(partSize) { partSize = 0 } + + // Cloning map since minio may modify it + userMetadata := make(map[string]string, len(b.putUserMetadata)) + for k, v := range b.putUserMetadata { + userMetadata[k] = v + } + if _, err := b.client.PutObject( ctx, b.name, @@ -501,7 +508,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { minio.PutObjectOptions{ PartSize: partSize, ServerSideEncryption: sse, - UserMetadata: b.putUserMetadata, + UserMetadata: userMetadata, StorageClass: b.storageClass, // 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we // ensure we pin this number to four.