Skip to content

Commit

Permalink
Enforce max series limit in the distributor when usage-tracker is ena…
Browse files Browse the repository at this point in the history
…bled (#10129)

* Enforce max series limit in the distributor when usage-tracker is enabled

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixes

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fix dependencies

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fix bug in UsageTrackerClient

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fix check

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added cortex_usage_tracker_client_track_series_duration_seconds

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Dec 4, 2024
1 parent ae8ba7d commit 4bc4bd3
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 22 deletions.
4 changes: 4 additions & 0 deletions development/mimir-ingest-storage/config/runtime.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# This file can be used to set overrides or other runtime config.

# overrides:
# anonymous:
# max_global_series_per_user: 10000
111 changes: 99 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ const (
writeRequestSlabPoolSize = 512 * 1024
)

type usageTrackerGenericClient interface {
services.Service
TrackSeries(ctx context.Context, userID string, series []uint64) ([]uint64, error)
}

// Distributor forwards appends and queries to individual ingesters.
type Distributor struct {
services.Service
Expand Down Expand Up @@ -150,11 +155,12 @@ type Distributor struct {
hashCollisionCount prometheus.Counter

// Metrics for data rejected for hitting per-tenant limits
discardedSamplesTooManyHaClusters *prometheus.CounterVec
discardedSamplesRateLimited *prometheus.CounterVec
discardedRequestsRateLimited *prometheus.CounterVec
discardedExemplarsRateLimited *prometheus.CounterVec
discardedMetadataRateLimited *prometheus.CounterVec
discardedSamplesTooManyHaClusters *prometheus.CounterVec
discardedSamplesPerUserSeriesLimit *prometheus.CounterVec
discardedSamplesRateLimited *prometheus.CounterVec
discardedRequestsRateLimited *prometheus.CounterVec
discardedExemplarsRateLimited *prometheus.CounterVec
discardedMetadataRateLimited *prometheus.CounterVec

// Metrics for data rejected for hitting per-instance limits
rejectedRequests *prometheus.CounterVec
Expand Down Expand Up @@ -187,7 +193,7 @@ type Distributor struct {
// usageTrackerClient is the client that should be used to track per-tenant series and
// enforce max series limit in the distributor. This field is nil if usage-tracker
// is disabled.
usageTrackerClient *usagetrackerclient.UsageTrackerClient
usageTrackerClient usageTrackerGenericClient
}

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
Expand Down Expand Up @@ -427,11 +433,12 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "Total number of label values with newlines seen at ingestion time.",
}, []string{"user"}),

discardedSamplesTooManyHaClusters: validation.DiscardedSamplesCounter(reg, reasonTooManyHAClusters),
discardedSamplesRateLimited: validation.DiscardedSamplesCounter(reg, reasonRateLimited),
discardedRequestsRateLimited: validation.DiscardedRequestsCounter(reg, reasonRateLimited),
discardedExemplarsRateLimited: validation.DiscardedExemplarsCounter(reg, reasonRateLimited),
discardedMetadataRateLimited: validation.DiscardedMetadataCounter(reg, reasonRateLimited),
discardedSamplesTooManyHaClusters: validation.DiscardedSamplesCounter(reg, reasonTooManyHAClusters),
discardedSamplesPerUserSeriesLimit: validation.DiscardedSamplesCounter(reg, reasonPerUserSeriesLimit),
discardedSamplesRateLimited: validation.DiscardedSamplesCounter(reg, reasonRateLimited),
discardedRequestsRateLimited: validation.DiscardedRequestsCounter(reg, reasonRateLimited),
discardedExemplarsRateLimited: validation.DiscardedExemplarsCounter(reg, reasonRateLimited),
discardedMetadataRateLimited: validation.DiscardedMetadataCounter(reg, reasonRateLimited),

rejectedRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_instance_rejected_requests_total",
Expand Down Expand Up @@ -704,6 +711,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
filter := prometheus.Labels{"user": userID}
d.dedupedSamples.DeletePartialMatch(filter)
d.discardedSamplesTooManyHaClusters.DeletePartialMatch(filter)
d.discardedSamplesPerUserSeriesLimit.DeletePartialMatch(filter)
d.discardedSamplesRateLimited.DeletePartialMatch(filter)
d.discardedRequestsRateLimited.DeleteLabelValues(userID)
d.discardedExemplarsRateLimited.DeleteLabelValues(userID)
Expand Down Expand Up @@ -843,7 +851,8 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {
middlewares = append(middlewares, d.prePushRelabelMiddleware)
middlewares = append(middlewares, d.prePushSortAndFilterMiddleware)
middlewares = append(middlewares, d.prePushValidationMiddleware)
middlewares = append(middlewares, d.cfg.PushWrappers...)
middlewares = append(middlewares, d.cfg.PushWrappers...) // TODO GEM has a BI middleware. It should probably be applied after prePushMaxSeriesLimitMiddleware
middlewares = append(middlewares, d.prePushMaxSeriesLimitMiddleware) // Should be the very last, to enforce the max series limit on top of all filtering, relabelling and other changes (e.g. GEM aggregations) previous middlewares could do

for ix := len(middlewares) - 1; ix >= 0; ix-- {
next = middlewares[ix](next)
Expand Down Expand Up @@ -1176,6 +1185,84 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
}
}

// prePushMaxSeriesLimitMiddleware enforces the per-tenant max series limit when the usage-tracker service is enabled.
func (d *Distributor) prePushMaxSeriesLimitMiddleware(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
// If the usage-tracker client hasn't been created it means usage-tracker is disabled
// for this instance.
if d.usageTrackerClient == nil {
return next(ctx, pushReq)
}

next, maybeCleanup := NextOrCleanup(next, pushReq)
defer maybeCleanup()

req, err := pushReq.WriteRequest()
if err != nil {
return err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

// Generate the stable hash of each series.
var (
seriesHashes = make([]uint64, len(req.Timeseries))
builder = labels.NewScratchBuilder(100)
nonCopiedLabels labels.Labels
)

for idx, series := range req.Timeseries {
mimirpb.FromLabelAdaptersOverwriteLabels(&builder, series.Labels, &nonCopiedLabels)
seriesHashes[idx] = labels.StableHash(nonCopiedLabels)
}

// Track the series and check if anyone should be rejected because over the limit.
rejectedHashes, err := d.usageTrackerClient.TrackSeries(ctx, userID, seriesHashes)
if err != nil {
return errors.Wrap(err, "failed to enforce max series limit")
}

if len(rejectedHashes) > 0 {
// Build a map of rejected hashes so that it's easier to lookup.
rejectedHashesMap := make(map[uint64]struct{}, len(rejectedHashes))
for _, hash := range rejectedHashes {
rejectedHashesMap[hash] = struct{}{}
}

// Filter out rejected series.
discardedSamples := 0
o := 0
for i := 0; i < len(req.Timeseries); i++ {
seriesHash := seriesHashes[i]

if _, rejected := rejectedHashesMap[seriesHash]; !rejected {
// Keep this series.
req.Timeseries[o] = req.Timeseries[i]
o++
continue
}

// Keep track of the discarded samples and histograms.
discardedSamples += len(req.Timeseries[i].Samples) + len(req.Timeseries[i].Histograms)

// This series has been rejected and filtered out from the WriteRequest. We can reuse its memory.
mimirpb.ReusePreallocTimeseries(&req.Timeseries[i])
}

req.Timeseries = req.Timeseries[:o]

// TODO this metric should be tracked by "group"
d.discardedSamplesPerUserSeriesLimit.WithLabelValues(userID, "").Add(float64(discardedSamples))
}

// TODO inject the soft error in the response
return next(ctx, pushReq)
}
}

// metricsMiddleware updates metrics which are expected to account for all received data,
// including data that later gets modified or dropped.
func (d *Distributor) metricsMiddleware(next PushFunc) PushFunc {
Expand Down
Loading

0 comments on commit 4bc4bd3

Please sign in to comment.