Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Exists call in meta fetcher #6474

Merged
merged 2 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, i
return m, nil
}

func IsBlockMetaFile(path string) bool {
return filepath.Base(path) == MetaFilename
}

func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
Expand Down
35 changes: 22 additions & 13 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"path/filepath"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -234,17 +235,6 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
cachedBlockDir = filepath.Join(f.cacheDir, id.String())
)

// TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items.
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
// TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work).
ok, err := f.bkt.Exists(ctx, metaFile)
if err != nil {
return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile)
}
if !ok {
return nil, ErrorSyncMetaNotFound
}

if m, seen := f.cached[id]; seen {
return m, nil
}
Expand Down Expand Up @@ -360,14 +350,24 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
})
}

partialBlocks := make(map[ulid.ULID]bool)
// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
return f.bkt.Iter(ctx, "", func(name string) error {
id, ok := IsBlockDir(name)
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if _, ok := partialBlocks[id]; !ok {
partialBlocks[id] = true
}
if !IsBlockMetaFile(file) {
return nil
}
partialBlocks[id] = false

select {
case <-ctx.Done():
Expand All @@ -376,13 +376,22 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
}

return nil
})
}, objstore.WithRecursiveIter)
})

if err := eg.Wait(); err != nil {
return nil, errors.Wrap(err, "BaseFetcher: iter bucket")
}

mtx.Lock()
for blockULID, isPartial := range partialBlocks {
if isPartial {
resp.partial[blockULID] = errors.Errorf("block %s has no meta file", blockULID)
resp.noMetas++
}
}
mtx.Unlock()

if len(resp.metaErrs) > 0 {
return resp, nil
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,26 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

return dest.UnmarshalBinary(encodedList, time.Now().Add(iterCfg.TTL))
case cachekey.IterRecursiveVerb:
_, iterCfg := cfg.FindIterConfig(parsedData.Name)
if iterCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}

var list []string
if err := bucket.Iter(ctx, parsedData.Name, func(s string) error {
list = append(list, s)
return nil
}, objstore.WithRecursiveIter); err != nil {
return err
}

encodedList, err := json.Marshal(list)
if err != nil {
return err
}

return dest.UnmarshalBinary(encodedList, time.Now().Add(iterCfg.TTL))
case cachekey.ContentVerb:
_, contentCfg := cfg.FindGetConfig(parsedData.Name)
Expand Down
12 changes: 7 additions & 5 deletions pkg/store/cache/cachekey/cachekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ var (
type VerbType string

const (
ExistsVerb VerbType = "exists"
ContentVerb VerbType = "content"
IterVerb VerbType = "iter"
AttributesVerb VerbType = "attrs"
SubrangeVerb VerbType = "subrange"
ExistsVerb VerbType = "exists"
ContentVerb VerbType = "content"
IterVerb VerbType = "iter"
IterRecursiveVerb VerbType = "iter-recursive"
AttributesVerb VerbType = "attrs"
SubrangeVerb VerbType = "subrange"
)

type BucketCacheKey struct {
Expand All @@ -50,6 +51,7 @@ func IsValidVerb(v VerbType) bool {
ExistsVerb,
ContentVerb,
IterVerb,
IterRecursiveVerb,
AttributesVerb,
SubrangeVerb:
return true
Expand Down
6 changes: 6 additions & 0 deletions pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er
}

cb.operationRequests.WithLabelValues(objstore.OpIter, cfgName).Inc()

iterVerb := cachekey.BucketCacheKey{Verb: cachekey.IterVerb, Name: dir}
opts := objstore.ApplyIterOptions(options...)
if opts.Recursive {
iterVerb.Verb = cachekey.IterRecursiveVerb
}

key := iterVerb.String()
data := cfg.Cache.Fetch(ctx, []string{key})
if data[key] != nil {
Expand Down