Skip to content

Commit

Permalink
Avoid LabelValues unless necessary
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Jan 9, 2023
1 parent d3f323e commit 90bcc56
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 77 deletions.
133 changes: 100 additions & 33 deletions pkg/storegateway/bucket_index_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,131 @@ type postingGroup struct {
removeKeys []labels.Label
}

func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup {
return &postingGroup{
func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) postingGroup {
return postingGroup{
addAll: addAll,
addKeys: addKeys,
removeKeys: removeKeys,
}
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func toPostingGroup(lvr indexheader.Reader, m *labels.Matcher) (*postingGroup, error) {
if setMatches := m.SetMatches(); len(setMatches) > 0 && (m.Type == labels.MatchRegexp || m.Type == labels.MatchNotRegexp) {
keys := make([]labels.Label, 0, len(setMatches))
for _, val := range setMatches {
if _, err := lvr.PostingsOffset(m.Name, val); errors.Is(err, indexheader.NotFoundRangeErr) {
func (g postingGroup) filterNonExistentKeys(preader postingsOffsetReader) (_ postingGroup, retErr error) {
filterKeys := func(keys []labels.Label) []labels.Label {
existingKeys := 0
for i, l := range keys {
if _, err := preader.PostingsOffset(l.Name, l.Value); errors.Is(err, indexheader.NotFoundRangeErr) {
// This label name and value doesn't exist in this block, so there are 0 postings we can match.
// Set it to an empty value, so we can filter it out later.
keys[i] = labels.Label{}
// Try with the rest of the set matchers, maybe they can match some series.
continue
} else if err != nil {
retErr = err
return nil
}
existingKeys++
}
if existingKeys == len(keys) {
return keys
}
if existingKeys == 0 {
return nil
}

filtered := make([]labels.Label, 0, existingKeys)
for _, k := range keys {
if k == (labels.Label{}) {
continue
}
filtered = append(filtered, k)
}
return filtered
}

return postingGroup{
addAll: g.addAll,
addKeys: filterKeys(g.addKeys),
removeKeys: filterKeys(g.removeKeys),
}, retErr
}

type lazyPostingGroup struct {
isRemoveGroup bool
labelName string
matcher func(string) bool
addAll bool
}

func newLazyPostingGroup(addAll bool, isRemoveGroup bool, labelName string, matcher func(string) bool) lazyPostingGroup {
return lazyPostingGroup{
isRemoveGroup: isRemoveGroup,
labelName: labelName,
matcher: matcher,
addAll: addAll,
}
}

// toPostingGroup guarantees that all the keys in the returned postingGroup exist in the index
func (g lazyPostingGroup) toPostingGroup(lvr labelValuesReader) (postingGroup, error) {
vals, err := lvr.LabelValues(g.labelName, g.matcher)
if err != nil {
return postingGroup{}, err
}
keys := make([]labels.Label, len(vals))
for i := range vals {
keys[i] = labels.Label{Name: g.labelName, Value: vals[i]}
}

if g.isRemoveGroup {
return postingGroup{
addAll: g.addAll,
removeKeys: keys,
}, nil
}
return postingGroup{
addAll: g.addAll,
addKeys: keys,
}, nil
}

type postingsOffsetReader interface {
PostingsOffset(name string, value string) (index.Range, error)
}

type labelValuesReader interface {
LabelValues(name string, filter func(string) bool) ([]string, error)
}

// toPostingGroup returns either a postingGroup or a lazyPostingGroup. It returns a postingGroup when it can
// derive all the possible values that would satisfy the matcher without looking up the index
// (e.g. pod="compactor=1"` or pod=~"compactor-(0|1|2|3)")
// If the matcher needs to use the index, then toPostingGroup returns a lazyPostingGroup (e.g. pod=~"compactor.*").
// NOTE: Derived from tsdb.postingsForMatcher
func toPostingGroup(m *labels.Matcher) (_ postingGroup, _ lazyPostingGroup, isLazy bool) {
if setMatches := m.SetMatches(); len(setMatches) > 0 && (m.Type == labels.MatchRegexp || m.Type == labels.MatchNotRegexp) {
keys := make([]labels.Label, 0, len(setMatches))
for _, val := range setMatches {
keys = append(keys, labels.Label{Name: m.Name, Value: val})
}
if m.Type == labels.MatchNotRegexp {
return newPostingGroup(true, nil, keys), nil
return newPostingGroup(true, nil, keys), lazyPostingGroup{}, false
}
return newPostingGroup(false, keys, nil), nil
return newPostingGroup(false, keys, nil), lazyPostingGroup{}, false
}

if m.Value != "" {
// Fast-path for equal matching.
// Works for every case except for `foo=""`, which is a special case, see below.
if m.Type == labels.MatchEqual {
if _, err := lvr.PostingsOffset(m.Name, m.Value); errors.Is(err, indexheader.NotFoundRangeErr) {
// This label name or value doesn't exist in this block, so there are 0 postings we can match.
return newPostingGroup(false, nil, nil), nil
}
return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil
return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), lazyPostingGroup{}, false
}

// If matcher is `label!="foo"`, we select an empty label value too,
// i.e., series that don't have this label.
// So this matcher selects all series in the storage,
// except for the ones that do have `label="foo"`
if m.Type == labels.MatchNotEqual {
if _, err := lvr.PostingsOffset(m.Name, m.Value); errors.Is(err, indexheader.NotFoundRangeErr) {
// This label name or value doesn't exist in this block, so we should include all postings
// because all series match m.
return newPostingGroup(true, nil, nil), nil
}
return newPostingGroup(true, nil, []labels.Label{{Name: m.Name, Value: m.Value}}), nil
return newPostingGroup(true, nil, []labels.Label{{Name: m.Name, Value: m.Value}}), lazyPostingGroup{}, false
}
}

Expand All @@ -87,22 +164,12 @@ func toPostingGroup(lvr indexheader.Reader, m *labels.Matcher) (*postingGroup, e
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555.
if m.Matches("") {
vals, err := lvr.LabelValues(m.Name, not(m.Matches))
toRemove := make([]labels.Label, len(vals))
for i := range vals {
toRemove[i] = labels.Label{Name: m.Name, Value: vals[i]}
}
return newPostingGroup(true, nil, toRemove), err
return postingGroup{}, newLazyPostingGroup(true, true, m.Name, not(m.Matches)), true
}

// Our matcher does not match the empty value, so we just need the postings that correspond
// to label values matched by the matcher.
vals, err := lvr.LabelValues(m.Name, m.Matches)
toAdd := make([]labels.Label, len(vals))
for i := range vals {
toAdd[i] = labels.Label{Name: m.Name, Value: vals[i]}
}
return newPostingGroup(false, toAdd, nil), err
return postingGroup{}, newLazyPostingGroup(false, false, m.Name, m.Matches), true
}

func not(filter func(string) bool) func(string) bool {
Expand Down
137 changes: 93 additions & 44 deletions pkg/storegateway/bucket_index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,54 +169,14 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use

// expandedPostings is the main logic of ExpandedPostings, without the promise wrapper.
func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, returnErr error) {
var (
postingGroups []*postingGroup
allRequested = false
hasAdds = false
keys []labels.Label
)

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
// Each group is separate to tell later what postings are intersecting with what.
pg, err := toPostingGroup(r.block.indexHeaderReader, m)
if err != nil {
return nil, errors.Wrap(err, "toPostingGroup")
}

// If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty
// postings would return no postings anyway.
// E.g. label="non-existing-value" returns empty group.
if !pg.addAll && len(pg.addKeys) == 0 {
return nil, nil
}

postingGroups = append(postingGroups, pg)
allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
keys = append(keys, pg.addKeys...)
keys = append(keys, pg.removeKeys...)
postingGroups, keys, err := toPostingGroups(ms, r.block.indexHeaderReader)
if err != nil {
return nil, errors.Wrap(err, "toPostingGroups")
}

if len(postingGroups) == 0 {
if len(keys) == 0 {
return nil, nil
}

// We only need special All postings if there are no other adds. If there are, we can skip fetching
// special All postings completely.
if allRequested && !hasAdds {
// add group with label to fetch "special All postings".
name, value := index.AllPostingsKey()
allPostingsLabel := labels.Label{Name: name, Value: value}

postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil))
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(ctx, keys, stats)
if err != nil {
return nil, errors.Wrap(err, "get postings")
Expand Down Expand Up @@ -268,6 +228,95 @@ func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.M
return ps, nil
}

// toPostingGroups returns a set of labels for which to look up postings lists. It guarantees that
// each postingGroup's addKeys and removeKeys exist in the index. The order of the returned labels
// is the same as iterating through the posting groups and for each group adding all addKeys and then adding all removeKeys.
func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, []labels.Label, error) {
var (
postingGroups = make([]postingGroup, 0, len(ms))
lazyPostingGroups []lazyPostingGroup
allRequested = false
hasAdds = false
)

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
// Each group is separate to tell later what postings are intersecting with what.
pg, lazyPg, isLazy := toPostingGroup(m)
if isLazy {
lazyPostingGroups = append(lazyPostingGroups, lazyPg)
} else {
postingGroups = append(postingGroups, pg)
}
}

// Next we check whether the posting groups won't select an empty set of postings.
// We start with the ones that have a known set of values because it's less expensive to check them in
// the index header.
numKeys := 0
for i, pg := range postingGroups {
var err error
pg, err = pg.filterNonExistentKeys(indexhdr)
if err != nil {
return nil, nil, errors.Wrap(err, "filtering posting group")
}
postingGroups[i] = pg

// If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty
// postings would return no postings anyway.
// E.g. label="non-existing-value" returns empty group.
if !pg.addAll && len(pg.addKeys) == 0 {
return nil, nil, nil
}

allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

numKeys += len(pg.addKeys)
numKeys += len(pg.removeKeys)
}

// We continue with the posting groups that require to scan all the label values from the index.
for _, lazyPg := range lazyPostingGroups {
pg, err := lazyPg.toPostingGroup(indexhdr)
if err != nil {
return nil, nil, errors.Wrap(err, "toPostingGroup")
}
postingGroups = append(postingGroups, pg)
// If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty
// postings would return no postings anyway.
// E.g. label="non-existing-value" returns empty group.
if !pg.addAll && len(pg.addKeys) == 0 {
return nil, nil, nil
}

allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

numKeys += len(pg.addKeys)
numKeys += len(pg.removeKeys)
}

// We only need special All postings if there are no other adds. If there are, we can skip fetching
// special All postings completely.
if allRequested && !hasAdds {
// add group with label to fetch "special All postings".
name, value := index.AllPostingsKey()
allPostingsLabel := labels.Label{Name: name, Value: value}

postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil))
numKeys++
}

keys := make([]labels.Label, 0, numKeys)
for _, pg := range postingGroups {
keys = append(keys, pg.addKeys...)
keys = append(keys, pg.removeKeys...)
}

return postingGroups, keys, nil
}

// FetchPostings fills postings requested by posting groups.
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be an ErrPostings
Expand Down

0 comments on commit 90bcc56

Please sign in to comment.