Skip to content

Commit

Permalink
Fix hash value for Prometheus histogram metrics. (#175)
Browse files Browse the repository at this point in the history
Through internal investigation, I observed that one of the queue length
is consistently high and blocked the whole queue manager. I identified
that there are large volume of points with hash value 0 tried to be
enqueued on to shard #0, which makes the sharding inbalance and the
sharded queue are constantly blocked by shard #0.

When Stackdriver Prometheus Sidecar tries to transform Prometheus
histogram metrics into Stackdriver timeseries with value type
distribution, it should return the hash values derived by series cache
rather than 0. This change fixes the undesired behavior.
  • Loading branch information
StevenYCChou authored Sep 24, 2019
1 parent cc5ee91 commit cf645f3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
22 changes: 11 additions & 11 deletions retrieval/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@ type sampleBuilder struct {
// the remainder of the input.
func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) {
sample := samples[0]
tailSamples := samples[1:]

entry, ok, err := b.series.get(ctx, sample.Ref)
if err != nil {
return nil, 0, samples, errors.Wrap(err, "get series information")
}
if !ok {
return nil, 0, samples[1:], nil
return nil, 0, tailSamples, nil
}

if entry.tracker != nil {
entry.tracker.newPoint(ctx, entry.lset, sample.T, sample.V)
}

if !entry.exported {
return nil, 0, samples[1:], nil
return nil, 0, tailSamples, nil
}
// Get a shallow copy of the proto so we can overwrite the point field
// and safely send it into the remote queues.
Expand All @@ -72,7 +73,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
var v float64
resetTimestamp, v, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V)
if !ok {
return nil, 0, samples[1:], nil
return nil, 0, tailSamples, nil
}
point.Interval.StartTime = getTimestamp(resetTimestamp)
point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_DoubleValue{v}}
Expand All @@ -86,46 +87,45 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
var v float64
resetTimestamp, v, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V)
if !ok {
return nil, 0, samples[1:], nil
return nil, 0, tailSamples, nil
}
point.Interval.StartTime = getTimestamp(resetTimestamp)
point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_DoubleValue{v}}
case metricSuffixCount:
var v float64
resetTimestamp, v, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V)
if !ok {
return nil, 0, samples[1:], nil
return nil, 0, tailSamples, nil
}
point.Interval.StartTime = getTimestamp(resetTimestamp)
point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_Int64Value{int64(v)}}
case "": // Actual quantiles.
point.Value = &monitoring_pb.TypedValue{Value: &monitoring_pb.TypedValue_DoubleValue{sample.V}}
default:
return nil, 0, samples[1:], errors.Errorf("unexpected metric name suffix %q", entry.suffix)
return nil, 0, tailSamples, errors.Errorf("unexpected metric name suffix %q", entry.suffix)
}

case textparse.MetricTypeHistogram:
// We pass in the original lset for matching since Prometheus's target label must
// be the same as well.
var v *distribution_pb.Distribution
v, resetTimestamp, samples, err = b.buildDistribution(ctx, entry.metadata.Metric, entry.lset, samples)
v, resetTimestamp, tailSamples, err = b.buildDistribution(ctx, entry.metadata.Metric, entry.lset, samples)
if v == nil || err != nil {
return nil, 0, samples, err
return nil, 0, tailSamples, err
}
point.Interval.StartTime = getTimestamp(resetTimestamp)
point.Value = &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_DistributionValue{v},
}
return &ts, 0, samples, nil

default:
return nil, 0, samples[1:], errors.Errorf("unexpected metric type %s", entry.metadata.Type)
}

if !b.series.updateSampleInterval(entry.hash, resetTimestamp, sample.T) {
return nil, 0, samples[1:], nil
return nil, 0, tailSamples, nil
}
return &ts, entry.hash, samples[1:], nil
return &ts, entry.hash, tailSamples, nil
}

const (
Expand Down
13 changes: 11 additions & 2 deletions retrieval/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,10 @@ func TestSampleBuilder(t *testing.T) {
t.Logf("Test case %d", i)

var s *monitoring_pb.TimeSeries
var h uint64
var err error
var result []*monitoring_pb.TimeSeries
var hashes []uint64

aggr, _ := NewCounterAggregator(log.NewNopLogger(), new(CounterAggregatorConfig))
series := newSeriesCache(nil, "", nil, nil, c.targets, c.metadata, resourceMaps, c.metricPrefix, false, aggr)
Expand All @@ -780,11 +782,12 @@ func TestSampleBuilder(t *testing.T) {
b := &sampleBuilder{series: series}

for k := 0; len(c.input) > 0; k++ {
s, _, c.input, err = b.next(context.Background(), c.input)
s, h, c.input, err = b.next(context.Background(), c.input)
if err != nil {
break
}
result = append(result, s)
hashes = append(hashes, h)
}
if err == nil && c.fail {
t.Fatal("expected error but got none")
Expand All @@ -801,7 +804,13 @@ func TestSampleBuilder(t *testing.T) {
t.Logf("expres %v", c.result)
t.Fatalf("unexpected sample %d: got\n\t%v\nwant\n\t%v", k, res, c.result[k])
}
expectedHash := uint64(0)
if c.result[k] != nil {
expectedHash = hashSeries(c.result[k])
}
if hashes[k] != expectedHash {
t.Fatalf("unexpected hash %v; want %v", hashes[k], expectedHash)
}
}

}
}

0 comments on commit cf645f3

Please sign in to comment.