Skip to content

Commit

Permalink
feat: track discarded data by usageTracker (#14081)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
vlad-diachenko authored Sep 10, 2024
1 parent 20ac1ba commit c65721e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
57 changes: 34 additions & 23 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
now := time.Now()

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
validation.DiscardedSamples.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineSize))
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
Expand All @@ -472,30 +471,11 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize))

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAdd(ctx, tenantID, validation.RateLimited, lbs, float64(discardedStreamBytes))
}
}
}
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
d.writeFailuresManager.Log(tenantID, err)
// Return a 429 to indicate to the client they are being rate limited
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
}

Expand Down Expand Up @@ -569,6 +549,37 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

func (d *Distributor) trackDiscardedData(
ctx context.Context,
req *logproto.PushRequest,
validationContext validationContext,
tenantID string,
validatedLineCount int,
validatedLineSize int,
reason string,
) {
validation.DiscardedSamples.WithLabelValues(reason, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(reason, tenantID).Add(float64(validatedLineSize))

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAdd(ctx, tenantID, reason, lbs, float64(discardedStreamBytes))
}
}
}
}

func hasAnyLevelLabels(l labels.Labels) (string, bool) {
for lbl := range allowedLabelsForLevel {
if l.Has(lbl) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestDistributor(t *testing.T) {
if len(tc.expectedErrors) > 0 {
for _, expectedError := range tc.expectedErrors {
if len(tc.expectedErrors) == 1 {
assert.Equal(t, err, expectedError)
assert.Equal(t, expectedError, err)
} else {
assert.Contains(t, err.Error(), expectedError.Error())
}
Expand Down

0 comments on commit c65721e

Please sign in to comment.