diff --git a/go.mod b/go.mod index b397adca67c5d..fad94e325aa59 100644 --- a/go.mod +++ b/go.mod @@ -137,7 +137,7 @@ require ( github.com/richardartoul/molecule v1.0.0 github.com/schollz/progressbar/v3 v3.17.0 github.com/shirou/gopsutil/v4 v4.24.10 - github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d + github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 github.com/twmb/franz-go v1.17.1 github.com/twmb/franz-go/pkg/kadm v1.13.0 github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015013301-cea7aa5d8037 diff --git a/go.sum b/go.sum index bbd72482c41b5..ac7fede0d1f79 100644 --- a/go.sum +++ b/go.sum @@ -2585,8 +2585,8 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.480/go.mod github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm v1.0.480/go.mod h1:zaBIuDDs+rC74X8Aog+LSu91GFtHYRYDC196RGTm2jk= github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= -github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqXxsCiq4UYeKCe07le0ieffyuHm/YfmH8= -github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4= +github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0= +github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index 89ad69f2e3c62..22acb28f4a9c1 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -119,7 +119,7 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul Name: group, }) return nil - }, objstore.WithRecursiveIter) + }, objstore.WithRecursiveIter()) if err != nil { return nil, err @@ -156,7 +156,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, Name: group, }) return nil - }, objstore.WithRecursiveIter) + }, objstore.WithRecursiveIter()) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/azure/bucket_client.go b/pkg/storage/bucket/azure/bucket_client.go index e0910b9dbed22..f7157cf1bca9a 100644 --- a/pkg/storage/bucket/azure/bucket_client.go +++ b/pkg/storage/bucket/azure/bucket_client.go @@ -22,16 +22,12 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo bucketConfig.ContainerName = cfg.ContainerName bucketConfig.MaxRetries = cfg.MaxRetries bucketConfig.UserAssignedID = cfg.UserAssignedID + bucketConfig.HTTPConfig.Transport = cfg.Transport if cfg.Endpoint != "" { // azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided. bucketConfig.Endpoint = cfg.Endpoint } - return factory(logger, bucketConfig, name, func(rt http.RoundTripper) http.RoundTripper { - if cfg.Transport != nil { - rt = cfg.Transport - } - return rt - }) + return factory(logger, bucketConfig, name, nil) } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 06f8d128f850d..64338e8f02a18 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -4,6 +4,8 @@ import ( "context" "errors" "flag" + "fmt" + "net/http" "regexp" "github.com/go-kit/log" @@ -126,6 +128,44 @@ func (cfg *Config) Validate() error { return cfg.StorageBackendConfig.Validate() } +func (cfg *Config) disableRetries(backend string) error { + switch backend { + case S3: + cfg.S3.MaxRetries = 1 + case GCS: + cfg.GCS.MaxRetries = 1 + case Azure: + cfg.Azure.MaxRetries = 1 + case Swift: + cfg.Swift.MaxRetries = 1 + case Filesystem: + // do nothing + default: + return fmt.Errorf("cannot disable retries for backend: %s", backend) + } + + return nil +} + +func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) error { + switch backend { + case S3: + cfg.S3.HTTP.Transport = rt + case GCS: + cfg.GCS.Transport = rt + case Azure: + cfg.Azure.Transport = rt + case Swift: + cfg.Swift.Transport = rt + case Filesystem: + // do nothing + default: + return fmt.Errorf("cannot configure transport for backend: %s", backend) + } + + return nil +} + // NewClient creates a new bucket client based on the configured backend func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) { var ( diff --git a/pkg/storage/bucket/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go index b5a8ce541e1d7..950202ea540e9 100644 --- a/pkg/storage/bucket/gcs/bucket_client.go +++ b/pkg/storage/bucket/gcs/bucket_client.go @@ -15,6 +15,7 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo bucketConfig.Bucket = cfg.BucketName bucketConfig.ServiceAccount = cfg.ServiceAccount.String() bucketConfig.ChunkSizeBytes = cfg.ChunkBufferSize + bucketConfig.MaxRetries = cfg.MaxRetries bucketConfig.HTTPConfig.Transport = cfg.Transport return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil) diff --git a/pkg/storage/bucket/gcs/config.go b/pkg/storage/bucket/gcs/config.go index a46c5030e4413..23ac4b409137f 100644 --- a/pkg/storage/bucket/gcs/config.go +++ b/pkg/storage/bucket/gcs/config.go @@ -12,6 +12,7 @@ type Config struct { BucketName string `yaml:"bucket_name"` ServiceAccount flagext.Secret `yaml:"service_account" doc:"description_method=GCSServiceAccountLongDescription"` ChunkBufferSize int `yaml:"chunk_buffer_size"` + MaxRetries int `yaml:"max_retries"` // Allow upstream callers to inject a round tripper Transport http.RoundTripper `yaml:"-"` @@ -27,6 +28,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.BucketName, prefix+"gcs.bucket-name", "", "GCS bucket name") f.Var(&cfg.ServiceAccount, prefix+"gcs.service-account", cfg.GCSServiceAccountShortDescription()) f.IntVar(&cfg.ChunkBufferSize, prefix+"gcs.chunk-buffer-size", 0, "The maximum size of the buffer that GCS client for a single PUT request. 0 to disable buffering.") + f.IntVar(&cfg.MaxRetries, prefix+"gcs.max-retries", 10, "The maximum number of retries for idempotent operations. Overrides the default gcs storage client behavior if this value is greater than 0. Set this to 1 to disable retries.") } func (cfg *Config) GCSServiceAccountShortDescription() string { diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 094f0ad2ea7ac..011ad0ed624ad 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -2,51 +2,78 @@ package bucket import ( "context" + "fmt" "io" + "slices" "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/aws" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" ) type ObjectClientAdapter struct { bucket, hedgedBucket objstore.Bucket logger log.Logger + supportsUpdatedAt bool isRetryableErr func(err error) bool } -func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter { - if hedgedBucket == nil { - hedgedBucket = bucket +func NewObjectClient(ctx context.Context, backend string, cfg Config, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) { + if disableRetries { + if err := cfg.disableRetries(backend); err != nil { + return nil, fmt.Errorf("create bucket: %w", err) + } + } + + bucket, err := NewClient(ctx, backend, cfg, component, logger) + if err != nil { + return nil, fmt.Errorf("create bucket: %w", err) + } + + hedgedBucket := bucket + if hedgingCfg.At != 0 { + hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) + if err != nil { + return nil, fmt.Errorf("create hedged transport: %w", err) + } + + if err := cfg.configureTransport(backend, hedgedTrasport); err != nil { + return nil, fmt.Errorf("create hedged bucket: %w", err) + } + + hedgedBucket, err = NewClient(ctx, backend, cfg, component, logger) + if err != nil { + return nil, fmt.Errorf("create hedged bucket: %w", err) + } } o := &ObjectClientAdapter{ - bucket: bucket, - hedgedBucket: hedgedBucket, - logger: log.With(logger, "component", "bucket_to_object_client_adapter"), + bucket: bucket, + hedgedBucket: hedgedBucket, + logger: log.With(logger, "component", "bucket_to_object_client_adapter"), + supportsUpdatedAt: slices.Contains(bucket.SupportedIterOptions(), objstore.UpdatedAt), // default to no retryable errors. Override with WithRetryableErrFunc isRetryableErr: func(_ error) bool { return false }, } - for _, opt := range opts { - opt(o) + switch backend { + case GCS: + o.isRetryableErr = gcp.IsRetryableErr + case S3: + o.isRetryableErr = aws.IsRetryableErr } - return o -} - -type ClientOptions func(*ObjectClientAdapter) - -func WithRetryableErrFunc(f func(err error) bool) ClientOptions { - return func(o *ObjectClientAdapter) { - o.isRetryableErr = f - } + return o, nil } func (o *ObjectClientAdapter) Stop() { @@ -103,26 +130,39 @@ func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string // If delimiter is empty we want to list all files if delimiter == "" { - iterParams = append(iterParams, objstore.WithRecursiveIter) + iterParams = append(iterParams, objstore.WithRecursiveIter()) + } + + if o.supportsUpdatedAt { + iterParams = append(iterParams, objstore.WithUpdatedAt()) } - err := o.bucket.Iter(ctx, prefix, func(objectKey string) error { + err := o.bucket.IterWithAttributes(ctx, prefix, func(attrs objstore.IterObjectAttributes) error { // CommonPrefixes are keys that have the prefix and have the delimiter // as a suffix + objectKey := attrs.Name if delimiter != "" && strings.HasSuffix(objectKey, delimiter) { commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey)) return nil } - // TODO: remove this once thanos support IterWithAttributes - attr, err := o.bucket.Attributes(ctx, objectKey) - if err != nil { - return errors.Wrapf(err, "failed to get attributes for %s", objectKey) + lastModified, ok := attrs.LastModified() + if o.supportsUpdatedAt && !ok { + return errors.Errorf("failed to get lastModified for %s", objectKey) + } + // Some providers do not support supports UpdatedAt option. For those we need + // to make an additional request to get the last modified time. + if !o.supportsUpdatedAt { + attr, err := o.bucket.Attributes(ctx, objectKey) + if err != nil { + return errors.Wrapf(err, "failed to get attributes for %s", objectKey) + } + lastModified = attr.LastModified } storageObjects = append(storageObjects, client.StorageObject{ Key: objectKey, - ModifiedAt: attr.LastModified, + ModifiedAt: lastModified, }) return nil diff --git a/pkg/storage/bucket/object_client_adapter_test.go b/pkg/storage/bucket/object_client_adapter_test.go index 1ce6de26856bf..341b59566333a 100644 --- a/pkg/storage/bucket/object_client_adapter_test.go +++ b/pkg/storage/bucket/object_client_adapter_test.go @@ -6,10 +6,12 @@ import ( "sort" "testing" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" ) func TestObjectClientAdapter_List(t *testing.T) { @@ -95,8 +97,12 @@ func TestObjectClientAdapter_List(t *testing.T) { require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff)) require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff)) - client := NewObjectClientAdapter(newBucket, nil, nil) - client.bucket = newBucket + client, err := NewObjectClient(context.Background(), "filesystem", Config{ + StorageBackendConfig: StorageBackendConfig{ + Filesystem: config, + }, + }, "test", hedging.Config{}, false, log.NewNopLogger()) + require.NoError(t, err) storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter) if tt.wantErr != nil { diff --git a/pkg/storage/bucket/prefixed_bucket_client.go b/pkg/storage/bucket/prefixed_bucket_client.go index 45680d744c021..b97eb0247f21e 100644 --- a/pkg/storage/bucket/prefixed_bucket_client.go +++ b/pkg/storage/bucket/prefixed_bucket_client.go @@ -44,6 +44,11 @@ func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error { // Name returns the bucket name for the provider. func (b *PrefixedBucketClient) Name() string { return b.bucket.Name() } +// SupportedIterOptions returns a list of supported IterOptions by the underlying provider. +func (b *PrefixedBucketClient) SupportedIterOptions() []objstore.IterOptionType { + return b.bucket.SupportedIterOptions() +} + // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. The configured prefix will be stripped // before supplied function is applied. @@ -53,6 +58,18 @@ func (b *PrefixedBucketClient) Iter(ctx context.Context, dir string, f func(stri }, options...) } +// IterWithAttributes calls f for each entry in the given directory similar to Iter. +// In addition to Name, it also includes requested object attributes in the argument to f. +// +// Attributes can be requested using IterOption. +// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. +func (b *PrefixedBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.bucket.IterWithAttributes(ctx, b.fullName(dir), func(attrs objstore.IterObjectAttributes) error { + attrs.Name = strings.TrimPrefix(attrs.Name, b.prefix+objstore.DirDelim) + return f(attrs) + }, options...) +} + // Get returns a reader for the given object name. func (b *PrefixedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.bucket.Get(ctx, b.fullName(name)) diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 5d904d8e5fe9b..381f3436f53d4 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -82,5 +82,6 @@ func newS3Config(cfg Config) (s3.Config, error) { Enable: cfg.TraceConfig.Enabled, }, STSEndpoint: cfg.STSEndpoint, + MaxRetries: cfg.MaxRetries, }, nil } diff --git a/pkg/storage/bucket/s3/config.go b/pkg/storage/bucket/s3/config.go index 792f93f752b32..67c412de6d606 100644 --- a/pkg/storage/bucket/s3/config.go +++ b/pkg/storage/bucket/s3/config.go @@ -118,6 +118,7 @@ type Config struct { PartSize uint64 `yaml:"part_size" category:"experimental"` SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"` STSEndpoint string `yaml:"sts_endpoint"` + MaxRetries int `yaml:"max_retries"` SSE SSEConfig `yaml:"sse"` HTTP HTTPConfig `yaml:"http"` @@ -146,6 +147,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", "))) f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.") f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.") + f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.") cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f) cfg.HTTP.RegisterFlagsWithPrefix(prefix, f) cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f) diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index 04c3d71a68e10..f713ae64df007 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -94,11 +94,25 @@ func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) { return sse, nil } +// SupportedIterOptions returns a list of supported IterOptions by the underlying provider. +func (b *SSEBucketClient) SupportedIterOptions() []objstore.IterOptionType { + return b.bucket.SupportedIterOptions() +} + // Iter implements objstore.Bucket. func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return b.bucket.Iter(ctx, dir, f, options...) } +// IterWithAttributes calls f for each entry in the given directory similar to Iter. +// In addition to Name, it also includes requested object attributes in the argument to f. +// +// Attributes can be requested using IterOption. +// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. +func (b *SSEBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.bucket.IterWithAttributes(ctx, dir, f, options...) +} + // Get implements objstore.Bucket. func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.bucket.Get(ctx, name) diff --git a/pkg/storage/bucket/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go index b36c07e506b87..28f3c922c4254 100644 --- a/pkg/storage/bucket/swift/bucket_client.go +++ b/pkg/storage/bucket/swift/bucket_client.go @@ -4,8 +4,8 @@ import ( "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/exthttp" "github.com/thanos-io/objstore/providers/swift" - yaml "gopkg.in/yaml.v2" ) // NewBucketClient creates a new Swift bucket client @@ -33,14 +33,9 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, // Hard-coded defaults. ChunkSize: swift.DefaultConfig.ChunkSize, UseDynamicLargeObjects: false, + HTTPConfig: exthttp.DefaultHTTPConfig, } + bucketConfig.HTTPConfig.Transport = cfg.Transport - // Thanos currently doesn't support passing the config as is, but expects a YAML, - // so we're going to serialize it. - serialized, err := yaml.Marshal(bucketConfig) - if err != nil { - return nil, err - } - - return swift.NewContainer(logger, serialized, nil) + return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil) } diff --git a/pkg/storage/bucket/swift/config.go b/pkg/storage/bucket/swift/config.go index a30dd7319e8c9..22717efcc8e59 100644 --- a/pkg/storage/bucket/swift/config.go +++ b/pkg/storage/bucket/swift/config.go @@ -2,6 +2,7 @@ package swift import ( "flag" + "net/http" "time" ) @@ -26,6 +27,9 @@ type Config struct { MaxRetries int `yaml:"max_retries"` ConnectTimeout time.Duration `yaml:"connect_timeout"` RequestTimeout time.Duration `yaml:"request_timeout"` + + // Allow upstream callers to inject a round tripper + Transport http.RoundTripper `yaml:"-"` } // RegisterFlags registers the flags for Swift storage diff --git a/pkg/storage/chunk/client/aws/s3_thanos_object_client.go b/pkg/storage/chunk/client/aws/s3_thanos_object_client.go deleted file mode 100644 index e00ded920d552..0000000000000 --- a/pkg/storage/chunk/client/aws/s3_thanos_object_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package aws - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/storage/bucket" - "github.com/grafana/loki/v3/pkg/storage/chunk/client" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" -) - -func NewS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { - b, err := newS3ThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) - if err != nil { - return nil, err - } - - var hedged objstore.Bucket - if hedgingCfg.At != 0 { - hedged, err = newS3ThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err - } - } - - o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr)) - return o, nil -} - -func newS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { - if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) - if err != nil { - return nil, err - } - - cfg.S3.HTTP.Transport = hedgedTrasport - } - - return bucket.NewClient(ctx, bucket.S3, cfg, component, logger) -} diff --git a/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go b/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go deleted file mode 100644 index 4bf2137433064..0000000000000 --- a/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package azure - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/storage/bucket" - "github.com/grafana/loki/v3/pkg/storage/chunk/client" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" -) - -// NewBlobStorageObjectClient makes a new BlobStorage-backed ObjectClient. -func NewBlobStorageThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { - b, err := newBlobStorageThanosObjClient(ctx, cfg, component, logger, false, hedgingCfg) - if err != nil { - return nil, err - } - - var hedged objstore.Bucket - if hedgingCfg.At != 0 { - hedged, err = newBlobStorageThanosObjClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err - } - } - - return bucket.NewObjectClientAdapter(b, hedged, logger), nil -} - -func newBlobStorageThanosObjClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { - if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) - if err != nil { - return nil, err - } - - cfg.Azure.Transport = hedgedTrasport - } - - return bucket.NewClient(ctx, bucket.Azure, cfg, component, logger) -} diff --git a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go deleted file mode 100644 index b4190be2d6943..0000000000000 --- a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package gcp - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/storage/bucket" - "github.com/grafana/loki/v3/pkg/storage/chunk/client" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" -) - -func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { - b, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) - if err != nil { - return nil, err - } - - var hedged objstore.Bucket - if hedgingCfg.At != 0 { - hedged, err = newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err - } - } - - o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr)) - return o, nil -} - -func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { - if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) - if err != nil { - return nil, err - } - - cfg.GCS.Transport = hedgedTrasport - } - - return bucket.NewClient(ctx, bucket.GCS, cfg, component, logger) -} diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 7f4046a47d868..7475ab83f88be 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -611,12 +611,16 @@ func (c *ClientMetrics) Unregister() { // NewObjectClient makes a new StorageClient with the prefix in the front. func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { + if cfg.UseThanosObjstore { + return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, cfg.CongestionControl.Enabled, util_log.Logger) + } + actual, err := internalNewObjectClient(name, component, cfg, clientMetrics) if err != nil { return nil, err } - if cfg.UseThanosObjstore || cfg.ObjectPrefix == "" { + if cfg.ObjectPrefix == "" { return actual, nil } else { prefix := strings.Trim(cfg.ObjectPrefix, "/") + "/" @@ -655,9 +659,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr s3Cfg.BackoffConfig.MaxRetries = 1 } - if cfg.UseThanosObjstore { - return aws.NewS3ThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) - } return aws.NewS3ObjectClient(s3Cfg, cfg.Hedging) case types.StorageTypeAlibabaCloud: @@ -687,9 +688,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr if cfg.CongestionControl.Enabled { gcsCfg.EnableRetries = false } - if cfg.UseThanosObjstore { - return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) - } return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging) case types.StorageTypeAzure: @@ -701,9 +699,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr } azureCfg = (azure.BlobStorageConfig)(nsCfg) } - if cfg.UseThanosObjstore { - return azure.NewBlobStorageThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) - } return azure.NewBlobStorage(&azureCfg, clientMetrics.AzureMetrics, cfg.Hedging) case types.StorageTypeSwift: @@ -753,10 +748,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr return ibmcloud.NewCOSObjectClient(cosCfg, cfg.Hedging) default: - if cfg.UseThanosObjstore { - return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %s", storeName, strings.Join(cfg.ObjectStore.SupportedBackends(), ", ")) - } - return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v, %v, %v, %v", storeName, types.StorageTypeAWS, types.StorageTypeS3, types.StorageTypeGCS, types.StorageTypeAzure, types.StorageTypeAlibabaCloud, types.StorageTypeSwift, types.StorageTypeBOS, types.StorageTypeCOS, types.StorageTypeFileSystem) } } diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index d2b1aaabdab19..f0904faa198b4 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations. ### Added +- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client. - [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support. - [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class. - [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials. @@ -53,6 +54,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#116](https://github.com/thanos-io/objstore/pull/116) Azure: Add new storage_create_container configuration property - [#128](https://github.com/thanos-io/objstore/pull/128) GCS: Add support for `ChunkSize` for writer. - [#130](https://github.com/thanos-io/objstore/pull/130) feat: Decouple creating bucket metrics from instrumenting the bucket +- [#147](https://github.com/thanos-io/objstore/pull/147) feat: Add MaxRetries config to cos, gcs and obs. - [#150](https://github.com/thanos-io/objstore/pull/150) Add support for roundtripper wrapper. ### Changed diff --git a/vendor/github.com/thanos-io/objstore/README.md b/vendor/github.com/thanos-io/objstore/README.md index 6d848e797465a..d8f5802313739 100644 --- a/vendor/github.com/thanos-io/objstore/README.md +++ b/vendor/github.com/thanos-io/objstore/README.md @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m The core this module is the [`Bucket` interface](objstore.go): -```go mdox-exec="sed -n '37,50p' objstore.go" +```go mdox-exec="sed -n '39,55p' objstore.go" // Bucket provides read and write access to an object storage bucket. // NOTE: We assume strong consistency for write-read flow. type Bucket interface { @@ -63,18 +63,31 @@ type Bucket interface { // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error + // Name returns the bucket name for the provider. + Name() string +} ``` All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go): -```go mdox-exec="sed -n '68,93p' objstore.go" - +```go mdox-exec="sed -n '71,106p' objstore.go" // BucketReader provides read access to an object storage bucket. type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -374,6 +387,7 @@ config: server_name: "" insecure_skip_verify: false disable_compression: false + chunk_size_bytes: 0 prefix: "" ``` @@ -447,6 +461,7 @@ config: storage_account: "" storage_account_key: "" storage_connection_string: "" + storage_create_container: false container: "" endpoint: "" user_assigned_id: "" diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index ed256c9cd9de8..d550e283ce0a6 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, return nil } +func (i *InMemBucket) SupportedIterOptions() []IterOptionType { + return []IterOptionType{Recursive} +} + +func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error { + if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(IterObjectAttributes{Name: name}) + }, options...) +} + // Get returns a reader for the given object name. func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) { if name == "" { diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index 62f1c655db742..33c6e5e867554 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -6,11 +6,13 @@ package objstore import ( "bytes" "context" + "fmt" "io" "io/fs" "os" "path" "path/filepath" + "slices" "strings" "sync" "time" @@ -70,8 +72,19 @@ type InstrumentedBucket interface { type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface { ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader } +var ErrOptionNotSupported = errors.New("iter option is not supported") + +// IterOptionType is used for type-safe option support checking. +type IterOptionType int + +const ( + Recursive IterOptionType = iota + UpdatedAt +) + // IterOption configures the provided params. -type IterOption func(params *IterParams) +type IterOption struct { + Type IterOptionType + Apply func(params *IterParams) +} // WithRecursiveIter is an option that can be applied to Iter() to recursively list objects // in the bucket. -func WithRecursiveIter(params *IterParams) { - params.Recursive = true +func WithRecursiveIter() IterOption { + return IterOption{ + Type: Recursive, + Apply: func(params *IterParams) { + params.Recursive = true + }, + } +} + +// WithUpdatedAt is an option that can be applied to Iter() to +// include the last modified time in the attributes. +// NB: Prefixes may not report last modified time. +// This option is currently supported for the azure, s3, bos, gcs and filesystem providers. +func WithUpdatedAt() IterOption { + return IterOption{ + Type: UpdatedAt, + Apply: func(params *IterParams) { + params.LastModified = true + }, + } } // IterParams holds the Iter() parameters and is used by objstore clients implementations. type IterParams struct { - Recursive bool + Recursive bool + LastModified bool +} + +func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error { + for _, opt := range options { + if !slices.Contains(supportedOptions, opt.Type) { + return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type) + } + } + + return nil } func ApplyIterOptions(options ...IterOption) IterParams { out := IterParams{} for _, opt := range options { - opt(&out) + opt.Apply(&out) } return out } @@ -189,6 +244,20 @@ type ObjectAttributes struct { LastModified time.Time `json:"last_modified"` } +type IterObjectAttributes struct { + Name string + lastModified time.Time +} + +func (i *IterObjectAttributes) SetLastModified(t time.Time) { + i.lastModified = t +} + +// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available. +func (i *IterObjectAttributes) LastModified() (time.Time, bool) { + return i.lastModified, !i.lastModified.IsZero() +} + // TryToGetSize tries to get upfront size from reader. // Some implementations may return only size of unread data in the reader, so it's best to call this method before // doing any reading. @@ -211,6 +280,8 @@ func TryToGetSize(r io.Reader) (int64, error) { return f.Size(), nil case ObjectSizer: return f.ObjectSize() + case *io.LimitedReader: + return f.N, nil } return 0, errors.Errorf("unsupported type of io.Reader: %T", r) } @@ -531,21 +602,43 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket return b.WithExpectedErrs(fn) } -func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error { +func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error { const op = OpIter b.metrics.ops.WithLabelValues(op).Inc() - start := time.Now() + timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op)) + defer timer.ObserveDuration() + err := b.bkt.Iter(ctx, dir, f, options...) if err != nil { if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { b.metrics.opsFailures.WithLabelValues(op).Inc() } } - b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) return err } +func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + const op = OpIter + b.metrics.ops.WithLabelValues(op).Inc() + + timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op)) + defer timer.ObserveDuration() + + err := b.bkt.IterWithAttributes(ctx, dir, f, options...) + if err != nil { + if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { + b.metrics.opsFailures.WithLabelValues(op).Inc() + } + } + + return err +} + +func (b *metricBucket) SupportedIterOptions() []IterOptionType { + return b.bkt.SupportedIterOptions() +} + func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { const op = OpAttributes b.metrics.ops.WithLabelValues(op).Inc() diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index f2b714346831b..a76b34c360ef2 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er }, options...) } +func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + pdir := withPrefix(p.prefix, dir) + + return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error { + attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim) + return f(attrs) + }, options...) +} + +func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType { + return p.bkt.SupportedIterOptions() +} + // Get returns a reader for the given object name. func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name)) diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index e125ca3511c97..5689dc62b7be5 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR return bkt, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + prefix := dir if prefix != "" && !strings.HasSuffix(prefix, DirDelim) { prefix += DirDelim @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blob := range resp.Segment.BlobItems { - if err := f(*blob.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blob.Name, + } + if params.LastModified { + attrs.SetLastModified(*blob.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } @@ -227,12 +239,18 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blobItem := range resp.Segment.BlobItems { - if err := f(*blobItem.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blobItem.Name, + } + if params.LastModified { + attrs.SetLastModified(*blobItem.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } for _, blobPrefix := range resp.Segment.BlobPrefixes { - if err := f(*blobPrefix.Name); err != nil { + if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil { return err } } @@ -240,6 +258,23 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. func (b *Bucket) IsObjNotFoundErr(err error) bool { if err == nil { diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index 2ed42ee8b64a3..01dca4bbd3402 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -50,13 +50,19 @@ func NewBucket(rootDir string) (*Bucket, error) { return &Bucket{rootDir: absDir}, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { if ctx.Err() != nil { return ctx.Err() } + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + params := objstore.ApplyIterOptions(options...) absDir := filepath.Join(b.rootDir, dir) info, err := os.Stat(absDir) @@ -92,7 +98,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if params.Recursive { // Recursively list files in the subdirectory. - if err := b.Iter(ctx, name, f, options...); err != nil { + if err := b.IterWithAttributes(ctx, name, f, options...); err != nil { return err } @@ -101,13 +107,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt continue } } - if err := f(name); err != nil { + + attrs := objstore.IterObjectAttributes{ + Name: name, + } + if params.LastModified { + absPath := filepath.Join(absDir, file.Name()) + stat, err := os.Stat(absPath) + if err != nil { + return errors.Wrapf(err, "stat %s", name) + } + attrs.SetLastModified(stat.ModTime()) + } + if err := f(attrs); err != nil { return err } } return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.GetRange(ctx, name, 0, -1) diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index efb208e60e4d5..d54a6782f4215 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -54,6 +54,11 @@ type Config struct { // Used as storage.Writer.ChunkSize of https://pkg.go.dev/google.golang.org/cloud/storage#Writer ChunkSizeBytes int `yaml:"chunk_size_bytes"` noAuth bool `yaml:"no_auth"` + + // MaxRetries controls the number of retries for idempotent operations. + // Overrides the default gcs storage client behavior if this value is greater than 0. + // Set this to 1 to disable retries. + MaxRetries int `yaml:"max_retries"` } // Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS. @@ -173,6 +178,11 @@ func newBucket(ctx context.Context, logger log.Logger, gc Config, opts []option. name: gc.Bucket, chunkSize: gc.ChunkSizeBytes, } + + if gc.MaxRetries > 0 { + bkt.bkt = bkt.bkt.Retryer(storage.WithMaxAttempts(gc.MaxRetries)) + } + return bkt, nil } @@ -181,18 +191,26 @@ func (b *Bucket) Name() string { return b.name } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + // If recursive iteration is enabled we should pass an empty delimiter. delimiter := DirDelim - if objstore.ApplyIterOptions(options...).Recursive { + if appliedOpts.Recursive { delimiter = "" } @@ -200,11 +218,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt Prefix: dir, Delimiter: delimiter, } - err := query.SetAttrSelection([]string{"Name"}) - if err != nil { - return err + if appliedOpts.LastModified { + if err := query.SetAttrSelection([]string{"Name", "Updated"}); err != nil { + return err + } + } else { + if err := query.SetAttrSelection([]string{"Name"}); err != nil { + return err + } } - it := b.bkt.Objects(ctx, query) for { select { @@ -219,12 +241,34 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if err != nil { return err } - if err := f(attrs.Prefix + attrs.Name); err != nil { + + objAttrs := objstore.IterObjectAttributes{Name: attrs.Prefix + attrs.Name} + if appliedOpts.LastModified { + objAttrs.SetLastModified(attrs.Updated) + } + if err := f(objAttrs); err != nil { return err } } } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { r, err := b.bkt.Object(name).NewReader(ctx) diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index 8e5b8b56402d6..27e82ffbafb33 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -136,6 +136,7 @@ type Config struct { PartSize uint64 `yaml:"part_size"` SSEConfig SSEConfig `yaml:"sse_config"` STSEndpoint string `yaml:"sts_endpoint"` + MaxRetries int `yaml:"max_retries"` } // SSEConfig deals with the configuration of SSE for Minio. The following options are valid: @@ -263,6 +264,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra Region: config.Region, Transport: tpt, BucketLookup: config.BucketLookupType.MinioType(), + MaxRetries: config.MaxRetries, }) if err != nil { return nil, errors.Wrap(err, "initialize s3 client") @@ -387,18 +389,26 @@ func ValidateForTests(conf Config) error { return nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + opts := minio.ListObjectsOptions{ Prefix: dir, - Recursive: objstore.ApplyIterOptions(options...).Recursive, + Recursive: appliedOpts.Recursive, UseV1: b.listObjectsV1, } @@ -415,7 +425,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if object.Key == dir { continue } - if err := f(object.Key); err != nil { + + attr := objstore.IterObjectAttributes{ + Name: object.Key, + } + if appliedOpts.LastModified { + attr.SetLastModified(object.LastModified) + } + + if err := f(attr); err != nil { return err } } @@ -423,6 +441,21 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return ctx.Err() } +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { sse, err := b.getServerSideEncryption(ctx) if err != nil { @@ -629,7 +662,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke bktToCreate := c.Bucket if c.Bucket != "" && reuseBucket { - if err := b.Iter(ctx, "", func(f string) error { + if err := b.Iter(ctx, "", func(string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, errors.Wrapf(err, "s3 check bucket %s", c.Bucket) diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index e872728e4d69e..86caa0c1edfd7 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -21,6 +21,7 @@ import ( "github.com/ncw/swift" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/exthttp" "gopkg.in/yaml.v2" @@ -222,9 +223,13 @@ func (c *Container) Name() string { return c.name } +func (c *Container) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + // Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. -func (c *Container) Iter(_ context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (c *Container) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { dir = strings.TrimSuffix(dir, string(DirDelim)) + string(DirDelim) } @@ -242,6 +247,7 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op if err != nil { return objects, errors.Wrap(err, "list object names") } + for _, object := range objects { if object == SegmentsDir { continue @@ -254,6 +260,16 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op }) } +func (c *Container) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(c.SupportedIterOptions(), options...); err != nil { + return err + } + + return c.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io.ReadCloser, error) { if name == "" { return nil, errors.New("object name cannot be empty") diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index 28cbd65889494..d3fa1def44344 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -195,7 +195,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) expected = []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some", "id2/obj_4.some", "obj_5.some"} sort.Strings(expected) sort.Strings(seen) @@ -214,7 +214,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from id1 dir? @@ -230,7 +230,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from not existing dir? @@ -295,6 +295,15 @@ func (d *delayingBucket) Iter(ctx context.Context, dir string, f func(string) er return d.bkt.Iter(ctx, dir, f, options...) } +func (d *delayingBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + time.Sleep(d.delay) + return d.bkt.IterWithAttributes(ctx, dir, f, options...) +} + +func (d *delayingBucket) SupportedIterOptions() []IterOptionType { + return d.bkt.SupportedIterOptions() +} + func (d *delayingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { time.Sleep(d.delay) return d.bkt.GetRange(ctx, name, off, length) diff --git a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go index 0a26ceeb66f50..cabe07b2cf0e5 100644 --- a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go @@ -52,6 +52,18 @@ func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) erro return } +func (t TracingBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) (err error) { + doWithSpan(ctx, "bucket_iter_with_attrs", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("dir", dir) + err = t.bkt.IterWithAttributes(spanCtx, dir, f, options...) + }) + return +} + +func (t TracingBucket) SupportedIterOptions() []objstore.IterOptionType { + return t.bkt.SupportedIterOptions() +} + func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { span, spanCtx := startSpan(ctx, "bucket_get") span.LogKV("name", name) diff --git a/vendor/modules.txt b/vendor/modules.txt index 4ceca3f031268..e1978ed10b5a7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1582,8 +1582,8 @@ github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require github.com/stretchr/testify/suite -# github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d -## explicit; go 1.21 +# github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 +## explicit; go 1.22 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp github.com/thanos-io/objstore/providers/azure