Skip to content

Commit

Permalink
Authenticate using DefaultAzureCredential enabling support for Azure …
Browse files Browse the repository at this point in the history
…Workload Identity resolves thanos-io#54
  • Loading branch information
rikhil-s committed Oct 19, 2023
1 parent 1b257a3 commit 78288d8
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#33](https://github.com/thanos-io/objstore/pull/33) Tracing: Add `ContextWithTracer()` to inject the tracer into the context.
- [#34](https://github.com/thanos-io/objstore/pull/34) Fix ignored options when creating shared credential Azure client.
- [#62](https://github.com/thanos-io/objstore/pull/62) S3: Fix ignored context cancellation in `Iter` method.
- [#77](https://github.com/thanos-io/objstore/pull/77) Fix buckets wrapped with metrics from being unable to determine object sizes in `Upload`.
- [#78](https://github.com/thanos-io/objstore/pull/78) S3: Fix possible concurrent modification of the PutUserMetadata map.

### Added
- [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support.
Expand All @@ -27,9 +29,11 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#61](https://github.com/thanos-io/objstore/pull/61) Add OpenTelemetry TracingBucket.
> This also changes the behaviour of `client.NewBucket`. Now it returns, uninstrumented and untraced bucket.
You can combine `objstore.WrapWithMetrics` and `tracing/{opentelemetry,opentracing}.WrapWithTraces` to have old behavior.
- [#69](https://github.com/thanos-io/objstore/pull/69) [#66](https://github.com/thanos-io/objstore/pull/66) Add `objstore_bucket_operation_transferred_bytes` that counts the number of total bytes read from the bucket operation Get/GetRange and also counts the number of total bytes written to the bucket operation Upload.
- [#64](https://github.com/thanos-io/objstore/pull/64) OCI: OKE Workload Identity support.
- [#73](https://github.com/thanos-io/objstore/pull/73) Аdded file path to erros from DownloadFile
- [#51](https://github.com/thanos-io/objstore/pull/51) Azure: Support using connection string authentication.
- [#76](https://github.com/thanos-io/objstore/pull/76) GCS: Query for object names only in `Iter` to possibly improve performance when listing objects.

### Changed
- [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`.
Expand Down
26 changes: 20 additions & 6 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,12 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
bkt.opsDuration.WithLabelValues(op)
bkt.opsFetchedBytes.WithLabelValues(op)
}
// fetched bytes only relevant for get and getrange

// fetched bytes only relevant for get, getrange and upload
for _, op := range []string{
OpGet,
OpGetRange,
// TODO: Add uploads
OpUpload,
} {
bkt.opsTransferredBytes.WithLabelValues(op)
}
Expand Down Expand Up @@ -592,15 +593,25 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

start := time.Now()
if err := b.bkt.Upload(ctx, name, r); err != nil {
trc := newTimingReadCloser(
NopCloserWithSize(r),
op,
b.opsDuration,
b.opsFailures,
b.isOpFailureExpected,
nil,
b.opsTransferredBytes,
)
defer trc.Close()
err := b.bkt.Upload(ctx, name, trc)
if err != nil {
if !b.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.opsFailures.WithLabelValues(op).Inc()
}
return err
}
b.lastSuccessfulUploadTime.SetToCurrentTime()
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())

return nil
}

Expand Down Expand Up @@ -692,7 +703,10 @@ func (rc *timingReadCloser) Close() error {

func (rc *timingReadCloser) Read(b []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(b)
rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n))
if rc.fetchedBytes != nil {
rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n))
}

rc.readBytes += int64(n)
// Report metric just once.
if !rc.alreadyGotErr && err != nil && err != io.EOF {
Expand Down
19 changes: 19 additions & 0 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,25 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="+Inf"} 0
objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get_range"} 0
objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get_range"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="32768"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="65536"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="131072"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="262144"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="524288"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.048576e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="2.097152e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="4.194304e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="8.388608e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.6777216e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="3.3554432e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="6.7108864e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.34217728e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="2.68435456e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="5.36870912e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="1.073741824e+09"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="upload",le="+Inf"} 3
objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="upload"} 1.048578e+06
objstore_bucket_operation_transferred_bytes_count{bucket="",operation="upload"} 3
`), `objstore_bucket_operation_transferred_bytes`))

testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10)))
Expand Down
9 changes: 6 additions & 3 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package azure
import (
"fmt"
"net/http"
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/pkg/errors"

"github.com/thanos-io/objstore/exthttp"
)
Expand Down Expand Up @@ -64,11 +66,12 @@ func getContainerClient(conf Config) (*container.Client, error) {
}

// Use MSI for authentication.
msiOpt := &azidentity.ManagedIdentityCredentialOptions{}
if conf.UserAssignedID != "" {
msiOpt.ID = azidentity.ClientID(conf.UserAssignedID)
if err := os.Setenv("AZURE_CLIENT_ID", conf.UserAssignedID); err != nil {
return nil, errors.Wrapf(err, "unable to set environment variable for AZURE_CLIENT_ID")
}
}
cred, err := azidentity.NewManagedIdentityCredential(msiOpt)
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
delimiter = ""
}

it := b.bkt.Objects(ctx, &storage.Query{
query := &storage.Query{
Prefix: dir,
Delimiter: delimiter,
})
}
err := query.SetAttrSelection([]string{"Name"})
if err != nil {
return err
}

it := b.bkt.Objects(ctx, query)
for {
select {
case <-ctx.Done():
Expand Down
9 changes: 8 additions & 1 deletion providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,13 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if size < int64(partSize) {
partSize = 0
}

// Cloning map since minio may modify it
userMetadata := make(map[string]string, len(b.putUserMetadata))
for k, v := range b.putUserMetadata {
userMetadata[k] = v
}

if _, err := b.client.PutObject(
ctx,
b.name,
Expand All @@ -501,7 +508,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
minio.PutObjectOptions{
PartSize: partSize,
ServerSideEncryption: sse,
UserMetadata: b.putUserMetadata,
UserMetadata: userMetadata,
StorageClass: b.storageClass,
// 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we
// ensure we pin this number to four.
Expand Down

0 comments on commit 78288d8

Please sign in to comment.