Skip to content

Commit

Permalink
distributor: Drop flag for non-direct OTLP translation
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Oct 16, 2024
1 parent 9d8252f commit 6f91fcc
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 165 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* `cortex_alertmanager_state_replication_failed_total`
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translated is well tested at this point. #9647
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
Expand Down
11 changes: 0 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1701,17 +1701,6 @@
"fieldFlag": "distributor.reusable-ingester-push-workers",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "direct_otlp_translation_enabled",
"required": false,
"desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "distributor.direct-otlp-translation-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 0 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1145,8 +1145,6 @@ Usage of ./cmd/mimir/mimir:
Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable.
-distributor.client-cleanup-period duration
How frequently to clean up clients for ingesters that have gone away. (default 15s)
-distributor.direct-otlp-translation-enabled
[experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true)
-distributor.drop-label string
This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.
-distributor.ha-tracker.cluster string
Expand Down
2 changes: 0 additions & 2 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ The following features are currently experimental:
- `-distributor.max-otlp-request-size`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Enable direct translation from OTLP write requests to Mimir equivalents
- `-distributor.direct-otlp-translation-enabled`
- Enable conversion of OTel start timestamps to Prometheus zero samples to mark series start
- `-distributor.otel-created-timestamp-zero-ingestion-enabled`
- Hash ring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,11 +963,6 @@ instance_limits:
# limiting feature.)
# CLI flag: -distributor.reusable-ingester-push-workers
[reusable_ingester_push_workers: <int> | default = 2000]
# (experimental) When enabled, OTLP write requests are directly translated to
# Mimir equivalents, for optimum performance.
# CLI flag: -distributor.direct-otlp-translation-enabled
[direct_otlp_translation_enabled: <boolean> | default = true]
```

### ingester
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
4 changes: 0 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ type Config struct {

WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems.
DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -248,7 +245,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")
f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down
136 changes: 3 additions & 133 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
Expand Down Expand Up @@ -63,7 +62,6 @@ func OTLPHandler(
pushMetrics *PushMetrics,
reg prometheus.Registerer,
logger log.Logger,
directTranslation bool,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

Expand Down Expand Up @@ -169,16 +167,9 @@ func OTLPHandler(
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

var metrics []mimirpb.PreallocTimeseries
if directTranslation {
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics())
if err != nil {
return err
}
} else {
metrics, err = otelMetricsToTimeseriesOld(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics())
if err != nil {
return err
}
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics())
if err != nil {
return err
}

metricCount := len(metrics)
Expand Down Expand Up @@ -429,127 +420,6 @@ func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes,
return mimirTS, nil
}

// Old, less efficient, version of otelMetricsToTimeseries.
func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := prometheusremotewrite.NewPrometheusConverter()
annots, errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{
AddMetricSuffixes: addSuffixes,
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
}, logger)
promTS := converter.TimeSeries()
if errs != nil {
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed

parseErrs := errs.Error()
if len(parseErrs) > maxErrMsgLen {
parseErrs = parseErrs[:maxErrMsgLen]
}

if len(promTS) == 0 {
return nil, errors.New(parseErrs)
}

level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs)
}
ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}

mimirTS := mimirpb.PreallocTimeseriesSliceFromPool()
for _, ts := range promTS {
mimirTS = append(mimirTS, promToMimirTimeseries(&ts))
}

return mimirTS, nil
}

func promToMimirTimeseries(promTs *prompb.TimeSeries) mimirpb.PreallocTimeseries {
labels := make([]mimirpb.LabelAdapter, 0, len(promTs.Labels))
for _, label := range promTs.Labels {
labels = append(labels, mimirpb.LabelAdapter{
Name: label.Name,
Value: label.Value,
})
}

samples := make([]mimirpb.Sample, 0, len(promTs.Samples))
for _, sample := range promTs.Samples {
samples = append(samples, mimirpb.Sample{
TimestampMs: sample.Timestamp,
Value: sample.Value,
})
}

histograms := make([]mimirpb.Histogram, 0, len(promTs.Histograms))
for idx := range promTs.Histograms {
histograms = append(histograms, promToMimirHistogram(&promTs.Histograms[idx]))
}

exemplars := make([]mimirpb.Exemplar, 0, len(promTs.Exemplars))
for _, exemplar := range promTs.Exemplars {
labels := make([]mimirpb.LabelAdapter, 0, len(exemplar.Labels))
for _, label := range exemplar.Labels {
labels = append(labels, mimirpb.LabelAdapter{
Name: label.Name,
Value: label.Value,
})
}

exemplars = append(exemplars, mimirpb.Exemplar{
Labels: labels,
Value: exemplar.Value,
TimestampMs: exemplar.Timestamp,
})
}

ts := mimirpb.TimeseriesFromPool()
ts.Labels = labels
ts.Samples = samples
ts.Histograms = histograms
ts.Exemplars = exemplars

return mimirpb.PreallocTimeseries{TimeSeries: ts}
}

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
pSpans = append(
pSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}
nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans))
for _, span := range h.NegativeSpans {
nSpans = append(
nSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}

return mimirpb.Histogram{
Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()},
NegativeSpans: nSpans,
NegativeDeltas: h.NegativeDeltas,
NegativeCounts: h.NegativeCounts,
PositiveSpans: pSpans,
PositiveDeltas: h.PositiveDeltas,
PositiveCounts: h.PositiveCounts,
Timestamp: h.Timestamp,
ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint),
}
}

// TimeseriesToOTLPRequest is used in tests.
func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger(), true)
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, false)
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestHandlerOTLPPush(t *testing.T) {

logs := &concurrency.SyncBuffer{}
retryConfig := RetryConfig{Enabled: true, MinBackoff: 5 * time.Second, MaxBackoff: 5 * time.Second}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()), true)
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {
assert.False(t, request.SkipLabelValidation)
pushReq.CleanUp()
return nil
}, nil, nil, log.NewNopLogger(), true)
}, nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
assert.Len(t, request.Timeseries, 1)
assert.False(t, request.SkipLabelValidation)
return nil
}, nil, nil, log.NewNopLogger(), true)
}, nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)

Expand All @@ -521,7 +521,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
assert.Len(t, request.Timeseries, 9) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case
assert.False(t, request.SkipLabelValidation)
return nil
}, nil, nil, log.NewNopLogger(), true)
}, nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand All @@ -542,7 +542,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {

resp := httptest.NewRecorder()

handler := OTLPHandler(140, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger(), true)
handler := OTLPHandler(140, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code)
body, err := io.ReadAll(resp.Body)
Expand Down
39 changes: 38 additions & 1 deletion pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {

return nil
}
h := OTLPHandler(200, util.NewBufferPool(), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger(), true)
h := OTLPHandler(200, util.NewBufferPool(), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
srv.HTTP.Handle("/otlp", h)

// start the server
Expand Down Expand Up @@ -1354,3 +1354,40 @@ func (o otlpLimitsMock) OTelMetricSuffixesEnabled(_ string) bool {
func (o otlpLimitsMock) OTelCreatedTimestampZeroIngestionEnabled(_ string) bool {
return false
}

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
pSpans = append(
pSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}
nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans))
for _, span := range h.NegativeSpans {
nSpans = append(
nSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}

return mimirpb.Histogram{
Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()},
NegativeSpans: nSpans,
NegativeDeltas: h.NegativeDeltas,
NegativeCounts: h.NegativeCounts,
PositiveSpans: pSpans,
PositiveDeltas: h.PositiveDeltas,
PositiveCounts: h.PositiveCounts,
Timestamp: h.Timestamp,
ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint),
}
}

0 comments on commit 6f91fcc

Please sign in to comment.