Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for object attributes in Iter call #63

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations.

### Added
- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client.
- [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support.
- [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class.
- [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials.
Expand Down
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m

The core this module is the [`Bucket` interface](objstore.go):

```go mdox-exec="sed -n '37,50p' objstore.go"
```go mdox-exec="sed -n '39,55p' objstore.go"
// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
Expand All @@ -63,18 +63,31 @@ type Bucket interface {
// If object does not exist in the moment of deletion, Delete should throw error.
Delete(ctx context.Context, name string) error

// Name returns the bucket name for the provider.
Name() string
}
```

All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go):

```go mdox-exec="sed -n '68,93p' objstore.go"

```go mdox-exec="sed -n '71,106p' objstore.go"
// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.

// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
SupportedIterOptions() []IterOptionType

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -374,6 +387,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -447,6 +461,7 @@ config:
storage_account: ""
storage_account_key: ""
storage_connection_string: ""
storage_create_container: false
container: ""
endpoint: ""
user_assigned_id: ""
Expand Down
14 changes: 14 additions & 0 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error,
return nil
}

func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
return []IterOptionType{Recursive}
}

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}

return b.Iter(ctx, dir, func(name string) error {
return f(IterObjectAttributes{Name: name})
}, options...)
}

// Get returns a reader for the given object name.
func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
if name == "" {
Expand Down
109 changes: 100 additions & 9 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package objstore
import (
"bytes"
"context"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -70,8 +72,19 @@ type InstrumentedBucket interface {
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.

// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
SupportedIterOptions() []IterOptionType

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface {
ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

var ErrOptionNotSupported = errors.New("iter option is not supported")

// IterOptionType is used for type-safe option support checking.
type IterOptionType int

const (
Recursive IterOptionType = iota
UpdatedAt
)

// IterOption configures the provided params.
type IterOption func(params *IterParams)
type IterOption struct {
Type IterOptionType
Apply func(params *IterParams)
}

// WithRecursiveIter is an option that can be applied to Iter() to recursively list objects
// in the bucket.
func WithRecursiveIter(params *IterParams) {
params.Recursive = true
func WithRecursiveIter() IterOption {
return IterOption{
Type: Recursive,
Apply: func(params *IterParams) {
params.Recursive = true
},
}
}

// WithUpdatedAt is an option that can be applied to Iter() to
// include the last modified time in the attributes.
// NB: Prefixes may not report last modified time.
// This option is currently supported for the azure, s3, bos, gcs and filesystem providers.
func WithUpdatedAt() IterOption {
return IterOption{
Type: UpdatedAt,
Apply: func(params *IterParams) {
params.LastModified = true
},
}
}

// IterParams holds the Iter() parameters and is used by objstore clients implementations.
type IterParams struct {
Recursive bool
Recursive bool
LastModified bool
}

func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error {
for _, opt := range options {
if !slices.Contains(supportedOptions, opt.Type) {
return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type)
}
}

return nil
}

func ApplyIterOptions(options ...IterOption) IterParams {
out := IterParams{}
for _, opt := range options {
opt(&out)
opt.Apply(&out)
}
return out
}
Expand Down Expand Up @@ -189,6 +244,20 @@ type ObjectAttributes struct {
LastModified time.Time `json:"last_modified"`
}

type IterObjectAttributes struct {
Name string
lastModified time.Time
}

func (i *IterObjectAttributes) SetLastModified(t time.Time) {
i.lastModified = t
}
Comment on lines +252 to +254
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious question: why did we make assignments to lastModified come through a setter but not name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wanted to make it explicit that lastModified might not always be set, made it a private field so the getter contract can return time.Time, bool


// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available.
func (i *IterObjectAttributes) LastModified() (time.Time, bool) {
return i.lastModified, !i.lastModified.IsZero()
}

// TryToGetSize tries to get upfront size from reader.
// Some implementations may return only size of unread data in the reader, so it's best to call this method before
// doing any reading.
Expand Down Expand Up @@ -533,21 +602,43 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
return b.WithExpectedErrs(fn)
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error {
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

start := time.Now()
timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op))
defer timer.ObserveDuration()

err := b.bkt.Iter(ctx, dir, f, options...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}
b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
return err
}

func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op))
defer timer.ObserveDuration()

err := b.bkt.IterWithAttributes(ctx, dir, f, options...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updates to b.metrics.opsDuration metric is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be there now. PTAL again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}

return err
}

func (b *metricBucket) SupportedIterOptions() []IterOptionType {
return b.bkt.SupportedIterOptions()
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = OpAttributes
b.metrics.ops.WithLabelValues(op).Inc()
Expand Down
13 changes: 13 additions & 0 deletions prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er
}, options...)
}

func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
pdir := withPrefix(p.prefix, dir)

return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error {
attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim)
return f(attrs)
}, options...)
}

func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType {
return p.bkt.SupportedIterOptions()
}

// Get returns a reader for the given object name.
func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name))
Expand Down
2 changes: 1 addition & 1 deletion prefixed_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
seen = append(seen, fn)
return nil
}, WithRecursiveIter))
}, WithRecursiveIter()))
expected := []string{"dir/file1.jpg", "file1.jpg"}
sort.Strings(expected)
sort.Strings(seen)
Expand Down
47 changes: 41 additions & 6 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR
return bkt, nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
}

func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}

prefix := dir
if prefix != "" && !strings.HasSuffix(prefix, DirDelim) {
prefix += DirDelim
Expand All @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blob := range resp.Segment.BlobItems {
if err := f(*blob.Name); err != nil {
attrs := objstore.IterObjectAttributes{
Name: *blob.Name,
}
if params.LastModified {
attrs.SetLastModified(*blob.Properties.LastModified)
}
if err := f(attrs); err != nil {
return err
}
}
Expand All @@ -227,19 +239,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blobItem := range resp.Segment.BlobItems {
if err := f(*blobItem.Name); err != nil {
attrs := objstore.IterObjectAttributes{
Name: *blobItem.Name,
}
if params.LastModified {
attrs.SetLastModified(*blobItem.Properties.LastModified)
}
if err := f(attrs); err != nil {
return err
}
}
for _, blobPrefix := range resp.Segment.BlobPrefixes {
if err := f(*blobPrefix.Name); err != nil {
if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil {
return err
}
}
}
return nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
// Only include recursive option since attributes are not used in this method.
var filteredOpts []objstore.IterOption
for _, opt := range opts {
if opt.Type == objstore.Recursive {
filteredOpts = append(filteredOpts, opt)
break
}
}

return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error {
return f(attrs.Name)
}, filteredOpts...)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
if err == nil {
Expand Down
Loading
Loading