diff --git a/pkg/block/block.go b/pkg/block/block.go index 41625dc17d..64add7fb51 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -295,6 +295,10 @@ func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, i return m, nil } +func IsBlockMetaFile(path string) bool { + return filepath.Base(path) == MetaFilename +} + func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index e97fc62f8b..d2953ae9c5 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -11,6 +11,7 @@ import ( "path" "path/filepath" "sort" + "strings" "sync" "time" @@ -234,17 +235,6 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met cachedBlockDir = filepath.Join(f.cacheDir, id.String()) ) - // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. - // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. - // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). - ok, err := f.bkt.Exists(ctx, metaFile) - if err != nil { - return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile) - } - if !ok { - return nil, ErrorSyncMetaNotFound - } - if m, seen := f.cached[id]; seen { return m, nil } @@ -360,14 +350,24 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { }) } + partialBlocks := make(map[ulid.ULID]bool) // Workers scheduled, distribute blocks. eg.Go(func() error { defer close(ch) return f.bkt.Iter(ctx, "", func(name string) error { - id, ok := IsBlockDir(name) + parts := strings.Split(name, "/") + dir, file := parts[0], parts[len(parts)-1] + id, ok := IsBlockDir(dir) if !ok { return nil } + if _, ok := partialBlocks[id]; !ok { + partialBlocks[id] = true + } + if !IsBlockMetaFile(file) { + return nil + } + partialBlocks[id] = false select { case <-ctx.Done(): @@ -376,13 +376,22 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { } return nil - }) + }, objstore.WithRecursiveIter) }) if err := eg.Wait(); err != nil { return nil, errors.Wrap(err, "BaseFetcher: iter bucket") } + mtx.Lock() + for blockULID, isPartial := range partialBlocks { + if isPartial { + resp.partial[blockULID] = errors.Errorf("block %s has no meta file", blockULID) + resp.noMetas++ + } + } + mtx.Unlock() + if len(resp.metaErrs) > 0 { return resp, nil } diff --git a/pkg/cache/groupcache.go b/pkg/cache/groupcache.go index b908b0c7a8..609959d0cb 100644 --- a/pkg/cache/groupcache.go +++ b/pkg/cache/groupcache.go @@ -197,6 +197,26 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf return err } + return dest.UnmarshalBinary(encodedList, time.Now().Add(iterCfg.TTL)) + case cachekey.IterRecursiveVerb: + _, iterCfg := cfg.FindIterConfig(parsedData.Name) + if iterCfg == nil { + panic("caching bucket layer must not call on unconfigured paths") + } + + var list []string + if err := bucket.Iter(ctx, parsedData.Name, func(s string) error { + list = append(list, s) + return nil + }, objstore.WithRecursiveIter); err != nil { + return err + } + + encodedList, err := json.Marshal(list) + if err != nil { + return err + } + return dest.UnmarshalBinary(encodedList, time.Now().Add(iterCfg.TTL)) case cachekey.ContentVerb: _, contentCfg := cfg.FindGetConfig(parsedData.Name) diff --git a/pkg/store/cache/cachekey/cachekey.go b/pkg/store/cache/cachekey/cachekey.go index eb5438be04..0393dbdb02 100644 --- a/pkg/store/cache/cachekey/cachekey.go +++ b/pkg/store/cache/cachekey/cachekey.go @@ -20,11 +20,12 @@ var ( type VerbType string const ( - ExistsVerb VerbType = "exists" - ContentVerb VerbType = "content" - IterVerb VerbType = "iter" - AttributesVerb VerbType = "attrs" - SubrangeVerb VerbType = "subrange" + ExistsVerb VerbType = "exists" + ContentVerb VerbType = "content" + IterVerb VerbType = "iter" + IterRecursiveVerb VerbType = "iter-recursive" + AttributesVerb VerbType = "attrs" + SubrangeVerb VerbType = "subrange" ) type BucketCacheKey struct { @@ -50,6 +51,7 @@ func IsValidVerb(v VerbType) bool { ExistsVerb, ContentVerb, IterVerb, + IterRecursiveVerb, AttributesVerb, SubrangeVerb: return true diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index cf8a2e4cd4..796036a9a2 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -132,7 +132,13 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er } cb.operationRequests.WithLabelValues(objstore.OpIter, cfgName).Inc() + iterVerb := cachekey.BucketCacheKey{Verb: cachekey.IterVerb, Name: dir} + opts := objstore.ApplyIterOptions(options...) + if opts.Recursive { + iterVerb.Verb = cachekey.IterRecursiveVerb + } + key := iterVerb.String() data := cfg.Cache.Fetch(ctx, []string{key}) if data[key] != nil {