diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d1fda94899..ed6f1f26500 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,6 @@ * [BUGFIX] Query-frontend: properly close gRPC streams to the query-scheduler to stop memory and goroutines leak. #3302 * [BUGFIX] Ruler: persist evaluation delay configured in the rulegroup. #3392 * [BUGFIX] Ring status pages: show 100% ownership as "100%", not "1e+02%". #3435 -* [BUGFIX] Fix panics in OTLP ingest path when parse errors exist. #3538 ### Mixin diff --git a/go.mod b/go.mod index f2a7a2a3ce9..3efa854f944 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,6 @@ require ( go.opentelemetry.io/collector/pdata v0.54.0 go.opentelemetry.io/otel v1.10.0 go.opentelemetry.io/otel/trace v1.10.0 - go.uber.org/multierr v1.8.0 golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 google.golang.org/api v0.97.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 @@ -208,6 +207,7 @@ require ( go.opentelemetry.io/collector/semconv v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.0 // indirect go.opentelemetry.io/otel/metric v0.32.0 // indirect + go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect diff --git a/pkg/util/push/otel.go b/pkg/util/push/otel.go index d51057eb3a1..3189eb80e20 100644 --- a/pkg/util/push/otel.go +++ b/pkg/util/push/otel.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" - "go.uber.org/multierr" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util" @@ -133,7 +132,7 @@ func otelMetricsToTimeseries(ctx context.Context, discardedDueToOtelParseError * return nil, err } - dropped := len(multierr.Errors(errs)) + dropped := md.DataPointCount() - sampleCountInMap(tsMap) discardedDueToOtelParseError.WithLabelValues(userID).Add(float64(dropped)) parseErrs := errs.Error() @@ -229,3 +228,12 @@ func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries) pmetricotlp.Request return pmetricotlp.NewRequestFromMetrics(d) } + +func sampleCountInMap(tsMap map[string]*prompb.TimeSeries) int { + count := 0 + for _, ts := range tsMap { + count += len(ts.Samples) + } + + return count +} diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 79d43eb1eaf..6546cd45c7c 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -100,72 +100,6 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) { assert.Equal(t, 200, resp.Code) } -func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { - // After the above test, the panic occurred again. - // This test is to ensure that the panic is fixed for the new cases as well. - - // First case is to make sure that target_info is counted correctly. - md := pmetric.NewMetrics() - const name = "foo" - attributes := pcommon.NewMap() - attributes.InsertString(model.MetricNameLabel, name) - - resource1 := md.ResourceMetrics().AppendEmpty() - resource1.Resource().Attributes().InsertString("region", "us-central1") - - metric1 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric1.SetName(name) - metric1.SetDataType(pmetric.MetricDataTypeGauge) - datapoint1 := metric1.Gauge().DataPoints().AppendEmpty() - datapoint1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - datapoint1.SetDoubleVal(0) - attributes.CopyTo(datapoint1.Attributes()) - datapoint1.Attributes().InsertString("diff_label", "bar") - - metric2 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric2.SetName(name) - metric2.SetDataType(pmetric.MetricDataTypeGauge) - - req := createOTLPRequest(t, pmetricotlp.NewRequestFromMetrics(md), false) - resp := httptest.NewRecorder() - handler := OTLPHandler(100000, nil, false, nil, func(ctx context.Context, pushReq *Request) (response *mimirpb.WriteResponse, err error) { - request, err := pushReq.WriteRequest() - assert.NoError(t, err) - assert.Len(t, request.Timeseries, 2) - assert.False(t, request.SkipLabelNameValidation) - pushReq.CleanUp() - return &mimirpb.WriteResponse{}, nil - }) - handler.ServeHTTP(resp, req) - assert.Equal(t, 200, resp.Code) - - // Second case is to make sure that histogram metrics are counted correctly. - metric3 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric3.SetName("http_request_duration_seconds") - metric3.SetDataType(pmetric.MetricDataTypeHistogram) - metric3.Histogram().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) - datapoint3 := metric3.Histogram().DataPoints().AppendEmpty() - datapoint3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - datapoint3.SetCount(50) - datapoint3.SetSum(100) - datapoint3.SetMExplicitBounds([]float64{0.1, 0.2, 0.3, 0.4, 0.5}) - datapoint3.SetMBucketCounts([]uint64{10, 20, 30, 40, 50}) - attributes.CopyTo(datapoint3.Attributes()) - - req = createOTLPRequest(t, pmetricotlp.NewRequestFromMetrics(md), false) - resp = httptest.NewRecorder() - handler = OTLPHandler(100000, nil, false, nil, func(ctx context.Context, pushReq *Request) (response *mimirpb.WriteResponse, err error) { - request, err := pushReq.WriteRequest() - assert.NoError(t, err) - assert.Len(t, request.Timeseries, 10) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case - assert.False(t, request.SkipLabelNameValidation) - pushReq.CleanUp() - return &mimirpb.WriteResponse{}, nil - }) - handler.ServeHTTP(resp, req) - assert.Equal(t, 200, resp.Code) -} - func TestHandler_otlpWriteWithCompression(t *testing.T) { req := createOTLPRequest(t, createOTLPMetricRequest(t), true) resp := httptest.NewRecorder()