From cf645f30cc87208584bcd02d85d16a554800f38e Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou <3055688+StevenYCChou@users.noreply.github.com> Date: Tue, 24 Sep 2019 08:43:37 -0400 Subject: [PATCH] Fix hash value for Prometheus histogram metrics. (#175) Through internal investigation, I observed that one of the queue length is consistently high and blocked the whole queue manager. I identified that there are large volume of points with hash value 0 tried to be enqueued on to shard #0, which makes the sharding inbalance and the sharded queue are constantly blocked by shard #0. When Stackdriver Prometheus Sidecar tries to transform Prometheus histogram metrics into Stackdriver timeseries with value type distribution, it should return the hash values derived by series cache rather than 0. This change fixes the undesired behavior. --- retrieval/transform.go | 22 +++++++++++----------- retrieval/transform_test.go | 13 +++++++++++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index 2a1c5cae..63bea5a8 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -38,13 +38,14 @@ type sampleBuilder struct { // the remainder of the input. func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) { sample := samples[0] + tailSamples := samples[1:] entry, ok, err := b.series.get(ctx, sample.Ref) if err != nil { return nil, 0, samples, errors.Wrap(err, "get series information") } if !ok { - return nil, 0, samples[1:], nil + return nil, 0, tailSamples, nil } if entry.tracker != nil { @@ -52,7 +53,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo } if !entry.exported { - return nil, 0, samples[1:], nil + return nil, 0, tailSamples, nil } // Get a shallow copy of the proto so we can overwrite the point field // and safely send it into the remote queues. @@ -72,7 +73,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo var v float64 resetTimestamp, v, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V) if !ok { - return nil, 0, samples[1:], nil + return nil, 0, tailSamples, nil } point.Interval.StartTime = getTimestamp(resetTimestamp) point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_DoubleValue{v}} @@ -86,7 +87,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo var v float64 resetTimestamp, v, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V) if !ok { - return nil, 0, samples[1:], nil + return nil, 0, tailSamples, nil } point.Interval.StartTime = getTimestamp(resetTimestamp) point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_DoubleValue{v}} @@ -94,38 +95,37 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo var v float64 resetTimestamp, v, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V) if !ok { - return nil, 0, samples[1:], nil + return nil, 0, tailSamples, nil } point.Interval.StartTime = getTimestamp(resetTimestamp) point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_Int64Value{int64(v)}} case "": // Actual quantiles. point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_DoubleValue{sample.V}} default: - return nil, 0, samples[1:], errors.Errorf("unexpected metric name suffix %q", entry.suffix) + return nil, 0, tailSamples, errors.Errorf("unexpected metric name suffix %q", entry.suffix) } case textparse.MetricTypeHistogram: // We pass in the original lset for matching since Prometheus's target label must // be the same as well. var v *distribution_pb.Distribution - v, resetTimestamp, samples, err = b.buildDistribution(ctx, entry.metadata.Metric, entry.lset, samples) + v, resetTimestamp, tailSamples, err = b.buildDistribution(ctx, entry.metadata.Metric, entry.lset, samples) if v == nil || err != nil { - return nil, 0, samples, err + return nil, 0, tailSamples, err } point.Interval.StartTime = getTimestamp(resetTimestamp) point.Value = &monitoring_pb.TypedValue{ Value: &monitoring_pb.TypedValue_DistributionValue{v}, } - return &ts, 0, samples, nil default: return nil, 0, samples[1:], errors.Errorf("unexpected metric type %s", entry.metadata.Type) } if !b.series.updateSampleInterval(entry.hash, resetTimestamp, sample.T) { - return nil, 0, samples[1:], nil + return nil, 0, tailSamples, nil } - return &ts, entry.hash, samples[1:], nil + return &ts, entry.hash, tailSamples, nil } const ( diff --git a/retrieval/transform_test.go b/retrieval/transform_test.go index 9094998e..8edddf09 100644 --- a/retrieval/transform_test.go +++ b/retrieval/transform_test.go @@ -768,8 +768,10 @@ func TestSampleBuilder(t *testing.T) { t.Logf("Test case %d", i) var s *monitoring_pb.TimeSeries + var h uint64 var err error var result []*monitoring_pb.TimeSeries + var hashes []uint64 aggr, _ := NewCounterAggregator(log.NewNopLogger(), new(CounterAggregatorConfig)) series := newSeriesCache(nil, "", nil, nil, c.targets, c.metadata, resourceMaps, c.metricPrefix, false, aggr) @@ -780,11 +782,12 @@ func TestSampleBuilder(t *testing.T) { b := &sampleBuilder{series: series} for k := 0; len(c.input) > 0; k++ { - s, _, c.input, err = b.next(context.Background(), c.input) + s, h, c.input, err = b.next(context.Background(), c.input) if err != nil { break } result = append(result, s) + hashes = append(hashes, h) } if err == nil && c.fail { t.Fatal("expected error but got none") @@ -801,7 +804,13 @@ func TestSampleBuilder(t *testing.T) { t.Logf("expres %v", c.result) t.Fatalf("unexpected sample %d: got\n\t%v\nwant\n\t%v", k, res, c.result[k]) } + expectedHash := uint64(0) + if c.result[k] != nil { + expectedHash = hashSeries(c.result[k]) + } + if hashes[k] != expectedHash { + t.Fatalf("unexpected hash %v; want %v", hashes[k], expectedHash) + } } - } }