Skip to content

Commit

Permalink
fix: Fixes pattern pruning stability (#13429)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 8, 2024
1 parent 2affa48 commit 7c86e65
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 220 deletions.
116 changes: 72 additions & 44 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@ import (
"errors"
"math"
"net/http"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"

loki_iter "github.com/grafana/loki/v3/pkg/iter"
pattern_iter "github.com/grafana/loki/v3/pkg/pattern/iter"
)

// TODO(kolesnikovae): parametrise QueryPatternsRequest
const minClusterSize = 30
const (
minClusterSize = 30
maxPatterns = 300
)

var ErrParseQuery = errors.New("only byte_over_time and count_over_time queries without filters are supported")

Expand Down Expand Up @@ -132,36 +136,63 @@ func (q *IngesterQuerier) querySample(ctx context.Context, req *logproto.QuerySa
return iterators, nil
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, "", nil)
for _, p := range resp.Series {
d.TrainPattern(p.GetPattern(), p.Samples)
total := make([]int64, len(resp.Series))

for i, p := range resp.Series {
for _, s := range p.Samples {
total[i] += s.Value
}
}

resp.Series = resp.Series[:0]
for _, cluster := range d.Clusters() {
if cluster.Size < minClusterSize {
continue
// Create a slice of structs to keep Series and total together
type SeriesWithTotal struct {
Series *logproto.PatternSeries
Total int64
}

seriesWithTotals := make([]SeriesWithTotal, len(resp.Series))
for i := range resp.Series {
seriesWithTotals[i] = SeriesWithTotal{
Series: resp.Series[i],
Total: total[i],
}
pattern := d.PatternString(cluster)
if pattern == "" {
continue
}

// Sort the slice of structs by the Total field
sort.Slice(seriesWithTotals, func(i, j int) bool {
return seriesWithTotals[i].Total > seriesWithTotals[j].Total
})

// Initialize a variable to keep track of the position for valid series
pos := 0

// Iterate over the seriesWithTotals
for i := range seriesWithTotals {
if seriesWithTotals[i].Total >= minClusterSize {
// Place the valid series at the current position
resp.Series[pos] = seriesWithTotals[i].Series
pos++
}
resp.Series = append(resp.Series,
logproto.NewPatternSeries(pattern, cluster.Samples()))
}

// Slice the resp.Series to include only the valid series
resp.Series = resp.Series[:pos]

if len(resp.Series) > maxPatterns {
resp.Series = resp.Series[:maxPatterns]
}

metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))

return resp
}

// ForAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
replicationSet, err := q.ringClient.Ring().GetReplicationSetForOperation(ring.Read)
replicationSet, err := q.ringClient.Ring().GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
Expand All @@ -174,32 +205,29 @@ type ResponseFromIngesters struct {
response interface{}
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
cfg := ring.DoUntilQuorumConfig{
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}

resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}

return ResponseFromIngesters{ingester.Addr, resp}, nil
}, func(ResponseFromIngesters) {
// Nothing to do
})
if err != nil {
g, ctx := errgroup.WithContext(ctx)
responses := make([]ResponseFromIngesters, len(replicationSet.Instances))

for i, ingester := range replicationSet.Instances {
ingester := ingester
i := i
g.Go(func() error {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return err
}

resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return err
}
responses[i] = ResponseFromIngesters{addr: ingester.Addr, response: resp}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}

responses := make([]ResponseFromIngesters, 0, len(results))
responses = append(responses, results...)

return responses, err
return responses, nil
}
Loading

0 comments on commit 7c86e65

Please sign in to comment.