From b91c509af3fe573af635dbc286558a82eb98e777 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 16 Oct 2024 16:33:48 +0200 Subject: [PATCH] distributor: Drop flag for non-direct OTLP translation Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 11 ---- cmd/mimir/help-all.txt.tmpl | 2 - .../configuration-parameters/index.md | 5 -- pkg/api/api.go | 2 +- pkg/distributor/distributor.go | 4 -- pkg/distributor/otel.go | 51 ++----------------- 7 files changed, 5 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b1775ea6b6..8ec8c1d5771 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * `cortex_alertmanager_state_replication_failed_total` * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` +* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translated is well tested at this point. #9647 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 056c87e0e17..8fdab4aca6e 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1701,17 +1701,6 @@ "fieldFlag": "distributor.reusable-ingester-push-workers", "fieldType": "int", "fieldCategory": "advanced" - }, - { - "kind": "field", - "name": "direct_otlp_translation_enabled", - "required": false, - "desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.", - "fieldValue": null, - "fieldDefaultValue": true, - "fieldFlag": "distributor.direct-otlp-translation-enabled", - "fieldType": "boolean", - "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index afed2ba684a..4aceb36c5dc 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1145,8 +1145,6 @@ Usage of ./cmd/mimir/mimir: Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable. -distributor.client-cleanup-period duration How frequently to clean up clients for ingesters that have gone away. (default 15s) - -distributor.direct-otlp-translation-enabled - [experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true) -distributor.drop-label string This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels. -distributor.ha-tracker.cluster string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 4ab17090b42..f757630bae2 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -963,11 +963,6 @@ instance_limits: # limiting feature.) # CLI flag: -distributor.reusable-ingester-push-workers [reusable_ingester_push_workers: | default = 2000] - -# (experimental) When enabled, OTLP write requests are directly translated to -# Mimir equivalents, for optimum performance. -# CLI flag: -distributor.direct-otlp-translation-enabled -[direct_otlp_translation_enabled: | default = true] ``` ### ingester diff --git a/pkg/api/api.go b/pkg/api/api.go index e5f6b68a8d5..4342a65bd52 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -265,7 +265,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib distributorpb.RegisterDistributorServer(a.server.GRPC, d) a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST") - a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST") + a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST") a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{ {Desc: "Ring status", Path: "/distributor/ring"}, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 67fab01c3f4..d7feae79ef1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -227,9 +227,6 @@ type Config struct { WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"` ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"` - - // DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems. - DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"` } // PushWrapper wraps around a push. It is similar to middleware.Interface. @@ -248,7 +245,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.") f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)") - f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.") cfg.DefaultLimits.RegisterFlags(f) } diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 449d0474348..7f8e297148e 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -24,7 +24,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" - "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" @@ -63,7 +62,6 @@ func OTLPHandler( pushMetrics *PushMetrics, reg prometheus.Registerer, logger log.Logger, - directTranslation bool, ) http.Handler { discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError) @@ -169,16 +167,9 @@ func OTLPHandler( pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize)) var metrics []mimirpb.PreallocTimeseries - if directTranslation { - metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics()) - if err != nil { - return err - } - } else { - metrics, err = otelMetricsToTimeseriesOld(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics()) - if err != nil { - return err - } + metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics()) + if err != nil { + return err } metricCount := len(metrics) @@ -429,42 +420,6 @@ func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes, return mimirTS, nil } -// Old, less efficient, version of otelMetricsToTimeseries. -func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { - converter := prometheusremotewrite.NewPrometheusConverter() - annots, errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{ - AddMetricSuffixes: addSuffixes, - EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion, - }, logger) - promTS := converter.TimeSeries() - if errs != nil { - dropped := len(multierr.Errors(errs)) - discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed - - parseErrs := errs.Error() - if len(parseErrs) > maxErrMsgLen { - parseErrs = parseErrs[:maxErrMsgLen] - } - - if len(promTS) == 0 { - return nil, errors.New(parseErrs) - } - - level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs) - } - ws, _ := annots.AsStrings("", 0, 0) - if len(ws) > 0 { - level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws) - } - - mimirTS := mimirpb.PreallocTimeseriesSliceFromPool() - for _, ts := range promTS { - mimirTS = append(mimirTS, promToMimirTimeseries(&ts)) - } - - return mimirTS, nil -} - func promToMimirTimeseries(promTs *prompb.TimeSeries) mimirpb.PreallocTimeseries { labels := make([]mimirpb.LabelAdapter, 0, len(promTs.Labels)) for _, label := range promTs.Labels {