Skip to content

Commit

Permalink
fix: service name detection on push
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 1, 2024
1 parent afb8513 commit fe447f5
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 29 deletions.
66 changes: 43 additions & 23 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ var (
Namespace: constants.Loki,
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant. Includes structured metadata bytes.",
}, []string{"tenant", "retention_hours"})
}, []string{"tenant", "retention_hours", "aggregated_metric"})

structuredMetadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_structured_metadata_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant for entries' structured metadata",
}, []string{"tenant", "retention_hours"})
}, []string{"tenant", "retention_hours", "aggregated_metric"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})
}, []string{"tenant", "aggregated_metric"})

bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
structuredMetadataBytesReceivedStats = analytics.NewCounter("distributor_structured_metadata_bytes_received")
Expand All @@ -70,6 +70,7 @@ type TenantsRetention interface {

type Limits interface {
OTLPConfig(userID string) OTLPConfig
DiscoverServiceName(userID string) []string
}

type EmptyLimits struct{}
Expand All @@ -78,6 +79,10 @@ func (EmptyLimits) OTLPConfig(string) OTLPConfig {
return DefaultOTLPConfig(GlobalOTLPConfig{})
}

func (EmptyLimits) DiscoverServiceName(string) []string {
return nil
}

type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
Expand Down Expand Up @@ -112,33 +117,30 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
structuredMetadataSize int64
)

isAggregatedMetric := fmt.Sprintf("%t", pushStats.IsAggregatedMetric)

for retentionPeriod, size := range pushStats.LogLinesBytes {
retentionHours := RetentionPeriodToString(retentionPeriod)

if !pushStats.IsAggregatedMetric {
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size))
bytesReceivedStats.Inc(size)
}
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size))
bytesReceivedStats.Inc(size)
entriesSize += size
}

for retentionPeriod, size := range pushStats.StructuredMetadataBytes {
retentionHours := RetentionPeriodToString(retentionPeriod)

if !pushStats.IsAggregatedMetric {
structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size))
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size))
bytesReceivedStats.Inc(size)
structuredMetadataBytesReceivedStats.Inc(size)
}
structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size))
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size))
bytesReceivedStats.Inc(size)
structuredMetadataBytesReceivedStats.Inc(size)

entriesSize += size
structuredMetadataSize += size
}

// incrementing tenant metrics if we have a tenant.
if pushStats.NumLines != 0 && userID != "" {
linesIngested.WithLabelValues(userID).Add(float64(pushStats.NumLines))
linesIngested.WithLabelValues(userID, isAggregatedMetric).Add(float64(pushStats.NumLines))
}
linesReceivedStats.Inc(pushStats.NumLines)

Expand All @@ -162,7 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
return req, nil
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
Expand Down Expand Up @@ -231,21 +233,37 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.ContentType = contentType
pushStats.ContentEncoding = contentEncoding

for _, s := range req.Streams {
discoverServiceName := limits.DiscoverServiceName(userID)
for i := range req.Streams {
s := req.Streams[i]
pushStats.StreamLabelsSize += int64(len(s.Labels))

var lbs labels.Labels
if tenantsRetention != nil || tracker != nil {
lbs, err = syntax.ParseLabels(s.Labels)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse labels: %w", err)
}
lbs, err := syntax.ParseLabels(s.Labels)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse labels: %w", err)
}

if lbs.Has(AggregatedMetricLabel) {
pushStats.IsAggregatedMetric = true
}

if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric {
serviceName := ServiceUnknown
for _, labelName := range discoverServiceName {
if labelVal := lbs.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

lb := labels.NewBuilder(lbs)
lbs = lb.Set(LabelServiceName, serviceName).Labels()
s.Labels = lbs.String()

// Remove the added label after it's added to the stream so it's not consumed by subsequent steps
lbs = lb.Del(LabelServiceName).Labels()
}

var retentionPeriod time.Duration
if tenantsRetention != nil {
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)
Expand All @@ -268,6 +286,8 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.MostRecentEntryTimestamp = e.Timestamp
}
}

req.Streams[i] = s
}

return &req, pushStats, nil
Expand Down
48 changes: 42 additions & 6 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestParseRequest(t *testing.T) {
expectedLines int
expectedBytesUsageTracker map[string]float64
expectedLabels labels.Labels
aggregatedMetric bool
}{
{
path: `/loki/api/v1/push`,
Expand Down Expand Up @@ -228,6 +229,18 @@ func TestParseRequest(t *testing.T) {
expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))},
expectedLabels: labels.FromStrings("foo", "bar2", LabelServiceName, ServiceUnknown),
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "__aggregated_metric__": "stuff", "foo": "bar2", "job": "stuff" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true,
enableServiceDiscovery: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
expectedBytesUsageTracker: map[string]float64{`{__aggregated_metric__="stuff", foo="bar2", job="stuff"}`: float64(len("fizzbuss"))},
expectedLabels: labels.FromStrings("__aggregated_metric__", "stuff", "foo", "bar2", "job", "stuff"),
aggregatedMetric: true,
},
} {
t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) {
structuredMetadataBytesIngested.Reset()
Expand Down Expand Up @@ -259,9 +272,32 @@ func TestParseRequest(t *testing.T) {
require.Equal(t, test.expectedBytes, bytesReceived)
require.Equalf(t, tracker.Total(), float64(bytesReceived), "tracked usage bytes must equal bytes received metric")
require.Equal(t, test.expectedLines, linesReceived)
require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
require.Equal(
t,
float64(test.expectedStructuredMetadataBytes),
testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric))),
)
require.Equal(
t,
float64(test.expectedBytes),
testutil.ToFloat64(
bytesIngested.WithLabelValues(
"fake",
"",
fmt.Sprintf("%t", test.aggregatedMetric),
),
),
)
require.Equal(
t,
float64(test.expectedLines),
testutil.ToFloat64(
linesIngested.WithLabelValues(
"fake",
fmt.Sprintf("%t", test.aggregatedMetric),
),
),
)
require.Equal(t, test.expectedLabels.String(), data.Streams[0].Labels)
require.InDeltaMapValuesf(t, test.expectedBytesUsageTracker, tracker.receivedBytes, 0.0, "%s != %s", test.expectedBytesUsageTracker, tracker.receivedBytes)
} else {
Expand All @@ -270,9 +306,9 @@ func TestParseRequest(t *testing.T) {
require.Equal(t, 0, structuredMetadataBytesReceived)
require.Equal(t, 0, bytesReceived)
require.Equal(t, 0, linesReceived)
require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric))))
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric))))
require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake", fmt.Sprintf("%t", test.aggregatedMetric))))
}
})
}
Expand Down

0 comments on commit fe447f5

Please sign in to comment.