From d9671e462fe659262e655e6e1cb7a7c1a16ac82d Mon Sep 17 00:00:00 2001 From: Nathan Braid Date: Fri, 18 Nov 2022 13:04:25 -0800 Subject: [PATCH] Adding counter continuity Prometheus requires that counters be monotonic and that counters are emitted periodically, so they are not marked as stale in PromQL queries. This commit changes the Cortex sink to ensure it emits all counters that it has seen, even if it is unchanged during a particular flush interval --- sinks/cortex/cortex.go | 46 +++++++++------- sinks/cortex/cortex_test.go | 53 +++++++++++++++++++ .../monotonic_counters_missing_keys.json | 28 ++++++++++ 3 files changed, 109 insertions(+), 18 deletions(-) create mode 100644 sinks/cortex/testdata/monotonic_counters_missing_keys.json diff --git a/sinks/cortex/cortex.go b/sinks/cortex/cortex.go index a0a919661..bb6de136b 100644 --- a/sinks/cortex/cortex.go +++ b/sinks/cortex/cortex.go @@ -271,7 +271,7 @@ func (s *CortexMetricSink) writeMetrics(ctx context.Context, metrics []samplers. span, _ := trace.StartSpanFromContext(ctx, "") defer span.ClientFinish(s.traceClient) - wr, updatedCounters := s.makeWriteRequest(metrics) + wr := s.makeWriteRequest(metrics) data, err := proto.Marshal(wr) if err != nil { @@ -318,10 +318,6 @@ func (s *CortexMetricSink) writeMetrics(ctx context.Context, metrics []samplers. return fmt.Errorf("cortex_err=\"failed to write batch: error response\", response_code=%d response_body=\"%s\"", r.StatusCode, b) } - for key, val := range updatedCounters { - s.counters[key] = val - } - return nil } @@ -334,33 +330,47 @@ func (s *CortexMetricSink) FlushOtherSamples(context.Context, []ssf.SSFSample) { // makeWriteRequest converts a list of samples from a flush into a single // prometheus remote-write compatible protobuf object -func (s *CortexMetricSink) makeWriteRequest(metrics []samplers.InterMetric) (*prompb.WriteRequest, map[counterMapKey]float64) { - ts := make([]*prompb.TimeSeries, len(metrics)) - updatedCounters := map[counterMapKey]float64{} - for i, metric := range metrics { +func (s *CortexMetricSink) makeWriteRequest(metrics []samplers.InterMetric) *prompb.WriteRequest { + var ts []*prompb.TimeSeries + for _, metric := range metrics { if metric.Type == samplers.CounterMetric && s.convertCountersToMonotonic { - newMetric, counterKey := s.convertToMonotonicCounter(metric) - metric = newMetric - updatedCounters[counterKey] = metric.Value + s.addToMonotonicCounter(metric) + } else { + ts = append(ts, metricToTimeSeries(metric, s.excludedTags)) } + } - ts[i] = metricToTimeSeries(metric, s.excludedTags) + if s.convertCountersToMonotonic { + for key, count := range s.counters { + ts = append(ts, metricToTimeSeries(samplers.InterMetric{ + Name: key.name, + Tags: getCounterMapKeyTags(key), + Value: count, + Timestamp: time.Now().Unix(), + }, s.excludedTags)) + } } return &prompb.WriteRequest{ Timeseries: ts, - }, updatedCounters + } } -func (s *CortexMetricSink) convertToMonotonicCounter(metric samplers.InterMetric) (samplers.InterMetric, counterMapKey) { +func (s *CortexMetricSink) addToMonotonicCounter(metric samplers.InterMetric) { + key := encodeCounterMapKey(metric) + s.counters[key] += metric.Value +} + +func encodeCounterMapKey(metric samplers.InterMetric) counterMapKey { sort.Strings(metric.Tags) - key := counterMapKey{ + return counterMapKey{ name: metric.Name, tags: strings.Join(metric.Tags, "|"), } +} - metric.Value += s.counters[key] - return metric, key +func getCounterMapKeyTags(key counterMapKey) []string { + return strings.Split(key.tags, "|") } // SetExcludedTags sets the excluded tag names. Any tags with the diff --git a/sinks/cortex/cortex_test.go b/sinks/cortex/cortex_test.go index 91e58c8c0..1e720272f 100644 --- a/sinks/cortex/cortex_test.go +++ b/sinks/cortex/cortex_test.go @@ -202,6 +202,59 @@ func TestMonotonicCounters(t *testing.T) { assert.Equal(t, 3, matchesDone) } +// Here we test that a monotonic counter _persist_ +// i.e. for a counter to work correctly in prometheus +// it must be sent _with every sample rate_ (not sparse) +// so we're making sure that if a counter is seen on Flush#1 but +// not passed to Flush#2 that we still report it with Flush#2 +func TestMonotonicCounterContinuity(t *testing.T) { + // Listen for prometheus writes + server := NewTestServer(t) + defer server.Close() + + // Set up a sink + sink, err := NewCortexMetricSink(server.URL, 30*time.Second, "", logrus.NewEntry(logrus.New()), "test", map[string]string{}, nil, 15, true) + assert.NoError(t, err) + assert.NoError(t, sink.Start(trace.DefaultClient)) + + // we'll load the monotonic counters file with _all_ keys and flush it + jsInput, err := ioutil.ReadFile("testdata/monotonic_counters.json") + assert.NoError(t, err) + var allMetrics []samplers.InterMetric + assert.NoError(t, json.Unmarshal(jsInput, &allMetrics)) + + _, err = sink.Flush(context.Background(), allMetrics) + assert.NoError(t, err) + + // let's load the counters with missing keys + jsMissingMetricsInput, err := ioutil.ReadFile("testdata/monotonic_counters_missing_keys.json") + assert.NoError(t, err) + var missingMetrics []samplers.InterMetric + assert.NoError(t, json.Unmarshal(jsMissingMetricsInput, &missingMetrics)) + + _, err = sink.Flush(context.Background(), missingMetrics) + assert.NoError(t, err) + + expectedVals := map[string]float64{ + "bar": 200, + // this counter is missing but, we should still see this value + "baz": 150, + "taz": 100, + } + + matchesDone := 0 + for _, data := range server.history[1].data.Timeseries { + for _, label := range data.Labels { + if label.Name == "foo" { + matchesDone++ + assert.Equal(t, expectedVals[label.GetValue()], data.Samples[0].GetValue()) + } + } + } + + assert.Equal(t, 3, matchesDone) +} + func TestChunkNumOfMetricsLessThanBatchSize(t *testing.T) { // Listen for prometheus writes server := NewTestServer(t) diff --git a/sinks/cortex/testdata/monotonic_counters_missing_keys.json b/sinks/cortex/testdata/monotonic_counters_missing_keys.json new file mode 100644 index 000000000..a3ef359cb --- /dev/null +++ b/sinks/cortex/testdata/monotonic_counters_missing_keys.json @@ -0,0 +1,28 @@ +[ + { + "Name": "a.a.counter.1", + "Timestamp": 1, + "Value": 100, + "Tags": [ + "foo:bar", + "baz:qux" + ], + "Type": 0, + "Message": "", + "HostName": "", + "Sinks": null + }, + { + "Name": "a.a.gauge.1", + "Timestamp": 0, + "Value": 100, + "Tags": [ + "foo:taz", + "baz:qux" + ], + "Type": 1, + "Message": "", + "HostName": "", + "Sinks": null + } +]