Skip to content

Commit

Permalink
feat(thanos): make use of the new function IterWithAttributes (#14793)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoBraveCoding authored Nov 11, 2024
1 parent ab2721d commit cfc3819
Show file tree
Hide file tree
Showing 19 changed files with 414 additions and 59 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ require (
github.com/richardartoul/molecule v1.0.0
github.com/schollz/progressbar/v3 v3.17.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015013301-cea7aa5d8037
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2585,8 +2585,8 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.480/go.mod
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm v1.0.480/go.mod h1:zaBIuDDs+rC74X8Aog+LSu91GFtHYRYDC196RGTm2jk=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqXxsCiq4UYeKCe07le0ieffyuHm/YfmH8=
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13 h1:PQd6xZs18KGoCZJgL9eyYsrRGzzRwYCr4iXuehZm++w=
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/rulestore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul
Name: group,
})
return nil
}, objstore.WithRecursiveIter)
}, objstore.WithRecursiveIter())

if err != nil {
return nil, err
Expand Down Expand Up @@ -156,7 +156,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context,
Name: group,
})
return nil
}, objstore.WithRecursiveIter)
}, objstore.WithRecursiveIter())
if err != nil {
return nil, err
}
Expand Down
36 changes: 26 additions & 10 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bucket
import (
"context"
"io"
"slices"
"strings"

"github.com/go-kit/log"
Expand All @@ -16,6 +17,7 @@ import (
type ObjectClientAdapter struct {
bucket, hedgedBucket objstore.Bucket
logger log.Logger
supportsUpdatedAt bool
isRetryableErr func(err error) bool
}

Expand All @@ -25,9 +27,10 @@ func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Log
}

o := &ObjectClientAdapter{
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
supportsUpdatedAt: slices.Contains(bucket.SupportedIterOptions(), objstore.UpdatedAt),
// default to no retryable errors. Override with WithRetryableErrFunc
isRetryableErr: func(_ error) bool {
return false
Expand Down Expand Up @@ -103,26 +106,39 @@ func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
iterParams = append(iterParams, objstore.WithRecursiveIter())
}

err := o.bucket.Iter(ctx, prefix, func(objectKey string) error {
if o.supportsUpdatedAt {
iterParams = append(iterParams, objstore.WithUpdatedAt())
}

err := o.bucket.IterWithAttributes(ctx, prefix, func(attrs objstore.IterObjectAttributes) error {
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
objectKey := attrs.Name
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}

// TODO: remove this once thanos support IterWithAttributes
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
lastModified, ok := attrs.LastModified()
if o.supportsUpdatedAt && !ok {
return errors.Errorf("failed to get lastModified for %s", objectKey)
}
// Some providers do not support supports UpdatedAt option. For those we need
// to make an additional request to get the last modified time.
if !o.supportsUpdatedAt {
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}
lastModified = attr.LastModified
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
ModifiedAt: lastModified,
})

return nil
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/bucket/prefixed_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error {
// Name returns the bucket name for the provider.
func (b *PrefixedBucketClient) Name() string { return b.bucket.Name() }

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
func (b *PrefixedBucketClient) SupportedIterOptions() []objstore.IterOptionType {
return b.bucket.SupportedIterOptions()
}

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory. The configured prefix will be stripped
// before supplied function is applied.
Expand All @@ -53,6 +58,18 @@ func (b *PrefixedBucketClient) Iter(ctx context.Context, dir string, f func(stri
}, options...)
}

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
func (b *PrefixedBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
return b.bucket.IterWithAttributes(ctx, b.fullName(dir), func(attrs objstore.IterObjectAttributes) error {
attrs.Name = strings.TrimPrefix(attrs.Name, b.prefix+objstore.DirDelim)
return f(attrs)
}, options...)
}

// Get returns a reader for the given object name.
func (b *PrefixedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, b.fullName(name))
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,25 @@ func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
return sse, nil
}

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
func (b *SSEBucketClient) SupportedIterOptions() []objstore.IterOptionType {
return b.bucket.SupportedIterOptions()
}

// Iter implements objstore.Bucket.
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bucket.Iter(ctx, dir, f, options...)
}

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
func (b *SSEBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
return b.bucket.IterWithAttributes(ctx, dir, f, options...)
}

// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, name)
Expand Down
1 change: 1 addition & 0 deletions vendor/github.com/thanos-io/objstore/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 19 additions & 4 deletions vendor/github.com/thanos-io/objstore/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/thanos-io/objstore/inmem.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cfc3819

Please sign in to comment.